Skip to content

Fix Netty reference leak. #1762

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .evergreen/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,9 @@ echo "Running tests with Java ${JAVA_VERSION}"
${MULTI_MONGOS_URI_SYSTEM_PROPERTY} ${API_VERSION} ${GRADLE_EXTRA_VARS} \
${JAVA_SYSPROP_ASYNC_TRANSPORT} ${JAVA_SYSPROP_NETTY_SSL_PROVIDER} \
-Dorg.mongodb.test.fle.on.demand.credential.test.failure.enabled=true \
--stacktrace --info --continue ${TESTS}
--stacktrace --info --continue ${TESTS} | tee -a logs.txt

if grep -q 'LEAK:' logs.txt ; then
echo "Netty Leak detected, please inspect build log"
exit 1
fi
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ tasks.withType<Test> {

useJUnitPlatform()

jvmArgs.add("-Dio.netty.leakDetection.level=paranoid")

// Pass any `org.mongodb.*` system settings
systemProperties =
System.getProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
@Override
public void selectServerAsync(final ServerSelector serverSelector, final OperationContext operationContext,
final SingleResultCallback<ServerTuple> callback) {
isTrue("open", !isClosed());
if (isClosed()) {
callback.onResult(null, new MongoClientException("Cluster was closed during server selection."));
}

Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computeServerSelectionTimeout();
ServerSelectionRequest request = new ServerSelectionRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
import static com.mongodb.assertions.Assertions.assertNull;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.TimeoutContext.createMongoTimeoutException;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.connection.Authenticator.shouldAuthenticate;
import static com.mongodb.internal.connection.CommandHelper.HELLO;
Expand Down Expand Up @@ -355,7 +355,7 @@ private Compressor createCompressor(final MongoCompressor mongoCompressor) {
public void close() {
// All but the first call is a no-op
if (!isClosed.getAndSet(true) && (stream != null)) {
stream.close();
stream.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public ByteBuf asReadOnly() {

@Override
public ByteBuf duplicate() {
return new NettyByteBuf(proxied.duplicate().retain(), isWriting);
return new NettyByteBuf(proxied.retainedDuplicate(), isWriting);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,7 @@ private void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf>
composite.addComponent(next);
iter.remove();
} else {
next.retain();
composite.addComponent(next.readSlice(bytesNeededFromCurrentBuffer));
composite.addComponent(next.readRetainedSlice(bytesNeededFromCurrentBuffer));
}
composite.writerIndex(composite.writerIndex() + bytesNeededFromCurrentBuffer);
bytesNeeded -= bytesNeededFromCurrentBuffer;
Expand Down Expand Up @@ -349,7 +348,11 @@ private boolean hasBytesAvailable(final int numBytes) {
private void handleReadResponse(@Nullable final io.netty.buffer.ByteBuf buffer, @Nullable final Throwable t) {
PendingReader localPendingReader = withLock(lock, () -> {
if (buffer != null) {
pendingInboundBuffers.add(buffer.retain());
if (isClosed) {
pendingException = new MongoSocketException("Received data after the stream was closed.", address);
} else {
pendingInboundBuffers.add(buffer.retain());
}
} else {
pendingException = t;
}
Expand Down Expand Up @@ -378,7 +381,7 @@ public void close() {
for (Iterator<io.netty.buffer.ByteBuf> iterator = pendingInboundBuffers.iterator(); iterator.hasNext();) {
io.netty.buffer.ByteBuf nextByteBuf = iterator.next();
iterator.remove();
nextByteBuf.release();
nextByteBuf.release(nextByteBuf.refCnt());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ void shouldWriteUtf8CString(final boolean useBranch, final BufferProvider buffer
@ParameterizedTest(name = "should get byte buffers as little endian. Parameters: useBranch={0}, bufferProvider={1}")
@MethodSource("bufferProvidersWithBranches")
void shouldGetByteBuffersAsLittleEndian(final boolean useBranch, final BufferProvider bufferProvider) {
List<ByteBuf> byteBuffers = new ArrayList<>();
try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
byte[] v = {1, 0, 0, 0};
if (useBranch) {
Expand All @@ -504,7 +505,11 @@ void shouldGetByteBuffersAsLittleEndian(final boolean useBranch, final BufferPro
} else {
out.writeBytes(v);
}
assertEquals(1, out.getByteBuffers().get(0).getInt());

byteBuffers = out.getByteBuffers();
assertEquals(1, byteBuffers.get(0).getInt());
} finally {
byteBuffers.forEach(ByteBuf::release);
}
}

Expand Down Expand Up @@ -1017,6 +1022,7 @@ void shouldWriteInt32WithinSpanningBuffers(
final int expectedLastBufferPosition,
final BufferProvider bufferProvider) {

List<ByteBuf> buffers = new ArrayList<>();
try (ByteBufferBsonOutput output =
new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Integer.BYTES))) {

Expand All @@ -1028,12 +1034,14 @@ void shouldWriteInt32WithinSpanningBuffers(

//then
//getByteBuffers returns ByteBuffers with limit() set to position, position set to 0.
List<ByteBuf> buffers = output.getByteBuffers();
buffers = output.getByteBuffers();
assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
assertBufferContents(expectedBuffers, buffers);

assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit());
assertEquals(expectedOutputPosition, output.getPosition());
} finally {
buffers.forEach(ByteBuf::release);
}
}

Expand All @@ -1049,6 +1057,7 @@ void shouldWriteInt64WithinSpanningBuffers(
final int expectedLastBufferPosition,
final BufferProvider bufferProvider) {

List<ByteBuf> buffers = new ArrayList<>();
try (ByteBufferBsonOutput output =
new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Long.BYTES))) {

Expand All @@ -1060,12 +1069,14 @@ void shouldWriteInt64WithinSpanningBuffers(

//then
//getByteBuffers returns ByteBuffers with limit() set to position, position set to 0.
List<ByteBuf> buffers = output.getByteBuffers();
buffers = output.getByteBuffers();
assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
assertBufferContents(expectedBuffers, buffers);

assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit());
assertEquals(expectedOutputPosition, output.getPosition());
} finally {
buffers.forEach(ByteBuf::release);
}
}

Expand All @@ -1081,6 +1092,7 @@ void shouldWriteDoubleWithinSpanningBuffers(
final int expectedLastBufferPosition,
final BufferProvider bufferProvider) {

List<ByteBuf> buffers = new ArrayList<>();
try (ByteBufferBsonOutput output =
new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Long.BYTES))) {

Expand All @@ -1092,12 +1104,14 @@ void shouldWriteDoubleWithinSpanningBuffers(

//then
//getByteBuffers returns ByteBuffers with limit() set to position, position set to 0.
List<ByteBuf> buffers = output.getByteBuffers();
buffers = output.getByteBuffers();
assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
assertBufferContents(expectedBuffers, buffers);

assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit());
assertEquals(expectedOutputPosition, output.getPosition());
} finally {
buffers.forEach(ByteBuf::release);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL;
import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL;
import static com.mongodb.ReadPreference.primary;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.TimeoutContext.createTimeoutContext;
import static com.mongodb.reactivestreams.client.internal.MongoOperationPublisher.sinkToCallback;
Expand All @@ -73,6 +74,7 @@ public class OperationExecutorImpl implements OperationExecutor {
@Override
public <T> Mono<T> execute(final AsyncReadOperation<T> operation, final ReadPreference readPreference, final ReadConcern readConcern,
@Nullable final ClientSession session) {
isTrue("open", !mongoClient.getCluster().isClosed());
notNull("operation", operation);
notNull("readPreference", readPreference);
notNull("readConcern", readConcern);
Expand Down Expand Up @@ -109,6 +111,7 @@ public <T> Mono<T> execute(final AsyncReadOperation<T> operation, final ReadPref
@Override
public <T> Mono<T> execute(final AsyncWriteOperation<T> operation, final ReadConcern readConcern,
@Nullable final ClientSession session) {
isTrue("open", !mongoClient.getCluster().isClosed());
notNull("operation", operation);
notNull("readConcern", readConcern);

Expand Down