Skip to content

Commit 0790c25

Browse files
committed
Improve Http2Pool connection reuse for concurrent acquires
Signed-off-by: zimatars <11007575+zimatars@users.noreply.github.com>
1 parent ee8d6d6 commit 0790c25

File tree

2 files changed

+36
-8
lines changed

2 files changed

+36
-8
lines changed

reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -407,8 +407,11 @@ void drainLoop() {
407407
log.debug(format(slot.connection.channel(), "Channel activated"));
408408
}
409409
ACQUIRED.incrementAndGet(this);
410+
// Reserve concurrency and re-offer the slot before async deliver so concurrent acquires can reuse the connection
411+
slot.incrementConcurrencyAndGet();
412+
slot.deactivate();
410413
slot.connection.channel().eventLoop().execute(() -> {
411-
borrower.deliver(new Http2PooledRef(slot));
414+
borrower.deliver(new Http2PooledRef(slot), true);
412415
drain();
413416
});
414417
}
@@ -836,17 +839,31 @@ public String toString() {
836839
}
837840

838841
void deliver(Http2PooledRef poolSlot) {
842+
deliver(poolSlot, false);
843+
}
844+
845+
void deliver(Http2PooledRef poolSlot, boolean alreadyReserved) {
839846
assert poolSlot.slot.connection.channel().eventLoop().inEventLoop();
840847
poolSlot.slot.updateMaxConcurrentStreams();
841-
if (!poolSlot.slot.canOpenStream()) {
842-
poolSlot.slot.deactivate();
848+
849+
int effectiveConcurrency = poolSlot.slot.concurrency() - (alreadyReserved ? 1 : 0);
850+
if (!poolSlot.slot.canOpenStream(effectiveConcurrency)) {
851+
if (alreadyReserved) {
852+
// Concurrency was reserved in drainLoop, rollback reservation
853+
poolSlot.slot.decrementConcurrencyAndGet();
854+
}
855+
else {
856+
poolSlot.slot.deactivate();
857+
}
843858
pool.addPending(pool.pending, this, true);
844859
return;
845860
}
846861
stopPendingCountdown(true);
847-
// Increment concurrency BEFORE deactivate so that canOpenStream() is correct for other threads
848-
poolSlot.slot.incrementConcurrencyAndGet();
849-
poolSlot.slot.deactivate();
862+
if (!alreadyReserved) {
863+
// Increment concurrency BEFORE deactivate so that canOpenStream() is correct for other threads
864+
poolSlot.slot.incrementConcurrencyAndGet();
865+
poolSlot.slot.deactivate();
866+
}
850867
if (get()) {
851868
//CANCELLED or timeout reached
852869
poolSlot.invalidate().subscribe(aVoid -> {}, e -> Operators.onErrorDropped(e, Context.empty()));
@@ -1037,7 +1054,13 @@ private long computeMaxConcurrentStreams() {
10371054
}
10381055

10391056
boolean canOpenStream() {
1040-
int concurrency = this.concurrency;
1057+
return canOpenStream(this.concurrency);
1058+
}
1059+
1060+
boolean canOpenStream(int concurrency) {
1061+
if (concurrency < 0) {
1062+
return false;
1063+
}
10411064
long max = this.maxConcurrentStreams;
10421065
// For non-HTTP/2 connections (max == 0), allow opening a stream if concurrency is 0
10431066
// For HTTP/2 connections, check that we haven't reached max concurrent streams

reactor-netty-http/src/main/java/reactor/netty/http/client/Http3Pool.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024-2025 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2024-2026 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -88,6 +88,11 @@ boolean canOpenStream() {
8888
return true;
8989
}
9090

91+
@Override
92+
boolean canOpenStream(int concurrency) {
93+
return true;
94+
}
95+
9196
@Override
9297
boolean goAwayReceived() {
9398
ChannelHandlerContext connectionHandlerCtx = http3ClientConnectionHandlerCtx();

0 commit comments

Comments
 (0)