Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
Expand Down Expand Up @@ -359,9 +358,6 @@ public void run(Timeout timeout) {
int totalCount = 0;

for (ConcurrentLinkedDeque<Channel> partition : partitions.values()) {

// store in intermediate unsynchronized lists to minimize
// the impact on the ConcurrentLinkedDeque
if (LOGGER.isDebugEnabled()) {
totalCount += partition.size();
}
Expand All @@ -380,28 +376,38 @@ public void run(Timeout timeout) {
}

/**
* One pass over a partition. A channel is dropped from the deque when it is a removeAll
* tombstone, remotely closed, idle-timeout expired or TTL expired. Tombstoned/concurrently
* leased channels are only unlinked (their owner closes them); expired channels are closed
* here, but only after this cleaner exclusively claims them, so a channel that {@code poll()}
* is leasing concurrently is never closed. Returns the number of channels closed by this tick.
* One pass over a partition. A channel is dropped from the deque when it is a
* {@code removeAll(Channel)} tombstone, remotely closed, idle-timeout expired or TTL expired.
* Tombstoned/concurrently leased channels are only unlinked (their owner closes them); expired
* channels are closed here, but only after this cleaner exclusively claims them, so a channel that
* {@code poll()} is leasing concurrently is never closed. Returns the number of channels closed by
* this tick.
*
* <p>Drop-worthy channels are unlinked in place through the iterator (O(1) amortized each) as the
* scan reaches them. The earlier approach collected them into a list and called
* {@link java.util.concurrent.ConcurrentLinkedDeque#removeAll(java.util.Collection) removeAll} after
* the scan, which re-walks every node doing an O(m) list {@code contains()} per node — O(n*m),
* degenerating toward O(n^2) when a whole partition is dropped in one tick (a load spike's
* connections idling out together, or a peer dropping many keep-alives at once). Unlinking via the
* iterator keeps the whole pass O(n).
*/
private int reapPartition(ConcurrentLinkedDeque<Channel> partition, long now) {
List<Channel> toRemove = null;
int closed = 0;

for (Channel channel : partition) {
Iterator<Channel> it = partition.iterator();
while (it.hasNext()) {
Channel channel = it.next();
IdleState idleState = channel.attr(IDLE_STATE_ATTRIBUTE_KEY).get();
if (idleState == null) {
continue;
}

if (idleState.isOwned()) {
// In-deque + owned ==> a removeAll() tombstone, or a node a concurrent poll() has
// already leased and unlinked. Either way: unlink, never close — the owner of the
// claim is responsible for closing it. removeAll() on an already-unlinked node is a
// harmless no-op.
toRemove = lazyAdd(toRemove, channel);
// In-deque + owned ==> a removeAll(Channel) tombstone, or a node a concurrent poll()
// has already leased and unlinked. Either way: unlink, never close — the owner of the
// claim is responsible for closing it. Unlinking an already-unlinked node through the
// iterator is a harmless no-op.
it.remove();
continue;
}

Expand All @@ -415,7 +421,7 @@ private int reapPartition(ConcurrentLinkedDeque<Channel> partition, long now) {
long startSnapshot = idleState.start();
// Claim before closing so we never close a channel poll() is leasing concurrently.
if (!idleState.takeOwnership()) {
continue; // poll() (or removeAll) won the claim; that owner now handles the channel
continue; // poll() (or removeAll(Channel)) won the claim; that owner now handles the channel
}
if (idleState.start() != startSnapshot) {
// The channel was leased and re-offered (fresh start) between the expiry check and
Expand All @@ -428,21 +434,10 @@ private int reapPartition(ConcurrentLinkedDeque<Channel> partition, long now) {
channel, isIdleTimeoutExpired, isRemotelyClosed, isTtlExpired);
close(channel);
closed++;
toRemove = lazyAdd(toRemove, channel);
it.remove();
}

if (toRemove != null) {
partition.removeAll(toRemove);
}
return closed;
}

private List<Channel> lazyAdd(List<Channel> list, Channel channel) {
if (list == null) {
list = new ArrayList<>(1);
}
list.add(channel);
return list;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,103 @@ public void channelReofferedAfterExpiryIsNotReaped() throws Exception {
pool.destroy();
}

// ---- reap pass unlinks many channels in a single tick (O(n) iterator-remove) ----

@Test
public void cleanerReapsManyIdleExpiredChannelsInOneTick() throws Exception {
// Exercises the reap pass unlinking MANY channels in a single tick — the O(n) iterator-remove
// path that replaced the old collect-then-ConcurrentLinkedDeque.removeAll (which was O(n*m)).
// All channels expire together, as they would when a load spike's connections idle out as a wave.
CapturingTimer timer = new CapturingTimer();
DefaultChannelPool pool = idlePool(timer, Duration.ofMillis(1));

final int count = 50;
Channel[] channels = new Channel[count];
for (int i = 0; i < count; i++) {
channels[i] = new EmbeddedChannel();
pool.offer(channels[i], KEY);
}
assertEquals(count, partitionSize(pool, KEY));
Thread.sleep(40); // now - start >= 1ms for every channel

timer.fire();

assertEquals(0, partitionSize(pool, KEY), "every idle-expired channel must be unlinked in one tick");
for (Channel c : channels) {
assertFalse(c.isActive(), "each idle-expired channel must be closed");
}
assertNull(pool.poll(KEY));

pool.destroy();
}

@Test
public void cleanerReapsExpiredButKeepsHealthyInSameTick() throws Exception {
// A single reap pass must drop the expired channels AND keep the fresh ones leasable: the
// iterator has to remove some nodes while continuing past the ones it keeps.
final long maxIdle = 200;
CapturingTimer timer = new CapturingTimer();
DefaultChannelPool pool = idlePool(timer, Duration.ofMillis(maxIdle));

final int expiredCount = 6;
Channel[] expired = new Channel[expiredCount];
for (int i = 0; i < expiredCount; i++) {
expired[i] = new EmbeddedChannel();
pool.offer(expired[i], KEY);
}
Thread.sleep(maxIdle + 150); // these are now well past maxIdleTime

final int healthyCount = 6;
Channel[] healthy = new Channel[healthyCount];
for (int i = 0; i < healthyCount; i++) {
healthy[i] = new EmbeddedChannel();
pool.offer(healthy[i], KEY); // fresh start, comfortably inside maxIdleTime
}
assertEquals(expiredCount + healthyCount, partitionSize(pool, KEY));

timer.fire();

assertEquals(healthyCount, partitionSize(pool, KEY), "only the fresh channels must survive the tick");
for (Channel c : expired) {
assertFalse(c.isActive(), "expired channels must be closed");
}
for (Channel c : healthy) {
assertTrue(c.isActive(), "fresh channels must not be touched");
}
int leased = 0;
while (pool.poll(KEY) != null) {
leased++;
}
assertEquals(healthyCount, leased, "every surviving channel must remain leasable");

pool.destroy();
}

@Test
public void cleanerUnlinksManyTombstonesInOneTick() throws Exception {
// Many tombstones (from removeAll(Channel)) must all be unlinked in a single pass, none closed.
CapturingTimer timer = new CapturingTimer();
DefaultChannelPool pool = ttlPool(timer);

final int count = 40;
Channel[] channels = new Channel[count];
for (int i = 0; i < count; i++) {
channels[i] = new EmbeddedChannel();
pool.offer(channels[i], KEY);
assertTrue(pool.removeAll(channels[i]));
}
assertEquals(count, partitionSize(pool, KEY), "tombstones linger until the cleaner ticks");

timer.fire();

assertEquals(0, partitionSize(pool, KEY), "every tombstone must be unlinked in one tick");
for (Channel c : channels) {
assertTrue(c.isActive(), "cleaner must not close tombstoned channels");
}

pool.destroy();
}

// ---- concurrency: no leaked tombstones, never leases a claimed channel ----

@Test
Expand Down
Loading