From f4e3c10d1c823aaf66da96e3e3a4db1fd2007401 Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Wed, 15 Sep 2021 15:05:04 +0100 Subject: [PATCH] Add result consumption and disposal to reactive testkit backend session close --- .../backend/holder/RxResultHolder.java | 2 + .../backend/holder/RxSessionHolder.java | 12 ++ .../messages/requests/SessionClose.java | 114 +++++++++++++++++- .../backend/messages/requests/StartTest.java | 7 -- 4 files changed, 127 insertions(+), 8 deletions(-) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java index 27cecd1b88..56631f33eb 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java @@ -38,11 +38,13 @@ public class RxResultHolder extends AbstractResultHolder> getSubscriber() diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java index 73e66b2920..607ce07229 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java @@ -18,13 +18,25 @@ */ package neo4j.org.testkit.backend.holder; +import lombok.Setter; + +import java.util.Optional; + import org.neo4j.driver.SessionConfig; import org.neo4j.driver.reactive.RxSession; public class RxSessionHolder extends AbstractSessionHolder { + @Setter + private RxResultHolder resultHolder; + public RxSessionHolder( DriverHolder driverHolder, RxSession session, SessionConfig config ) { super( driverHolder, session, config ); } + + public Optional getResultHolder() + { + return Optional.ofNullable( resultHolder ); + } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java index d3680a3f49..fce604a3e0 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java @@ -20,12 +20,18 @@ import lombok.Getter; import lombok.Setter; +import neo4j.org.testkit.backend.RxBlockingSubscriber; import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.holder.RxResultHolder; import neo4j.org.testkit.backend.messages.responses.Session; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import reactor.core.publisher.Mono; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicLong; + +import org.neo4j.driver.Record; @Setter @Getter @@ -52,15 +58,121 @@ public CompletionStage processAsync( TestkitState testkitState public Mono processRx( TestkitState testkitState ) { return testkitState.getRxSessionHolder( data.getSessionId() ) - .flatMap( sessionHolder -> Mono.fromDirect( sessionHolder.getSession().close() ) ) + .flatMap( sessionHolder -> sessionHolder.getResultHolder() + .map( this::consumeRequestedDemandAndCancelIfSubscribed ) + .orElse( Mono.empty() ) + .then( Mono.fromDirect( sessionHolder.getSession().close() ) ) ) .then( Mono.just( createResponse() ) ); } + private Mono consumeRequestedDemandAndCancelIfSubscribed( RxResultHolder resultHolder ) + { + return resultHolder.getSubscriber() + .map( subscriber -> Mono.fromCompletionStage( consumeRequestedDemandAndCancelIfSubscribed( resultHolder, subscriber ) ) ) + .orElse( Mono.empty() ); + } + + private CompletionStage consumeRequestedDemandAndCancelIfSubscribed( RxResultHolder resultHolder, RxBlockingSubscriber subscriber ) + { + if ( subscriber.getCompletionStage().toCompletableFuture().isDone() ) + { + return CompletableFuture.completedFuture( null ); + } + + return new DemandConsumer<>( subscriber, resultHolder.getRequestedRecordsCounter() ) + .getCompletedStage() + .thenCompose( completionReason -> + { + CompletionStage result; + switch ( completionReason ) + { + case REQUESTED_DEMAND_CONSUMED: + result = subscriber.getSubscriptionStage().thenApply( subscription -> + { + subscription.cancel(); + return null; + } ); + break; + case RECORD_STREAM_EXHAUSTED: + result = CompletableFuture.completedFuture( null ); + break; + default: + result = new CompletableFuture<>(); + result.toCompletableFuture() + .completeExceptionally( new RuntimeException( "Unexpected completion reason: " + completionReason ) ); + } + return result; + } ); + } + private Session createResponse() { return Session.builder().data( Session.SessionBody.builder().id( data.getSessionId() ).build() ).build(); } + private static class DemandConsumer + { + private final RxBlockingSubscriber subscriber; + private final AtomicLong unfulfilledDemandCounter; + @Getter + private final CompletableFuture completedStage = new CompletableFuture<>(); + + private enum CompletionReason + { + REQUESTED_DEMAND_CONSUMED, + RECORD_STREAM_EXHAUSTED + } + + private DemandConsumer( RxBlockingSubscriber subscriber, AtomicLong unfulfilledDemandCounter ) + { + this.subscriber = subscriber; + this.unfulfilledDemandCounter = unfulfilledDemandCounter; + + subscriber.getCompletionStage().whenComplete( this::onComplete ); + if ( this.unfulfilledDemandCounter.get() > 0 ) + { + setupNextSignalConsumer(); + } + } + + private void setupNextSignalConsumer() + { + CompletableFuture consumer = new CompletableFuture<>(); + subscriber.setNextSignalConsumer( consumer ); + consumer.whenComplete( this::onNext ); + } + + private void onNext( T ignored, Throwable throwable ) + { + if ( throwable != null ) + { + completedStage.completeExceptionally( throwable ); + return; + } + + if ( unfulfilledDemandCounter.decrementAndGet() > 0 ) + { + setupNextSignalConsumer(); + } + else + { + completedStage.complete( CompletionReason.REQUESTED_DEMAND_CONSUMED ); + } + } + + private void onComplete( Void ignored, Throwable throwable ) + { + if ( throwable != null ) + { + completedStage.completeExceptionally( throwable ); + } + else + { + completedStage.complete( CompletionReason.RECORD_STREAM_EXHAUSTED ); + } + } + } + @Setter @Getter private static class SessionCloseBody diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java index 0e6f3c8e47..0cb300c632 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java @@ -61,10 +61,6 @@ public class StartTest implements TestkitRequest REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_after_hello$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_run$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_on_tx_run$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRetry\\..*$", "Unfinished results consumption" ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRetryClustering\\..*$", "Unfinished results consumption" ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_discard_on_session_close_unfinished_result$", - "Does not support partially consumed state" ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_error_on_database_shutdown_using_tx_run$", "Session close throws error" ); skipMessage = "Requires investigation"; REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_agent", skipMessage ); @@ -79,12 +75,9 @@ public class StartTest implements TestkitRequest REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout_unmanaged_tx$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_tx_commit$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRunParameters\\..*$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_rollback_tx_on_session_close_unfinished_result$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_rollback_tx_on_session_close_untouched_result$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_autocommit_transactions_should_support_timeout$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_fails_on_bad_syntax$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_fails_on_missing_parameter$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_iteration_larger_than_fetch_size$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_iteration_nested$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_partial_iteration$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxFuncRun\\.test_iteration_nested$", skipMessage );