Skip to content

JAVA-5640 refactor csot #1760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 8 additions & 46 deletions driver-core/src/main/com/mongodb/internal/TimeoutContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,20 @@

import com.mongodb.MongoClientException;
import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.internal.async.AsyncRunnable;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.CommandMessage;
import com.mongodb.internal.time.StartTime;
import com.mongodb.internal.time.Timeout;
import com.mongodb.lang.Nullable;
import com.mongodb.session.ClientSession;

import java.util.Objects;
import java.util.Optional;
import java.util.function.LongConsumer;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.assertNull;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.time.Timeout.ZeroSemantics.ZERO_DURATION_MEANS_INFINITE;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

Expand Down Expand Up @@ -121,6 +115,12 @@ public TimeoutContext copyTimeoutContext() {
return new TimeoutContext(getTimeoutSettings(), getTimeout());
}

public TimeoutContext withNewlyStartedTimeout() {
TimeoutContext newContext = copyTimeoutContext();
newContext.timeout = startTimeout(newContext.timeoutSettings.getTimeoutMS());
return newContext;
}

public TimeoutContext(final TimeoutSettings timeoutSettings) {
this(false, timeoutSettings, startTimeout(timeoutSettings.getTimeoutMS()));
}
Expand Down Expand Up @@ -297,56 +297,18 @@ public int getConnectTimeoutMs() {

/**
* @see #hasTimeoutMS()
* @see #doWithResetTimeout(Runnable)
* @see #doWithResetTimeout(AsyncRunnable, SingleResultCallback)
*/
@Deprecated // TODO-JAVA-5640 REMOVE method
public void resetTimeoutIfPresent() {
getAndResetTimeoutIfPresent();
}

/**
* @see #hasTimeoutMS()
* @return A {@linkplain Optional#isPresent() non-empty} previous {@linkplain Timeout} iff {@link #hasTimeoutMS()},
* i.e., iff it was reset.
*/
private Optional<Timeout> getAndResetTimeoutIfPresent() {
Timeout result = timeout;
if (hasTimeoutMS()) {
timeout = startTimeout(timeoutSettings.getTimeoutMS());
return ofNullable(result);
}
return empty();
}

/**
* @see #resetTimeoutIfPresent()
*/
public void doWithResetTimeout(final Runnable action) {
Optional<Timeout> originalTimeout = getAndResetTimeoutIfPresent();
try {
action.run();
} finally {
originalTimeout.ifPresent(original -> timeout = original);
}
}

/**
* @see #resetTimeoutIfPresent()
*/
public void doWithResetTimeout(final AsyncRunnable action, final SingleResultCallback<Void> callback) {
beginAsync().thenRun(c -> {
Optional<Timeout> originalTimeout = getAndResetTimeoutIfPresent();
beginAsync().thenRun(c2 -> {
action.finish(c2);
}).thenAlwaysRunAndFinish(() -> {
originalTimeout.ifPresent(original -> timeout = original);
}, c);
}).finish(callback);
}

/**
* Resets the timeout if this timeout context is being used by pool maintenance
*/
@Deprecated // TODO-JAVA-5640 REMOVE method
public void resetMaintenanceTimeout() {
if (!isMaintenanceContext) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
*<p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface SingleResultCallback<T> {
public static SingleResultCallback<Void> THEN_DO_NOTHING = (r, t) -> {};

/**
* Called when the function completes. This method must not complete abruptly, see {@link AsyncCallbackFunction} for more details.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
@NotThreadSafe
public final class RetryState {
public static final int RETRIES = 1;
private static final int INFINITE_ATTEMPTS = Integer.MAX_VALUE;
public static final int INFINITE_ATTEMPTS = Integer.MAX_VALUE;

private final LoopState loopState;
private final int attempts;
Expand All @@ -67,19 +67,16 @@ public final class RetryState {
* </p>
*
* @param retries A positive number of allowed retries. {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
* @param timeoutContext A timeout context that will be used to determine if the operation has timed out.
* @param retryUntilTimeoutThrowsException // TODO-JAVA-5640 shouldn't a timeout always stop retries?
* @see #attempts()
*/
public static RetryState withRetryableState(final int retries, final TimeoutContext timeoutContext) {
public static RetryState withRetryableState(final int retries, final boolean retryUntilTimeoutThrowsException) {
assertTrue(retries > 0);
if (timeoutContext.hasTimeoutMS()){
return new RetryState(INFINITE_ATTEMPTS, timeoutContext);
}
return new RetryState(retries, null);
return new RetryState(retries, retryUntilTimeoutThrowsException);
}

public static RetryState withNonRetryableState() {
return new RetryState(0, null);
return new RetryState(0, false);
}

/**
Expand All @@ -94,19 +91,19 @@ public static RetryState withNonRetryableState() {
* @see #attempts()
*/
public RetryState(final TimeoutContext timeoutContext) {
this(INFINITE_ATTEMPTS, timeoutContext);
this(INFINITE_ATTEMPTS, timeoutContext.hasTimeoutMS());
}

/**
* @param retries A non-negative number of allowed retries. {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
* @param timeoutContext A timeout context that will be used to determine if the operation has timed out.
* @param retryUntilTimeoutThrowsException
* @see #attempts()
*/
private RetryState(final int retries, @Nullable final TimeoutContext timeoutContext) {
private RetryState(final int retries, final boolean retryUntilTimeoutThrowsException) {
assertTrue(retries >= 0);
loopState = new LoopState();
attempts = retries == INFINITE_ATTEMPTS ? INFINITE_ATTEMPTS : retries + 1;
this.retryUntilTimeoutThrowsException = timeoutContext != null && timeoutContext.hasTimeoutMS();
this.retryUntilTimeoutThrowsException = retryUntilTimeoutThrowsException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ public ServerApi getServerApi() {
return serverApi;
}

@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
public OperationContext(final long id,
private OperationContext(final long id,
final RequestContext requestContext,
final SessionContext sessionContext,
final TimeoutContext timeoutContext,
Expand Down Expand Up @@ -138,6 +137,11 @@ public ServerDeprioritization getServerDeprioritization() {
return serverDeprioritization;
}

public OperationContext withNewlyStartedTimeout() {
TimeoutContext tc = this.timeoutContext.withNewlyStartedTimeout();
return this.withTimeoutContext(tc);
}

public static final class ServerDeprioritization {
@Nullable
private ServerAddress candidate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.assertions.Assertions.doesNotThrow;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.async.SingleResultCallback.THEN_DO_NOTHING;
import static com.mongodb.internal.operation.CommandBatchCursorHelper.FIRST_BATCH;
import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR;
import static com.mongodb.internal.operation.CommandBatchCursorHelper.NEXT_BATCH;
Expand All @@ -71,8 +72,8 @@ class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T>
private final int maxWireVersion;
private final boolean firstBatchEmpty;
private final ResourceManager resourceManager;
private final OperationContext operationContext;
private final TimeoutMode timeoutMode;
private OperationContext operationContext;
private final AtomicBoolean processedInitial = new AtomicBoolean();
private int batchSize;
private volatile CommandCursorResult<T> commandCursorResult;
Expand All @@ -94,10 +95,10 @@ class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T>
this.comment = comment;
this.maxWireVersion = connectionDescription.getMaxWireVersion();
this.firstBatchEmpty = commandCursorResult.getResults().isEmpty();
operationContext = connectionSource.getOperationContext();
this.timeoutMode = timeoutMode;

operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS);
operationContext = connectionSource.getOperationContext();
operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS); // TODO-JAVA-5640 with?

AsyncConnection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER
? connection : null;
Expand Down Expand Up @@ -175,7 +176,7 @@ public int getMaxWireVersion() {

void checkTimeoutModeAndResetTimeoutContextIfIteration() {
if (timeoutMode == TimeoutMode.ITERATION) {
operationContext.getTimeoutContext().resetTimeoutIfPresent();
operationContext = operationContext.withNewlyStartedTimeout();
}
}

Expand Down Expand Up @@ -229,7 +230,8 @@ private CommandCursorResult<T> toCommandCursorResult(final ServerAddress serverA

/**
* Configures the cursor to {@link #close()}
* without {@linkplain TimeoutContext#resetTimeoutIfPresent() resetting} its {@linkplain TimeoutContext#getTimeout() timeout}.
* without {@linkplain TimeoutContext#withNewlyStartedTimeout() starting a new}
* {@linkplain TimeoutContext#getTimeout() timeout}.
* This is useful when managing the {@link #close()} behavior externally.
*/
AsyncCommandBatchCursor<T> disableTimeoutResetWhenClosing() {
Expand Down Expand Up @@ -277,15 +279,14 @@ void markAsPinned(final AsyncConnection connectionToPin, final Connection.Pinnin
void doClose() {
TimeoutContext timeoutContext = operationContext.getTimeoutContext();
timeoutContext.resetToDefaultMaxTime();
SingleResultCallback<Void> thenDoNothing = (r, t) -> {};
if (resetTimeoutWhenClosing) {
timeoutContext.doWithResetTimeout(this::releaseResourcesAsync, thenDoNothing);
releaseResourcesAsync(operationContext.withNewlyStartedTimeout(), THEN_DO_NOTHING);
} else {
releaseResourcesAsync(thenDoNothing);
releaseResourcesAsync(operationContext, THEN_DO_NOTHING);
}
}

private void releaseResourcesAsync(final SingleResultCallback<Void> callback) {
private void releaseResourcesAsync(final OperationContext operationContext, final SingleResultCallback<Void> callback) {
beginAsync().thenRunTryCatchAsyncBlocks(c -> {
if (isSkipReleasingServerResourcesOnClose()) {
unsetServerCursor();
Expand All @@ -295,7 +296,7 @@ private void releaseResourcesAsync(final SingleResultCallback<Void> callback) {
getConnection(c2);
}).thenConsume((connection, c3) -> {
beginAsync().thenRun(c4 -> {
releaseServerResourcesAsync(connection, c4);
releaseServerResourcesAsync(connection, operationContext, c4);
}).thenAlwaysRunAndFinish(() -> {
connection.release();
}, c3);
Expand Down Expand Up @@ -346,11 +347,12 @@ private void getConnection(final SingleResultCallback<AsyncConnection> callback)
}
}

private void releaseServerResourcesAsync(final AsyncConnection connection, final SingleResultCallback<Void> callback) {
private void releaseServerResourcesAsync(final AsyncConnection connection, final OperationContext operationContext,
final SingleResultCallback<Void> callback) {
beginAsync().thenRun((c) -> {
ServerCursor localServerCursor = super.getServerCursor();
if (localServerCursor != null) {
killServerCursorAsync(getNamespace(), localServerCursor, connection, callback);
killServerCursorAsync(getNamespace(), localServerCursor, connection, operationContext, callback);
} else {
c.complete(c);
}
Expand All @@ -359,11 +361,22 @@ private void releaseServerResourcesAsync(final AsyncConnection connection, final
}, callback);
}

private void killServerCursorAsync(final MongoNamespace namespace, final ServerCursor localServerCursor,
final AsyncConnection localConnection, final SingleResultCallback<Void> callback) {
localConnection.commandAsync(namespace.getDatabaseName(), getKillCursorsCommand(namespace, localServerCursor),
NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), new BsonDocumentCodec(),
operationContext, (r, t) -> callback.onResult(null, null));
private void killServerCursorAsync(
final MongoNamespace namespace,
final ServerCursor localServerCursor,
final AsyncConnection localConnection,
final OperationContext operationContext,
final SingleResultCallback<Void> callback) {
beginAsync().thenRun(c -> {
localConnection.commandAsync(
namespace.getDatabaseName(),
getKillCursorsCommand(namespace, localServerCursor),
NoOpFieldNameValidator.INSTANCE,
ReadPreference.primary(),
new BsonDocumentCodec(),
operationContext,
(r, t) -> c.complete(c));
}).finish(callback);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.VisibleForTesting;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
Expand Down Expand Up @@ -64,7 +65,7 @@ public class ChangeStreamOperation<T> implements AsyncReadOperation<AsyncBatchCu
private BsonTimestamp startAtOperationTime;
private boolean showExpandedEvents;


@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
public ChangeStreamOperation(final MongoNamespace namespace, final FullDocument fullDocument,
final FullDocumentBeforeChange fullDocumentBeforeChange, final List<BsonDocument> pipeline, final Decoder<T> decoder) {
this(namespace, fullDocument, fullDocumentBeforeChange, pipeline, decoder, ChangeStreamLevel.COLLECTION);
Expand Down
Loading