From e95a992e5e87629307ec2f37b77887e1ab459784 Mon Sep 17 00:00:00 2001 From: Pavel Ptashyts <49400901+pavel-ptashyts@users.noreply.github.com> Date: Wed, 1 Jul 2026 18:00:11 +0200 Subject: [PATCH] Never block the event loop acquiring a connection permit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On a redirect / 401 / 407 / retry replay, sendNextRequest re-enters NettyRequestSender.sendRequestWithNewChannel from AsyncHttpClientHandler.channelRead — on the Netty event loop. acquirePartitionLockLazily then called the connection semaphore's blocking acquireChannelLock (Semaphore.tryAcquire(acquireFreeChannelTimeout)), parking the event-loop thread for up to the configured timeout when a finite maxConnections / maxConnectionsPerHost limit is saturated. Blocking the loop stalls every other connection it serves, and the permit may only be released by a task queued on that same loop. The sibling waitForHttp2Connection already guards this with isOnEventLoop(); this path did not. Acquire the permit non-blocking when on the event loop: fail fast (the request still gets its single non-blocking HTTP/2-reuse poll before aborting) instead of parking the loop. Off the loop — the initial execute() on the caller thread — the configured blocking wait is unchanged. - ConnectionSemaphore: add a default acquireChannelLock(key, nonBlocking) overload (default delegates to the blocking form, so custom implementations are unaffected). - Max/PerHost/Combined limiters override it with a non-blocking tryAcquire(). - NettyResponseFuture.acquirePartitionLockLazily(boolean) threads the flag through; the call site passes isOnEventLoop(). Gated behind non-default config (a positive cap AND a positive acquireFreeChannelTimeout); inert under defaults. Adds SemaphoreTest coverage that the non-blocking acquire fails fast instead of waiting and that the default overload delegates. --- .../netty/NettyResponseFuture.java | 14 ++++- .../channel/CombinedConnectionSemaphore.java | 15 +++++ .../netty/channel/ConnectionSemaphore.java | 22 +++++++ .../netty/channel/MaxConnectionSemaphore.java | 12 +++- .../channel/PerHostConnectionSemaphore.java | 13 +++- .../netty/request/NettyRequestSender.java | 7 ++- .../netty/channel/SemaphoreTest.java | 63 +++++++++++++++++++ 7 files changed, 142 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java b/client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java index 99b964735c..058f6aaac6 100755 --- a/client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java +++ b/client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java @@ -616,6 +616,18 @@ public void repinRoundRobinAddress(InetAddress actualAddress) { } public void acquirePartitionLockLazily() throws IOException { + acquirePartitionLockLazily(false); + } + + /** + * Lazily acquires this request's per-host connection permit. + * + * @param nonBlocking when {@code true}, acquire the permit without waiting (fail fast) — required when + * called on a Netty event-loop thread, where a blocking acquire would freeze the + * loop. Off the loop (the initial {@code execute()} on the caller thread) pass + * {@code false} to keep the configured acquire-timeout wait. + */ + public void acquirePartitionLockLazily(boolean nonBlocking) throws IOException { if (connectionSemaphore == null || partitionKeyLock != null) { return; } @@ -623,7 +635,7 @@ public void acquirePartitionLockLazily() throws IOException { // Semaphore is keyed per host (base key), not the round-robin per-IP override: the permit is // taken before the target IP is known and the connector may fail over to another IP. Object partitionKey = basePartitionKey(); - connectionSemaphore.acquireChannelLock(partitionKey); + connectionSemaphore.acquireChannelLock(partitionKey, nonBlocking); Object prevKey = PARTITION_KEY_LOCK_FIELD.getAndSet(this, partitionKey); if (prevKey != null) { // self-check diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/CombinedConnectionSemaphore.java b/client/src/main/java/org/asynchttpclient/netty/channel/CombinedConnectionSemaphore.java index 36748b077f..5d626c59cd 100644 --- a/client/src/main/java/org/asynchttpclient/netty/channel/CombinedConnectionSemaphore.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/CombinedConnectionSemaphore.java @@ -44,6 +44,21 @@ public void acquireChannelLock(Object partitionKey) throws IOException { } } + @Override + public void acquireChannelLock(Object partitionKey, boolean nonBlocking) throws IOException { + if (!nonBlocking) { + acquireChannelLock(partitionKey); + return; + } + // nonBlocking (the caller is on the event loop): take the global permit without waiting, then the + // per-host permit without waiting, releasing the global one if the per-host permit is unavailable. + globalMaxConnectionSemaphore.acquireChannelLock(partitionKey, true); + if (!getFreeConnectionsForHost(partitionKey).tryAcquire()) { + releaseGlobal(partitionKey); + throw tooManyConnectionsPerHost; + } + } + protected void releaseGlobal(Object partitionKey) { globalMaxConnectionSemaphore.releaseChannelLock(partitionKey); } diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ConnectionSemaphore.java b/client/src/main/java/org/asynchttpclient/netty/channel/ConnectionSemaphore.java index 300d0a8cd4..af5ccb5a2a 100644 --- a/client/src/main/java/org/asynchttpclient/netty/channel/ConnectionSemaphore.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/ConnectionSemaphore.java @@ -24,5 +24,27 @@ public interface ConnectionSemaphore { void acquireChannelLock(Object partitionKey) throws IOException; + /** + * Acquires a connection permit for {@code partitionKey}, optionally without blocking. + * + *

When {@code nonBlocking} is {@code true} a permit must be taken without waiting — the + * too-many-connections exception is thrown immediately if none is free. This is used when the caller + * runs on a Netty event-loop thread (a redirect / 401 / 407 / retry replay re-enters the send path on + * the event loop), where waiting for the configured acquire timeout would freeze the loop and stall + * every other connection it serves. When {@code false} this behaves exactly like + * {@link #acquireChannelLock(Object)}. + * + *

The default implementation ignores the hint and delegates to the blocking + * {@link #acquireChannelLock(Object)}, preserving the behaviour of custom implementations; the built-in + * limiters override it to honour {@code nonBlocking}. + * + * @param partitionKey the per-host partition key the permit is scoped to + * @param nonBlocking {@code true} to fail fast instead of waiting for a permit + * @throws IOException if no permit could be acquired + */ + default void acquireChannelLock(Object partitionKey, boolean nonBlocking) throws IOException { + acquireChannelLock(partitionKey); + } + void releaseChannelLock(Object partitionKey); } diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/MaxConnectionSemaphore.java b/client/src/main/java/org/asynchttpclient/netty/channel/MaxConnectionSemaphore.java index 7640b0e1fa..9e43a23d1c 100644 --- a/client/src/main/java/org/asynchttpclient/netty/channel/MaxConnectionSemaphore.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/MaxConnectionSemaphore.java @@ -43,8 +43,18 @@ public class MaxConnectionSemaphore implements ConnectionSemaphore { @Override public void acquireChannelLock(Object partitionKey) throws IOException { + acquireChannelLock(partitionKey, false); + } + + @Override + public void acquireChannelLock(Object partitionKey, boolean nonBlocking) throws IOException { try { - if (!freeChannels.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS)) { + // nonBlocking (the caller is on the event loop): try once and fail fast rather than parking + // the loop for up to acquireTimeout. + boolean acquired = nonBlocking + ? freeChannels.tryAcquire() + : freeChannels.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS); + if (!acquired) { throw tooManyConnections; } } catch (InterruptedException e) { diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/PerHostConnectionSemaphore.java b/client/src/main/java/org/asynchttpclient/netty/channel/PerHostConnectionSemaphore.java index 5930c0e959..73468a9ee3 100644 --- a/client/src/main/java/org/asynchttpclient/netty/channel/PerHostConnectionSemaphore.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/PerHostConnectionSemaphore.java @@ -43,8 +43,19 @@ public class PerHostConnectionSemaphore implements ConnectionSemaphore { @Override public void acquireChannelLock(Object partitionKey) throws IOException { + acquireChannelLock(partitionKey, false); + } + + @Override + public void acquireChannelLock(Object partitionKey, boolean nonBlocking) throws IOException { try { - if (!getFreeConnectionsForHost(partitionKey).tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS)) { + Semaphore freeConnections = getFreeConnectionsForHost(partitionKey); + // nonBlocking (the caller is on the event loop): try once and fail fast rather than parking + // the loop for up to acquireTimeout. + boolean acquired = nonBlocking + ? freeConnections.tryAcquire() + : freeConnections.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS); + if (!acquired) { throw tooManyConnectionsPerHost; } } catch (InterruptedException e) { diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index a92789fb76..a6bb2c072c 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -428,7 +428,12 @@ private ListenableFuture sendRequestWithNewChannel(Request request, Proxy // Do not throw an exception when we need an extra connection for a // redirect. try { - future.acquirePartitionLockLazily(); + // On the event loop (a redirect / 401 / 407 / retry replay re-enters sendRequest here), + // acquire the connection permit WITHOUT blocking: parking the loop for + // acquireFreeChannelTimeout would stall every other connection it serves (and the permit + // may be released only by a task queued on this same loop). Off the loop — the initial + // execute() on the caller thread — keep the configured blocking wait. + future.acquirePartitionLockLazily(isOnEventLoop()); } catch (IOException semaphoreException) { // If HTTP/2 is enabled, another thread may be establishing an H2 connection. // Poll the H2 registry with brief retries before giving up. diff --git a/client/src/test/java/org/asynchttpclient/netty/channel/SemaphoreTest.java b/client/src/test/java/org/asynchttpclient/netty/channel/SemaphoreTest.java index 1c9f1db1d4..962ee4fd0a 100644 --- a/client/src/test/java/org/asynchttpclient/netty/channel/SemaphoreTest.java +++ b/client/src/test/java/org/asynchttpclient/netty/channel/SemaphoreTest.java @@ -19,6 +19,7 @@ import org.asynchttpclient.exception.TooManyConnectionsException; import org.asynchttpclient.exception.TooManyConnectionsPerHostException; import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; @@ -171,4 +172,66 @@ private void checkRelease(ConnectionSemaphore semaphore) throws IOException { } assertFalse(tooManyCaught); } + + // ---- non-blocking acquire (event-loop path): must fail fast, never wait for acquireTimeout ---- + + // Deliberately large: a blocking acquire on an exhausted semaphore would blow the 1s @Timeout below, + // so these tests fail loudly if the non-blocking path ever waits. + static final int NON_BLOCKING__LONG_TIMEOUT = 5000; + + @Test + @Timeout(unit = TimeUnit.MILLISECONDS, value = 1000) + public void maxConnectionNonBlockingFailsFastWhenExhausted() throws IOException { + nonBlockingFailsFast(new MaxConnectionSemaphore(1, NON_BLOCKING__LONG_TIMEOUT)); + } + + @Test + @Timeout(unit = TimeUnit.MILLISECONDS, value = 1000) + public void perHostNonBlockingFailsFastWhenExhausted() throws IOException { + nonBlockingFailsFast(new PerHostConnectionSemaphore(1, NON_BLOCKING__LONG_TIMEOUT)); + } + + @Test + @Timeout(unit = TimeUnit.MILLISECONDS, value = 1000) + public void combinedNonBlockingFailsFastWhenExhausted() throws IOException { + nonBlockingFailsFast(new CombinedConnectionSemaphore(1, 1, NON_BLOCKING__LONG_TIMEOUT)); + } + + private void nonBlockingFailsFast(ConnectionSemaphore semaphore) throws IOException { + semaphore.acquireChannelLock(PK); // consume the only permit + boolean tooManyCaught = false; + long start = System.currentTimeMillis(); + try { + semaphore.acquireChannelLock(PK, true); + } catch (TooManyConnectionsException | TooManyConnectionsPerHostException e) { + tooManyCaught = true; + } + long elapsed = System.currentTimeMillis() - start; + assertTrue(tooManyCaught, "non-blocking acquire must fail fast when no permit is free"); + assertTrue(elapsed < 500, "non-blocking acquire must not wait for the acquire timeout, waited " + elapsed + " ms"); + + // A freed permit is immediately acquirable via the non-blocking path. + semaphore.releaseChannelLock(PK); + semaphore.acquireChannelLock(PK, true); // must not throw + } + + @Test + @Timeout(unit = TimeUnit.MILLISECONDS, value = 1000) + public void defaultNonBlockingOverloadDelegatesToBlockingAcquire() throws IOException { + // A custom ConnectionSemaphore that implements only the required methods inherits the default + // two-arg overload, which must delegate to the blocking acquireChannelLock(Object). + boolean[] called = {false}; + ConnectionSemaphore custom = new ConnectionSemaphore() { + @Override + public void acquireChannelLock(Object partitionKey) { + called[0] = true; + } + + @Override + public void releaseChannelLock(Object partitionKey) { + } + }; + custom.acquireChannelLock(PK, true); + assertTrue(called[0], "default overload must delegate to acquireChannelLock(Object)"); + } }