From a19be26c3771d04bd8a150cff8186d3ab46670ea Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Wed, 12 Mar 2025 16:19:19 -0600 Subject: [PATCH 1/4] Rename operationTimeout --- .../client/internal/crypt/Crypt.java | 4 +-- .../internal/crypt/CryptConnection.java | 6 ++-- .../internal/gridfs/GridFSBucketImpl.java | 2 +- .../gridfs/GridFSPublisherCreator.java | 29 ++++++++++--------- .../internal/CollectionInfoRetriever.java | 4 +-- .../client/internal/CommandMarker.java | 13 +++++---- .../com/mongodb/client/internal/Crypt.java | 24 +++++++++------ .../client/internal/CryptConnection.java | 10 ++++--- 8 files changed, 52 insertions(+), 40 deletions(-) diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/Crypt.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/Crypt.java index 61ccaa320fe..a9407799dc7 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/Crypt.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/Crypt.java @@ -143,9 +143,9 @@ public Mono encrypt(final String databaseName, final RawBsonDoc * * @param commandResponse the encrypted command response */ - public Mono decrypt(final RawBsonDocument commandResponse, @Nullable final Timeout operationTimeout) { + public Mono decrypt(final RawBsonDocument commandResponse, @Nullable final Timeout timeout) { notNull("commandResponse", commandResponse); - return executeStateMachine(() -> mongoCrypt.createDecryptionContext(commandResponse), operationTimeout) + return executeStateMachine(() -> mongoCrypt.createDecryptionContext(commandResponse), timeout) .onErrorMap(this::wrapInClientException); } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptConnection.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptConnection.java index c05bfb663f2..f7febad164c 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptConnection.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptConnection.java @@ -121,14 +121,14 @@ public void commandAsync(final String database, final BsonDocument command, : new SplittablePayloadBsonWriter(bsonBinaryWriter, bsonOutput, createSplittablePayloadMessageSettings(), payload, MAX_SPLITTABLE_DOCUMENT_SIZE); - Timeout operationTimeout = operationContext.getTimeoutContext().getTimeout(); + Timeout timeout = operationContext.getTimeoutContext().getTimeout(); getEncoder(command).encode(writer, command, EncoderContext.builder().build()); - crypt.encrypt(database, new RawBsonDocument(bsonOutput.getInternalBuffer(), 0, bsonOutput.getSize()), operationTimeout) + crypt.encrypt(database, new RawBsonDocument(bsonOutput.getInternalBuffer(), 0, bsonOutput.getSize()), timeout) .flatMap((Function>) encryptedCommand -> Mono.create(sink -> wrapped.commandAsync(database, encryptedCommand, commandFieldNameValidator, readPreference, new RawBsonDocumentCodec(), operationContext, responseExpected, EmptyMessageSequences.INSTANCE, sinkToCallback(sink)))) - .flatMap(rawBsonDocument -> crypt.decrypt(rawBsonDocument, operationTimeout)) + .flatMap(rawBsonDocument -> crypt.decrypt(rawBsonDocument, timeout)) .map(decryptedResponse -> commandResultDecoder.decode(new BsonBinaryReader(decryptedResponse.getByteBuffer().asNIO()), DecoderContext.builder().build()) diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSBucketImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSBucketImpl.java index 948c666489c..0b3a166e698 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSBucketImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSBucketImpl.java @@ -232,7 +232,7 @@ public GridFSDownloadPublisher downloadToPublisher(final String filename) { @Override public GridFSDownloadPublisher downloadToPublisher(final String filename, final GridFSDownloadOptions options) { Function findPublisherCreator = - operationTimeout -> createGridFSFindPublisher(filesCollection, null, filename, options, operationTimeout); + timeout -> createGridFSFindPublisher(filesCollection, null, filename, options, timeout); return createGridFSDownloadPublisher(chunksCollection, null, findPublisherCreator); } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSPublisherCreator.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSPublisherCreator.java index 166abca6a0b..cf50a9a2dc5 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSPublisherCreator.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSPublisherCreator.java @@ -93,9 +93,9 @@ public static GridFSFindPublisher createGridFSFindPublisher( final MongoCollection filesCollection, @Nullable final ClientSession clientSession, @Nullable final Bson filter, - @Nullable final Timeout operationTimeout) { + @Nullable final Timeout timeout) { notNull("filesCollection", filesCollection); - return new GridFSFindPublisherImpl(createFindPublisher(filesCollection, clientSession, filter, operationTimeout)); + return new GridFSFindPublisherImpl(createFindPublisher(filesCollection, clientSession, filter, timeout)); } public static GridFSFindPublisher createGridFSFindPublisher( @@ -103,7 +103,7 @@ public static GridFSFindPublisher createGridFSFindPublisher( @Nullable final ClientSession clientSession, final String filename, final GridFSDownloadOptions options, - @Nullable final Timeout operationTimeout) { + @Nullable final Timeout timeout) { notNull("filesCollection", filesCollection); notNull("filename", filename); notNull("options", options); @@ -119,7 +119,8 @@ public static GridFSFindPublisher createGridFSFindPublisher( sort = -1; } - return createGridFSFindPublisher(filesCollection, clientSession, new Document("filename", filename), operationTimeout).skip(skip) + return createGridFSFindPublisher(filesCollection, clientSession, new Document("filename", filename), timeout) + .skip(skip) .sort(new Document("uploadDate", sort)); } @@ -127,19 +128,19 @@ public static FindPublisher createFindPublisher( final MongoCollection filesCollection, @Nullable final ClientSession clientSession, @Nullable final Bson filter, - @Nullable final Timeout operationTimeout) { + @Nullable final Timeout timeout) { notNull("filesCollection", filesCollection); FindPublisher publisher; if (clientSession == null) { - publisher = collectionWithTimeout(filesCollection, operationTimeout).find(); + publisher = collectionWithTimeout(filesCollection, timeout).find(); } else { - publisher = collectionWithTimeout(filesCollection, operationTimeout).find(clientSession); + publisher = collectionWithTimeout(filesCollection, timeout).find(clientSession); } if (filter != null) { publisher = publisher.filter(filter); } - if (operationTimeout != null) { + if (timeout != null) { publisher.timeoutMode(TimeoutMode.CURSOR_LIFETIME); } return publisher; @@ -175,8 +176,8 @@ public static Publisher createDeletePublisher(final MongoCollection { - Timeout operationTimeout = startTimeout(filesCollection.getTimeout(MILLISECONDS)); - return collectionWithTimeoutMono(filesCollection, operationTimeout) + Timeout timeout = startTimeout(filesCollection.getTimeout(MILLISECONDS)); + return collectionWithTimeoutMono(filesCollection, timeout) .flatMap(wrappedCollection -> { if (clientSession == null) { return Mono.from(wrappedCollection.deleteOne(filter)); @@ -187,7 +188,7 @@ public static Publisher createDeletePublisher(final MongoCollection { if (clientSession == null) { return Mono.from(wrappedCollection.deleteMany(new BsonDocument("files_id", id))); @@ -228,15 +229,15 @@ public static Publisher createDropPublisher(final MongoCollection { - Timeout operationTimeout = startTimeout(filesCollection.getTimeout(MILLISECONDS)); - return collectionWithTimeoutMono(filesCollection, operationTimeout) + Timeout timeout = startTimeout(filesCollection.getTimeout(MILLISECONDS)); + return collectionWithTimeoutMono(filesCollection, timeout) .flatMap(wrappedCollection -> { if (clientSession == null) { return Mono.from(wrappedCollection.drop()); } else { return Mono.from(wrappedCollection.drop(clientSession)); } - }).then(collectionWithTimeoutDeferred(chunksCollection, operationTimeout)) + }).then(collectionWithTimeoutDeferred(chunksCollection, timeout)) .flatMap(wrappedCollection -> { if (clientSession == null) { return Mono.from(wrappedCollection.drop()); diff --git a/driver-sync/src/main/com/mongodb/client/internal/CollectionInfoRetriever.java b/driver-sync/src/main/com/mongodb/client/internal/CollectionInfoRetriever.java index 9d02a1e8756..9ce34fd18a9 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/CollectionInfoRetriever.java +++ b/driver-sync/src/main/com/mongodb/client/internal/CollectionInfoRetriever.java @@ -36,8 +36,8 @@ class CollectionInfoRetriever { this.client = notNull("client", client); } - public List filter(final String databaseName, final BsonDocument filter, @Nullable final Timeout operationTimeout) { - return databaseWithTimeout(client.getDatabase(databaseName), TIMEOUT_ERROR_MESSAGE, operationTimeout) + public List filter(final String databaseName, final BsonDocument filter, @Nullable final Timeout timeout) { + return databaseWithTimeout(client.getDatabase(databaseName), TIMEOUT_ERROR_MESSAGE, timeout) .listCollections(BsonDocument.class) .filter(filter) .into(new ArrayList<>()); diff --git a/driver-sync/src/main/com/mongodb/client/internal/CommandMarker.java b/driver-sync/src/main/com/mongodb/client/internal/CommandMarker.java index 73eed8efd01..d694cf682a3 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/CommandMarker.java +++ b/driver-sync/src/main/com/mongodb/client/internal/CommandMarker.java @@ -84,11 +84,11 @@ class CommandMarker implements Closeable { } } - RawBsonDocument mark(final String databaseName, final RawBsonDocument command, @Nullable final Timeout operationTimeout) { + RawBsonDocument mark(final String databaseName, final RawBsonDocument command, @Nullable final Timeout timeout) { if (client != null) { try { try { - return executeCommand(databaseName, command, operationTimeout); + return executeCommand(databaseName, command, timeout); } catch (MongoOperationTimeoutException e){ throw e; } catch (MongoTimeoutException e) { @@ -96,7 +96,7 @@ RawBsonDocument mark(final String databaseName, final RawBsonDocument command, @ throw e; } startProcess(processBuilder); - return executeCommand(databaseName, command, operationTimeout); + return executeCommand(databaseName, command, timeout); } } catch (MongoException e) { throw wrapInClientException(e); @@ -113,14 +113,17 @@ public void close() { } } - private RawBsonDocument executeCommand(final String databaseName, final RawBsonDocument markableCommand, @Nullable final Timeout operationTimeout) { + private RawBsonDocument executeCommand( + final String databaseName, + final RawBsonDocument markableCommand, + @Nullable final Timeout timeout) { assertNotNull(client); MongoDatabase mongoDatabase = client.getDatabase(databaseName) .withReadConcern(ReadConcern.DEFAULT) .withReadPreference(ReadPreference.primary()); - return databaseWithTimeout(mongoDatabase, TIMEOUT_ERROR_MESSAGE, operationTimeout) + return databaseWithTimeout(mongoDatabase, TIMEOUT_ERROR_MESSAGE, timeout) .runCommand(markableCommand, RawBsonDocument.class); } diff --git a/driver-sync/src/main/com/mongodb/client/internal/Crypt.java b/driver-sync/src/main/com/mongodb/client/internal/Crypt.java index 15ba16e66da..ae7a75ae626 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/Crypt.java +++ b/driver-sync/src/main/com/mongodb/client/internal/Crypt.java @@ -132,7 +132,10 @@ public class Crypt implements Closeable { * @param command the unencrypted command * @return the encrypted command */ - RawBsonDocument encrypt(final String databaseName, final RawBsonDocument command, @Nullable final Timeout timeoutOperation) { + RawBsonDocument encrypt( + final String databaseName, + final RawBsonDocument command, + @Nullable final Timeout timeout) { notNull("databaseName", databaseName); notNull("command", command); @@ -141,7 +144,7 @@ RawBsonDocument encrypt(final String databaseName, final RawBsonDocument command } try (MongoCryptContext encryptionContext = mongoCrypt.createEncryptionContext(databaseName, command)) { - return executeStateMachine(encryptionContext, databaseName, timeoutOperation); + return executeStateMachine(encryptionContext, databaseName, timeout); } catch (MongoCryptException e) { throw wrapInMongoException(e); } @@ -274,24 +277,27 @@ public void close() { } } - private RawBsonDocument executeStateMachine(final MongoCryptContext cryptContext, @Nullable final String databaseName, @Nullable final Timeout operationTimeout) { + private RawBsonDocument executeStateMachine( + final MongoCryptContext cryptContext, + @Nullable final String databaseName, + @Nullable final Timeout timeout) { while (true) { State state = cryptContext.getState(); switch (state) { case NEED_MONGO_COLLINFO: - collInfo(cryptContext, notNull("databaseName", databaseName), operationTimeout); + collInfo(cryptContext, notNull("databaseName", databaseName), timeout); break; case NEED_MONGO_MARKINGS: - mark(cryptContext, notNull("databaseName", databaseName), operationTimeout); + mark(cryptContext, notNull("databaseName", databaseName), timeout); break; case NEED_KMS_CREDENTIALS: fetchCredentials(cryptContext); break; case NEED_MONGO_KEYS: - fetchKeys(cryptContext, operationTimeout); + fetchKeys(cryptContext, timeout); break; case NEED_KMS: - decryptKeys(cryptContext, operationTimeout); + decryptKeys(cryptContext, timeout); break; case READY: return cryptContext.finish(); @@ -320,9 +326,9 @@ private void collInfo(final MongoCryptContext cryptContext, final String databas } } - private void mark(final MongoCryptContext cryptContext, final String databaseName, @Nullable final Timeout operationTimeout) { + private void mark(final MongoCryptContext cryptContext, final String databaseName, @Nullable final Timeout timeout) { try { - RawBsonDocument markedCommand = assertNotNull(commandMarker).mark(databaseName, cryptContext.getMongoOperation(), operationTimeout); + RawBsonDocument markedCommand = assertNotNull(commandMarker).mark(databaseName, cryptContext.getMongoOperation(), timeout); cryptContext.addMongoOperationResult(markedCommand); cryptContext.completeMongoOperation(); } catch (Throwable t) { diff --git a/driver-sync/src/main/com/mongodb/client/internal/CryptConnection.java b/driver-sync/src/main/com/mongodb/client/internal/CryptConnection.java index 803df89a6b6..b5c814edb04 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/CryptConnection.java +++ b/driver-sync/src/main/com/mongodb/client/internal/CryptConnection.java @@ -112,9 +112,11 @@ public T command(final String database, final BsonDocument command, final Fi getEncoder(command).encode(writer, command, EncoderContext.builder().build()); - Timeout operationTimeout = operationContext.getTimeoutContext().getTimeout(); - RawBsonDocument encryptedCommand = crypt.encrypt(database, - new RawBsonDocument(bsonOutput.getInternalBuffer(), 0, bsonOutput.getSize()), operationTimeout); + Timeout timeout = operationContext.getTimeoutContext().getTimeout(); + RawBsonDocument encryptedCommand = crypt.encrypt( + database, + new RawBsonDocument(bsonOutput.getInternalBuffer(), 0, bsonOutput.getSize()), + timeout); RawBsonDocument encryptedResponse = wrapped.command(database, encryptedCommand, commandFieldNameValidator, readPreference, new RawBsonDocumentCodec(), operationContext, responseExpected, EmptyMessageSequences.INSTANCE); @@ -123,7 +125,7 @@ public T command(final String database, final BsonDocument command, final Fi return null; } - RawBsonDocument decryptedResponse = crypt.decrypt(encryptedResponse, operationTimeout); + RawBsonDocument decryptedResponse = crypt.decrypt(encryptedResponse, timeout); BsonBinaryReader reader = new BsonBinaryReader(decryptedResponse.getByteBuffer().asNIO()); From 489d6d93af8c2c24aada7fd38c18eb24cef4c740 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Thu, 13 Mar 2025 15:57:45 -0600 Subject: [PATCH 2/4] Remove doWithResetTimeout --- .../com/mongodb/internal/TimeoutContext.java | 54 +++---------------- .../internal/async/SingleResultCallback.java | 2 + .../internal/connection/OperationContext.java | 8 ++- .../operation/AsyncCommandBatchCursor.java | 36 ++++++++----- .../operation/ChangeStreamOperation.java | 3 +- .../operation/CommandBatchCursor.java | 28 ++++++---- 6 files changed, 60 insertions(+), 71 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java index ba3b8eb0ac5..b4561e6fdc6 100644 --- a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java +++ b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java @@ -17,8 +17,6 @@ import com.mongodb.MongoClientException; import com.mongodb.MongoOperationTimeoutException; -import com.mongodb.internal.async.AsyncRunnable; -import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.connection.CommandMessage; import com.mongodb.internal.time.StartTime; import com.mongodb.internal.time.Timeout; @@ -26,17 +24,13 @@ import com.mongodb.session.ClientSession; import java.util.Objects; -import java.util.Optional; import java.util.function.LongConsumer; import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.assertions.Assertions.assertNull; import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; -import static com.mongodb.internal.async.AsyncRunnable.beginAsync; import static com.mongodb.internal.time.Timeout.ZeroSemantics.ZERO_DURATION_MEANS_INFINITE; -import static java.util.Optional.empty; -import static java.util.Optional.ofNullable; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -121,6 +115,12 @@ public TimeoutContext copyTimeoutContext() { return new TimeoutContext(getTimeoutSettings(), getTimeout()); } + public TimeoutContext withNewlyStartedTimeout() { + TimeoutContext newContext = copyTimeoutContext(); + newContext.timeout = startTimeout(newContext.timeoutSettings.getTimeoutMS()); + return newContext; + } + public TimeoutContext(final TimeoutSettings timeoutSettings) { this(false, timeoutSettings, startTimeout(timeoutSettings.getTimeoutMS())); } @@ -297,56 +297,18 @@ public int getConnectTimeoutMs() { /** * @see #hasTimeoutMS() - * @see #doWithResetTimeout(Runnable) - * @see #doWithResetTimeout(AsyncRunnable, SingleResultCallback) */ + @Deprecated // TODO-JAVA-5640 REMOVE method public void resetTimeoutIfPresent() { - getAndResetTimeoutIfPresent(); - } - - /** - * @see #hasTimeoutMS() - * @return A {@linkplain Optional#isPresent() non-empty} previous {@linkplain Timeout} iff {@link #hasTimeoutMS()}, - * i.e., iff it was reset. - */ - private Optional getAndResetTimeoutIfPresent() { - Timeout result = timeout; if (hasTimeoutMS()) { timeout = startTimeout(timeoutSettings.getTimeoutMS()); - return ofNullable(result); - } - return empty(); - } - - /** - * @see #resetTimeoutIfPresent() - */ - public void doWithResetTimeout(final Runnable action) { - Optional originalTimeout = getAndResetTimeoutIfPresent(); - try { - action.run(); - } finally { - originalTimeout.ifPresent(original -> timeout = original); } } - /** - * @see #resetTimeoutIfPresent() - */ - public void doWithResetTimeout(final AsyncRunnable action, final SingleResultCallback callback) { - beginAsync().thenRun(c -> { - Optional originalTimeout = getAndResetTimeoutIfPresent(); - beginAsync().thenRun(c2 -> { - action.finish(c2); - }).thenAlwaysRunAndFinish(() -> { - originalTimeout.ifPresent(original -> timeout = original); - }, c); - }).finish(callback); - } - /** * Resets the timeout if this timeout context is being used by pool maintenance */ + @Deprecated // TODO-JAVA-5640 REMOVE method public void resetMaintenanceTimeout() { if (!isMaintenanceContext) { return; diff --git a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java index 632e453d0c0..de0771f4b57 100644 --- a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java +++ b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java @@ -29,6 +29,8 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public interface SingleResultCallback { + public static SingleResultCallback THEN_DO_NOTHING = (r, t) -> {}; + /** * Called when the function completes. This method must not complete abruptly, see {@link AsyncCallbackFunction} for more details. * diff --git a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java index bf29ebc051b..5e6adeff530 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java +++ b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java @@ -101,8 +101,7 @@ public ServerApi getServerApi() { return serverApi; } - @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE) - public OperationContext(final long id, + private OperationContext(final long id, final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext, @@ -138,6 +137,11 @@ public ServerDeprioritization getServerDeprioritization() { return serverDeprioritization; } + public OperationContext withNewlyStartedTimeout() { + TimeoutContext tc = this.timeoutContext.withNewlyStartedTimeout(); + return this.withTimeoutContext(tc); + } + public static final class ServerDeprioritization { @Nullable private ServerAddress candidate; diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java index 942721a27ad..a7052565aaf 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java @@ -53,6 +53,7 @@ import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.assertions.Assertions.doesNotThrow; import static com.mongodb.internal.async.AsyncRunnable.beginAsync; +import static com.mongodb.internal.async.SingleResultCallback.THEN_DO_NOTHING; import static com.mongodb.internal.operation.CommandBatchCursorHelper.FIRST_BATCH; import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR; import static com.mongodb.internal.operation.CommandBatchCursorHelper.NEXT_BATCH; @@ -277,15 +278,14 @@ void markAsPinned(final AsyncConnection connectionToPin, final Connection.Pinnin void doClose() { TimeoutContext timeoutContext = operationContext.getTimeoutContext(); timeoutContext.resetToDefaultMaxTime(); - SingleResultCallback thenDoNothing = (r, t) -> {}; if (resetTimeoutWhenClosing) { - timeoutContext.doWithResetTimeout(this::releaseResourcesAsync, thenDoNothing); + releaseResourcesAsync(operationContext.withNewlyStartedTimeout(), THEN_DO_NOTHING); } else { - releaseResourcesAsync(thenDoNothing); + releaseResourcesAsync(operationContext, THEN_DO_NOTHING); } } - private void releaseResourcesAsync(final SingleResultCallback callback) { + private void releaseResourcesAsync(final OperationContext operationContext, final SingleResultCallback callback) { beginAsync().thenRunTryCatchAsyncBlocks(c -> { if (isSkipReleasingServerResourcesOnClose()) { unsetServerCursor(); @@ -295,7 +295,7 @@ private void releaseResourcesAsync(final SingleResultCallback callback) { getConnection(c2); }).thenConsume((connection, c3) -> { beginAsync().thenRun(c4 -> { - releaseServerResourcesAsync(connection, c4); + releaseServerResourcesAsync(connection, operationContext, c4); }).thenAlwaysRunAndFinish(() -> { connection.release(); }, c3); @@ -346,11 +346,12 @@ private void getConnection(final SingleResultCallback callback) } } - private void releaseServerResourcesAsync(final AsyncConnection connection, final SingleResultCallback callback) { + private void releaseServerResourcesAsync(final AsyncConnection connection, final OperationContext operationContext, + final SingleResultCallback callback) { beginAsync().thenRun((c) -> { ServerCursor localServerCursor = super.getServerCursor(); if (localServerCursor != null) { - killServerCursorAsync(getNamespace(), localServerCursor, connection, callback); + killServerCursorAsync(getNamespace(), localServerCursor, connection, operationContext, callback); } else { c.complete(c); } @@ -359,11 +360,22 @@ private void releaseServerResourcesAsync(final AsyncConnection connection, final }, callback); } - private void killServerCursorAsync(final MongoNamespace namespace, final ServerCursor localServerCursor, - final AsyncConnection localConnection, final SingleResultCallback callback) { - localConnection.commandAsync(namespace.getDatabaseName(), getKillCursorsCommand(namespace, localServerCursor), - NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), new BsonDocumentCodec(), - operationContext, (r, t) -> callback.onResult(null, null)); + private void killServerCursorAsync( + final MongoNamespace namespace, + final ServerCursor localServerCursor, + final AsyncConnection localConnection, + final OperationContext operationContext, + final SingleResultCallback callback) { + beginAsync().thenRun(c -> { + localConnection.commandAsync( + namespace.getDatabaseName(), + getKillCursorsCommand(namespace, localServerCursor), + NoOpFieldNameValidator.INSTANCE, + ReadPreference.primary(), + new BsonDocumentCodec(), + operationContext, + (r, t) -> c.complete(c)); + }).finish(callback); } } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java index 4ef28c796cb..c648bbe1813 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java @@ -22,6 +22,7 @@ import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.VisibleForTesting; import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncReadBinding; @@ -64,7 +65,7 @@ public class ChangeStreamOperation implements AsyncReadOperation pipeline, final Decoder decoder) { this(namespace, fullDocument, fullDocumentBeforeChange, pipeline, decoder, ChangeStreamLevel.COLLECTION); diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java index d201976e5ed..528e6827bb5 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java @@ -317,13 +317,13 @@ void doClose() { TimeoutContext timeoutContext = operationContext.getTimeoutContext(); timeoutContext.resetToDefaultMaxTime(); if (resetTimeoutWhenClosing) { - timeoutContext.doWithResetTimeout(this::releaseResources); + releaseResources(operationContext.withNewlyStartedTimeout()); } else { - releaseResources(); + releaseResources(operationContext); } } - private void releaseResources() { + private void releaseResources(final OperationContext operationContext) { try { if (isSkipReleasingServerResourcesOnClose()) { unsetServerCursor(); @@ -331,7 +331,7 @@ private void releaseResources() { if (super.getServerCursor() != null) { Connection connection = getConnection(); try { - releaseServerResources(connection); + releaseServerResources(connection, operationContext); } finally { connection.release(); } @@ -373,21 +373,29 @@ private Connection getConnection() { } } - private void releaseServerResources(final Connection connection) { + private void releaseServerResources(final Connection connection, final OperationContext operationContext) { try { ServerCursor localServerCursor = super.getServerCursor(); if (localServerCursor != null) { - killServerCursor(getNamespace(), localServerCursor, connection); + killServerCursor(getNamespace(), localServerCursor, connection, operationContext); } } finally { unsetServerCursor(); } } - private void killServerCursor(final MongoNamespace namespace, final ServerCursor localServerCursor, - final Connection localConnection) { - localConnection.command(namespace.getDatabaseName(), getKillCursorsCommand(namespace, localServerCursor), - NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), new BsonDocumentCodec(), operationContext); + private void killServerCursor( + final MongoNamespace namespace, + final ServerCursor localServerCursor, + final Connection localConnection, + final OperationContext operationContext) { + localConnection.command( + namespace.getDatabaseName(), + getKillCursorsCommand(namespace, localServerCursor), + NoOpFieldNameValidator.INSTANCE, + ReadPreference.primary(), + new BsonDocumentCodec(), + operationContext); } } } From fedc7643b324febb71f1bc8cdf0d0759406d7e95 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Fri, 14 Mar 2025 16:37:37 -0600 Subject: [PATCH 3/4] Remove resetTimeoutIfPresent from CommandBatchCursor --- .../internal/operation/AsyncCommandBatchCursor.java | 11 ++++++----- .../internal/operation/CommandBatchCursor.java | 13 +++++++------ 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java index a7052565aaf..a391458ce6f 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java @@ -72,8 +72,8 @@ class AsyncCommandBatchCursor implements AsyncAggregateResponseBatchCursor private final int maxWireVersion; private final boolean firstBatchEmpty; private final ResourceManager resourceManager; - private final OperationContext operationContext; private final TimeoutMode timeoutMode; + private OperationContext operationContext; private final AtomicBoolean processedInitial = new AtomicBoolean(); private int batchSize; private volatile CommandCursorResult commandCursorResult; @@ -95,10 +95,10 @@ class AsyncCommandBatchCursor implements AsyncAggregateResponseBatchCursor this.comment = comment; this.maxWireVersion = connectionDescription.getMaxWireVersion(); this.firstBatchEmpty = commandCursorResult.getResults().isEmpty(); - operationContext = connectionSource.getOperationContext(); this.timeoutMode = timeoutMode; - operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS); + operationContext = connectionSource.getOperationContext(); + operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS); // TODO-JAVA-5640 with? AsyncConnection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER ? connection : null; @@ -176,7 +176,7 @@ public int getMaxWireVersion() { void checkTimeoutModeAndResetTimeoutContextIfIteration() { if (timeoutMode == TimeoutMode.ITERATION) { - operationContext.getTimeoutContext().resetTimeoutIfPresent(); + operationContext = operationContext.withNewlyStartedTimeout(); } } @@ -230,7 +230,8 @@ private CommandCursorResult toCommandCursorResult(final ServerAddress serverA /** * Configures the cursor to {@link #close()} - * without {@linkplain TimeoutContext#resetTimeoutIfPresent() resetting} its {@linkplain TimeoutContext#getTimeout() timeout}. + * without {@linkplain TimeoutContext#withNewlyStartedTimeout() starting a new} + * {@linkplain TimeoutContext#getTimeout() timeout}. * This is useful when managing the {@link #close()} behavior externally. */ AsyncCommandBatchCursor disableTimeoutResetWhenClosing() { diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java index 528e6827bb5..2a23f9d0711 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java @@ -67,8 +67,8 @@ class CommandBatchCursor implements AggregateResponseBatchCursor { private final int maxWireVersion; private final boolean firstBatchEmpty; private final ResourceManager resourceManager; - private final OperationContext operationContext; private final TimeoutMode timeoutMode; + private OperationContext operationContext; private int batchSize; private CommandCursorResult commandCursorResult; @@ -92,10 +92,10 @@ class CommandBatchCursor implements AggregateResponseBatchCursor { this.comment = comment; this.maxWireVersion = connectionDescription.getMaxWireVersion(); this.firstBatchEmpty = commandCursorResult.getResults().isEmpty(); - operationContext = connectionSource.getOperationContext(); this.timeoutMode = timeoutMode; - operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS); + operationContext = connectionSource.getOperationContext(); + operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS); // TODO-JAVA-5640 with? Connection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER ? connection : null; resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor()); @@ -235,7 +235,7 @@ public int getMaxWireVersion() { void checkTimeoutModeAndResetTimeoutContextIfIteration() { if (timeoutMode == TimeoutMode.ITERATION) { - operationContext.getTimeoutContext().resetTimeoutIfPresent(); + operationContext = operationContext.withNewlyStartedTimeout(); } } @@ -271,7 +271,8 @@ private CommandCursorResult toCommandCursorResult(final ServerAddress serverA /** * Configures the cursor to {@link #close()} - * without {@linkplain TimeoutContext#resetTimeoutIfPresent() resetting} its {@linkplain TimeoutContext#getTimeout() timeout}. + * without {@linkplain TimeoutContext#withNewlyStartedTimeout() starting a new} + * {@linkplain TimeoutContext#getTimeout() timeout}. * This is useful when managing the {@link #close()} behavior externally. */ CommandBatchCursor disableTimeoutResetWhenClosing() { @@ -316,7 +317,7 @@ void markAsPinned(final Connection connectionToPin, final Connection.PinningMode void doClose() { TimeoutContext timeoutContext = operationContext.getTimeoutContext(); timeoutContext.resetToDefaultMaxTime(); - if (resetTimeoutWhenClosing) { + if (resetTimeoutWhenClosing) { // TODO-JAVA-5640 don't we always reset when closing? releaseResources(operationContext.withNewlyStartedTimeout()); } else { releaseResources(operationContext); From 3a81c6120f254fba6a79f04abe0c4eace58a114a Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Fri, 14 Mar 2025 16:49:48 -0600 Subject: [PATCH 4/4] Remove Timeout from RetryState --- .../internal/async/function/RetryState.java | 21 ++++++++----------- .../operation/CommandOperationHelper.java | 5 ++++- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java b/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java index e1cecf721fc..5c9f756a94d 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java @@ -47,7 +47,7 @@ @NotThreadSafe public final class RetryState { public static final int RETRIES = 1; - private static final int INFINITE_ATTEMPTS = Integer.MAX_VALUE; + public static final int INFINITE_ATTEMPTS = Integer.MAX_VALUE; private final LoopState loopState; private final int attempts; @@ -67,19 +67,16 @@ public final class RetryState { *

* * @param retries A positive number of allowed retries. {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited. - * @param timeoutContext A timeout context that will be used to determine if the operation has timed out. + * @param retryUntilTimeoutThrowsException // TODO-JAVA-5640 shouldn't a timeout always stop retries? * @see #attempts() */ - public static RetryState withRetryableState(final int retries, final TimeoutContext timeoutContext) { + public static RetryState withRetryableState(final int retries, final boolean retryUntilTimeoutThrowsException) { assertTrue(retries > 0); - if (timeoutContext.hasTimeoutMS()){ - return new RetryState(INFINITE_ATTEMPTS, timeoutContext); - } - return new RetryState(retries, null); + return new RetryState(retries, retryUntilTimeoutThrowsException); } public static RetryState withNonRetryableState() { - return new RetryState(0, null); + return new RetryState(0, false); } /** @@ -94,19 +91,19 @@ public static RetryState withNonRetryableState() { * @see #attempts() */ public RetryState(final TimeoutContext timeoutContext) { - this(INFINITE_ATTEMPTS, timeoutContext); + this(INFINITE_ATTEMPTS, timeoutContext.hasTimeoutMS()); } /** * @param retries A non-negative number of allowed retries. {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited. - * @param timeoutContext A timeout context that will be used to determine if the operation has timed out. + * @param retryUntilTimeoutThrowsException * @see #attempts() */ - private RetryState(final int retries, @Nullable final TimeoutContext timeoutContext) { + private RetryState(final int retries, final boolean retryUntilTimeoutThrowsException) { assertTrue(retries >= 0); loopState = new LoopState(); attempts = retries == INFINITE_ATTEMPTS ? INFINITE_ATTEMPTS : retries + 1; - this.retryUntilTimeoutThrowsException = timeoutContext != null && timeoutContext.hasTimeoutMS(); + this.retryUntilTimeoutThrowsException = retryUntilTimeoutThrowsException; } /** diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java index db6870f52e8..8332ad916fb 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java @@ -45,6 +45,7 @@ import static com.mongodb.assertions.Assertions.assertFalse; import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.internal.async.function.RetryState.INFINITE_ATTEMPTS; import static com.mongodb.internal.operation.OperationHelper.LOGGER; import static java.lang.String.format; import static java.util.Arrays.asList; @@ -122,7 +123,9 @@ private static Throwable chooseRetryableWriteException( static RetryState initialRetryState(final boolean retry, final TimeoutContext timeoutContext) { if (retry) { - return RetryState.withRetryableState(RetryState.RETRIES, timeoutContext); + boolean retryUntilTimeoutThrowsException = timeoutContext.hasTimeoutMS(); + int retries = retryUntilTimeoutThrowsException ? INFINITE_ATTEMPTS : RetryState.RETRIES; + return RetryState.withRetryableState(retries, retryUntilTimeoutThrowsException); } return RetryState.withNonRetryableState(); }