Skip to content

Commit ee8d6d6

Browse files
committed
Improve Http2Pool connection reuse for concurrent acquires
Signed-off-by: zimatars <11007575+zimatars@users.noreply.github.com>
1 parent c44561a commit ee8d6d6

File tree

4 files changed

+31
-50
lines changed

4 files changed

+31
-50
lines changed

reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,8 @@ void drainLoop() {
391391
// when cached connections are below minimum connections, then allocate a new connection
392392
boolean belowMinConnections = minConnections > 0 &&
393393
poolConfig.allocationStrategy().permitGranted() < minConnections;
394-
Slot slot = belowMinConnections ? null : findConnection(resources);
394+
int resourcesCount = idleSize;
395+
Slot slot = belowMinConnections ? null : findConnection(resources, resourcesCount);
395396
if (slot != null) {
396397
Borrower borrower = pollPending(borrowers, true);
397398
if (borrower == null || borrower.get()) {
@@ -406,19 +407,18 @@ void drainLoop() {
406407
log.debug(format(slot.connection.channel(), "Channel activated"));
407408
}
408409
ACQUIRED.incrementAndGet(this);
409-
// Reserve concurrency and re-offer the slot before async deliver so concurrent acquires can reuse the connection
410-
slot.incrementConcurrencyAndGet();
411-
slot.deactivate();
412410
slot.connection.channel().eventLoop().execute(() -> {
413-
borrower.deliver(new Http2PooledRef(slot), true);
411+
borrower.deliver(new Http2PooledRef(slot));
414412
drain();
415413
});
416414
}
417415
else {
418-
int resourcesCount = idleSize;
419416
if (minConnections > 0 &&
420417
poolConfig.allocationStrategy().permitGranted() >= minConnections &&
421-
resourcesCount == 0) {
418+
poolConfig.allocationStrategy().permitGranted() > resourcesCount) {
419+
// connections allocations were triggered
420+
}
421+
else if (minConnections == 0 && poolConfig.allocationStrategy().permitGranted() > resourcesCount) {
422422
// connections allocations were triggered
423423
}
424424
else {
@@ -542,8 +542,7 @@ void evictInBackground() {
542542
scheduleEviction();
543543
}
544544

545-
@Nullable Slot findConnection(ConcurrentLinkedQueue<Slot> resources) {
546-
int resourcesCount = idleSize;
545+
@Nullable Slot findConnection(ConcurrentLinkedQueue<Slot> resources, int resourcesCount) {
547546
while (resourcesCount > 0) {
548547
// There are connections in the queue
549548

@@ -837,31 +836,17 @@ public String toString() {
837836
}
838837

839838
void deliver(Http2PooledRef poolSlot) {
840-
deliver(poolSlot, false);
841-
}
842-
843-
void deliver(Http2PooledRef poolSlot, boolean alreadyReserved) {
844839
assert poolSlot.slot.connection.channel().eventLoop().inEventLoop();
845840
poolSlot.slot.updateMaxConcurrentStreams();
846-
847-
int effectiveConcurrency = poolSlot.slot.concurrency() - (alreadyReserved ? 1 : 0);
848-
if (!poolSlot.slot.canOpenStream(effectiveConcurrency)) {
849-
if (alreadyReserved) {
850-
// Concurrency was reserved in drainLoop, rollback reservation
851-
poolSlot.slot.decrementConcurrencyAndGet();
852-
}
853-
else {
854-
poolSlot.slot.deactivate();
855-
}
841+
if (!poolSlot.slot.canOpenStream()) {
842+
poolSlot.slot.deactivate();
856843
pool.addPending(pool.pending, this, true);
857844
return;
858845
}
859846
stopPendingCountdown(true);
860-
if (!alreadyReserved) {
861-
// Increment concurrency BEFORE deactivate so that canOpenStream() is correct for other threads
862-
poolSlot.slot.incrementConcurrencyAndGet();
863-
poolSlot.slot.deactivate();
864-
}
847+
// Increment concurrency BEFORE deactivate so that canOpenStream() is correct for other threads
848+
poolSlot.slot.incrementConcurrencyAndGet();
849+
poolSlot.slot.deactivate();
865850
if (get()) {
866851
//CANCELLED or timeout reached
867852
poolSlot.invalidate().subscribe(aVoid -> {}, e -> Operators.onErrorDropped(e, Context.empty()));
@@ -1052,13 +1037,7 @@ private long computeMaxConcurrentStreams() {
10521037
}
10531038

10541039
boolean canOpenStream() {
1055-
return canOpenStream(this.concurrency);
1056-
}
1057-
1058-
boolean canOpenStream(int concurrency) {
1059-
if (concurrency < 0) {
1060-
return false;
1061-
}
1040+
int concurrency = this.concurrency;
10621041
long max = this.maxConcurrentStreams;
10631042
// For non-HTTP/2 connections (max == 0), allow opening a stream if concurrency is 0
10641043
// For HTTP/2 connections, check that we haven't reached max concurrent streams

reactor-netty-http/src/main/java/reactor/netty/http/client/Http3Pool.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024-2026 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2024-2025 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -88,11 +88,6 @@ boolean canOpenStream() {
8888
return true;
8989
}
9090

91-
@Override
92-
boolean canOpenStream(int concurrency) {
93-
return true;
94-
}
95-
9691
@Override
9792
boolean goAwayReceived() {
9893
ChannelHandlerContext connectionHandlerCtx = http3ClientConnectionHandlerCtx();

reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PooledConnectionProviderCustomMetricsTest.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -42,6 +42,7 @@
4242

4343
import static org.assertj.core.api.Assertions.assertThat;
4444
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
45+
import static org.awaitility.Awaitility.await;
4546
import static reactor.netty.http.HttpProtocol.H2;
4647

4748
class Http2PooledConnectionProviderCustomMetricsTest extends BaseHttpTest {
@@ -78,7 +79,7 @@ void measureActiveStreamsSize() throws InterruptedException {
7879
.maxConnections(10)
7980
.build();
8081

81-
CountDownLatch latch = new CountDownLatch(5);
82+
CountDownLatch latch = new CountDownLatch(1);
8283
HttpClient httpClient =
8384
createClient(pool, disposableServer::address)
8485
.protocol(H2)
@@ -94,10 +95,14 @@ void measureActiveStreamsSize() throws InterruptedException {
9495
.subscribe());
9596

9697
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
97-
assertThat(isRegistered.get()).isTrue();
98-
HttpConnectionPoolMetrics actual = metrics.get();
99-
assertThat(actual).isNotNull();
100-
assertThat(actual.activeStreamSize()).isEqualTo(5);
98+
99+
await().atMost(5, TimeUnit.SECONDS)
100+
.untilAsserted(() -> {
101+
assertThat(isRegistered.get()).isTrue();
102+
HttpConnectionPoolMetrics actual = metrics.get();
103+
assertThat(actual).isNotNull();
104+
assertThat(actual.activeStreamSize()).isEqualTo(5);
105+
});
101106
}
102107
finally {
103108
pool.disposeLater().block(Duration.ofSeconds(5));

reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2011-2025 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2011-2026 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -1854,7 +1854,9 @@ void testGracefulShutdownH2(HttpProtocol[] serverProtocols, HttpProtocol[] clien
18541854
private void doTestGracefulShutdown(HttpServer server, HttpClient client) throws Exception {
18551855
CountDownLatch latch1 = new CountDownLatch(2);
18561856
CountDownLatch latch2 = new CountDownLatch(2);
1857-
CountDownLatch latchGoAway = new CountDownLatch(2);
1857+
// HTTP/2 GOAWAY is a connection-level frame. With multiplexing enabled, both concurrent requests
1858+
// can share the same connection, so we should only expect at least one GOAWAY to be observed.
1859+
CountDownLatch latchGoAway = new CountDownLatch(1);
18581860
CountDownLatch latch3 = new CountDownLatch(1);
18591861
LoopResources loop = LoopResources.create("testGracefulShutdown");
18601862
group = new DefaultChannelGroup(executor);
@@ -1906,7 +1908,7 @@ public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg)
19061908
// Stop accepting incoming requests, wait at most 3s for the active requests to finish
19071909
disposableServer.disposeNow();
19081910

1909-
assertThat(latchGoAway.await(30, TimeUnit.SECONDS)).as("2 GOAWAY should have been received").isTrue();
1911+
assertThat(latchGoAway.await(30, TimeUnit.SECONDS)).as("GOAWAY should have been received").isTrue();
19101912
assertThat(latch2.await(30, TimeUnit.SECONDS)).isTrue();
19111913

19121914
// Dispose the event loop

0 commit comments

Comments
 (0)