diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java
index ba3b8eb0ac5..b4561e6fdc6 100644
--- a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java
+++ b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java
@@ -17,8 +17,6 @@
import com.mongodb.MongoClientException;
import com.mongodb.MongoOperationTimeoutException;
-import com.mongodb.internal.async.AsyncRunnable;
-import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.CommandMessage;
import com.mongodb.internal.time.StartTime;
import com.mongodb.internal.time.Timeout;
@@ -26,17 +24,13 @@
import com.mongodb.session.ClientSession;
import java.util.Objects;
-import java.util.Optional;
import java.util.function.LongConsumer;
import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.assertNull;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
-import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.time.Timeout.ZeroSemantics.ZERO_DURATION_MEANS_INFINITE;
-import static java.util.Optional.empty;
-import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -121,6 +115,12 @@ public TimeoutContext copyTimeoutContext() {
return new TimeoutContext(getTimeoutSettings(), getTimeout());
}
+ public TimeoutContext withNewlyStartedTimeout() {
+ TimeoutContext newContext = copyTimeoutContext();
+ newContext.timeout = startTimeout(newContext.timeoutSettings.getTimeoutMS());
+ return newContext;
+ }
+
public TimeoutContext(final TimeoutSettings timeoutSettings) {
this(false, timeoutSettings, startTimeout(timeoutSettings.getTimeoutMS()));
}
@@ -297,56 +297,18 @@ public int getConnectTimeoutMs() {
/**
* @see #hasTimeoutMS()
- * @see #doWithResetTimeout(Runnable)
- * @see #doWithResetTimeout(AsyncRunnable, SingleResultCallback)
*/
+ @Deprecated // TODO-JAVA-5640 REMOVE method
public void resetTimeoutIfPresent() {
- getAndResetTimeoutIfPresent();
- }
-
- /**
- * @see #hasTimeoutMS()
- * @return A {@linkplain Optional#isPresent() non-empty} previous {@linkplain Timeout} iff {@link #hasTimeoutMS()},
- * i.e., iff it was reset.
- */
- private Optional getAndResetTimeoutIfPresent() {
- Timeout result = timeout;
if (hasTimeoutMS()) {
timeout = startTimeout(timeoutSettings.getTimeoutMS());
- return ofNullable(result);
- }
- return empty();
- }
-
- /**
- * @see #resetTimeoutIfPresent()
- */
- public void doWithResetTimeout(final Runnable action) {
- Optional originalTimeout = getAndResetTimeoutIfPresent();
- try {
- action.run();
- } finally {
- originalTimeout.ifPresent(original -> timeout = original);
}
}
- /**
- * @see #resetTimeoutIfPresent()
- */
- public void doWithResetTimeout(final AsyncRunnable action, final SingleResultCallback callback) {
- beginAsync().thenRun(c -> {
- Optional originalTimeout = getAndResetTimeoutIfPresent();
- beginAsync().thenRun(c2 -> {
- action.finish(c2);
- }).thenAlwaysRunAndFinish(() -> {
- originalTimeout.ifPresent(original -> timeout = original);
- }, c);
- }).finish(callback);
- }
-
/**
* Resets the timeout if this timeout context is being used by pool maintenance
*/
+ @Deprecated // TODO-JAVA-5640 REMOVE method
public void resetMaintenanceTimeout() {
if (!isMaintenanceContext) {
return;
diff --git a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java
index 632e453d0c0..de0771f4b57 100644
--- a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java
+++ b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java
@@ -29,6 +29,8 @@
*This class is not part of the public API and may be removed or changed at any time
*/
public interface SingleResultCallback {
+ public static SingleResultCallback THEN_DO_NOTHING = (r, t) -> {};
+
/**
* Called when the function completes. This method must not complete abruptly, see {@link AsyncCallbackFunction} for more details.
*
diff --git a/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java b/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java
index e1cecf721fc..5c9f756a94d 100644
--- a/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java
+++ b/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java
@@ -47,7 +47,7 @@
@NotThreadSafe
public final class RetryState {
public static final int RETRIES = 1;
- private static final int INFINITE_ATTEMPTS = Integer.MAX_VALUE;
+ public static final int INFINITE_ATTEMPTS = Integer.MAX_VALUE;
private final LoopState loopState;
private final int attempts;
@@ -67,19 +67,16 @@ public final class RetryState {
*
*
* @param retries A positive number of allowed retries. {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
- * @param timeoutContext A timeout context that will be used to determine if the operation has timed out.
+ * @param retryUntilTimeoutThrowsException // TODO-JAVA-5640 shouldn't a timeout always stop retries?
* @see #attempts()
*/
- public static RetryState withRetryableState(final int retries, final TimeoutContext timeoutContext) {
+ public static RetryState withRetryableState(final int retries, final boolean retryUntilTimeoutThrowsException) {
assertTrue(retries > 0);
- if (timeoutContext.hasTimeoutMS()){
- return new RetryState(INFINITE_ATTEMPTS, timeoutContext);
- }
- return new RetryState(retries, null);
+ return new RetryState(retries, retryUntilTimeoutThrowsException);
}
public static RetryState withNonRetryableState() {
- return new RetryState(0, null);
+ return new RetryState(0, false);
}
/**
@@ -94,19 +91,19 @@ public static RetryState withNonRetryableState() {
* @see #attempts()
*/
public RetryState(final TimeoutContext timeoutContext) {
- this(INFINITE_ATTEMPTS, timeoutContext);
+ this(INFINITE_ATTEMPTS, timeoutContext.hasTimeoutMS());
}
/**
* @param retries A non-negative number of allowed retries. {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
- * @param timeoutContext A timeout context that will be used to determine if the operation has timed out.
+ * @param retryUntilTimeoutThrowsException
* @see #attempts()
*/
- private RetryState(final int retries, @Nullable final TimeoutContext timeoutContext) {
+ private RetryState(final int retries, final boolean retryUntilTimeoutThrowsException) {
assertTrue(retries >= 0);
loopState = new LoopState();
attempts = retries == INFINITE_ATTEMPTS ? INFINITE_ATTEMPTS : retries + 1;
- this.retryUntilTimeoutThrowsException = timeoutContext != null && timeoutContext.hasTimeoutMS();
+ this.retryUntilTimeoutThrowsException = retryUntilTimeoutThrowsException;
}
/**
diff --git a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java
index bf29ebc051b..5e6adeff530 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java
@@ -101,8 +101,7 @@ public ServerApi getServerApi() {
return serverApi;
}
- @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
- public OperationContext(final long id,
+ private OperationContext(final long id,
final RequestContext requestContext,
final SessionContext sessionContext,
final TimeoutContext timeoutContext,
@@ -138,6 +137,11 @@ public ServerDeprioritization getServerDeprioritization() {
return serverDeprioritization;
}
+ public OperationContext withNewlyStartedTimeout() {
+ TimeoutContext tc = this.timeoutContext.withNewlyStartedTimeout();
+ return this.withTimeoutContext(tc);
+ }
+
public static final class ServerDeprioritization {
@Nullable
private ServerAddress candidate;
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java
index 942721a27ad..a391458ce6f 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java
@@ -53,6 +53,7 @@
import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.assertions.Assertions.doesNotThrow;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
+import static com.mongodb.internal.async.SingleResultCallback.THEN_DO_NOTHING;
import static com.mongodb.internal.operation.CommandBatchCursorHelper.FIRST_BATCH;
import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR;
import static com.mongodb.internal.operation.CommandBatchCursorHelper.NEXT_BATCH;
@@ -71,8 +72,8 @@ class AsyncCommandBatchCursor implements AsyncAggregateResponseBatchCursor
private final int maxWireVersion;
private final boolean firstBatchEmpty;
private final ResourceManager resourceManager;
- private final OperationContext operationContext;
private final TimeoutMode timeoutMode;
+ private OperationContext operationContext;
private final AtomicBoolean processedInitial = new AtomicBoolean();
private int batchSize;
private volatile CommandCursorResult commandCursorResult;
@@ -94,10 +95,10 @@ class AsyncCommandBatchCursor implements AsyncAggregateResponseBatchCursor
this.comment = comment;
this.maxWireVersion = connectionDescription.getMaxWireVersion();
this.firstBatchEmpty = commandCursorResult.getResults().isEmpty();
- operationContext = connectionSource.getOperationContext();
this.timeoutMode = timeoutMode;
- operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS);
+ operationContext = connectionSource.getOperationContext();
+ operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS); // TODO-JAVA-5640 with?
AsyncConnection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER
? connection : null;
@@ -175,7 +176,7 @@ public int getMaxWireVersion() {
void checkTimeoutModeAndResetTimeoutContextIfIteration() {
if (timeoutMode == TimeoutMode.ITERATION) {
- operationContext.getTimeoutContext().resetTimeoutIfPresent();
+ operationContext = operationContext.withNewlyStartedTimeout();
}
}
@@ -229,7 +230,8 @@ private CommandCursorResult toCommandCursorResult(final ServerAddress serverA
/**
* Configures the cursor to {@link #close()}
- * without {@linkplain TimeoutContext#resetTimeoutIfPresent() resetting} its {@linkplain TimeoutContext#getTimeout() timeout}.
+ * without {@linkplain TimeoutContext#withNewlyStartedTimeout() starting a new}
+ * {@linkplain TimeoutContext#getTimeout() timeout}.
* This is useful when managing the {@link #close()} behavior externally.
*/
AsyncCommandBatchCursor disableTimeoutResetWhenClosing() {
@@ -277,15 +279,14 @@ void markAsPinned(final AsyncConnection connectionToPin, final Connection.Pinnin
void doClose() {
TimeoutContext timeoutContext = operationContext.getTimeoutContext();
timeoutContext.resetToDefaultMaxTime();
- SingleResultCallback thenDoNothing = (r, t) -> {};
if (resetTimeoutWhenClosing) {
- timeoutContext.doWithResetTimeout(this::releaseResourcesAsync, thenDoNothing);
+ releaseResourcesAsync(operationContext.withNewlyStartedTimeout(), THEN_DO_NOTHING);
} else {
- releaseResourcesAsync(thenDoNothing);
+ releaseResourcesAsync(operationContext, THEN_DO_NOTHING);
}
}
- private void releaseResourcesAsync(final SingleResultCallback callback) {
+ private void releaseResourcesAsync(final OperationContext operationContext, final SingleResultCallback callback) {
beginAsync().thenRunTryCatchAsyncBlocks(c -> {
if (isSkipReleasingServerResourcesOnClose()) {
unsetServerCursor();
@@ -295,7 +296,7 @@ private void releaseResourcesAsync(final SingleResultCallback callback) {
getConnection(c2);
}).thenConsume((connection, c3) -> {
beginAsync().thenRun(c4 -> {
- releaseServerResourcesAsync(connection, c4);
+ releaseServerResourcesAsync(connection, operationContext, c4);
}).thenAlwaysRunAndFinish(() -> {
connection.release();
}, c3);
@@ -346,11 +347,12 @@ private void getConnection(final SingleResultCallback callback)
}
}
- private void releaseServerResourcesAsync(final AsyncConnection connection, final SingleResultCallback callback) {
+ private void releaseServerResourcesAsync(final AsyncConnection connection, final OperationContext operationContext,
+ final SingleResultCallback callback) {
beginAsync().thenRun((c) -> {
ServerCursor localServerCursor = super.getServerCursor();
if (localServerCursor != null) {
- killServerCursorAsync(getNamespace(), localServerCursor, connection, callback);
+ killServerCursorAsync(getNamespace(), localServerCursor, connection, operationContext, callback);
} else {
c.complete(c);
}
@@ -359,11 +361,22 @@ private void releaseServerResourcesAsync(final AsyncConnection connection, final
}, callback);
}
- private void killServerCursorAsync(final MongoNamespace namespace, final ServerCursor localServerCursor,
- final AsyncConnection localConnection, final SingleResultCallback callback) {
- localConnection.commandAsync(namespace.getDatabaseName(), getKillCursorsCommand(namespace, localServerCursor),
- NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), new BsonDocumentCodec(),
- operationContext, (r, t) -> callback.onResult(null, null));
+ private void killServerCursorAsync(
+ final MongoNamespace namespace,
+ final ServerCursor localServerCursor,
+ final AsyncConnection localConnection,
+ final OperationContext operationContext,
+ final SingleResultCallback callback) {
+ beginAsync().thenRun(c -> {
+ localConnection.commandAsync(
+ namespace.getDatabaseName(),
+ getKillCursorsCommand(namespace, localServerCursor),
+ NoOpFieldNameValidator.INSTANCE,
+ ReadPreference.primary(),
+ new BsonDocumentCodec(),
+ operationContext,
+ (r, t) -> c.complete(c));
+ }).finish(callback);
}
}
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java
index 4ef28c796cb..c648bbe1813 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java
@@ -22,6 +22,7 @@
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.mongodb.internal.TimeoutContext;
+import com.mongodb.internal.VisibleForTesting;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
@@ -64,7 +65,7 @@ public class ChangeStreamOperation implements AsyncReadOperation pipeline, final Decoder decoder) {
this(namespace, fullDocument, fullDocumentBeforeChange, pipeline, decoder, ChangeStreamLevel.COLLECTION);
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java
index d201976e5ed..2a23f9d0711 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java
@@ -67,8 +67,8 @@ class CommandBatchCursor implements AggregateResponseBatchCursor {
private final int maxWireVersion;
private final boolean firstBatchEmpty;
private final ResourceManager resourceManager;
- private final OperationContext operationContext;
private final TimeoutMode timeoutMode;
+ private OperationContext operationContext;
private int batchSize;
private CommandCursorResult commandCursorResult;
@@ -92,10 +92,10 @@ class CommandBatchCursor implements AggregateResponseBatchCursor {
this.comment = comment;
this.maxWireVersion = connectionDescription.getMaxWireVersion();
this.firstBatchEmpty = commandCursorResult.getResults().isEmpty();
- operationContext = connectionSource.getOperationContext();
this.timeoutMode = timeoutMode;
- operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS);
+ operationContext = connectionSource.getOperationContext();
+ operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS); // TODO-JAVA-5640 with?
Connection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER ? connection : null;
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
@@ -235,7 +235,7 @@ public int getMaxWireVersion() {
void checkTimeoutModeAndResetTimeoutContextIfIteration() {
if (timeoutMode == TimeoutMode.ITERATION) {
- operationContext.getTimeoutContext().resetTimeoutIfPresent();
+ operationContext = operationContext.withNewlyStartedTimeout();
}
}
@@ -271,7 +271,8 @@ private CommandCursorResult toCommandCursorResult(final ServerAddress serverA
/**
* Configures the cursor to {@link #close()}
- * without {@linkplain TimeoutContext#resetTimeoutIfPresent() resetting} its {@linkplain TimeoutContext#getTimeout() timeout}.
+ * without {@linkplain TimeoutContext#withNewlyStartedTimeout() starting a new}
+ * {@linkplain TimeoutContext#getTimeout() timeout}.
* This is useful when managing the {@link #close()} behavior externally.
*/
CommandBatchCursor disableTimeoutResetWhenClosing() {
@@ -316,14 +317,14 @@ void markAsPinned(final Connection connectionToPin, final Connection.PinningMode
void doClose() {
TimeoutContext timeoutContext = operationContext.getTimeoutContext();
timeoutContext.resetToDefaultMaxTime();
- if (resetTimeoutWhenClosing) {
- timeoutContext.doWithResetTimeout(this::releaseResources);
+ if (resetTimeoutWhenClosing) { // TODO-JAVA-5640 don't we always reset when closing?
+ releaseResources(operationContext.withNewlyStartedTimeout());
} else {
- releaseResources();
+ releaseResources(operationContext);
}
}
- private void releaseResources() {
+ private void releaseResources(final OperationContext operationContext) {
try {
if (isSkipReleasingServerResourcesOnClose()) {
unsetServerCursor();
@@ -331,7 +332,7 @@ private void releaseResources() {
if (super.getServerCursor() != null) {
Connection connection = getConnection();
try {
- releaseServerResources(connection);
+ releaseServerResources(connection, operationContext);
} finally {
connection.release();
}
@@ -373,21 +374,29 @@ private Connection getConnection() {
}
}
- private void releaseServerResources(final Connection connection) {
+ private void releaseServerResources(final Connection connection, final OperationContext operationContext) {
try {
ServerCursor localServerCursor = super.getServerCursor();
if (localServerCursor != null) {
- killServerCursor(getNamespace(), localServerCursor, connection);
+ killServerCursor(getNamespace(), localServerCursor, connection, operationContext);
}
} finally {
unsetServerCursor();
}
}
- private void killServerCursor(final MongoNamespace namespace, final ServerCursor localServerCursor,
- final Connection localConnection) {
- localConnection.command(namespace.getDatabaseName(), getKillCursorsCommand(namespace, localServerCursor),
- NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), new BsonDocumentCodec(), operationContext);
+ private void killServerCursor(
+ final MongoNamespace namespace,
+ final ServerCursor localServerCursor,
+ final Connection localConnection,
+ final OperationContext operationContext) {
+ localConnection.command(
+ namespace.getDatabaseName(),
+ getKillCursorsCommand(namespace, localServerCursor),
+ NoOpFieldNameValidator.INSTANCE,
+ ReadPreference.primary(),
+ new BsonDocumentCodec(),
+ operationContext);
}
}
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java
index db6870f52e8..8332ad916fb 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java
@@ -45,6 +45,7 @@
import static com.mongodb.assertions.Assertions.assertFalse;
import static com.mongodb.assertions.Assertions.assertNotNull;
+import static com.mongodb.internal.async.function.RetryState.INFINITE_ATTEMPTS;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static java.lang.String.format;
import static java.util.Arrays.asList;
@@ -122,7 +123,9 @@ private static Throwable chooseRetryableWriteException(
static RetryState initialRetryState(final boolean retry, final TimeoutContext timeoutContext) {
if (retry) {
- return RetryState.withRetryableState(RetryState.RETRIES, timeoutContext);
+ boolean retryUntilTimeoutThrowsException = timeoutContext.hasTimeoutMS();
+ int retries = retryUntilTimeoutThrowsException ? INFINITE_ATTEMPTS : RetryState.RETRIES;
+ return RetryState.withRetryableState(retries, retryUntilTimeoutThrowsException);
}
return RetryState.withNonRetryableState();
}
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/Crypt.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/Crypt.java
index 61ccaa320fe..a9407799dc7 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/Crypt.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/Crypt.java
@@ -143,9 +143,9 @@ public Mono encrypt(final String databaseName, final RawBsonDoc
*
* @param commandResponse the encrypted command response
*/
- public Mono decrypt(final RawBsonDocument commandResponse, @Nullable final Timeout operationTimeout) {
+ public Mono decrypt(final RawBsonDocument commandResponse, @Nullable final Timeout timeout) {
notNull("commandResponse", commandResponse);
- return executeStateMachine(() -> mongoCrypt.createDecryptionContext(commandResponse), operationTimeout)
+ return executeStateMachine(() -> mongoCrypt.createDecryptionContext(commandResponse), timeout)
.onErrorMap(this::wrapInClientException);
}
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptConnection.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptConnection.java
index c05bfb663f2..f7febad164c 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptConnection.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptConnection.java
@@ -121,14 +121,14 @@ public void commandAsync(final String database, final BsonDocument command,
: new SplittablePayloadBsonWriter(bsonBinaryWriter, bsonOutput, createSplittablePayloadMessageSettings(), payload,
MAX_SPLITTABLE_DOCUMENT_SIZE);
- Timeout operationTimeout = operationContext.getTimeoutContext().getTimeout();
+ Timeout timeout = operationContext.getTimeoutContext().getTimeout();
getEncoder(command).encode(writer, command, EncoderContext.builder().build());
- crypt.encrypt(database, new RawBsonDocument(bsonOutput.getInternalBuffer(), 0, bsonOutput.getSize()), operationTimeout)
+ crypt.encrypt(database, new RawBsonDocument(bsonOutput.getInternalBuffer(), 0, bsonOutput.getSize()), timeout)
.flatMap((Function>) encryptedCommand ->
Mono.create(sink -> wrapped.commandAsync(database, encryptedCommand, commandFieldNameValidator, readPreference,
new RawBsonDocumentCodec(), operationContext, responseExpected, EmptyMessageSequences.INSTANCE, sinkToCallback(sink))))
- .flatMap(rawBsonDocument -> crypt.decrypt(rawBsonDocument, operationTimeout))
+ .flatMap(rawBsonDocument -> crypt.decrypt(rawBsonDocument, timeout))
.map(decryptedResponse ->
commandResultDecoder.decode(new BsonBinaryReader(decryptedResponse.getByteBuffer().asNIO()),
DecoderContext.builder().build())
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSBucketImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSBucketImpl.java
index 948c666489c..0b3a166e698 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSBucketImpl.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSBucketImpl.java
@@ -232,7 +232,7 @@ public GridFSDownloadPublisher downloadToPublisher(final String filename) {
@Override
public GridFSDownloadPublisher downloadToPublisher(final String filename, final GridFSDownloadOptions options) {
Function findPublisherCreator =
- operationTimeout -> createGridFSFindPublisher(filesCollection, null, filename, options, operationTimeout);
+ timeout -> createGridFSFindPublisher(filesCollection, null, filename, options, timeout);
return createGridFSDownloadPublisher(chunksCollection, null, findPublisherCreator);
}
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSPublisherCreator.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSPublisherCreator.java
index 166abca6a0b..cf50a9a2dc5 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSPublisherCreator.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSPublisherCreator.java
@@ -93,9 +93,9 @@ public static GridFSFindPublisher createGridFSFindPublisher(
final MongoCollection filesCollection,
@Nullable final ClientSession clientSession,
@Nullable final Bson filter,
- @Nullable final Timeout operationTimeout) {
+ @Nullable final Timeout timeout) {
notNull("filesCollection", filesCollection);
- return new GridFSFindPublisherImpl(createFindPublisher(filesCollection, clientSession, filter, operationTimeout));
+ return new GridFSFindPublisherImpl(createFindPublisher(filesCollection, clientSession, filter, timeout));
}
public static GridFSFindPublisher createGridFSFindPublisher(
@@ -103,7 +103,7 @@ public static GridFSFindPublisher createGridFSFindPublisher(
@Nullable final ClientSession clientSession,
final String filename,
final GridFSDownloadOptions options,
- @Nullable final Timeout operationTimeout) {
+ @Nullable final Timeout timeout) {
notNull("filesCollection", filesCollection);
notNull("filename", filename);
notNull("options", options);
@@ -119,7 +119,8 @@ public static GridFSFindPublisher createGridFSFindPublisher(
sort = -1;
}
- return createGridFSFindPublisher(filesCollection, clientSession, new Document("filename", filename), operationTimeout).skip(skip)
+ return createGridFSFindPublisher(filesCollection, clientSession, new Document("filename", filename), timeout)
+ .skip(skip)
.sort(new Document("uploadDate", sort));
}
@@ -127,19 +128,19 @@ public static FindPublisher createFindPublisher(
final MongoCollection filesCollection,
@Nullable final ClientSession clientSession,
@Nullable final Bson filter,
- @Nullable final Timeout operationTimeout) {
+ @Nullable final Timeout timeout) {
notNull("filesCollection", filesCollection);
FindPublisher publisher;
if (clientSession == null) {
- publisher = collectionWithTimeout(filesCollection, operationTimeout).find();
+ publisher = collectionWithTimeout(filesCollection, timeout).find();
} else {
- publisher = collectionWithTimeout(filesCollection, operationTimeout).find(clientSession);
+ publisher = collectionWithTimeout(filesCollection, timeout).find(clientSession);
}
if (filter != null) {
publisher = publisher.filter(filter);
}
- if (operationTimeout != null) {
+ if (timeout != null) {
publisher.timeoutMode(TimeoutMode.CURSOR_LIFETIME);
}
return publisher;
@@ -175,8 +176,8 @@ public static Publisher createDeletePublisher(final MongoCollection {
- Timeout operationTimeout = startTimeout(filesCollection.getTimeout(MILLISECONDS));
- return collectionWithTimeoutMono(filesCollection, operationTimeout)
+ Timeout timeout = startTimeout(filesCollection.getTimeout(MILLISECONDS));
+ return collectionWithTimeoutMono(filesCollection, timeout)
.flatMap(wrappedCollection -> {
if (clientSession == null) {
return Mono.from(wrappedCollection.deleteOne(filter));
@@ -187,7 +188,7 @@ public static Publisher createDeletePublisher(final MongoCollection {
if (clientSession == null) {
return Mono.from(wrappedCollection.deleteMany(new BsonDocument("files_id", id)));
@@ -228,15 +229,15 @@ public static Publisher createDropPublisher(final MongoCollection {
- Timeout operationTimeout = startTimeout(filesCollection.getTimeout(MILLISECONDS));
- return collectionWithTimeoutMono(filesCollection, operationTimeout)
+ Timeout timeout = startTimeout(filesCollection.getTimeout(MILLISECONDS));
+ return collectionWithTimeoutMono(filesCollection, timeout)
.flatMap(wrappedCollection -> {
if (clientSession == null) {
return Mono.from(wrappedCollection.drop());
} else {
return Mono.from(wrappedCollection.drop(clientSession));
}
- }).then(collectionWithTimeoutDeferred(chunksCollection, operationTimeout))
+ }).then(collectionWithTimeoutDeferred(chunksCollection, timeout))
.flatMap(wrappedCollection -> {
if (clientSession == null) {
return Mono.from(wrappedCollection.drop());
diff --git a/driver-sync/src/main/com/mongodb/client/internal/CollectionInfoRetriever.java b/driver-sync/src/main/com/mongodb/client/internal/CollectionInfoRetriever.java
index 9d02a1e8756..9ce34fd18a9 100644
--- a/driver-sync/src/main/com/mongodb/client/internal/CollectionInfoRetriever.java
+++ b/driver-sync/src/main/com/mongodb/client/internal/CollectionInfoRetriever.java
@@ -36,8 +36,8 @@ class CollectionInfoRetriever {
this.client = notNull("client", client);
}
- public List filter(final String databaseName, final BsonDocument filter, @Nullable final Timeout operationTimeout) {
- return databaseWithTimeout(client.getDatabase(databaseName), TIMEOUT_ERROR_MESSAGE, operationTimeout)
+ public List filter(final String databaseName, final BsonDocument filter, @Nullable final Timeout timeout) {
+ return databaseWithTimeout(client.getDatabase(databaseName), TIMEOUT_ERROR_MESSAGE, timeout)
.listCollections(BsonDocument.class)
.filter(filter)
.into(new ArrayList<>());
diff --git a/driver-sync/src/main/com/mongodb/client/internal/CommandMarker.java b/driver-sync/src/main/com/mongodb/client/internal/CommandMarker.java
index 73eed8efd01..d694cf682a3 100644
--- a/driver-sync/src/main/com/mongodb/client/internal/CommandMarker.java
+++ b/driver-sync/src/main/com/mongodb/client/internal/CommandMarker.java
@@ -84,11 +84,11 @@ class CommandMarker implements Closeable {
}
}
- RawBsonDocument mark(final String databaseName, final RawBsonDocument command, @Nullable final Timeout operationTimeout) {
+ RawBsonDocument mark(final String databaseName, final RawBsonDocument command, @Nullable final Timeout timeout) {
if (client != null) {
try {
try {
- return executeCommand(databaseName, command, operationTimeout);
+ return executeCommand(databaseName, command, timeout);
} catch (MongoOperationTimeoutException e){
throw e;
} catch (MongoTimeoutException e) {
@@ -96,7 +96,7 @@ RawBsonDocument mark(final String databaseName, final RawBsonDocument command, @
throw e;
}
startProcess(processBuilder);
- return executeCommand(databaseName, command, operationTimeout);
+ return executeCommand(databaseName, command, timeout);
}
} catch (MongoException e) {
throw wrapInClientException(e);
@@ -113,14 +113,17 @@ public void close() {
}
}
- private RawBsonDocument executeCommand(final String databaseName, final RawBsonDocument markableCommand, @Nullable final Timeout operationTimeout) {
+ private RawBsonDocument executeCommand(
+ final String databaseName,
+ final RawBsonDocument markableCommand,
+ @Nullable final Timeout timeout) {
assertNotNull(client);
MongoDatabase mongoDatabase = client.getDatabase(databaseName)
.withReadConcern(ReadConcern.DEFAULT)
.withReadPreference(ReadPreference.primary());
- return databaseWithTimeout(mongoDatabase, TIMEOUT_ERROR_MESSAGE, operationTimeout)
+ return databaseWithTimeout(mongoDatabase, TIMEOUT_ERROR_MESSAGE, timeout)
.runCommand(markableCommand, RawBsonDocument.class);
}
diff --git a/driver-sync/src/main/com/mongodb/client/internal/Crypt.java b/driver-sync/src/main/com/mongodb/client/internal/Crypt.java
index 15ba16e66da..ae7a75ae626 100644
--- a/driver-sync/src/main/com/mongodb/client/internal/Crypt.java
+++ b/driver-sync/src/main/com/mongodb/client/internal/Crypt.java
@@ -132,7 +132,10 @@ public class Crypt implements Closeable {
* @param command the unencrypted command
* @return the encrypted command
*/
- RawBsonDocument encrypt(final String databaseName, final RawBsonDocument command, @Nullable final Timeout timeoutOperation) {
+ RawBsonDocument encrypt(
+ final String databaseName,
+ final RawBsonDocument command,
+ @Nullable final Timeout timeout) {
notNull("databaseName", databaseName);
notNull("command", command);
@@ -141,7 +144,7 @@ RawBsonDocument encrypt(final String databaseName, final RawBsonDocument command
}
try (MongoCryptContext encryptionContext = mongoCrypt.createEncryptionContext(databaseName, command)) {
- return executeStateMachine(encryptionContext, databaseName, timeoutOperation);
+ return executeStateMachine(encryptionContext, databaseName, timeout);
} catch (MongoCryptException e) {
throw wrapInMongoException(e);
}
@@ -274,24 +277,27 @@ public void close() {
}
}
- private RawBsonDocument executeStateMachine(final MongoCryptContext cryptContext, @Nullable final String databaseName, @Nullable final Timeout operationTimeout) {
+ private RawBsonDocument executeStateMachine(
+ final MongoCryptContext cryptContext,
+ @Nullable final String databaseName,
+ @Nullable final Timeout timeout) {
while (true) {
State state = cryptContext.getState();
switch (state) {
case NEED_MONGO_COLLINFO:
- collInfo(cryptContext, notNull("databaseName", databaseName), operationTimeout);
+ collInfo(cryptContext, notNull("databaseName", databaseName), timeout);
break;
case NEED_MONGO_MARKINGS:
- mark(cryptContext, notNull("databaseName", databaseName), operationTimeout);
+ mark(cryptContext, notNull("databaseName", databaseName), timeout);
break;
case NEED_KMS_CREDENTIALS:
fetchCredentials(cryptContext);
break;
case NEED_MONGO_KEYS:
- fetchKeys(cryptContext, operationTimeout);
+ fetchKeys(cryptContext, timeout);
break;
case NEED_KMS:
- decryptKeys(cryptContext, operationTimeout);
+ decryptKeys(cryptContext, timeout);
break;
case READY:
return cryptContext.finish();
@@ -320,9 +326,9 @@ private void collInfo(final MongoCryptContext cryptContext, final String databas
}
}
- private void mark(final MongoCryptContext cryptContext, final String databaseName, @Nullable final Timeout operationTimeout) {
+ private void mark(final MongoCryptContext cryptContext, final String databaseName, @Nullable final Timeout timeout) {
try {
- RawBsonDocument markedCommand = assertNotNull(commandMarker).mark(databaseName, cryptContext.getMongoOperation(), operationTimeout);
+ RawBsonDocument markedCommand = assertNotNull(commandMarker).mark(databaseName, cryptContext.getMongoOperation(), timeout);
cryptContext.addMongoOperationResult(markedCommand);
cryptContext.completeMongoOperation();
} catch (Throwable t) {
diff --git a/driver-sync/src/main/com/mongodb/client/internal/CryptConnection.java b/driver-sync/src/main/com/mongodb/client/internal/CryptConnection.java
index 803df89a6b6..b5c814edb04 100644
--- a/driver-sync/src/main/com/mongodb/client/internal/CryptConnection.java
+++ b/driver-sync/src/main/com/mongodb/client/internal/CryptConnection.java
@@ -112,9 +112,11 @@ public T command(final String database, final BsonDocument command, final Fi
getEncoder(command).encode(writer, command, EncoderContext.builder().build());
- Timeout operationTimeout = operationContext.getTimeoutContext().getTimeout();
- RawBsonDocument encryptedCommand = crypt.encrypt(database,
- new RawBsonDocument(bsonOutput.getInternalBuffer(), 0, bsonOutput.getSize()), operationTimeout);
+ Timeout timeout = operationContext.getTimeoutContext().getTimeout();
+ RawBsonDocument encryptedCommand = crypt.encrypt(
+ database,
+ new RawBsonDocument(bsonOutput.getInternalBuffer(), 0, bsonOutput.getSize()),
+ timeout);
RawBsonDocument encryptedResponse = wrapped.command(database, encryptedCommand, commandFieldNameValidator, readPreference,
new RawBsonDocumentCodec(), operationContext, responseExpected, EmptyMessageSequences.INSTANCE);
@@ -123,7 +125,7 @@ public T command(final String database, final BsonDocument command, final Fi
return null;
}
- RawBsonDocument decryptedResponse = crypt.decrypt(encryptedResponse, operationTimeout);
+ RawBsonDocument decryptedResponse = crypt.decrypt(encryptedResponse, timeout);
BsonBinaryReader reader = new BsonBinaryReader(decryptedResponse.getByteBuffer().asNIO());