diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 6b2f9c7fda..53222d1f55 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -20,15 +20,20 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import org.neo4j.driver.internal.async.InternalStatementResultCursor; import org.neo4j.driver.internal.async.QueryRunner; +import org.neo4j.driver.internal.async.ResultCursorsHolder; import org.neo4j.driver.internal.handlers.BeginTxResponseHandler; import org.neo4j.driver.internal.handlers.CommitTxResponseHandler; import org.neo4j.driver.internal.handlers.NoOpResponseHandler; import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler; import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.internal.types.InternalTypeSystem; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; @@ -43,6 +48,7 @@ import static java.util.Collections.emptyMap; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.neo4j.driver.internal.util.Futures.completionErrorCause; import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.internal.util.Futures.getBlocking; import static org.neo4j.driver.v1.Values.value; @@ -56,28 +62,36 @@ public class ExplicitTransaction implements Transaction private enum State { /** The transaction is running with no explicit success or failure marked */ - ACTIVE, + ACTIVE( true ), /** Running, user marked for success, meaning it'll value committed */ - MARKED_SUCCESS, + MARKED_SUCCESS( true ), /** User marked as failed, meaning it'll be rolled back. */ - MARKED_FAILED, + MARKED_FAILED( true ), /** * This transaction has been explicitly terminated by calling {@link Session#reset()}. */ - TERMINATED, + TERMINATED( false ), /** This transaction has successfully committed */ - COMMITTED, + COMMITTED( false ), /** This transaction has been rolled back */ - ROLLED_BACK + ROLLED_BACK( false ); + + final boolean txOpen; + + State( boolean txOpen ) + { + this.txOpen = txOpen; + } } private final Connection connection; private final NetworkSession session; + private final ResultCursorsHolder resultCursors; private volatile Bookmark bookmark = Bookmark.empty(); private volatile State state = State.ACTIVE; @@ -86,6 +100,7 @@ public ExplicitTransaction( Connection connection, NetworkSession session ) { this.connection = connection; this.session = session; + this.resultCursors = new ResultCursorsHolder(); } public CompletionStage beginAsync( Bookmark initialBookmark ) @@ -162,7 +177,9 @@ else if ( state == State.TERMINATED ) } else { - return doCommitAsync().whenComplete( transactionClosed( State.COMMITTED ) ); + return resultCursors.retrieveNotConsumedError() + .thenCompose( error -> doCommitAsync().handle( handleCommitOrRollback( error ) ) ) + .whenComplete( transactionClosed( State.COMMITTED ) ); } } @@ -185,38 +202,12 @@ else if ( state == State.TERMINATED ) } else { - return doRollbackAsync().whenComplete( transactionClosed( State.ROLLED_BACK ) ); + return resultCursors.retrieveNotConsumedError() + .thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) ) + .whenComplete( transactionClosed( State.ROLLED_BACK ) ); } } - private BiConsumer transactionClosed( State newState ) - { - return ( ignore, error ) -> - { - state = newState; - connection.releaseInBackground(); - session.setBookmark( bookmark ); - }; - } - - private CompletionStage doCommitAsync() - { - CompletableFuture commitFuture = new CompletableFuture<>(); - connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, - new CommitTxResponseHandler( commitFuture, this ) ); - - return commitFuture.thenRun( () -> state = State.COMMITTED ); - } - - private CompletionStage doRollbackAsync() - { - CompletableFuture rollbackFuture = new CompletableFuture<>(); - connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, - new RollbackTxResponseHandler( rollbackFuture ) ); - - return rollbackFuture.thenRun( () -> state = State.ROLLED_BACK ); - } - @Override public StatementResult run( String statementText, Value statementParameters ) { @@ -273,23 +264,31 @@ public CompletionStage runAsync( String statementTemplate @Override public StatementResult run( Statement statement ) { - ensureCanRunQueries(); - StatementResultCursor cursor = getBlocking( QueryRunner.runAsBlocking( connection, statement, this ) ); + StatementResultCursor cursor = getBlocking( run( statement, false ) ); return new InternalStatementResult( cursor ); } @Override public CompletionStage runAsync( Statement statement ) { - ensureCanRunQueries(); //noinspection unchecked - return (CompletionStage) QueryRunner.runAsAsync( connection, statement, this ); + return (CompletionStage) run( statement, true ); } - @Override - public boolean isOpen() + private CompletionStage run( Statement statement, boolean asAsync ) { - return state != State.COMMITTED && state != State.ROLLED_BACK && state != State.TERMINATED; + ensureCanRunQueries(); + CompletionStage cursorStage; + if ( asAsync ) + { + cursorStage = QueryRunner.runAsAsync( connection, statement, this ); + } + else + { + cursorStage = QueryRunner.runAsBlocking( connection, statement, this ); + } + resultCursors.add( cursorStage ); + return cursorStage; } private void ensureCanRunQueries() @@ -317,6 +316,12 @@ else if ( state == State.TERMINATED ) } } + @Override + public boolean isOpen() + { + return state.txOpen; + } + @Override public TypeSystem typeSystem() { @@ -340,4 +345,49 @@ public void setBookmark( Bookmark bookmark ) this.bookmark = bookmark; } } + + private CompletionStage doCommitAsync() + { + CompletableFuture commitFuture = new CompletableFuture<>(); + ResponseHandler pullAllHandler = new CommitTxResponseHandler( commitFuture, this ); + connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler ); + return commitFuture; + } + + private CompletionStage doRollbackAsync() + { + CompletableFuture rollbackFuture = new CompletableFuture<>(); + ResponseHandler pullAllHandler = new RollbackTxResponseHandler( rollbackFuture ); + connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler ); + return rollbackFuture; + } + + private BiFunction handleCommitOrRollback( Throwable cursorFailure ) + { + return ( ignore, commitOrRollbackError ) -> + { + if ( cursorFailure != null ) + { + throw new CompletionException( completionErrorCause( cursorFailure ) ); + } + else if ( commitOrRollbackError != null ) + { + throw new CompletionException( completionErrorCause( commitOrRollbackError ) ); + } + else + { + return null; + } + }; + } + + private BiConsumer transactionClosed( State newState ) + { + return ( ignore, error ) -> + { + state = newState; + connection.releaseInBackground(); + session.setBookmark( bookmark ); + }; + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 0bbe5fdc71..e61ce516fe 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -26,6 +26,7 @@ import org.neo4j.driver.internal.async.InternalStatementResultCursor; import org.neo4j.driver.internal.async.QueryRunner; +import org.neo4j.driver.internal.async.ResultCursorsHolder; import org.neo4j.driver.internal.logging.DelegatingLogger; import org.neo4j.driver.internal.retry.RetryLogic; import org.neo4j.driver.internal.spi.Connection; @@ -59,12 +60,12 @@ public class NetworkSession implements Session private final ConnectionProvider connectionProvider; private final AccessMode mode; private final RetryLogic retryLogic; + private final ResultCursorsHolder resultCursors; protected final Logger logger; private volatile Bookmark bookmark = Bookmark.empty(); private volatile CompletionStage transactionStage = completedFuture( null ); private volatile CompletionStage connectionStage = completedFuture( null ); - private volatile CompletionStage lastResultStage = completedFuture( null ); private final AtomicBoolean open = new AtomicBoolean( true ); @@ -74,6 +75,7 @@ public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, R this.connectionProvider = connectionProvider; this.mode = mode; this.retryLogic = retryLogic; + this.resultCursors = new ResultCursorsHolder(); this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) ); } @@ -153,45 +155,34 @@ public boolean isOpen() @Override public void close() { - if ( open.compareAndSet( true, false ) ) - { - // todo: should closeAsync() also do this waiting for buffered result? - // todo: unit test result buffering? - getBlocking( lastResultStage - .exceptionally( error -> null ) - .thenCompose( this::ensureBuffered ) - .thenCompose( error -> releaseResources().thenApply( ignore -> - { - if ( error != null ) - { - throw new CompletionException( error ); - } - return null; - } ) ) ); - } + getBlocking( closeAsync() ); } @Override public CompletionStage closeAsync() { - // todo: wait for buffered result? if ( open.compareAndSet( true, false ) ) { - return releaseResources(); + return resultCursors.retrieveNotConsumedError() + .thenCompose( error -> releaseResources().thenApply( ignore -> + { + Throwable queryError = Futures.completionErrorCause( error ); + if ( queryError != null ) + { + // connection has been acquired and there is an unconsumed error in result cursor + throw new CompletionException( queryError ); + } + else + { + // either connection acquisition failed or + // there are no unconsumed errors in the result cursor + return null; + } + } ) ); } return completedFuture( null ); } - // todo: test this method - CompletionStage ensureBuffered( InternalStatementResultCursor cursor ) - { - if ( cursor == null ) - { - return completedFuture( null ); - } - return cursor.resultBuffered(); - } - @Override public Transaction beginTransaction() { @@ -421,7 +412,7 @@ private CompletionStage runAsync( Statement state { ensureSessionIsOpen(); - lastResultStage = ensureNoOpenTxBeforeRunningQuery() + CompletionStage cursorStage = ensureNoOpenTxBeforeRunningQuery() .thenCompose( ignore -> acquireConnection( mode ) ) .thenCompose( connection -> { @@ -435,7 +426,8 @@ private CompletionStage runAsync( Statement state } } ); - return lastResultStage; + resultCursors.add( cursorStage ); + return cursorStage; } private CompletionStage beginTransactionAsync( AccessMode mode ) @@ -496,7 +488,7 @@ private CompletionStage rollbackTransaction() } ).exceptionally( error -> { Throwable cause = Futures.completionErrorCause( error ); - logger.error( "Failed to rollback active transaction", cause ); + logger.warn( "Active transaction rolled back with an error", cause ); return null; } ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java index eecbdb2172..1f2e0ae6f2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java @@ -34,6 +34,7 @@ import org.neo4j.driver.v1.util.Function; import org.neo4j.driver.v1.util.Functions; +// todo: unit tests public class InternalStatementResultCursor implements StatementResultCursor { // todo: maybe smth better than these two string constants? @@ -142,10 +143,9 @@ public CompletionStage> listAsync( Function mapFunction ) return resultFuture; } - // todo: test this method and give it better name - public CompletionStage resultBuffered() + public CompletionStage failureAsync() { - return pullAllHandler.resultBuffered(); + return pullAllHandler.failureAsync(); } private void internalForEachAsync( Consumer action, CompletableFuture resultFuture ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java b/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java index ebcf7031af..9097e735ea 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java @@ -48,8 +48,7 @@ public static CompletionStage runAsBlocking( Conn } public static CompletionStage runAsBlocking( Connection connection, - Statement statement, - ExplicitTransaction tx ) + Statement statement, ExplicitTransaction tx ) { return runAsAsync( connection, statement, tx, false ); } @@ -61,8 +60,7 @@ public static CompletionStage runAsAsync( Connect } public static CompletionStage runAsAsync( Connection connection, - Statement statement, - ExplicitTransaction tx ) + Statement statement, ExplicitTransaction tx ) { return runAsAsync( connection, statement, tx, true ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java b/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java new file mode 100644 index 0000000000..819d7f56e2 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletionStage; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +public class ResultCursorsHolder +{ + private final List> cursorStages = new ArrayList<>(); + + public void add( CompletionStage cursorStage ) + { + Objects.requireNonNull( cursorStage ); + cursorStages.add( cursorStage ); + } + + public CompletionStage retrieveNotConsumedError() + { + return cursorStages.stream() + .map( this::retrieveFailure ) + .reduce( completedFuture( null ), this::nonNullFailureFromEither ); + } + + private CompletionStage retrieveFailure( CompletionStage cursorStage ) + { + return cursorStage + .exceptionally( cursor -> null ) + .thenCompose( cursor -> cursor == null ? completedFuture( null ) : cursor.failureAsync() ); + } + + private CompletionStage nonNullFailureFromEither( CompletionStage stage1, + CompletionStage stage2 ) + { + return stage1.thenCompose( value -> + { + if ( value != null ) + { + return completedFuture( value ); + } + return stage2; + } ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java index cee8595b50..e957fcd3a6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java @@ -49,6 +49,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.neo4j.driver.internal.util.Futures.failedFuture; +// todo: unit tests public abstract class PullAllResponseHandler implements ResponseHandler { private static final boolean TOUCH_AUTO_READ = false; @@ -59,17 +60,15 @@ public abstract class PullAllResponseHandler implements ResponseHandler private final Queue records = new LinkedList<>(); - // todo: use presence of summary as a "finished" indicator and remove this field private boolean finished; private Throwable failure; private ResultSummary summary; private CompletableFuture recordFuture; private CompletableFuture summaryFuture; - private CompletableFuture resultBufferedFuture; + private CompletableFuture failureFuture; - public PullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler, - Connection connection ) + public PullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler, Connection connection ) { this.statement = requireNonNull( statement ); this.runResponseHandler = requireNonNull( runResponseHandler ); @@ -86,7 +85,7 @@ public synchronized void onSuccess( Map metadata ) completeRecordFuture( null ); completeSummaryFuture( summary ); - completeResultBufferedFuture( null ); + completeFailureFuture( null ); } protected abstract void afterSuccess(); @@ -104,7 +103,7 @@ public synchronized void onFailure( Throwable error ) { // error propagated through record future, complete other two completeSummaryFuture( summary ); - completeResultBufferedFuture( null ); + completeFailureFuture( null ); } else { @@ -112,13 +111,14 @@ public synchronized void onFailure( Throwable error ) if ( failedSummaryFuture ) { // error propagated through summary future, complete other one - completeResultBufferedFuture( null ); + completeFailureFuture( null ); } else { - boolean completedResultBufferedFuture = completeResultBufferedFuture( error ); - if ( !completedResultBufferedFuture ) + boolean completedFailureFuture = completeFailureFuture( error ); + if ( !completedFailureFuture ) { + // error has not been propagated to the user, remember it failure = error; } } @@ -142,9 +142,7 @@ public synchronized CompletionStage peekAsync() { if ( failure != null ) { - Throwable error = failure; - failure = null; // propagate failure only once - return failedFuture( error ); + return failedFuture( extractFailure() ); } if ( finished ) @@ -166,16 +164,16 @@ public synchronized CompletionStage peekAsync() public synchronized CompletionStage nextAsync() { - return peekAsync().thenApply( record -> - { - dequeueRecord(); - return record; - } ); + return peekAsync().thenApply( ignore -> dequeueRecord() ); } public synchronized CompletionStage summaryAsync() { - if ( summary != null ) + if ( failure != null ) + { + return failedFuture( extractFailure() ); + } + else if ( summary != null ) { return completedFuture( summary ); } @@ -189,13 +187,11 @@ public synchronized CompletionStage summaryAsync() } } - public synchronized CompletionStage resultBuffered() + public synchronized CompletionStage failureAsync() { if ( failure != null ) { - Throwable error = failure; - failure = null; // propagate failure only once - return completedFuture( error ); + return completedFuture( extractFailure() ); } else if ( finished ) { @@ -203,11 +199,11 @@ else if ( finished ) } else { - if ( resultBufferedFuture == null ) + if ( failureFuture == null ) { - resultBufferedFuture = new CompletableFuture<>(); + failureFuture = new CompletableFuture<>(); } - return resultBufferedFuture; + return failureFuture; } } @@ -236,6 +232,18 @@ private Record dequeueRecord() return record; } + private Throwable extractFailure() + { + if ( failure == null ) + { + throw new IllegalStateException( "Can't extract failure because it does not exist" ); + } + + Throwable error = failure; + failure = null; // propagate failure only once + return error; + } + private void completeRecordFuture( Record record ) { if ( recordFuture != null ) @@ -280,12 +288,12 @@ private boolean failSummaryFuture( Throwable error ) return false; } - private boolean completeResultBufferedFuture( Throwable error ) + private boolean completeFailureFuture( Throwable error ) { - if ( resultBufferedFuture != null ) + if ( failureFuture != null ) { - CompletableFuture future = resultBufferedFuture; - resultBufferedFuture = null; + CompletableFuture future = failureFuture; + failureFuture = null; future.complete( error ); return true; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java index 28ec1d8f7d..3b2c1dc130 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java @@ -46,13 +46,13 @@ public void onSuccess( Map metadata ) statementKeys = extractKeys( metadata ); resultAvailableAfter = extractResultAvailableAfter( metadata ); - runCompletedFuture.complete( null ); + completeRunFuture(); } @Override public void onFailure( Throwable error ) { - runCompletedFuture.completeExceptionally( error ); + completeRunFuture(); } @Override @@ -71,6 +71,16 @@ public long resultAvailableAfter() return resultAvailableAfter; } + /** + * Complete the given future with {@code null}. Future is never completed exceptionally because callers are only + * interested in when RUN completes and not how. Async API needs to wait for RUN because it needs to access + * statement keys. + */ + private void completeRunFuture() + { + runCompletedFuture.complete( null ); + } + private static List extractKeys( Map metadata ) { Value keysValue = metadata.get( "fields" ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 06053347f9..4ed7b2f19e 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -34,6 +34,7 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.internal.spi.ResponseHandler; +import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.internal.util.Supplier; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Session; @@ -88,6 +89,7 @@ public void setUp() { connection = connectionMock(); when( connection.releaseNow() ).thenReturn( completedFuture( null ) ); + when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 ); connectionProvider = mock( ConnectionProvider.class ); when( connectionProvider.acquireConnection( any( AccessMode.class ) ) ) .thenReturn( completedFuture( connection ) ); @@ -247,9 +249,12 @@ public void acquiresNewConnectionWhenUnableToUseCurrentOneForRun() } @Test - public void forceReleasesOpenConnectionUsedForRunWhenSessionIsClosed() + public void releasesOpenConnectionUsedForRunWhenSessionIsClosed() { - session.run( "RETURN 1" ); + String query = "RETURN 1"; + setupSuccessfulPullAll( query ); + + session.run( query ); getBlocking( session.closeAsync() ); @@ -351,11 +356,14 @@ public void updatesBookmarkWhenTxIsClosed() @Test public void releasesConnectionWhenTxIsClosed() { + String query = "RETURN 42"; + setupSuccessfulPullAll( query ); + Transaction tx = session.beginTransaction(); - tx.run( "RETURN 1" ); + tx.run( query ); verify( connectionProvider ).acquireConnection( READ ); - verify( connection ).runAndFlush( eq( "RETURN 1" ), any(), any(), any() ); + verify( connection ).runAndFlush( eq( query ), any(), any(), any() ); tx.close(); verify( connection ).releaseInBackground(); @@ -1022,6 +1030,16 @@ private static void setupFailingBegin( Connection connection, Throwable error ) } ).when( connection ).runAndFlush( eq( "BEGIN" ), any(), any(), any() ); } + private void setupSuccessfulPullAll( String query ) + { + doAnswer( invocation -> + { + ResponseHandler pullAllHandler = invocation.getArgumentAt( 3, ResponseHandler.class ); + pullAllHandler.onSuccess( emptyMap() ); + return null; + } ).when( connection ).runAndFlush( eq( query ), eq( emptyMap() ), any(), any() ); + } + private static class TxWork implements TransactionWork { final int result; diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ResultCursorsHolderTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/ResultCursorsHolderTest.java new file mode 100644 index 0000000000..ade998a6f6 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ResultCursorsHolderTest.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async; + +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeoutException; + +import org.neo4j.driver.internal.util.Futures; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.util.Futures.getBlocking; + +public class ResultCursorsHolderTest +{ + @Test + public void shouldReturnNoErrorWhenNoCursorStages() + { + ResultCursorsHolder holder = new ResultCursorsHolder(); + + Throwable error = getBlocking( holder.retrieveNotConsumedError() ); + assertNull( error ); + } + + @Test + public void shouldFailToAddNullCursorStage() + { + ResultCursorsHolder holder = new ResultCursorsHolder(); + + try + { + holder.add( null ); + fail( "Exception expected" ); + } + catch ( NullPointerException e ) + { + // expected + } + } + + @Test + public void shouldReturnNoErrorWhenCursorStagesHaveNoErrors() + { + ResultCursorsHolder holder = new ResultCursorsHolder(); + + holder.add( cursorWithoutError() ); + holder.add( cursorWithoutError() ); + holder.add( cursorWithoutError() ); + holder.add( cursorWithoutError() ); + + Throwable error = getBlocking( holder.retrieveNotConsumedError() ); + assertNull( error ); + } + + @Test + public void shouldNotReturnStageErrors() + { + ResultCursorsHolder holder = new ResultCursorsHolder(); + + holder.add( Futures.failedFuture( new RuntimeException( "Failed to acquire a connection" ) ) ); + holder.add( cursorWithoutError() ); + holder.add( cursorWithoutError() ); + holder.add( Futures.failedFuture( new IOException( "Failed to do IO" ) ) ); + + Throwable error = getBlocking( holder.retrieveNotConsumedError() ); + assertNull( error ); + } + + @Test + public void shouldReturnErrorWhenOneCursorFailed() + { + IOException error = new IOException( "IO failed" ); + ResultCursorsHolder holder = new ResultCursorsHolder(); + + holder.add( cursorWithoutError() ); + holder.add( cursorWithoutError() ); + holder.add( cursorWithError( error ) ); + holder.add( cursorWithoutError() ); + + Throwable retrievedError = getBlocking( holder.retrieveNotConsumedError() ); + assertEquals( error, retrievedError ); + } + + @Test + public void shouldReturnFirstError() + { + RuntimeException error1 = new RuntimeException( "Error 1" ); + IOException error2 = new IOException( "Error 2" ); + TimeoutException error3 = new TimeoutException( "Error 3" ); + ResultCursorsHolder holder = new ResultCursorsHolder(); + + holder.add( cursorWithoutError() ); + holder.add( cursorWithError( error1 ) ); + holder.add( cursorWithError( error2 ) ); + holder.add( cursorWithError( error3 ) ); + + assertEquals( error1, getBlocking( holder.retrieveNotConsumedError() ) ); + } + + private CompletionStage cursorWithoutError() + { + return cursorWithError( null ); + } + + private CompletionStage cursorWithError( Throwable error ) + { + InternalStatementResultCursor cursor = mock( InternalStatementResultCursor.class ); + when( cursor.failureAsync() ).thenReturn( completedFuture( error ) ); + return completedFuture( cursor ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index d7f44729ba..a30eab495a 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -149,9 +149,10 @@ public void shouldRunQueryWithMultipleResults() @Test public void shouldFailForIncorrectQuery() { + StatementResultCursor cursor = await( session.runAsync( "RETURN" ) ); try { - await( session.runAsync( "RETURN" ) ); + await( cursor.nextAsync() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -658,9 +659,10 @@ public void shouldRunAfterRunFailureToAcquireConnection() { neo4j.killDb(); + StatementResultCursor cursor1 = getBlocking( session.runAsync( "RETURN 42" ) ); try { - getBlocking( session.runAsync( "RETURN 42" ) ); + getBlocking( cursor1.nextAsync() ); fail( "Exception expected" ); } catch ( ServiceUnavailableException e ) @@ -670,8 +672,8 @@ public void shouldRunAfterRunFailureToAcquireConnection() neo4j.startDb(); - StatementResultCursor cursor = getBlocking( session.runAsync( "RETURN 42" ) ); - Record record = getBlocking( cursor.singleAsync() ); + StatementResultCursor cursor2 = getBlocking( session.runAsync( "RETURN 42" ) ); + Record record = getBlocking( cursor2.singleAsync() ); assertEquals( 42, record.get( 0 ).asInt() ); } @@ -703,9 +705,10 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection() { neo4j.killDb(); + StatementResultCursor cursor1 = await( session.runAsync( "RETURN 42" ) ); try { - getBlocking( session.runAsync( "RETURN 42" ) ); + getBlocking( cursor1.consumeAsync() ); fail( "Exception expected" ); } catch ( ServiceUnavailableException e ) @@ -716,8 +719,8 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection() neo4j.startDb(); Transaction tx = getBlocking( session.beginTransactionAsync() ); - StatementResultCursor cursor = getBlocking( tx.runAsync( "RETURN 42" ) ); - Record record = getBlocking( cursor.singleAsync() ); + StatementResultCursor cursor2 = getBlocking( tx.runAsync( "RETURN 42" ) ); + Record record = getBlocking( cursor2.singleAsync() ); assertEquals( 42, record.get( 0 ).asInt() ); assertNull( getBlocking( tx.rollbackAsync() ) ); } @@ -832,6 +835,158 @@ public CompletionStage execute( Transaction tx ) assertEquals( 1, countNodesByLabel( "MyNode" ) ); } + @Test + public void shouldPropagateRunFailureWhenClosed() + { + session.runAsync( "RETURN 10 / 0" ); + + try + { + getBlocking( session.closeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldPropagateBlockedRunFailureWhenClosed() + { + getBlocking( session.runAsync( "RETURN 10 / 0" ) ); + + try + { + getBlocking( session.closeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + + @Test + public void shouldPropagatePullAllFailureWhenClosed() + { + session.runAsync( "UNWIND range(20000, 0, -1) AS x RETURN 10 / x" ); + + try + { + getBlocking( session.closeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldPropagateBlockedPullAllFailureWhenClosed() + { + getBlocking( session.runAsync( "UNWIND range(20000, 0, -1) AS x RETURN 10 / x" ) ); + + try + { + getBlocking( session.closeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldCloseCleanlyWhenRunErrorConsumed() + { + StatementResultCursor cursor = getBlocking( session.runAsync( "SomeWrongQuery" ) ); + + try + { + getBlocking( cursor.consumeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), startsWith( "Invalid input" ) ); + } + + assertNull( getBlocking( session.closeAsync() ) ); + } + + @Test + public void shouldCloseCleanlyWhenPullAllErrorConsumed() + { + StatementResultCursor cursor = getBlocking( session.runAsync( "UNWIND range(10, 0, -1) AS x RETURN 1 / x" ) ); + + try + { + getBlocking( cursor.consumeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + + assertNull( getBlocking( session.closeAsync() ) ); + } + + @Test + public void shouldBePossibleToConsumeResultAfterSessionIsClosed() + { + CompletionStage cursorStage = session.runAsync( "UNWIND range(1, 20000) AS x RETURN x" ); + + getBlocking( session.closeAsync() ); + + StatementResultCursor cursor = getBlocking( cursorStage ); + List ints = getBlocking( cursor.listAsync( record -> record.get( 0 ).asInt() ) ); + assertEquals( 20000, ints.size() ); + } + + @Test + public void shouldPropagateFailureFromSummary() + { + StatementResultCursor cursor = getBlocking( session.runAsync( "RETURN Something" ) ); + + try + { + getBlocking( cursor.summaryAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "SyntaxError" ) ); + } + + assertNotNull( getBlocking( cursor.summaryAsync() ) ); + } + + @Test + public void shouldPropagateFailureInCloseFromPreviousRun() + { + session.runAsync( "CREATE ()" ); + session.runAsync( "CREATE ()" ); + session.runAsync( "CREATE ()" ); + session.runAsync( "RETURN invalid" ); + session.runAsync( "CREATE ()" ); + session.runAsync( "CREATE ()" ); + + try + { + getBlocking( session.closeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "SyntaxError" ) ); + } + } + private Future>> runNestedQueries( StatementResultCursor inputCursor ) { CompletableFuture>> resultFuture = new CompletableFuture<>(); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 20a8d71d42..ab0b36d9d3 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -120,8 +120,7 @@ public void tearDown() public void shouldKnowSessionIsClosed() throws Throwable { // Given - driver = newDriver(); - Session session = driver.session(); + Session session = neo4j.driver().session(); // When session.close(); @@ -162,13 +161,11 @@ public void shouldKillLongRunningStatement() throws Throwable { neo4j.ensureProcedures( "longRunningStatement.jar" ); // Given - driver = newDriver(); - int executionTimeout = 10; // 10s final int killTimeout = 1; // 1s long startTime = -1, endTime; - try ( Session session = driver.session() ) + try ( Session session = neo4j.driver().session() ) { StatementResult result = session.run( "CALL test.driver.longRunningStatement({seconds})", @@ -200,14 +197,12 @@ public void shouldKillLongStreamingResult() throws Throwable { neo4j.ensureProcedures( "longRunningStatement.jar" ); // Given - driver = newDriver(); - int executionTimeout = 10; // 10s final int killTimeout = 1; // 1s long startTime = -1, endTime; int recordCount = 0; - try ( final Session session = driver.session() ) + try ( final Session session = neo4j.driver().session() ) { StatementResult result = session.run( "CALL test.driver.longStreamingResult({seconds})", parameters( "seconds", executionTimeout ) ); @@ -242,9 +237,8 @@ public void shouldAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable { // Given neo4j.ensureProcedures( "longRunningStatement.jar" ); - driver = newDriver(); - try ( Session session = driver.session() ) + try ( Session session = neo4j.driver().session() ) { Transaction tx1 = session.beginTransaction(); @@ -273,9 +267,8 @@ public void shouldThrowExceptionOnCloseIfResetFailureIsNotConsumed() throws Thro { // Given neo4j.ensureProcedures( "longRunningStatement.jar" ); - driver = newDriver(); - Session session = driver.session(); + Session session = neo4j.driver().session(); session.run( "CALL test.driver.longRunningStatement({seconds})", parameters( "seconds", 10 ) ); Thread.sleep( 1000 ); @@ -294,9 +287,8 @@ public void shouldBeAbleToBeginTxAfterResetFailureIsConsumed() throws Throwable { // Given neo4j.ensureProcedures( "longRunningStatement.jar" ); - driver = newDriver(); - try ( Session session = driver.session() ) + try ( Session session = neo4j.driver().session() ) { Transaction tx = session.beginTransaction(); @@ -335,23 +327,19 @@ public void shouldBeAbleToBeginTxAfterResetFailureIsConsumed() throws Throwable @SuppressWarnings( "deprecation" ) private void resetSessionAfterTimeout( final Session session, final int timeout ) { - new Thread( new Runnable() + new Thread( () -> { - @Override - public void run() + try { - try - { - Thread.sleep( timeout * 1000 ); // let the statement executing for timeout seconds - } - catch ( InterruptedException e ) - { - e.printStackTrace(); - } - finally - { - session.reset(); // reset the session after timeout - } + Thread.sleep( timeout * 1000 ); // let the statement executing for timeout seconds + } + catch ( InterruptedException e ) + { + e.printStackTrace(); + } + finally + { + session.reset(); // reset the session after timeout } } ).start(); } @@ -361,8 +349,7 @@ public void run() public void shouldAllowMoreStatementAfterSessionReset() { // Given - try ( Driver driver = newDriver(); - Session session = driver.session() ) + try ( Session session = neo4j.driver().session() ) { session.run( "Return 1" ).consume(); @@ -380,8 +367,7 @@ public void shouldAllowMoreStatementAfterSessionReset() public void shouldAllowMoreTxAfterSessionReset() { // Given - try ( Driver driver = newDriver(); - Session session = driver.session() ) + try ( Session session = neo4j.driver().session() ) { try ( Transaction tx = session.beginTransaction() ) { @@ -406,8 +392,7 @@ public void shouldAllowMoreTxAfterSessionReset() public void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset() { // Given - try ( Driver driver = newDriver(); - Session session = driver.session() ) + try ( Session session = neo4j.driver().session() ) { try ( Transaction tx = session.beginTransaction() ) { @@ -429,8 +414,7 @@ public void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset() public void shouldAllowMoreTxAfterSessionResetInTx() { // Given - try ( Driver driver = newDriver(); - Session session = driver.session() ) + try ( Session session = neo4j.driver().session() ) { try ( Transaction tx = session.beginTransaction() ) { @@ -653,14 +637,7 @@ public void readTxCommittedWithoutTxSuccess() assumeBookmarkSupport( driver ); assertNull( session.lastBookmark() ); - long answer = session.readTransaction( new TransactionWork() - { - @Override - public Long execute( Transaction tx ) - { - return tx.run( "RETURN 42" ).single().get( 0 ).asLong(); - } - } ); + long answer = session.readTransaction( tx -> tx.run( "RETURN 42" ).single().get( 0 ).asLong() ); assertEquals( 42, answer ); // bookmark should be not-null after commit @@ -675,14 +652,8 @@ public void writeTxCommittedWithoutTxSuccess() { try ( Session session = driver.session() ) { - long answer = session.writeTransaction( new TransactionWork() - { - @Override - public Long execute( Transaction tx ) - { - return tx.run( "CREATE (:Person {name: 'Thor Odinson'}) RETURN 42" ).single().get( 0 ).asLong(); - } - } ); + long answer = session.writeTransaction( tx -> + tx.run( "CREATE (:Person {name: 'Thor Odinson'}) RETURN 42" ).single().get( 0 ).asLong() ); assertEquals( 42, answer ); } @@ -703,15 +674,11 @@ public void readTxRolledBackWithTxFailure() assumeBookmarkSupport( driver ); assertNull( session.lastBookmark() ); - long answer = session.readTransaction( new TransactionWork() + long answer = session.readTransaction( tx -> { - @Override - public Long execute( Transaction tx ) - { - StatementResult result = tx.run( "RETURN 42" ); - tx.failure(); - return result.single().get( 0 ).asLong(); - } + StatementResult result = tx.run( "RETURN 42" ); + tx.failure(); + return result.single().get( 0 ).asLong(); } ); assertEquals( 42, answer ); @@ -727,15 +694,11 @@ public void writeTxRolledBackWithTxFailure() { try ( Session session = driver.session() ) { - int answer = session.writeTransaction( new TransactionWork() + int answer = session.writeTransaction( tx -> { - @Override - public Integer execute( Transaction tx ) - { - tx.run( "CREATE (:Person {name: 'Natasha Romanoff'})" ); - tx.failure(); - return 42; - } + tx.run( "CREATE (:Person {name: 'Natasha Romanoff'})" ); + tx.failure(); + return 42; } ); assertEquals( 42, answer ); @@ -760,18 +723,14 @@ public void readTxRolledBackWhenExceptionIsThrown() try { - session.readTransaction( new TransactionWork() + session.readTransaction( tx -> { - @Override - public Long execute( Transaction tx ) + StatementResult result = tx.run( "RETURN 42" ); + if ( result.single().get( 0 ).asLong() == 42 ) { - StatementResult result = tx.run( "RETURN 42" ); - if ( result.single().get( 0 ).asLong() == 42 ) - { - throw new IllegalStateException(); - } - return 1L; + throw new IllegalStateException(); } + return 1L; } ); fail( "Exception expected" ); } @@ -794,14 +753,10 @@ public void writeTxRolledBackWhenExceptionIsThrown() { try { - session.writeTransaction( new TransactionWork() + session.writeTransaction( tx -> { - @Override - public Integer execute( Transaction tx ) - { - tx.run( "CREATE (:Person {name: 'Loki Odinson'})" ); - throw new IllegalStateException(); - } + tx.run( "CREATE (:Person {name: 'Loki Odinson'})" ); + throw new IllegalStateException(); } ); fail( "Exception expected" ); } @@ -828,16 +783,12 @@ public void readTxRolledBackWhenMarkedBothSuccessAndFailure() assumeBookmarkSupport( driver ); assertNull( session.lastBookmark() ); - long answer = session.readTransaction( new TransactionWork() + long answer = session.readTransaction( tx -> { - @Override - public Long execute( Transaction tx ) - { - StatementResult result = tx.run( "RETURN 42" ); - tx.success(); - tx.failure(); - return result.single().get( 0 ).asLong(); - } + StatementResult result = tx.run( "RETURN 42" ); + tx.success(); + tx.failure(); + return result.single().get( 0 ).asLong(); } ); assertEquals( 42, answer ); @@ -853,16 +804,12 @@ public void writeTxRolledBackWhenMarkedBothSuccessAndFailure() { try ( Session session = driver.session() ) { - int answer = session.writeTransaction( new TransactionWork() + int answer = session.writeTransaction( tx -> { - @Override - public Integer execute( Transaction tx ) - { - tx.run( "CREATE (:Person {name: 'Natasha Romanoff'})" ); - tx.success(); - tx.failure(); - return 42; - } + tx.run( "CREATE (:Person {name: 'Natasha Romanoff'})" ); + tx.success(); + tx.failure(); + return 42; } ); assertEquals( 42, answer ); @@ -887,15 +834,11 @@ public void readTxRolledBackWhenMarkedAsSuccessAndThrowsException() try { - session.readTransaction( new TransactionWork() + session.readTransaction( tx -> { - @Override - public Long execute( Transaction tx ) - { - tx.run( "RETURN 42" ); - tx.success(); - throw new IllegalStateException(); - } + tx.run( "RETURN 42" ); + tx.success(); + throw new IllegalStateException(); } ); fail( "Exception expected" ); } @@ -918,15 +861,11 @@ public void writeTxRolledBackWhenMarkedAsSuccessAndThrowsException() { try { - session.writeTransaction( new TransactionWork() + session.writeTransaction( tx -> { - @Override - public Integer execute( Transaction tx ) - { - tx.run( "CREATE (:Person {name: 'Natasha Romanoff'})" ); - tx.success(); - throw new IllegalStateException(); - } + tx.run( "CREATE (:Person {name: 'Natasha Romanoff'})" ); + tx.success(); + throw new IllegalStateException(); } ); fail( "Exception expected" ); } @@ -996,7 +935,7 @@ public void resetShouldStopWriteTransactionWaitingForALock() throws Exception testResetOfQueryWaitingForLock( new NodeIdUpdater() { @Override - public void performUpdate( Driver driver, final int nodeId, final int newNodeId, + public void performUpdate( Driver driver, int nodeId, int newNodeId, AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception { try ( Session session = driver.session() ) @@ -1004,16 +943,12 @@ public void performUpdate( Driver driver, final int nodeId, final int newNodeId, usedSessionRef.set( session ); latchToWait.await(); - session.writeTransaction( new TransactionWork() + session.writeTransaction( tx -> { - @Override - public Void execute( Transaction tx ) - { - invocationsOfWork.incrementAndGet(); - StatementResult result = updateNodeId( tx, nodeId, newNodeId ); - result.consume(); - return null; - } + invocationsOfWork.incrementAndGet(); + StatementResult result = updateNodeId( tx, nodeId, newNodeId ); + result.consume(); + return null; } ); } } @@ -1036,65 +971,54 @@ public void transactionRunShouldFailOnDeadlocks() throws Exception final CountDownLatch latch1 = new CountDownLatch( 1 ); final CountDownLatch latch2 = new CountDownLatch( 1 ); - try ( final Driver driver = newDriver() ) + Future result1 = executeInDifferentThread( () -> { - Future result1 = executeInDifferentThread( new Callable() + try ( Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction() ) { - @Override - public Void call() throws Exception - { - try ( Session session = driver.session(); - Transaction tx = session.beginTransaction() ) - { - // lock first node - updateNodeId( tx, nodeId1, newNodeId1 ).consume(); + // lock first node + updateNodeId( tx, nodeId1, newNodeId1 ).consume(); - latch1.await(); - latch2.countDown(); + latch1.await(); + latch2.countDown(); - // lock second node - updateNodeId( tx, nodeId2, newNodeId1 ).consume(); + // lock second node + updateNodeId( tx, nodeId2, newNodeId1 ).consume(); - tx.success(); - } - return null; - } - } ); + tx.success(); + } + return null; + } ); - Future result2 = executeInDifferentThread( new Callable() + Future result2 = executeInDifferentThread( () -> + { + try ( Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction() ) { - @Override - public Void call() throws Exception - { - try ( Session session = driver.session(); - Transaction tx = session.beginTransaction() ) - { - // lock second node - updateNodeId( tx, nodeId2, newNodeId2 ).consume(); + // lock second node + updateNodeId( tx, nodeId2, newNodeId2 ).consume(); - latch1.countDown(); - latch2.await(); + latch1.countDown(); + latch2.await(); - // lock first node - updateNodeId( tx, nodeId1, newNodeId2 ).consume(); + // lock first node + updateNodeId( tx, nodeId1, newNodeId2 ).consume(); - tx.success(); - } - return null; - } - } ); - - boolean firstResultFailed = assertOneOfTwoFuturesFailWithDeadlock( result1, result2 ); - if ( firstResultFailed ) - { - assertEquals( 0, countNodesWithId( newNodeId1 ) ); - assertEquals( 2, countNodesWithId( newNodeId2 ) ); - } - else - { - assertEquals( 2, countNodesWithId( newNodeId1 ) ); - assertEquals( 0, countNodesWithId( newNodeId2 ) ); + tx.success(); } + return null; + } ); + + boolean firstResultFailed = assertOneOfTwoFuturesFailWithDeadlock( result1, result2 ); + if ( firstResultFailed ) + { + assertEquals( 0, countNodesWithId( newNodeId1 ) ); + assertEquals( 2, countNodesWithId( newNodeId2 ) ); + } + else + { + assertEquals( 2, countNodesWithId( newNodeId1 ) ); + assertEquals( 0, countNodesWithId( newNodeId2 ) ); } } @@ -1113,95 +1037,80 @@ public void writeTransactionFunctionShouldRetryDeadlocks() throws Exception final CountDownLatch latch1 = new CountDownLatch( 1 ); final CountDownLatch latch2 = new CountDownLatch( 1 ); - try ( final Driver driver = newDriver() ) + Future result1 = executeInDifferentThread( () -> { - Future result1 = executeInDifferentThread( new Callable() + try ( Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction() ) { - @Override - public Void call() throws Exception - { - try ( Session session = driver.session(); - Transaction tx = session.beginTransaction() ) - { - // lock first node - updateNodeId( tx, nodeId1, newNodeId1 ).consume(); + // lock first node + updateNodeId( tx, nodeId1, newNodeId1 ).consume(); - latch1.await(); - latch2.countDown(); + latch1.await(); + latch2.countDown(); - // lock second node - updateNodeId( tx, nodeId2, newNodeId1 ).consume(); + // lock second node + updateNodeId( tx, nodeId2, newNodeId1 ).consume(); - tx.success(); - } - return null; - } - } ); + tx.success(); + } + return null; + } ); - Future result2 = executeInDifferentThread( new Callable() + Future result2 = executeInDifferentThread( () -> + { + try ( Session session = neo4j.driver().session() ) { - @Override - public Void call() throws Exception + session.writeTransaction( tx -> { - try ( Session session = driver.session() ) - { - session.writeTransaction( new TransactionWork() - { - @Override - public Void execute( Transaction tx ) - { - // lock second node - updateNodeId( tx, nodeId2, newNodeId2 ).consume(); + // lock second node + updateNodeId( tx, nodeId2, newNodeId2 ).consume(); - latch1.countDown(); - await( latch2 ); + latch1.countDown(); + await( latch2 ); - // lock first node - updateNodeId( tx, nodeId1, newNodeId2 ).consume(); + // lock first node + updateNodeId( tx, nodeId1, newNodeId2 ).consume(); - createNodeWithId( nodeId3 ); + createNodeWithId( nodeId3 ); - return null; - } - } ); - } return null; - } - } ); - - boolean firstResultFailed = false; - try - { - // first future may: - // 1) succeed, when it's tx was able to grab both locks and tx in other future was - // terminated because of a deadlock - // 2) fail, when it's tx was terminated because of a deadlock - assertNull( result1.get( 20, TimeUnit.SECONDS ) ); - } - catch ( ExecutionException e ) - { - firstResultFailed = true; + } ); } + return null; + } ); - // second future can't fail because deadlocks are retried - assertNull( result2.get( 20, TimeUnit.SECONDS ) ); + boolean firstResultFailed = false; + try + { + // first future may: + // 1) succeed, when it's tx was able to grab both locks and tx in other future was + // terminated because of a deadlock + // 2) fail, when it's tx was terminated because of a deadlock + assertNull( result1.get( 20, TimeUnit.SECONDS ) ); + } + catch ( ExecutionException e ) + { + firstResultFailed = true; + } - if ( firstResultFailed ) - { - // tx with retries was successful and updated ids - assertEquals( 0, countNodesWithId( newNodeId1 ) ); - assertEquals( 2, countNodesWithId( newNodeId2 ) ); - } - else - { - // tx without retries was successful and updated ids - // tx with retries did not manage to find nodes because their ids were updated - assertEquals( 2, countNodesWithId( newNodeId1 ) ); - assertEquals( 0, countNodesWithId( newNodeId2 ) ); - } - // tx with retries was successful and created an additional node - assertEquals( 1, countNodesWithId( nodeId3 ) ); + // second future can't fail because deadlocks are retried + assertNull( result2.get( 20, TimeUnit.SECONDS ) ); + + if ( firstResultFailed ) + { + // tx with retries was successful and updated ids + assertEquals( 0, countNodesWithId( newNodeId1 ) ); + assertEquals( 2, countNodesWithId( newNodeId2 ) ); + } + else + { + // tx without retries was successful and updated ids + // tx with retries did not manage to find nodes because their ids were updated + assertEquals( 2, countNodesWithId( newNodeId1 ) ); + assertEquals( 0, countNodesWithId( newNodeId2 ) ); } + // tx with retries was successful and created an additional node + assertEquals( 1, countNodesWithId( nodeId3 ) ); } @Test @@ -1210,8 +1119,7 @@ public void shouldExecuteTransactionWorkInCallerThread() int maxFailures = 3; Thread callerThread = Thread.currentThread(); - try ( Driver driver = newDriver(); - Session session = driver.session() ) + try ( Session session = neo4j.driver().session() ) { String result = session.readTransaction( new TransactionWork() { @@ -1233,6 +1141,119 @@ public String execute( Transaction tx ) } } + @Test + public void shouldPropagateRunFailureWhenClosed() + { + Session session = neo4j.driver().session(); + + session.run( "RETURN 10 / 0" ); + + try + { + session.close(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldPropagatePullAllFailureWhenClosed() + { + Session session = neo4j.driver().session(); + + session.run( "UNWIND range(20000, 0, -1) AS x RETURN 10 / x" ); + + try + { + session.close(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldBePossibleToConsumeResultAfterSessionIsClosed() + { + StatementResult result; + try ( Session session = neo4j.driver().session() ) + { + result = session.run( "UNWIND range(1, 20000) AS x RETURN x" ); + } + + List ints = result.list( record -> record.get( 0 ).asInt() ); + assertEquals( 20000, ints.size() ); + } + + @Test + public void shouldPropagateFailureFromSummary() + { + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( "RETURN Wrong" ); + + try + { + result.summary(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "SyntaxError" ) ); + } + + assertNotNull( result.summary() ); + } + } + + @Test + public void shouldThrowFromCloseWhenPreviousErrorNotConsumed() + { + Session session = neo4j.driver().session(); + + session.run( "CREATE ()" ); + session.run( "CREATE ()" ); + session.run( "RETURN 10 / 0" ); + session.run( "CREATE ()" ); + + try + { + session.close(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldCloseCleanlyWhenRunErrorConsumed() + { + Session session = neo4j.driver().session(); + + session.run( "CREATE ()" ); + + try + { + session.run( "RETURN 10 / 0" ).consume(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + session.run( "CREATE ()" ); + + session.close(); + assertFalse( session.isOpen() ); + } + private void assumeServerIs31OrLater() { ServerVersion serverVersion = ServerVersion.version( neo4j.driver() ); @@ -1254,19 +1275,15 @@ private void testExecuteReadTx( AccessMode sessionMode ) // read previously committed data try ( Session session = driver.session( sessionMode ) ) { - Set names = session.readTransaction( new TransactionWork>() + Set names = session.readTransaction( tx -> { - @Override - public Set execute( Transaction tx ) + List records = tx.run( "MATCH (p:Person) RETURN p.name AS name" ).list(); + Set names1 = new HashSet<>( records.size() ); + for ( Record record : records ) { - List records = tx.run( "MATCH (p:Person) RETURN p.name AS name" ).list(); - Set names = new HashSet<>( records.size() ); - for ( Record record : records ) - { - names.add( record.get( "name" ).asString() ); - } - return names; + names1.add( record.get( "name" ).asString() ); } + return names1; } ); assertThat( names, containsInAnyOrder( "Tony Stark", "Steve Rogers" ) ); @@ -1280,16 +1297,12 @@ private void testExecuteWriteTx( AccessMode sessionMode ) // write some test data try ( Session session = driver.session( sessionMode ) ) { - String material = session.writeTransaction( new TransactionWork() + String material = session.writeTransaction( tx -> { - @Override - public String execute( Transaction tx ) - { - StatementResult result = tx.run( "CREATE (s:Shield {material: 'Vibranium'}) RETURN s" ); - tx.success(); - Record record = result.single(); - return record.get( 0 ).asNode().get( "material" ).asString(); - } + StatementResult result = tx.run( "CREATE (s:Shield {material: 'Vibranium'}) RETURN s" ); + tx.success(); + Record record = result.single(); + return record.get( 0 ).asNode().get( "material" ).asString(); } ); assertEquals( "Vibranium", material ); @@ -1311,17 +1324,13 @@ private void testTxRollbackWhenFunctionThrows( AccessMode sessionMode ) { try { - session.writeTransaction( new TransactionWork() + session.writeTransaction( tx -> { - @Override - public Void execute( Transaction tx ) - { - tx.run( "CREATE (:Person {name: 'Thanos'})" ); - // trigger division by zero error: - tx.run( "UNWIND range(0, 1) AS i RETURN 10/i" ); - tx.success(); - return null; - } + tx.run( "CREATE (:Person {name: 'Thanos'})" ); + // trigger division by zero error: + tx.run( "UNWIND range(0, 1) AS i RETURN 10/i" ); + tx.success(); + return null; } ); fail( "Exception expected" ); } @@ -1337,6 +1346,7 @@ public Void execute( Transaction tx ) Record record = session.run( "MATCH (p:Person {name: 'Thanos'}) RETURN count(p)" ).single(); assertEquals( 0, record.get( 0 ).asInt() ); } + } @SuppressWarnings( "deprecation" ) @@ -1351,11 +1361,10 @@ private void testResetOfQueryWaitingForLock( NodeIdUpdater nodeIdUpdater ) throw CountDownLatch nodeLocked = new CountDownLatch( 1 ); AtomicReference otherSessionRef = new AtomicReference<>(); - try ( Driver driver = newDriver(); - Session session = driver.session(); + try ( Session session = neo4j.driver().session(); Transaction tx = session.beginTransaction() ) { - Future txResult = nodeIdUpdater.update( driver, nodeId, newNodeId1, otherSessionRef, nodeLocked ); + Future txResult = nodeIdUpdater.update( nodeId, newNodeId1, otherSessionRef, nodeLocked ); StatementResult result = updateNodeId( tx, nodeId, newNodeId2 ); result.consume(); @@ -1390,11 +1399,6 @@ private Driver newDriverWithFixedRetries( int maxRetriesCount ) return driverFactory.newInstance( neo4j.uri(), auth, routingConf, RetrySettings.DEFAULT, noLoggingConfig() ); } - private Driver newDriver() - { - return GraphDatabase.driver( neo4j.uri(), neo4j.authToken(), noLoggingConfig() ); - } - private Driver newDriverWithLimitedRetries( int maxTxRetryTime, TimeUnit unit ) { Config config = Config.build() @@ -1515,17 +1519,13 @@ private static void await( CountDownLatch latch ) private abstract class NodeIdUpdater { - final Future update( final Driver driver, final int nodeId, final int newNodeId, - final AtomicReference usedSessionRef, final CountDownLatch latchToWait ) + final Future update( int nodeId, int newNodeId, AtomicReference usedSessionRef, + CountDownLatch latchToWait ) { - return executeInDifferentThread( new Callable() + return executeInDifferentThread( () -> { - @Override - public Void call() throws Exception - { - performUpdate( driver, nodeId, newNodeId, usedSessionRef, latchToWait ); - return null; - } + performUpdate( neo4j.driver(), nodeId, newNodeId, usedSessionRef, latchToWait ); + return null; } ); } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index ab544dde1f..dc908e7b8c 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -227,9 +227,10 @@ public void shouldFailToCommitAfterSingleWrongStatement() { Transaction tx = await( session.beginTransactionAsync() ); + StatementResultCursor cursor = await( tx.runAsync( "RETURN" ) ); try { - await( tx.runAsync( "RETURN" ) ); + await( cursor.consumeAsync() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -253,9 +254,10 @@ public void shouldAllowRollbackAfterSingleWrongStatement() { Transaction tx = await( session.beginTransactionAsync() ); + StatementResultCursor cursor = await( tx.runAsync( "RETURN" ) ); try { - await( tx.runAsync( "RETURN" ) ); + await( cursor.nextAsync() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -281,9 +283,10 @@ public void shouldFailToCommitAfterCoupleCorrectAndSingleWrongStatement() assertNotNull( record2 ); assertEquals( 42, record2.get( 0 ).asInt() ); + StatementResultCursor cursor3 = await( tx.runAsync( "RETURN" ) ); try { - await( tx.runAsync( "RETURN" ) ); + await( cursor3.consumeAsync() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -317,9 +320,10 @@ public void shouldAllowRollbackAfterCoupleCorrectAndSingleWrongStatement() assertNotNull( record2 ); assertEquals( 42, record2.get( 0 ).asInt() ); + StatementResultCursor cursor3 = await( tx.runAsync( "RETURN" ) ); try { - await( tx.runAsync( "RETURN" ) ); + await( cursor3.summaryAsync() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -335,9 +339,10 @@ public void shouldNotAllowNewStatementsAfterAnIncorrectStatement() { Transaction tx = await( session.beginTransactionAsync() ); + StatementResultCursor cursor = await( tx.runAsync( "RETURN" ) ); try { - await( tx.runAsync( "RETURN" ) ); + await( cursor.nextAsync() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -954,6 +959,283 @@ public void shouldUpdateSessionBookmarkAfterCommit() assertNotEquals( bookmarkBefore, bookmarkAfter ); } + @Test + public void shouldFailToCommitWhenQueriesFailAndErrorNotConsumed() throws InterruptedException + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + tx.runAsync( "CREATE (:TestNode)" ); + tx.runAsync( "CREATE (:TestNode)" ); + tx.runAsync( "RETURN 10 / 0" ); + tx.runAsync( "CREATE (:TestNode)" ); + + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertEquals( "/ by zero", e.getMessage() ); + } + } + + @Test + public void shouldPropagateRunFailureFromCommit() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + tx.runAsync( "RETURN ILLEGAL" ); + + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "ILLEGAL" ) ); + } + } + + @Test + public void shouldPropagateBlockedRunFailureFromCommit() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + getBlocking( tx.runAsync( "RETURN 42 / 0" ) ); + + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldPropagateRunFailureFromRollback() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + tx.runAsync( "RETURN ILLEGAL" ); + + try + { + getBlocking( tx.rollbackAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "ILLEGAL" ) ); + } + } + + @Test + public void shouldPropagateBlockedRunFailureFromRollback() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + getBlocking( tx.runAsync( "RETURN 42 / 0" ) ); + + try + { + getBlocking( tx.rollbackAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldPropagatePullAllFailureFromCommit() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + tx.runAsync( "UNWIND [1, 2, 3, 'Hi'] AS x RETURN 10 / x" ); + + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "TypeError" ) ); + } + } + + @Test + public void shouldPropagateBlockedPullAllFailureFromCommit() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + getBlocking( tx.runAsync( "UNWIND [1, 2, 3, 'Hi'] AS x RETURN 10 / x" ) ); + + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "TypeError" ) ); + } + } + + @Test + public void shouldPropagatePullAllFailureFromRollback() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + tx.runAsync( "UNWIND [1, 2, 3, 'Hi'] AS x RETURN 10 / x" ); + + try + { + getBlocking( tx.rollbackAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "TypeError" ) ); + } + } + + @Test + public void shouldPropagateBlockedPullAllFailureFromRollback() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + getBlocking( tx.runAsync( "UNWIND [1, 2, 3, 'Hi'] AS x RETURN 10 / x" ) ); + + try + { + getBlocking( tx.rollbackAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "TypeError" ) ); + } + } + + @Test + public void shouldFailToCommitWhenRunFailureIsConsumed() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + StatementResultCursor cursor = getBlocking( tx.runAsync( "RETURN Wrong" ) ); + + try + { + getBlocking( cursor.consumeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "SyntaxError" ) ); + } + + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), startsWith( "Transaction rolled back" ) ); + } + } + + @Test + public void shouldFailToCommitWhenPullAllFailureIsConsumed() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + StatementResultCursor cursor = getBlocking( tx.runAsync( + "FOREACH (value IN [1,2, 'aaa'] | CREATE (:Person {name: 10 / value}))" ) ); + + try + { + getBlocking( cursor.consumeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "TypeError" ) ); + } + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), startsWith( "Transaction rolled back" ) ); + } + } + + @Test + public void shouldRollbackWhenRunFailureIsConsumed() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + StatementResultCursor cursor = getBlocking( tx.runAsync( "RETURN Wrong" ) ); + + try + { + getBlocking( cursor.consumeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "SyntaxError" ) ); + } + + assertNull( getBlocking( tx.rollbackAsync() ) ); + } + + @Test + public void shouldRollbackWhenPullAllFailureIsConsumed() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + StatementResultCursor cursor = getBlocking( tx.runAsync( "UNWIND [1, 0] AS x RETURN 5 / x" ) ); + + try + { + getBlocking( cursor.consumeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + + assertNull( getBlocking( tx.rollbackAsync() ) ); + } + + @Test + public void shouldPropagateFailureFromSummary() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + StatementResultCursor cursor = getBlocking( tx.runAsync( "RETURN Wrong" ) ); + + try + { + getBlocking( cursor.summaryAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "SyntaxError" ) ); + } + + assertNotNull( getBlocking( cursor.summaryAsync() ) ); + } + private int countNodes( Object id ) { StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java index f058485b3a..4f60697008 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java @@ -18,7 +18,6 @@ */ package org.neo4j.driver.v1.integration; -import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -35,9 +34,11 @@ import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.util.TestNeo4jSession; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -357,9 +358,29 @@ public void shouldRollBackTxIfErrorWithConsume() throws Throwable StatementResult cursor = tx.run( "RETURN 1" ); int val = cursor.single().get( "1" ).asInt(); - Assert.assertThat( val, equalTo( 1 ) ); + assertThat( val, equalTo( 1 ) ); } } + } + + @Test + public void shouldPropagateFailureFromSummary() + { + try ( Transaction tx = session.beginTransaction() ) + { + StatementResult result = tx.run( "RETURN Wrong" ); + try + { + result.summary(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "SyntaxError" ) ); + } + + assertNotNull( result.summary() ); + } } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQuery.java b/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQuery.java index cc3bfe03e8..a6dc8f64d5 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQuery.java +++ b/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQuery.java @@ -24,11 +24,15 @@ import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResultCursor; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.Neo4jException; -import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; -import static org.neo4j.driver.internal.util.Matchers.syntaxError; public class AsyncWrongQuery extends AbstractAsyncQuery { @@ -42,15 +46,19 @@ public CompletionStage execute( C context ) { Session session = newSession( AccessMode.READ, context ); - return session.runAsync( "RETURN" ).handle( ( cursor, error ) -> - { - session.closeAsync(); + return session.runAsync( "RETURN Wrong" ) + .thenCompose( StatementResultCursor::nextAsync ) + .handle( ( record, error ) -> + { + session.closeAsync(); + assertNull( record ); - assertNull( cursor ); - Throwable cause = Futures.completionErrorCause( error ); - assertThat( cause, is( syntaxError( "Unexpected end of input" ) ) ); + Throwable cause = Futures.completionErrorCause( error ); + assertNotNull( cause ); + assertThat( cause, instanceOf( ClientException.class ) ); + assertThat( ((Neo4jException) cause).code(), containsString( "SyntaxError" ) ); - return null; - } ); + return null; + } ); } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQueryInTx.java b/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQueryInTx.java index 917d6aa8b2..ae2ea4f04d 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQueryInTx.java +++ b/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQueryInTx.java @@ -24,12 +24,16 @@ import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResultCursor; import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.Neo4jException; -import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; -import static org.neo4j.driver.internal.util.Matchers.syntaxError; public class AsyncWrongQueryInTx extends AbstractAsyncQuery { @@ -44,14 +48,19 @@ public CompletionStage execute( C context ) Session session = newSession( AccessMode.READ, context ); return session.beginTransactionAsync() - .thenCompose( tx -> tx.runAsync( "RETURN" ).handle( ( cursor, error ) -> - { - assertNull( cursor ); - Throwable cause = Futures.completionErrorCause( error ); - assertThat( cause, is( syntaxError( "Unexpected end of input" ) ) ); - - return tx; - } ) ) + .thenCompose( tx -> tx.runAsync( "RETURN Wrong" ) + .thenCompose( StatementResultCursor::nextAsync ) + .handle( ( record, error ) -> + { + assertNull( record ); + + Throwable cause = Futures.completionErrorCause( error ); + assertNotNull( cause ); + assertThat( cause, instanceOf( ClientException.class ) ); + assertThat( ((Neo4jException) cause).code(), containsString( "SyntaxError" ) ); + + return tx; + } ) ) .thenCompose( Transaction::rollbackAsync ) .whenComplete( ( ignore, error ) -> session.closeAsync() ); }