diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index 6633f7cb2c..2dc68900a0 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -132,4 +132,9 @@ echo "Running tests with Java ${JAVA_VERSION}" ${MULTI_MONGOS_URI_SYSTEM_PROPERTY} ${API_VERSION} ${GRADLE_EXTRA_VARS} \ ${JAVA_SYSPROP_ASYNC_TRANSPORT} ${JAVA_SYSPROP_NETTY_SSL_PROVIDER} \ -Dorg.mongodb.test.fle.on.demand.credential.test.failure.enabled=true \ - --stacktrace --info --continue ${TESTS} + --stacktrace --info --continue ${TESTS} | tee -a logs.txt + +if grep -q 'LEAK:' logs.txt ; then + echo "Netty Leak detected, please inspect build log" + exit 1 +fi diff --git a/buildSrc/src/main/kotlin/conventions/testing-base.gradle.kts b/buildSrc/src/main/kotlin/conventions/testing-base.gradle.kts index 77aeebb6a6..4708c742d4 100644 --- a/buildSrc/src/main/kotlin/conventions/testing-base.gradle.kts +++ b/buildSrc/src/main/kotlin/conventions/testing-base.gradle.kts @@ -34,6 +34,8 @@ tasks.withType { useJUnitPlatform() + jvmArgs.add("-Dio.netty.leakDetection.level=paranoid") + // Pass any `org.mongodb.*` system settings systemProperties = System.getProperties() diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index fb840d9ad0..5e7ff54d43 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -188,7 +188,9 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera @Override public void selectServerAsync(final ServerSelector serverSelector, final OperationContext operationContext, final SingleResultCallback callback) { - isTrue("open", !isClosed()); + if (isClosed()) { + callback.onResult(null, new MongoClientException("Cluster was closed during server selection.")); + } Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computeServerSelectionTimeout(); ServerSelectionRequest request = new ServerSelectionRequest( diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index c1b12f9f18..bf009aa1b0 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -75,8 +75,8 @@ import static com.mongodb.assertions.Assertions.assertNull; import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.assertions.Assertions.notNull; -import static com.mongodb.internal.async.AsyncRunnable.beginAsync; import static com.mongodb.internal.TimeoutContext.createMongoTimeoutException; +import static com.mongodb.internal.async.AsyncRunnable.beginAsync; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; import static com.mongodb.internal.connection.Authenticator.shouldAuthenticate; import static com.mongodb.internal.connection.CommandHelper.HELLO; @@ -355,7 +355,7 @@ private Compressor createCompressor(final MongoCompressor mongoCompressor) { public void close() { // All but the first call is a no-op if (!isClosed.getAndSet(true) && (stream != null)) { - stream.close(); + stream.close(); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java index 21124d81d3..c81cc87dee 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java +++ b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java @@ -256,7 +256,7 @@ public ByteBuf asReadOnly() { @Override public ByteBuf duplicate() { - return new NettyByteBuf(proxied.duplicate().retain(), isWriting); + return new NettyByteBuf(proxied.retainedDuplicate(), isWriting); } @Override diff --git a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java index b28054e7d3..0d02dc0741 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java @@ -307,8 +307,7 @@ private void readAsync(final int numBytes, final AsyncCompletionHandler composite.addComponent(next); iter.remove(); } else { - next.retain(); - composite.addComponent(next.readSlice(bytesNeededFromCurrentBuffer)); + composite.addComponent(next.readRetainedSlice(bytesNeededFromCurrentBuffer)); } composite.writerIndex(composite.writerIndex() + bytesNeededFromCurrentBuffer); bytesNeeded -= bytesNeededFromCurrentBuffer; @@ -349,7 +348,11 @@ private boolean hasBytesAvailable(final int numBytes) { private void handleReadResponse(@Nullable final io.netty.buffer.ByteBuf buffer, @Nullable final Throwable t) { PendingReader localPendingReader = withLock(lock, () -> { if (buffer != null) { - pendingInboundBuffers.add(buffer.retain()); + if (isClosed) { + pendingException = new MongoSocketException("Received data after the stream was closed.", address); + } else { + pendingInboundBuffers.add(buffer.retain()); + } } else { pendingException = t; } @@ -378,7 +381,7 @@ public void close() { for (Iterator iterator = pendingInboundBuffers.iterator(); iterator.hasNext();) { io.netty.buffer.ByteBuf nextByteBuf = iterator.next(); iterator.remove(); - nextByteBuf.release(); + nextByteBuf.release(nextByteBuf.refCnt()); } }); } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufferBsonOutputTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufferBsonOutputTest.java index c54332b0f1..8988ea3d6d 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufferBsonOutputTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufferBsonOutputTest.java @@ -495,6 +495,7 @@ void shouldWriteUtf8CString(final boolean useBranch, final BufferProvider buffer @ParameterizedTest(name = "should get byte buffers as little endian. Parameters: useBranch={0}, bufferProvider={1}") @MethodSource("bufferProvidersWithBranches") void shouldGetByteBuffersAsLittleEndian(final boolean useBranch, final BufferProvider bufferProvider) { + List byteBuffers = new ArrayList<>(); try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) { byte[] v = {1, 0, 0, 0}; if (useBranch) { @@ -504,7 +505,11 @@ void shouldGetByteBuffersAsLittleEndian(final boolean useBranch, final BufferPro } else { out.writeBytes(v); } - assertEquals(1, out.getByteBuffers().get(0).getInt()); + + byteBuffers = out.getByteBuffers(); + assertEquals(1, byteBuffers.get(0).getInt()); + } finally { + byteBuffers.forEach(ByteBuf::release); } } @@ -1017,6 +1022,7 @@ void shouldWriteInt32WithinSpanningBuffers( final int expectedLastBufferPosition, final BufferProvider bufferProvider) { + List buffers = new ArrayList<>(); try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Integer.BYTES))) { @@ -1028,12 +1034,14 @@ void shouldWriteInt32WithinSpanningBuffers( //then //getByteBuffers returns ByteBuffers with limit() set to position, position set to 0. - List buffers = output.getByteBuffers(); + buffers = output.getByteBuffers(); assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch"); assertBufferContents(expectedBuffers, buffers); assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit()); assertEquals(expectedOutputPosition, output.getPosition()); + } finally { + buffers.forEach(ByteBuf::release); } } @@ -1049,6 +1057,7 @@ void shouldWriteInt64WithinSpanningBuffers( final int expectedLastBufferPosition, final BufferProvider bufferProvider) { + List buffers = new ArrayList<>(); try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Long.BYTES))) { @@ -1060,12 +1069,14 @@ void shouldWriteInt64WithinSpanningBuffers( //then //getByteBuffers returns ByteBuffers with limit() set to position, position set to 0. - List buffers = output.getByteBuffers(); + buffers = output.getByteBuffers(); assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch"); assertBufferContents(expectedBuffers, buffers); assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit()); assertEquals(expectedOutputPosition, output.getPosition()); + } finally { + buffers.forEach(ByteBuf::release); } } @@ -1081,6 +1092,7 @@ void shouldWriteDoubleWithinSpanningBuffers( final int expectedLastBufferPosition, final BufferProvider bufferProvider) { + List buffers = new ArrayList<>(); try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Long.BYTES))) { @@ -1092,12 +1104,14 @@ void shouldWriteDoubleWithinSpanningBuffers( //then //getByteBuffers returns ByteBuffers with limit() set to position, position set to 0. - List buffers = output.getByteBuffers(); + buffers = output.getByteBuffers(); assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch"); assertBufferContents(expectedBuffers, buffers); assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit()); assertEquals(expectedOutputPosition, output.getPosition()); + } finally { + buffers.forEach(ByteBuf::release); } } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java index 0a4b0318d1..702a2918a9 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java @@ -47,6 +47,7 @@ import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL; import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL; import static com.mongodb.ReadPreference.primary; +import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.TimeoutContext.createTimeoutContext; import static com.mongodb.reactivestreams.client.internal.MongoOperationPublisher.sinkToCallback; @@ -73,6 +74,7 @@ public class OperationExecutorImpl implements OperationExecutor { @Override public Mono execute(final AsyncReadOperation operation, final ReadPreference readPreference, final ReadConcern readConcern, @Nullable final ClientSession session) { + isTrue("open", !mongoClient.getCluster().isClosed()); notNull("operation", operation); notNull("readPreference", readPreference); notNull("readConcern", readConcern); @@ -109,6 +111,7 @@ public Mono execute(final AsyncReadOperation operation, final ReadPref @Override public Mono execute(final AsyncWriteOperation operation, final ReadConcern readConcern, @Nullable final ClientSession session) { + isTrue("open", !mongoClient.getCluster().isClosed()); notNull("operation", operation); notNull("readConcern", readConcern);