Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -616,14 +616,26 @@ public void repinRoundRobinAddress(InetAddress actualAddress) {
}

public void acquirePartitionLockLazily() throws IOException {
acquirePartitionLockLazily(false);
}

/**
* Lazily acquires this request's per-host connection permit.
*
* @param nonBlocking when {@code true}, acquire the permit without waiting (fail fast) — required when
* called on a Netty event-loop thread, where a blocking acquire would freeze the
* loop. Off the loop (the initial {@code execute()} on the caller thread) pass
* {@code false} to keep the configured acquire-timeout wait.
*/
public void acquirePartitionLockLazily(boolean nonBlocking) throws IOException {
if (connectionSemaphore == null || partitionKeyLock != null) {
return;
}

// Semaphore is keyed per host (base key), not the round-robin per-IP override: the permit is
// taken before the target IP is known and the connector may fail over to another IP.
Object partitionKey = basePartitionKey();
connectionSemaphore.acquireChannelLock(partitionKey);
connectionSemaphore.acquireChannelLock(partitionKey, nonBlocking);
Object prevKey = PARTITION_KEY_LOCK_FIELD.getAndSet(this, partitionKey);
if (prevKey != null) {
// self-check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ public void acquireChannelLock(Object partitionKey) throws IOException {
}
}

@Override
public void acquireChannelLock(Object partitionKey, boolean nonBlocking) throws IOException {
if (!nonBlocking) {
acquireChannelLock(partitionKey);
return;
}
// nonBlocking (the caller is on the event loop): take the global permit without waiting, then the
// per-host permit without waiting, releasing the global one if the per-host permit is unavailable.
globalMaxConnectionSemaphore.acquireChannelLock(partitionKey, true);
if (!getFreeConnectionsForHost(partitionKey).tryAcquire()) {
releaseGlobal(partitionKey);
throw tooManyConnectionsPerHost;
}
}

