Skip to content

Commit fe2d12f

Browse files
authored
Add proper initialisation of max concurrent streams when successful HTTP/1.1 to HTTP/2 upgrade (#3832)
- Reset maxConcurrentStreams to 0 when invalidating a connection - Add logging when an upgrade stream is opened Signed-off-by: Violeta Georgieva <696661+violetagg@users.noreply.github.com>
1 parent 1310364 commit fe2d12f

File tree

2 files changed

+13
-1
lines changed

2 files changed

+13
-1
lines changed

reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -972,7 +972,6 @@ static class Slot extends AtomicBoolean implements PooledRefMetadata {
972972
this.applicationProtocol = null;
973973
}
974974
initMaxConcurrentStreams();
975-
TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, this.maxConcurrentStreams);
976975
}
977976

978977
void initMaxConcurrentStreams() {
@@ -982,6 +981,7 @@ void initMaxConcurrentStreams() {
982981
this.maxConcurrentStreams = pool.maxConcurrentStreams == -1 ? maxConcurrentStreams :
983982
Math.min(pool.maxConcurrentStreams, maxConcurrentStreams);
984983
}
984+
TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, this.maxConcurrentStreams);
985985
}
986986

987987
boolean canOpenStream() {
@@ -1072,6 +1072,7 @@ void invalidate() {
10721072
}
10731073
pool.poolConfig.allocationStrategy().returnPermits(1);
10741074
TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, -maxConcurrentStreams);
1075+
maxConcurrentStreams = 0;
10751076
}
10761077
}
10771078

reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@
9999
import static reactor.netty.ReactorNetty.format;
100100
import static reactor.netty.ReactorNetty.setChannelContext;
101101
import static reactor.netty.http.client.Http2ConnectionProvider.OWNER;
102+
import static reactor.netty.http.client.Http2ConnectionProvider.http2PooledRef;
103+
import static reactor.netty.http.client.Http2ConnectionProvider.logStreamsState;
102104
import static reactor.netty.http.client.Http3Codec.newHttp3ClientConnectionHandler;
103105

104106
/**
@@ -892,9 +894,11 @@ public void handlerAdded(ChannelHandlerContext ctx) {
892894
ConnectionObserver channelOwner = ctx.channel().attr(OWNER).get();
893895
Http2ConnectionProvider.DisposableAcquire owner = null;
894896
ConnectionObserver obs = null;
897+
Http2Pool.Http2PooledRef http2PooledRef = null;
895898
if (channelOwner instanceof Http2ConnectionProvider.DisposableAcquire) {
896899
owner = (Http2ConnectionProvider.DisposableAcquire) channelOwner;
897900
obs = owner.obs;
901+
http2PooledRef = http2PooledRef(owner.pooledRef);
898902
}
899903
if (responseTimeoutHandler != null) {
900904
pipeline.remove(NettyPipeline.ResponseTimeoutHandler);
@@ -913,6 +917,10 @@ public void handlerAdded(ChannelHandlerContext ctx) {
913917
}
914918
pipeline.remove(NettyPipeline.ReactiveBridge);
915919
pipeline.remove(this);
920+
921+
if (http2PooledRef != null) {
922+
http2PooledRef.slot.initMaxConcurrentStreams();
923+
}
916924
}
917925
}
918926

@@ -972,6 +980,9 @@ protected void initChannel(Channel ch) {
972980
}
973981
addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory,
974982
acceptGzip, metricsRecorder, proxyAddress, remoteAddress, responseTimeoutMillis, uriTagValue);
983+
if (log.isDebugEnabled()) {
984+
logStreamsState(ch, http2PooledRef(owner.pooledRef).slot, "Stream opened");
985+
}
975986
}
976987
else {
977988
// Handle server pushes (inbound streams)

0 commit comments

Comments
 (0)