From c4328b80fdc017ebbfd55315641c1b19755cdbbb Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Tue, 17 Aug 2021 13:12:50 +0100 Subject: [PATCH] Add Async Testkit Backend support This update brings async Testkit backend support that uses async driver for testing and is compatible with the current Testkit protocol. This allows async driver testing using the same Testkit tests transparently. Not all requests are implemented in this PR. For instance, the resolver implementation will be added separately. The current update enables a substantial amount of tests to be executed. In addition, a separate PR is planned to migrate the sync backend to the new Netty based implementation. --- testkit-backend/pom.xml | 4 + .../testkit/backend/AsyncBackendServer.java | 63 +++++++++ .../testkit/backend/AsyncSessionState.java | 39 ++++++ .../org/testkit/backend/BackendServer.java | 75 +++++++++++ .../neo4j/org/testkit/backend/Runner.java | 49 +------ .../org/testkit/backend/TestkitState.java | 5 + .../handler/TestkitMessageInboundHandler.java | 81 ++++++++++++ .../TestkitMessageOutboundHandler.java | 39 ++++++ .../TestkitRequestProcessorHandler.java | 121 ++++++++++++++++++ .../TestkitRequestResponseMapperHandler.java | 66 ++++++++++ .../requests/CheckMultiDBSupport.java | 23 +++- .../DomainNameResolutionCompleted.java | 8 ++ .../messages/requests/DriverClose.java | 17 +++ .../messages/requests/GetFeatures.java | 26 +++- .../messages/requests/GetRoutingTable.java | 9 ++ .../backend/messages/requests/NewDriver.java | 8 ++ .../backend/messages/requests/NewSession.java | 34 ++++- .../requests/ResolverResolutionCompleted.java | 8 ++ .../messages/requests/ResultConsume.java | 38 ++++-- .../backend/messages/requests/ResultNext.java | 20 ++- .../messages/requests/RetryableNegative.java | 22 ++++ .../messages/requests/RetryablePositive.java | 11 ++ .../requests/SessionBeginTransaction.java | 33 +++++ .../messages/requests/SessionClose.java | 17 +++ .../requests/SessionLastBookmarks.java | 16 ++- .../requests/SessionReadTransaction.java | 28 +++- .../backend/messages/requests/SessionRun.java | 23 ++++ .../requests/SessionWriteTransaction.java | 27 ++++ .../backend/messages/requests/StartTest.java | 38 ++++++ .../messages/requests/TestkitRequest.java | 5 + .../messages/requests/TransactionClose.java | 13 +- .../messages/requests/TransactionCommit.java | 16 ++- .../requests/TransactionRollback.java | 16 ++- .../messages/requests/TransactionRun.java | 23 +++- .../messages/requests/VerifyConnectivity.java | 18 +++ .../backend/messages/responses/SkipTest.java | 45 +++++++ testkit-tests/pom.xml | 34 ++++- testkit/backend.py | 2 +- 38 files changed, 1037 insertions(+), 83 deletions(-) create mode 100644 testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncBackendServer.java create mode 100644 testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncSessionState.java create mode 100644 testkit-backend/src/main/java/neo4j/org/testkit/backend/BackendServer.java create mode 100644 testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitMessageInboundHandler.java create mode 100644 testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitMessageOutboundHandler.java create mode 100644 testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java create mode 100644 testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java create mode 100644 testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/SkipTest.java diff --git a/testkit-backend/pom.xml b/testkit-backend/pom.xml index 37a584d137..e6a7f1a276 100644 --- a/testkit-backend/pom.xml +++ b/testkit-backend/pom.xml @@ -25,6 +25,10 @@ neo4j-java-driver ${project.version} + + io.netty + netty-handler + com.fasterxml.jackson.core jackson-core diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncBackendServer.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncBackendServer.java new file mode 100644 index 0000000000..495624e5a0 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncBackendServer.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import neo4j.org.testkit.backend.channel.handler.TestkitMessageInboundHandler; +import neo4j.org.testkit.backend.channel.handler.TestkitMessageOutboundHandler; +import neo4j.org.testkit.backend.channel.handler.TestkitRequestProcessorHandler; +import neo4j.org.testkit.backend.channel.handler.TestkitRequestResponseMapperHandler; + +public class AsyncBackendServer +{ + public void run() throws InterruptedException + { + EventLoopGroup group = new NioEventLoopGroup(); + try + { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group( group ) + .channel( NioServerSocketChannel.class ) + .localAddress( 9876 ) + .childHandler( new ChannelInitializer() + { + @Override + protected void initChannel( SocketChannel channel ) + { + channel.pipeline().addLast( new TestkitMessageInboundHandler() ); + channel.pipeline().addLast( new TestkitMessageOutboundHandler() ); + channel.pipeline().addLast( new TestkitRequestResponseMapperHandler() ); + channel.pipeline().addLast( new TestkitRequestProcessorHandler() ); + } + } ); + ChannelFuture server = bootstrap.bind().sync(); + server.channel().closeFuture().sync(); + } + finally + { + group.shutdownGracefully().sync(); + } + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncSessionState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncSessionState.java new file mode 100644 index 0000000000..2cde77bef6 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncSessionState.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend; + +import lombok.Getter; +import lombok.Setter; + +import java.util.concurrent.CompletableFuture; + +import org.neo4j.driver.async.AsyncSession; + +@Getter +@Setter +public class AsyncSessionState +{ + public AsyncSession session; + public CompletableFuture txWorkFuture; + + public AsyncSessionState( AsyncSession session ) + { + this.session = session; + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/BackendServer.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/BackendServer.java new file mode 100644 index 0000000000..d35f955705 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/BackendServer.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.CompletableFuture; + +public class BackendServer +{ + public void run() throws IOException + { + ServerSocket serverSocket = new ServerSocket( 9876 ); + + System.out.println( "Java TestKit Backend Started on port: " + serverSocket.getLocalPort() ); + + while ( true ) + { + final Socket clientSocket = serverSocket.accept(); + CompletableFuture.runAsync( () -> handleClient( clientSocket ) ); + } + } + + private void handleClient( Socket clientSocket ) + { + try + { + System.out.println( "Handling connection from: " + clientSocket.getRemoteSocketAddress() ); + BufferedReader in = new BufferedReader( new InputStreamReader( clientSocket.getInputStream() ) ); + BufferedWriter out = new BufferedWriter( new OutputStreamWriter( clientSocket.getOutputStream() ) ); + CommandProcessor commandProcessor = new CommandProcessor( in, out ); + + boolean cont = true; + while ( cont ) + { + try + { + cont = commandProcessor.process(); + } + catch ( Exception e ) + { + e.printStackTrace(); + clientSocket.close(); + cont = false; + } + } + } + catch ( IOException ex ) + { + throw new UncheckedIOException( ex ); + } + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java index faf6b53df9..2c60fb8cd2 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java @@ -18,58 +18,19 @@ */ package neo4j.org.testkit.backend; -import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.UncheckedIOException; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.concurrent.CompletableFuture; public class Runner { - public static void main( String[] args ) throws IOException + public static void main( String[] args ) throws IOException, InterruptedException { - ServerSocket serverSocket = new ServerSocket( 9876 ); - - System.out.println( "Java TestKit Backend Started on port: " + serverSocket.getLocalPort() ); - - while ( true ) - { - final Socket clientSocket = serverSocket.accept(); - CompletableFuture.runAsync( () -> handleClient( clientSocket ) ); - } - } - - private static void handleClient( Socket clientSocket ) - { - try + if ( args.length > 0 && args[0].equals( "async" ) ) { - System.out.println( "Handling connection from: " + clientSocket.getRemoteSocketAddress() ); - BufferedReader in = new BufferedReader( new InputStreamReader( clientSocket.getInputStream() ) ); - BufferedWriter out = new BufferedWriter( new OutputStreamWriter( clientSocket.getOutputStream() ) ); - CommandProcessor commandProcessor = new CommandProcessor( in, out ); - - boolean cont = true; - while ( cont ) - { - try - { - cont = commandProcessor.process(); - } - catch ( Exception e ) - { - e.printStackTrace(); - clientSocket.close(); - cont = false; - } - } + new AsyncBackendServer().run(); } - catch ( IOException ex ) + else { - throw new UncheckedIOException( ex ); + new BackendServer().run(); } } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java index b42c72cfd9..2d14850180 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java @@ -31,6 +31,8 @@ import org.neo4j.driver.Driver; import org.neo4j.driver.Result; import org.neo4j.driver.Transaction; +import org.neo4j.driver.async.AsyncTransaction; +import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.exceptions.Neo4jException; import org.neo4j.driver.internal.cluster.RoutingTableRegistry; import org.neo4j.driver.net.ServerAddress; @@ -41,8 +43,11 @@ public class TestkitState private final Map drivers = new HashMap<>(); private final Map routingTableRegistry = new HashMap<>(); private final Map sessionStates = new HashMap<>(); + private final Map asyncSessionStates = new HashMap<>(); private final Map results = new HashMap<>(); + private final Map resultCursors = new HashMap<>(); private final Map transactions = new HashMap<>(); + private final Map asyncTransactions = new HashMap<>(); private final Map errors = new HashMap<>(); private int idGenerator = 0; private final Consumer responseWriter; diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitMessageInboundHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitMessageInboundHandler.java new file mode 100644 index 0000000000..f1c06a815c --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitMessageInboundHandler.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.channel.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.CharsetUtil; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class TestkitMessageInboundHandler extends SimpleChannelInboundHandler +{ + private final StringBuilder requestBuffer = new StringBuilder(); + + @Override + public void channelRead0( ChannelHandlerContext ctx, ByteBuf byteBuf ) + { + String requestStr = byteBuf.toString( CharsetUtil.UTF_8 ); + requestBuffer.append( requestStr ); + + List testkitMessages = new ArrayList<>(); + Optional testkitMessageOpt = extractTestkitMessage(); + while ( testkitMessageOpt.isPresent() ) + { + testkitMessages.add( testkitMessageOpt.get() ); + testkitMessageOpt = extractTestkitMessage(); + } + + testkitMessages.forEach( ctx::fireChannelRead ); + } + + private Optional extractTestkitMessage() + { + String requestEndMarker = "#request end\n"; + int endMarkerIndex = requestBuffer.indexOf( requestEndMarker ); + if ( endMarkerIndex < 0 ) + { + return Optional.empty(); + } + String requestBeginMarker = "#request begin\n"; + int beginMarkerIndex = requestBuffer.indexOf( requestBeginMarker ); + if ( beginMarkerIndex != 0 ) + { + throw new RuntimeException( "Unexpected data in message buffer" ); + } + // extract Testkit message without markers + String testkitMessage = requestBuffer.substring( requestBeginMarker.length(), endMarkerIndex ); + if ( testkitMessage.contains( requestBeginMarker ) || testkitMessage.contains( requestEndMarker ) ) + { + throw new RuntimeException( "Testkit message contains request markers" ); + } + // remove Testkit message from buffer + requestBuffer.delete( 0, endMarkerIndex + requestEndMarker.length() + 1 ); + return Optional.of( testkitMessage ); + } + + @Override + public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) + { + ctx.close(); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitMessageOutboundHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitMessageOutboundHandler.java new file mode 100644 index 0000000000..905b69f754 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitMessageOutboundHandler.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.channel.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; + +import java.nio.charset.StandardCharsets; + +public class TestkitMessageOutboundHandler extends ChannelOutboundHandlerAdapter +{ + @Override + public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) + { + String testkitResponseStr = (String) msg; + String testkitMessage = String.format( "#response begin\n%s\n#response end\n", testkitResponseStr ); + ByteBuf byteBuf = Unpooled.copiedBuffer( testkitMessage, StandardCharsets.UTF_8 ); + ctx.writeAndFlush( byteBuf, promise ); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java new file mode 100644 index 0000000000..ccac59c4fb --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java @@ -0,0 +1,121 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.channel.handler; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.messages.requests.TestkitRequest; +import neo4j.org.testkit.backend.messages.responses.BackendError; +import neo4j.org.testkit.backend.messages.responses.DriverError; +import neo4j.org.testkit.backend.messages.responses.TestkitResponse; + +import java.util.concurrent.CompletionException; + +import org.neo4j.driver.exceptions.Neo4jException; +import org.neo4j.driver.exceptions.UntrustedServerException; +import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl; + +public class TestkitRequestProcessorHandler extends ChannelInboundHandlerAdapter +{ + private final TestkitState testkitState = new TestkitState( this::writeAndFlush, () -> true ); + private Channel channel; + + @Override + public void channelRegistered( ChannelHandlerContext ctx ) throws Exception + { + channel = ctx.channel(); + super.channelRegistered( ctx ); + } + + @Override + public void channelRead( ChannelHandlerContext ctx, Object msg ) + { + TestkitRequest testkitRequest = (TestkitRequest) msg; + try + { + testkitRequest.processAsync( testkitState ) + .thenAccept( responseOpt -> responseOpt.ifPresent( ctx::writeAndFlush ) ) + .exceptionally( throwable -> + { + ctx.writeAndFlush( createErrorResponse( throwable ) ); + return null; + } ); + } + catch ( Throwable throwable ) + { + ctx.writeAndFlush( createErrorResponse( throwable ) ); + } + } + + private TestkitResponse createErrorResponse( Throwable throwable ) + { + if ( throwable instanceof CompletionException ) + { + throwable = throwable.getCause(); + } + if ( throwable instanceof Neo4jException ) + { + String id = testkitState.newId(); + Neo4jException e = (Neo4jException) throwable; + testkitState.getErrors().put( id, e ); + return DriverError.builder() + .data( DriverError.DriverErrorBody.builder() + .id( id ) + .errorType( e.getClass().getName() ) + .code( e.code() ) + .msg( e.getMessage() ) + .build() ) + .build(); + } + else if ( isConnectionPoolClosedException( throwable ) || throwable instanceof UntrustedServerException ) + { + String id = testkitState.newId(); + return DriverError.builder() + .data( + DriverError.DriverErrorBody.builder() + .id( id ) + .errorType( throwable.getClass().getName() ) + .msg( throwable.getMessage() ) + .build() + ) + .build(); + } + else + { + return BackendError.builder().data( BackendError.BackendErrorBody.builder().msg( throwable.toString() ).build() ).build(); + } + } + + private boolean isConnectionPoolClosedException( Throwable throwable ) + { + return throwable instanceof IllegalStateException && throwable.getMessage() != null && + throwable.getMessage().equals( ConnectionPoolImpl.CONNECTION_POOL_CLOSED_ERROR_MESSAGE ); + } + + private void writeAndFlush( TestkitResponse response ) + { + if ( channel == null ) + { + throw new IllegalStateException( "Called before channel is initialized" ); + } + channel.writeAndFlush( response ); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java new file mode 100644 index 0000000000..f627bc546b --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.channel.handler; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import neo4j.org.testkit.backend.messages.TestkitModule; +import neo4j.org.testkit.backend.messages.requests.TestkitRequest; +import neo4j.org.testkit.backend.messages.responses.TestkitResponse; + +public class TestkitRequestResponseMapperHandler extends ChannelDuplexHandler +{ + private final ObjectMapper objectMapper; + + public TestkitRequestResponseMapperHandler() + { + objectMapper = new ObjectMapper(); + TestkitModule testkitModule = new TestkitModule(); + this.objectMapper.registerModule( testkitModule ); + this.objectMapper.disable( DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES ); + } + + @Override + public void channelRead( ChannelHandlerContext ctx, Object msg ) + { + String testkitMessage = (String) msg; + TestkitRequest testkitRequest; + try + { + testkitRequest = objectMapper.readValue( testkitMessage, TestkitRequest.class ); + } + catch ( JsonProcessingException e ) + { + throw new RuntimeException( "Failed to deserialize Testkit message", e ); + } + ctx.fireChannelRead( testkitRequest ); + } + + @Override + public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) throws Exception + { + TestkitResponse testkitResponse = (TestkitResponse) msg; + String responseStr = objectMapper.writeValueAsString( testkitResponse ); + ctx.writeAndFlush( responseStr, promise ); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java index e16baa9523..9e75b62845 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java @@ -25,6 +25,9 @@ import neo4j.org.testkit.backend.messages.responses.MultiDBSupport; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + @Setter @Getter @NoArgsConstructor @@ -37,7 +40,25 @@ public TestkitResponse process( TestkitState testkitState ) { String driverId = data.getDriverId(); boolean available = testkitState.getDrivers().get( driverId ).supportsMultiDb(); - return MultiDBSupport.builder().data( MultiDBSupport.MultiDBSupportBody.builder().available( available ).build() ).build(); + return createResponse( available ); + } + + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getDrivers().get( data.getDriverId() ) + .supportsMultiDbAsync() + .thenApply( this::createResponse ) + .thenApply( Optional::of ); + } + + private MultiDBSupport createResponse( boolean available ) + { + return MultiDBSupport.builder() + .data( MultiDBSupport.MultiDBSupportBody.builder() + .available( available ) + .build() ) + .build(); } @Setter diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java index b5605a64d4..cde9a111b3 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java @@ -27,6 +27,8 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletionStage; @Setter @Getter @@ -58,6 +60,12 @@ public TestkitResponse process( TestkitState testkitState ) return null; } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + throw new UnsupportedOperationException(); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java index f4906e98c3..16a4a0dfe4 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java @@ -25,6 +25,9 @@ import neo4j.org.testkit.backend.messages.responses.Driver; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + @Setter @Getter @NoArgsConstructor @@ -36,6 +39,20 @@ public class DriverClose implements TestkitRequest public TestkitResponse process( TestkitState testkitState ) { testkitState.getDrivers().get( data.getDriverId() ).close(); + return createResponse(); + } + + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getDrivers().get( data.getDriverId() ) + .closeAsync() + .thenApply( ignored -> createResponse() ) + .thenApply( Optional::of ); + } + + private Driver createResponse() + { return Driver.builder().data( Driver.DriverBody.builder().id( data.getDriverId() ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java index 211f07f665..c25313fb22 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java @@ -26,26 +26,46 @@ import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; @Setter @Getter @NoArgsConstructor public class GetFeatures implements TestkitRequest { - private static final Set FEATURES = new HashSet<>( Arrays.asList( + private static final Set COMMON_FEATURES = new HashSet<>( Arrays.asList( "AuthorizationExpiredTreatment", "Optimization:PullPipelining", "ConfHint:connection.recv_timeout_seconds", - "Temporary:TransactionClose", "Temporary:DriverFetchSize", "Temporary:DriverMaxTxRetryTime" ) ); + private static final Set SYNC_FEATURES = new HashSet<>( Collections.singletonList( + "Temporary:TransactionClose" + ) ); + @Override public TestkitResponse process( TestkitState testkitState ) { - return FeatureList.builder().data( FeatureList.FeatureListBody.builder().features( FEATURES ).build() ).build(); + Set features = new HashSet<>( COMMON_FEATURES ); + features.addAll( SYNC_FEATURES ); + return createResponse( features ); + } + + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return CompletableFuture.completedFuture( Optional.of( createResponse( COMMON_FEATURES ) ) ); + } + + private FeatureList createResponse( Set features ) + { + return FeatureList.builder().data( FeatureList.FeatureListBody.builder().features( features ).build() ).build(); } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java index 64457bf42f..9327a53895 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java @@ -27,6 +27,9 @@ import java.util.Arrays; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.stream.Collectors; @@ -75,6 +78,12 @@ public TestkitResponse process( TestkitState testkitState ) ).build(); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return CompletableFuture.completedFuture( Optional.of( process( testkitState ) ) ); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java index 14bca56974..86ab421d19 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java @@ -32,6 +32,8 @@ import java.net.URI; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import org.neo4j.driver.AuthToken; @@ -106,6 +108,12 @@ public TestkitResponse process( TestkitState testkitState ) return Driver.builder().data( Driver.DriverBody.builder().id( id ).build() ).build(); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return CompletableFuture.completedFuture( Optional.of( process( testkitState ) ) ); + } + private ServerAddressResolver callbackResolver( TestkitState testkitState ) { return address -> diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java index b9bcd26874..4b4089f1e9 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java @@ -21,13 +21,18 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import neo4j.org.testkit.backend.AsyncSessionState; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.Session; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.BiFunction; import java.util.stream.Collectors; import org.neo4j.driver.AccessMode; @@ -44,6 +49,19 @@ public class NewSession implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) + { + return createSessionStateAndResponse( testkitState, this::createSessionState, testkitState.getSessionStates() ); + } + + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return CompletableFuture.completedFuture( + Optional.of( createSessionStateAndResponse( testkitState, this::createAsyncSessionState, testkitState.getAsyncSessionStates() ) ) ); + } + + private TestkitResponse createSessionStateAndResponse( TestkitState testkitState, BiFunction sessionStateProducer, + Map sessionStateContainer ) { Driver driver = testkitState.getDrivers().get( data.getDriverId() ); AccessMode formattedAccessMode = data.getAccessMode().equals( "r" ) ? AccessMode.READ : AccessMode.WRITE; @@ -53,7 +71,7 @@ public TestkitResponse process( TestkitState testkitState ) Optional.ofNullable( data.bookmarks ) .map( bookmarks -> bookmarks.stream().map( InternalBookmark::parse ).collect( Collectors.toList() ) ) .ifPresent( builder::withBookmarks ); - + Optional.ofNullable( data.database ).ifPresent( builder::withDatabase ); if ( data.getFetchSize() != 0 ) @@ -61,13 +79,23 @@ public TestkitResponse process( TestkitState testkitState ) builder.withFetchSize( data.getFetchSize() ); } - org.neo4j.driver.Session session = driver.session( builder.build() ); String newId = testkitState.newId(); - testkitState.getSessionStates().put( newId, new SessionState( session ) ); + T sessionState = sessionStateProducer.apply( driver, builder.build() ); + sessionStateContainer.put( newId, sessionState ); return Session.builder().data( Session.SessionBody.builder().id( newId ).build() ).build(); } + private SessionState createSessionState( Driver driver, SessionConfig sessionConfig ) + { + return new SessionState( driver.session( sessionConfig ) ); + } + + private AsyncSessionState createAsyncSessionState( Driver driver, SessionConfig sessionConfig ) + { + return new AsyncSessionState( driver.asyncSession( sessionConfig ) ); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResolverResolutionCompleted.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResolverResolutionCompleted.java index 9408cea5f4..86810f63f2 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResolverResolutionCompleted.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResolverResolutionCompleted.java @@ -26,6 +26,8 @@ import java.util.LinkedHashSet; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import org.neo4j.driver.internal.BoltServerAddress; @@ -45,6 +47,12 @@ public TestkitResponse process( TestkitState testkitState ) return null; } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + throw new UnsupportedOperationException(); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java index 6722ebd5ab..fe672b7d4c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java @@ -26,6 +26,9 @@ import neo4j.org.testkit.backend.messages.responses.Summary; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + import org.neo4j.driver.Result; import org.neo4j.driver.exceptions.NoSuchRecordException; @@ -42,17 +45,7 @@ public TestkitResponse process( TestkitState testkitState ) try { Result result = testkitState.getResults().get( data.getResultId() ); - org.neo4j.driver.summary.ResultSummary summary = result.consume(); - Summary.ServerInfo serverInfo = Summary.ServerInfo.builder() - .protocolVersion( summary.server().protocolVersion() ) - .agent( summary.server().agent() ) - .build(); - Summary.SummaryBody data = Summary.SummaryBody.builder() - .serverInfo( serverInfo ) - .build(); - return Summary.builder() - .data( data ) - .build(); + return createResponse( result.consume() ); } catch ( NoSuchRecordException ignored ) { @@ -60,6 +53,29 @@ public TestkitResponse process( TestkitState testkitState ) } } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getResultCursors().get( data.getResultId() ) + .consumeAsync() + .thenApply( this::createResponse ) + .thenApply( Optional::of ); + } + + private Summary createResponse( org.neo4j.driver.summary.ResultSummary summary ) + { + Summary.ServerInfo serverInfo = Summary.ServerInfo.builder() + .protocolVersion( summary.server().protocolVersion() ) + .agent( summary.server().agent() ) + .build(); + Summary.SummaryBody data = Summary.SummaryBody.builder() + .serverInfo( serverInfo ) + .build(); + return Summary.builder() + .data( data ) + .build(); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java index ef35fd99f7..cbdaae3070 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java @@ -26,6 +26,9 @@ import neo4j.org.testkit.backend.messages.responses.Record; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + import org.neo4j.driver.Result; import org.neo4j.driver.exceptions.NoSuchRecordException; @@ -42,8 +45,7 @@ public TestkitResponse process( TestkitState testkitState ) try { Result result = testkitState.getResults().get( data.getResultId() ); - org.neo4j.driver.Record record = result.next(); - return Record.builder().data( Record.RecordBody.builder().values( record ).build() ).build(); + return createResponse( result.next() ); } catch ( NoSuchRecordException ignored ) { @@ -51,6 +53,20 @@ public TestkitResponse process( TestkitState testkitState ) } } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getResultCursors().get( data.getResultId() ) + .nextAsync() + .thenApply( record -> record != null ? createResponse( record ) : NullRecord.builder().build() ) + .thenApply( Optional::of ); + } + + private Record createResponse( org.neo4j.driver.Record record ) + { + return Record.builder().data( Record.RecordBody.builder().values( record ).build() ).build(); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java index f1b3d692c9..e7c63c6d37 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java @@ -21,10 +21,15 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import neo4j.org.testkit.backend.AsyncSessionState; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + @Setter @Getter @NoArgsConstructor @@ -45,6 +50,23 @@ public TestkitResponse process( TestkitState testkitState ) return null; } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); + Throwable throwable; + if ( !"".equals( data.getErrorId() ) ) + { + throwable = testkitState.getErrors().get( data.getErrorId() ); + } + else + { + throwable = new RuntimeException( "Error from client in retryable tx" ); + } + sessionState.getTxWorkFuture().completeExceptionally( throwable ); + return CompletableFuture.completedFuture( Optional.empty() ); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java index d56f8c21f5..474720251b 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java @@ -25,6 +25,10 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + @Setter @Getter @NoArgsConstructor @@ -44,6 +48,13 @@ public TestkitResponse process( TestkitState testkitState ) return null; } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + testkitState.getAsyncSessionStates().get( data.getSessionId() ).getTxWorkFuture().complete( null ); + return CompletableFuture.completedFuture( Optional.empty() ); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java index 93d0811a1d..66227f2b6f 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java @@ -21,6 +21,7 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import neo4j.org.testkit.backend.AsyncSessionState; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; @@ -29,8 +30,10 @@ import java.time.Duration; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletionStage; import org.neo4j.driver.TransactionConfig; +import org.neo4j.driver.async.AsyncSession; @Setter @Getter @@ -62,6 +65,36 @@ public TestkitResponse process( TestkitState testkitState ) .orElseThrow( () -> new RuntimeException( "Could not find session" ) ); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); + if ( sessionState != null ) + { + AsyncSession session = sessionState.getSession(); + TransactionConfig.Builder builder = TransactionConfig.builder(); + Optional.ofNullable( data.txMeta ).ifPresent( builder::withMetadata ); + + if ( data.getTimeout() != null ) + { + builder.withTimeout( Duration.ofMillis( data.getTimeout() ) ); + } + + String txId = testkitState.newId(); + return session.beginTransactionAsync( builder.build() ) + .thenApply( tx -> + { + testkitState.getAsyncTransactions().put( txId, tx ); + return transaction( txId ); + } ) + .thenApply( Optional::of ); + } + else + { + return null; + } + } + private Transaction transaction( String txId ) { return Transaction.builder().data( Transaction.TransactionBody.builder().id( txId ).build() ).build(); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java index ec117640ea..ba96189e05 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java @@ -25,6 +25,9 @@ import neo4j.org.testkit.backend.messages.responses.Session; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + @Setter @Getter @NoArgsConstructor @@ -36,6 +39,20 @@ public class SessionClose implements TestkitRequest public TestkitResponse process( TestkitState testkitState ) { testkitState.getSessionStates().get( data.getSessionId() ).getSession().close(); + return createResponse(); + } + + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getAsyncSessionStates().get( data.getSessionId() ).getSession() + .closeAsync() + .thenApply( ignored -> createResponse() ) + .thenApply( Optional::of ); + } + + private Session createResponse() + { return Session.builder().data( Session.SessionBody.builder().id( data.getSessionId() ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java index 51666cc46f..0a69142894 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java @@ -27,6 +27,8 @@ import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import org.neo4j.driver.Bookmark; @@ -45,11 +47,23 @@ public TestkitResponse process( TestkitState testkitState ) .map( session -> { Bookmark bookmark = testkitState.getSessionStates().get( data.getSessionId() ).getSession().lastBookmark(); - return Bookmarks.builder().data( Bookmarks.BookmarksBody.builder().bookmarks( bookmark ).build() ).build(); + return createResponse( bookmark ); } ) .orElseThrow( () -> new RuntimeException( "Could not find session" ) ); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + Bookmark bookmark = testkitState.getAsyncSessionStates().get( data.getSessionId() ).getSession().lastBookmark(); + return CompletableFuture.completedFuture( Optional.of( createResponse( bookmark ) ) ); + } + + private Bookmarks createResponse( Bookmark bookmark ) + { + return Bookmarks.builder().data( Bookmarks.BookmarksBody.builder().bookmarks( bookmark ).build() ).build(); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java index b57ff63bcf..605fe247e3 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java @@ -21,6 +21,7 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import neo4j.org.testkit.backend.AsyncSessionState; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.RetryableDone; @@ -28,9 +29,13 @@ import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import org.neo4j.driver.Session; import org.neo4j.driver.TransactionWork; +import org.neo4j.driver.async.AsyncSession; +import org.neo4j.driver.async.AsyncTransactionWork; @Setter @Getter @@ -42,7 +47,7 @@ public class SessionReadTransaction implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getSessionStates().getOrDefault( data.sessionId, null ) ) + return Optional.ofNullable( testkitState.getSessionStates().getOrDefault( data.getSessionId(), null ) ) .map( sessionState -> { Session session = sessionState.getSession(); @@ -51,6 +56,27 @@ public TestkitResponse process( TestkitState testkitState ) } ).orElseThrow( () -> new RuntimeException( "Could not find session" ) ); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); + AsyncSession session = sessionState.getSession(); + + AsyncTransactionWork> workWrapper = tx -> + { + String txId = testkitState.newId(); + testkitState.getAsyncTransactions().put( txId, tx ); + testkitState.getResponseWriter().accept( retryableTry( txId ) ); + CompletableFuture txWorkFuture = new CompletableFuture<>(); + sessionState.setTxWorkFuture( txWorkFuture ); + return txWorkFuture; + }; + + return session.readTransactionAsync( workWrapper ) + .thenApply( nothing -> retryableDone() ) + .thenApply( Optional::of ); + } + private TransactionWork handle( TestkitState testkitState, SessionState sessionState ) { return tx -> diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java index d119e4c552..2d2f0d0807 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java @@ -30,10 +30,12 @@ import java.time.Duration; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletionStage; import org.neo4j.driver.Query; import org.neo4j.driver.Session; import org.neo4j.driver.TransactionConfig; +import org.neo4j.driver.async.AsyncSession; @Setter @Getter @@ -59,6 +61,27 @@ public TestkitResponse process( TestkitState testkitState ) return Result.builder().data( Result.ResultBody.builder().id( newId ).build() ).build(); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + AsyncSession session = testkitState.getAsyncSessionStates().get( data.getSessionId() ).getSession(); + Query query = Optional.ofNullable( data.params ) + .map( params -> new Query( data.cypher, data.params ) ) + .orElseGet( () -> new Query( data.cypher ) ); + TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); + Optional.ofNullable( data.getTxMeta() ).ifPresent( transactionConfig::withMetadata ); + Optional.ofNullable( data.getTimeout() ).ifPresent( to -> transactionConfig.withTimeout( Duration.ofMillis( to ) ) ); + + return session.runAsync( query, transactionConfig.build() ) + .thenApply( resultCursor -> + { + String newId = testkitState.newId(); + testkitState.getResultCursors().put( newId, resultCursor ); + return Result.builder().data( Result.ResultBody.builder().id( newId ).build() ).build(); + } ) + .thenApply( Optional::of ); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java index 64c053591c..077a0076ce 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java @@ -21,6 +21,7 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import neo4j.org.testkit.backend.AsyncSessionState; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.RetryableDone; @@ -29,9 +30,13 @@ import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import org.neo4j.driver.Session; import org.neo4j.driver.TransactionWork; +import org.neo4j.driver.async.AsyncSession; +import org.neo4j.driver.async.AsyncTransactionWork; @Setter @Getter @@ -52,6 +57,28 @@ public TestkitResponse process( TestkitState testkitState ) } ).orElseThrow( () -> new RuntimeException( "Could not find session" ) ); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); + AsyncSession session = sessionState.getSession(); + + AsyncTransactionWork> workWrapper = + tx -> + { + String txId = testkitState.newId(); + testkitState.getAsyncTransactions().put( txId, tx ); + testkitState.getResponseWriter().accept( retryableTry( txId ) ); + CompletableFuture tryResult = new CompletableFuture<>(); + sessionState.setTxWorkFuture( tryResult ); + return tryResult; + }; + + return session.writeTransactionAsync( workWrapper ) + .thenApply( nothing -> retryableDone() ) + .thenApply( Optional::of ); + } + private TransactionWork handle( TestkitState testkitState, SessionState sessionState ) { return tx -> diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java index 2637c3ec7b..af45836492 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java @@ -23,13 +23,33 @@ import lombok.Setter; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.RunTest; +import neo4j.org.testkit.backend.messages.responses.SkipTest; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + @Setter @Getter @NoArgsConstructor public class StartTest implements TestkitRequest { + private static final Map ASYNC_SKIP_PATTERN_TO_REASON = new HashMap<>(); + + static + { + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_fail_when_driver_closed_using_session_run$", "Does not throw error" ); + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_read_successfully_on_empty_discovery_result_using_session_run$", "Resolver not implemented" ); + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_request_rt_from_all_initial_routers_until_successful", "Resolver not implemented" ); + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors", "Resolver not implemented" ); + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_successfully_acquire_rt_when_router_ip_changes$", "Resolver not implemented" ); + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_use_resolver_during_rediscovery_when_existing_routers_fail$", "Resolver not implemented" ); + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_reject_server_using_verify_connectivity_bolt_3x0", "Does not error as expected" ); + } + private StartTestBody data; @Override @@ -38,6 +58,24 @@ public TestkitResponse process( TestkitState testkitState ) return RunTest.builder().build(); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + TestkitResponse testkitResponse = ASYNC_SKIP_PATTERN_TO_REASON + .entrySet() + .stream() + .filter( entry -> data.getTestName().matches( entry.getKey() ) ) + .findFirst() + .map( entry -> (TestkitResponse) SkipTest.builder() + .data( SkipTest.SkipTestBody.builder() + .reason( entry.getValue() ) + .build() ) + .build() ) + .orElseGet( () -> RunTest.builder().build() ); + + return CompletableFuture.completedFuture( Optional.of( testkitResponse ) ); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java index c2a22a7d6e..52cd16590c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java @@ -23,6 +23,9 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "name" ) @JsonSubTypes( { @JsonSubTypes.Type( NewDriver.class ), @JsonSubTypes.Type( NewSession.class ), @@ -41,4 +44,6 @@ public interface TestkitRequest { TestkitResponse process( TestkitState testkitState ); + + CompletionStage> processAsync( TestkitState testkitState ); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java index d047e49d19..7bab055f97 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java @@ -26,6 +26,7 @@ import neo4j.org.testkit.backend.messages.responses.Transaction; import java.util.Optional; +import java.util.concurrent.CompletionStage; @Setter @Getter @@ -37,16 +38,22 @@ public class TransactionClose implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getTransactions().get( data.txId ) ) + return Optional.ofNullable( testkitState.getTransactions().get( data.getTxId() ) ) .map( tx -> { tx.close(); - return transaction( data.txId ); + return createResponse( data.getTxId() ); } ) .orElseThrow( () -> new RuntimeException( "Could not find transaction" ) ); } - private Transaction transaction( String txId ) + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + throw new UnsupportedOperationException(); + } + + private Transaction createResponse( String txId ) { return Transaction.builder().data( Transaction.TransactionBody.builder().id( txId ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java index b1f825fc66..ebe5c18c6a 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java @@ -26,6 +26,7 @@ import neo4j.org.testkit.backend.messages.responses.Transaction; import java.util.Optional; +import java.util.concurrent.CompletionStage; @Getter @NoArgsConstructor @@ -37,16 +38,25 @@ public class TransactionCommit implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getTransactions().get( data.txId ) ) + return Optional.ofNullable( testkitState.getTransactions().get( data.getTxId() ) ) .map( tx -> { tx.commit(); - return transaction( data.txId ); + return createResponse( data.getTxId() ); } ) .orElseThrow( () -> new RuntimeException( "Could not find transaction" ) ); } - private Transaction transaction( String txId ) + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getAsyncTransactions().get( data.getTxId() ) + .commitAsync() + .thenApply( ignored -> createResponse( data.getTxId() ) ) + .thenApply( Optional::of ); + } + + private Transaction createResponse( String txId ) { return Transaction.builder().data( Transaction.TransactionBody.builder().id( txId ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java index 148a515886..75e24e2b57 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java @@ -26,6 +26,7 @@ import neo4j.org.testkit.backend.messages.responses.Transaction; import java.util.Optional; +import java.util.concurrent.CompletionStage; @Getter @NoArgsConstructor @@ -37,16 +38,25 @@ public class TransactionRollback implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getTransactions().get( data.txId ) ) + return Optional.ofNullable( testkitState.getTransactions().get( data.getTxId() ) ) .map( tx -> { tx.rollback(); - return transaction( data.txId ); + return createResponse( data.getTxId() ); } ) .orElseThrow( () -> new RuntimeException( "Could not find transaction" ) ); } - private Transaction transaction( String txId ) + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getAsyncTransactions().get( data.getTxId() ) + .rollbackAsync() + .thenApply( ignored -> createResponse( data.getTxId() ) ) + .thenApply( Optional::of ); + } + + private Transaction createResponse( String txId ) { return Transaction.builder().data( Transaction.TransactionBody.builder().id( txId ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java index 7d797147d2..2e993e320f 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletionStage; @Setter @Getter @@ -41,19 +42,33 @@ public class TransactionRun implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getTransactions().get( data.txId ) ) + return Optional.ofNullable( testkitState.getTransactions().get( data.getTxId() ) ) .map( tx -> - tx.run( data.cypher, data.getParams() != null ? data.getParams() : Collections.emptyMap() ) ) + tx.run( data.getCypher(), data.getParams() != null ? data.getParams() : Collections.emptyMap() ) ) .map( result -> { String resultId = testkitState.newId(); testkitState.getResults().put( resultId, result ); - return result( resultId ); + return createResponse( resultId ); } ) .orElseThrow( () -> new RuntimeException( "Could not find transaction" ) ); } - private Result result( String resultId ) + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getAsyncTransactions().get( data.getTxId() ) + .runAsync( data.getCypher(), data.getParams() != null ? data.getParams() : Collections.emptyMap() ) + .thenApply( resultCursor -> + { + String resultId = testkitState.newId(); + testkitState.getResultCursors().put( resultId, resultCursor ); + return createResponse( resultId ); + } ) + .thenApply( Optional::of ); + } + + private Result createResponse( String resultId ) { return Result.builder().data( Result.ResultBody.builder().id( resultId ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java index e985452acc..acd3907aa4 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java @@ -25,6 +25,9 @@ import neo4j.org.testkit.backend.messages.responses.Driver; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + @Setter @Getter @NoArgsConstructor @@ -37,6 +40,21 @@ public TestkitResponse process( TestkitState testkitState ) { String id = data.getDriverId(); testkitState.getDrivers().get( id ).verifyConnectivity(); + return createResponse( id ); + } + + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + String id = data.getDriverId(); + return testkitState.getDrivers().get( id ) + .verifyConnectivityAsync() + .thenApply( ignored -> createResponse( id ) ) + .thenApply( Optional::of ); + } + + private Driver createResponse( String id ) + { return Driver.builder().data( Driver.DriverBody.builder().id( id ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/SkipTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/SkipTest.java new file mode 100644 index 0000000000..7f2762c66a --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/SkipTest.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.messages.responses; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +@Setter +@Getter +@Builder +public class SkipTest implements TestkitResponse +{ + private SkipTestBody data; + + @Override + public String testkitName() + { + return "SkipTest"; + } + + @Setter + @Getter + @Builder + public static class SkipTestBody + { + private final String reason; + } +} diff --git a/testkit-tests/pom.xml b/testkit-tests/pom.xml index b027cb045e..83c5386cfc 100644 --- a/testkit-tests/pom.xml +++ b/testkit-tests/pom.xml @@ -23,6 +23,7 @@ --tests TESTKIT_TESTS INTEGRATION_TESTS STUB_TESTS STRESS_TESTS TLS_TESTS 7200000 + %a-async 0.36.1 true @@ -46,6 +47,7 @@ + tklnchr testkit-launcher:%v @@ -54,11 +56,11 @@ - %n + %a 0 @@ -109,11 +111,37 @@ start + + + run-testkit-async + integration-test + + + start + + + + + tklnchr + + ${testkit.async.name.pattern} + + ${project.build.directory}/testkit-async + async + + + ${testkit.async.name.pattern}> + + + + + + remove-testkit-launcher post-integration-test - + stop diff --git a/testkit/backend.py b/testkit/backend.py index beeb957468..ccce69d5cf 100644 --- a/testkit/backend.py +++ b/testkit/backend.py @@ -10,5 +10,5 @@ err = open("/artifacts/backenderr.log", "w") out = open("/artifacts/backendout.log", "w") subprocess.check_call( - ["java", "-jar", "testkit-backend/target/testkit-backend.jar"], stdout=out, stderr=err) + ["java", "-jar", "testkit-backend/target/testkit-backend.jar", os.getenv('TEST_BACKEND_SERVER', '')], stdout=out, stderr=err)