protected void releaseGlobal(Object partitionKey) {
globalMaxConnectionSemaphore.releaseChannelLock(partitionKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,27 @@ public interface ConnectionSemaphore {

void acquireChannelLock(Object partitionKey) throws IOException;

/**
* Acquires a connection permit for {@code partitionKey}, optionally without blocking.
*
* <p>When {@code nonBlocking} is {@code true} a permit must be taken without waiting — the
* too-many-connections exception is thrown immediately if none is free. This is used when the caller
* runs on a Netty event-loop thread (a redirect / 401 / 407 / retry replay re-enters the send path on
* the event loop), where waiting for the configured acquire timeout would freeze the loop and stall
* every other connection it serves. When {@code false} this behaves exactly like
* {@link #acquireChannelLock(Object)}.
*
* <p>The default implementation ignores the hint and delegates to the blocking
* {@link #acquireChannelLock(Object)}, preserving the behaviour of custom implementations; the built-in
* limiters override it to honour {@code nonBlocking}.
*
* @param partitionKey the per-host partition key the permit is scoped to
* @param nonBlocking {@code true} to fail fast instead of waiting for a permit
* @throws IOException if no permit could be acquired
*/
default void acquireChannelLock(Object partitionKey, boolean nonBlocking) throws IOException {
acquireChannelLock(partitionKey);
}

void releaseChannelLock(Object partitionKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,18 @@ public class MaxConnectionSemaphore implements ConnectionSemaphore {

@Override
public void acquireChannelLock(Object partitionKey) throws IOException {
acquireChannelLock(partitionKey, false);
}

@Override
public void acquireChannelLock(Object partitionKey, boolean nonBlocking) throws IOException {
try {
if (!freeChannels.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS)) {
// nonBlocking (the caller is on the event loop): try once and fail fast rather than parking
// the loop for up to acquireTimeout.
boolean acquired = nonBlocking
? freeChannels.tryAcquire()
: freeChannels.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS);
if (!acquired) {
throw tooManyConnections;
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,19 @@ public class PerHostConnectionSemaphore implements ConnectionSemaphore {

@Override
public void acquireChannelLock(Object partitionKey) throws IOException {
acquireChannelLock(partitionKey, false);
}

@Override
public void acquireChannelLock(Object partitionKey, boolean nonBlocking) throws IOException {
try {
if (!getFreeConnectionsForHost(partitionKey).tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS)) {
Semaphore freeConnections = getFreeConnectionsForHost(partitionKey);
// nonBlocking (the caller is on the event loop): try once and fail fast rather than parking
// the loop for up to acquireTimeout.
boolean acquired = nonBlocking
? freeConnections.tryAcquire()
: freeConnections.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS);
if (!acquired) {
throw tooManyConnectionsPerHost;
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,12 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(Request request, Proxy
// Do not throw an exception when we need an extra connection for a
// redirect.
try {
future.acquirePartitionLockLazily();
// On the event loop (a redirect / 401 / 407 / retry replay re-enters sendRequest here),
// acquire the connection permit WITHOUT blocking: parking the loop for
// acquireFreeChannelTimeout would stall every other connection it serves (and the permit
// may be released only by a task queued on this same loop). Off the loop — the initial
// execute() on the caller thread — keep the configured blocking wait.
future.acquirePartitionLockLazily(isOnEventLoop());
} catch (IOException semaphoreException) {
// If HTTP/2 is enabled, another thread may be establishing an H2 connection.
// Poll the H2 registry with brief retries before giving up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.asynchttpclient.exception.TooManyConnectionsException;
import org.asynchttpclient.exception.TooManyConnectionsPerHostException;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -171,4 +172,66 @@ private void checkRelease(ConnectionSemaphore semaphore) throws IOException {
}
assertFalse(tooManyCaught);
}

// ---- non-blocking acquire (event-loop path): must fail fast, never wait for acquireTimeout ----

// Deliberately large: a blocking acquire on an exhausted semaphore would blow the 1s @Timeout below,
// so these tests fail loudly if the non-blocking path ever waits.
static final int NON_BLOCKING__LONG_TIMEOUT = 5000;

@Test
@Timeout(unit = TimeUnit.MILLISECONDS, value = 1000)
public void maxConnectionNonBlockingFailsFastWhenExhausted() throws IOException {
nonBlockingFailsFast(new MaxConnectionSemaphore(1, NON_BLOCKING__LONG_TIMEOUT));
}

@Test
@Timeout(unit = TimeUnit.MILLISECONDS, value = 1000)
public void perHostNonBlockingFailsFastWhenExhausted() throws IOException {
nonBlockingFailsFast(new PerHostConnectionSemaphore(1, NON_BLOCKING__LONG_TIMEOUT));
}

@Test
@Timeout(unit = TimeUnit.MILLISECONDS, value = 1000)
public void combinedNonBlockingFailsFastWhenExhausted() throws IOException {
nonBlockingFailsFast(new CombinedConnectionSemaphore(1, 1, NON_BLOCKING__LONG_TIMEOUT));
}

private void nonBlockingFailsFast(ConnectionSemaphore semaphore) throws IOException {
semaphore.acquireChannelLock(PK); // consume the only permit
boolean tooManyCaught = false;
long start = System.currentTimeMillis();
try {
semaphore.acquireChannelLock(PK, true);
} catch (TooManyConnectionsException | TooManyConnectionsPerHostException e) {
tooManyCaught = true;
}
long elapsed = System.currentTimeMillis() - start;
assertTrue(tooManyCaught, "non-blocking acquire must fail fast when no permit is free");
assertTrue(elapsed < 500, "non-blocking acquire must not wait for the acquire timeout, waited " + elapsed + " ms");

// A freed permit is immediately acquirable via the non-blocking path.
semaphore.releaseChannelLock(PK);
semaphore.acquireChannelLock(PK, true); // must not throw
}

@Test
@Timeout(unit = TimeUnit.MILLISECONDS, value = 1000)
public void defaultNonBlockingOverloadDelegatesToBlockingAcquire() throws IOException {
// A custom ConnectionSemaphore that implements only the required methods inherits the default
// two-arg overload, which must delegate to the blocking acquireChannelLock(Object).
boolean[] called = {false};
ConnectionSemaphore custom = new ConnectionSemaphore() {
@Override
public void acquireChannelLock(Object partitionKey) {
called[0] = true;
}

@Override
public void releaseChannelLock(Object partitionKey) {
}
};
custom.acquireChannelLock(PK, true);
assertTrue(called[0], "default overload must delegate to acquireChannelLock(Object)");
}
}
Loading