diff --git a/driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java b/driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java index b247fa6a21..953fb77cb1 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java +++ b/driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java @@ -24,7 +24,6 @@ import java.net.URI; import java.net.UnknownHostException; -import static java.lang.String.format; import static java.util.Objects.requireNonNull; /** @@ -37,6 +36,7 @@ public class BoltServerAddress private final String host; private final int port; + private final String stringValue; public BoltServerAddress( String address ) { @@ -52,6 +52,7 @@ public BoltServerAddress( String host, int port ) { this.host = requireNonNull( host ); this.port = port; + this.stringValue = String.format( "%s:%d", host, port ); } @Override @@ -78,7 +79,7 @@ public int hashCode() @Override public String toString() { - return format( "%s:%d", host, port ); + return stringValue; } /** diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 3da5499813..a39e272aaa 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -289,8 +289,7 @@ private CompletionStage run( Statement statement, { ensureCanRunQueries(); CompletionStage cursorStage = - QueryRunner.runInTransaction( connection, statement, - this, waitForRunResponse ); + QueryRunner.runInTransaction( connection, statement, this, waitForRunResponse ); resultCursors.add( cursorStage ); return cursorStage; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java index 4db027d074..a18148fe50 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java @@ -95,9 +95,7 @@ public CompletionStage singleAsync() @Override public CompletionStage consumeAsync() { - return forEachAsync( record -> - { - } ); + return pullAllHandler.consumeAsync(); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/BoltProtocolV1Util.java b/driver/src/main/java/org/neo4j/driver/internal/async/BoltProtocolV1Util.java index a75c0cfc2c..02dfc017b1 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/BoltProtocolV1Util.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/BoltProtocolV1Util.java @@ -21,7 +21,6 @@ import io.netty.buffer.ByteBuf; import static io.netty.buffer.Unpooled.copyInt; -import static io.netty.buffer.Unpooled.copyShort; import static io.netty.buffer.Unpooled.unreleasableBuffer; public final class BoltProtocolV1Util @@ -41,12 +40,7 @@ public final class BoltProtocolV1Util PROTOCOL_VERSION_1, NO_PROTOCOL_VERSION, NO_PROTOCOL_VERSION, - NO_PROTOCOL_VERSION ) ) - .asReadOnly(); - - private static final ByteBuf MESSAGE_BOUNDARY_BUF = unreleasableBuffer( copyShort( 0 ) ).asReadOnly(); - - private static final ByteBuf CHUNK_HEADER_PLACEHOLDER_BUF = unreleasableBuffer( copyShort( 0 ) ).asReadOnly(); + NO_PROTOCOL_VERSION ) ).asReadOnly(); private BoltProtocolV1Util() { @@ -62,13 +56,18 @@ public static String handshakeString() return "[0x6060B017, 1, 0, 0, 0]"; } - public static ByteBuf messageBoundary() + public static void writeMessageBoundary( ByteBuf buf ) + { + buf.writeShort( 0 ); + } + + public static void writeEmptyChunkHeader( ByteBuf buf ) { - return MESSAGE_BOUNDARY_BUF.duplicate(); + buf.writeShort( 0 ); } - public static ByteBuf chunkHeaderPlaceholder() + public static void writeChunkHeader( ByteBuf buf, int chunkStartIndex, int headerValue ) { - return CHUNK_HEADER_PLACEHOLDER_BUF.duplicate(); + buf.setShort( chunkStartIndex, headerValue ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/outbound/ChunkAwareByteBufOutput.java b/driver/src/main/java/org/neo4j/driver/internal/async/outbound/ChunkAwareByteBufOutput.java index 7b1c5562de..41a608da91 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/outbound/ChunkAwareByteBufOutput.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/outbound/ChunkAwareByteBufOutput.java @@ -20,12 +20,12 @@ import io.netty.buffer.ByteBuf; +import org.neo4j.driver.internal.async.BoltProtocolV1Util; import org.neo4j.driver.internal.packstream.PackOutput; import static java.util.Objects.requireNonNull; import static org.neo4j.driver.internal.async.BoltProtocolV1Util.CHUNK_HEADER_SIZE_BYTES; import static org.neo4j.driver.internal.async.BoltProtocolV1Util.DEFAULT_MAX_OUTBOUND_CHUNK_SIZE_BYTES; -import static org.neo4j.driver.internal.async.BoltProtocolV1Util.chunkHeaderPlaceholder; public class ChunkAwareByteBufOutput implements PackOutput { @@ -138,15 +138,15 @@ private void ensureCanFitInCurrentChunk( int numberOfBytes ) private void startNewChunk( int index ) { currentChunkStartIndex = index; - buf.writeBytes( chunkHeaderPlaceholder() ); + BoltProtocolV1Util.writeEmptyChunkHeader( buf ); currentChunkSize = CHUNK_HEADER_SIZE_BYTES; } private void writeChunkSizeHeader() { - // go to the beginning of the chunk and write 2 byte size header + // go to the beginning of the chunk and write the size header int chunkBodySize = currentChunkSize - CHUNK_HEADER_SIZE_BYTES; - buf.setShort( currentChunkStartIndex, chunkBodySize ); + BoltProtocolV1Util.writeChunkHeader( buf, currentChunkStartIndex, chunkBodySize ); } private int availableBytesInCurrentChunk() diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java index 3f0771e7b8..a5c4a8b94f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java @@ -25,6 +25,7 @@ import java.util.List; +import org.neo4j.driver.internal.async.BoltProtocolV1Util; import org.neo4j.driver.internal.logging.ChannelActivityLogger; import org.neo4j.driver.internal.messaging.Message; import org.neo4j.driver.internal.messaging.MessageFormat; @@ -32,7 +33,6 @@ import org.neo4j.driver.v1.Logging; import static io.netty.buffer.ByteBufUtil.hexDump; -import static org.neo4j.driver.internal.async.BoltProtocolV1Util.messageBoundary; public class OutboundMessageHandler extends MessageToMessageEncoder { @@ -95,8 +95,8 @@ protected void encode( ChannelHandlerContext ctx, Message msg, List out log.trace( "C: %s", hexDump( messageBuf ) ); } + BoltProtocolV1Util.writeMessageBoundary( messageBuf ); out.add( messageBuf ); - out.add( messageBoundary() ); } public OutboundMessageHandler withoutByteArraySupport() diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java index 97da4e9831..a5a315f973 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java @@ -30,6 +30,7 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.internal.util.Futures; +import org.neo4j.driver.internal.util.Iterables; import org.neo4j.driver.internal.util.MetadataUtil; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Statement; @@ -45,6 +46,8 @@ public abstract class PullAllResponseHandler implements ResponseHandler { + private static final Queue UNINITIALIZED_RECORDS = Iterables.emptyQueue(); + static final int RECORD_BUFFER_LOW_WATERMARK = Integer.getInteger( "recordBufferLowWatermark", 300 ); static final int RECORD_BUFFER_HIGH_WATERMARK = Integer.getInteger( "recordBufferHighWatermark", 1000 ); @@ -52,12 +55,14 @@ public abstract class PullAllResponseHandler implements ResponseHandler private final RunResponseHandler runResponseHandler; protected final Connection connection; - private final Queue records = new ArrayDeque<>(); + // initialized lazily when first record arrives + private Queue records = UNINITIALIZED_RECORDS; private boolean finished; private Throwable failure; private ResultSummary summary; + private boolean ignoreRecords; private CompletableFuture recordFuture; private CompletableFuture failureFuture; @@ -112,9 +117,16 @@ public synchronized void onFailure( Throwable error ) @Override public synchronized void onRecord( Value[] fields ) { - Record record = new InternalRecord( runResponseHandler.statementKeys(), fields ); - enqueueRecord( record ); - completeRecordFuture( record ); + if ( ignoreRecords ) + { + completeRecordFuture( null ); + } + else + { + Record record = new InternalRecord( runResponseHandler.statementKeys(), fields ); + enqueueRecord( record ); + completeRecordFuture( record ); + } } public synchronized CompletionStage peekAsync() @@ -127,7 +139,7 @@ public synchronized CompletionStage peekAsync() return failedFuture( extractFailure() ); } - if ( finished ) + if ( ignoreRecords || finished ) { return completedWithNull(); } @@ -161,6 +173,13 @@ public synchronized CompletionStage summaryAsync() } ); } + public synchronized CompletionStage consumeAsync() + { + ignoreRecords = true; + records.clear(); + return summaryAsync(); + } + public synchronized CompletionStage> listAsync( Function mapFunction ) { return failureAsync().thenApply( error -> @@ -199,6 +218,11 @@ else if ( finished ) private void enqueueRecord( Record record ) { + if ( records == UNINITIALIZED_RECORDS ) + { + records = new ArrayDeque<>(); + } + records.add( record ); boolean shouldBufferAllRecords = failureFuture != null; diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java index ae5691464a..63bf5f4553 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -274,7 +273,7 @@ private void packValue( Value value ) throws IOException packer.packStructHeader( 3, PATH ); // Unique nodes - Map nodeIdx = new LinkedHashMap<>(); + Map nodeIdx = Iterables.newLinkedHashMapWithSize( path.length() + 1 ); for ( Node node : path.nodes() ) { if ( !nodeIdx.containsKey( node ) ) @@ -289,7 +288,7 @@ private void packValue( Value value ) throws IOException } // Unique rels - Map relIdx = new LinkedHashMap<>(); + Map relIdx = Iterables.newLinkedHashMapWithSize( path.length() ); for ( Relationship rel : path.relationships() ) { if ( !relIdx.containsKey( rel ) ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/Extract.java b/driver/src/main/java/org/neo4j/driver/internal/util/Extract.java index e7154bc5d4..595401f8b8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/Extract.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/Extract.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -102,7 +101,7 @@ public static Map map( Map data, Function head = data.entrySet().iterator().next(); return singletonMap( head.getKey(), mapFunction.apply( head.getValue() ) ); } else { - Map map = new LinkedHashMap<>( size ); + Map map = Iterables.newLinkedHashMapWithSize( size ); for ( Map.Entry entry : data.entrySet() ) { map.put( entry.getKey(), mapFunction.apply( entry.getValue() ) ); @@ -124,7 +123,7 @@ public static Map map( Record record, Function mapFunct return singletonMap( record.keys().get( 0 ), mapFunction.apply( record.get( 0 ) ) ); default: - Map map = new LinkedHashMap<>( size ); + Map map = Iterables.newLinkedHashMapWithSize( size ); List keys = record.keys(); for ( int i = 0; i < size; i++ ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/Iterables.java b/driver/src/main/java/org/neo4j/driver/internal/util/Iterables.java index f971601821..d9ebee79bc 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/Iterables.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/Iterables.java @@ -18,17 +18,23 @@ */ package org.neo4j.driver.internal.util; +import java.util.AbstractQueue; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Queue; import org.neo4j.driver.v1.util.Function; public class Iterables { + @SuppressWarnings( "rawtypes" ) + private static final Queue EMPTY_QUEUE = new EmptyQueue(); private static final float DEFAULT_HASH_MAP_LOAD_FACTOR = 0.75F; public static int count( Iterable it ) @@ -110,11 +116,22 @@ public void remove() }; } + @SuppressWarnings( "unchecked" ) + public static Queue emptyQueue() + { + return (Queue) EMPTY_QUEUE; + } + public static HashMap newHashMapWithSize( int expectedSize ) { return new HashMap<>( hashMapCapacity( expectedSize ) ); } + public static LinkedHashMap newLinkedHashMapWithSize( int expectedSize ) + { + return new LinkedHashMap<>( hashMapCapacity( expectedSize ) ); + } + private static int hashMapCapacity( int expectedSize ) { if ( expectedSize < 3 ) @@ -127,4 +144,37 @@ private static int hashMapCapacity( int expectedSize ) } return (int) ((float) expectedSize / DEFAULT_HASH_MAP_LOAD_FACTOR + 1.0F); } + + private static class EmptyQueue extends AbstractQueue + { + @Override + public Iterator iterator() + { + return Collections.emptyIterator(); + } + + @Override + public int size() + { + return 0; + } + + @Override + public boolean offer( T t ) + { + throw new UnsupportedOperationException(); + } + + @Override + public T poll() + { + return null; + } + + @Override + public T peek() + { + return null; + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/ServerVersion.java b/driver/src/main/java/org/neo4j/driver/internal/util/ServerVersion.java index d3f713b8b5..c932fa72ca 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/ServerVersion.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/ServerVersion.java @@ -28,24 +28,27 @@ public class ServerVersion { + public static final ServerVersion v3_2_0 = new ServerVersion( 3, 2, 0 ); + public static final ServerVersion v3_1_0 = new ServerVersion( 3, 1, 0 ); + public static final ServerVersion v3_0_0 = new ServerVersion( 3, 0, 0 ); + public static final ServerVersion vInDev = new ServerVersion( 0, 0, 0 ); + private static final String NEO4J_IN_DEV_VERSION_STRING = "Neo4j/dev"; + private static final Pattern PATTERN = + Pattern.compile( "(Neo4j/)?(\\d+)\\.(\\d+)(?:\\.)?(\\d*)(\\.|-|\\+)?([0-9A-Za-z-.]*)?" ); + private final int major; private final int minor; private final int patch; - - private static final Pattern PATTERN = - Pattern.compile("(Neo4j/)?(\\d+)\\.(\\d+)(?:\\.)?(\\d*)(\\.|-|\\+)?([0-9A-Za-z-.]*)?"); + private final String stringValue; private ServerVersion( int major, int minor, int patch ) { this.major = major; this.minor = minor; this.patch = patch; + this.stringValue = stringValue( major, minor, patch ); } - public static final ServerVersion v3_2_0 = new ServerVersion(3, 2, 0); - public static final ServerVersion v3_1_0 = new ServerVersion(3, 1, 0); - public static final ServerVersion v3_0_0 = new ServerVersion(3, 0, 0); - public static final ServerVersion vInDev = new ServerVersion(0, 0, 0); public static ServerVersion version( Driver driver ) { @@ -60,7 +63,7 @@ public static ServerVersion version( String server ) { if ( server == null ) { - return new ServerVersion( 3, 0, 0 ); + return v3_0_0; } else { @@ -152,8 +155,15 @@ private int compareTo( ServerVersion o ) @Override public String toString() { - return this == vInDev - ? NEO4J_IN_DEV_VERSION_STRING - : String.format( "Neo4j/%s.%s.%s", major, minor, patch ); + return stringValue; + } + + private static String stringValue( int major, int minor, int patch ) + { + if ( major == 0 && minor == 0 && patch == 0 ) + { + return NEO4J_IN_DEV_VERSION_STRING; + } + return String.format( "Neo4j/%s.%s.%s", major, minor, patch ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/ExtractTest.java b/driver/src/test/java/org/neo4j/driver/internal/ExtractTest.java index 7a28dce0fc..bfc033c896 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ExtractTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ExtractTest.java @@ -27,11 +27,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import org.neo4j.driver.internal.util.Extract; +import org.neo4j.driver.internal.util.Iterables; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.util.Function; import org.neo4j.driver.v1.util.Pair; @@ -126,7 +126,7 @@ public void testMapValues() throws Exception public void testShouldPreserveMapOrderMapValues() throws Exception { // GIVEN - Map map = new LinkedHashMap<>(); + Map map = Iterables.newLinkedHashMapWithSize( 2 ); map.put( "k2", value( 43 ) ); map.put( "k1", value( 42 ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultCursorTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultCursorTest.java index 4543c2db77..b865063fe5 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultCursorTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultCursorTest.java @@ -197,60 +197,6 @@ public void shouldFailWhenAskedForSingleRecordButResultContainsMore() } } - @Test - public void shouldConsumeAsyncWhenResultContainsMultipleRecords() - { - PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - - Record record1 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 1, 1, 1 ) ); - Record record2 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 2, 2, 2 ) ); - Record record3 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 3, 3, 3 ) ); - when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record1 ) ) - .thenReturn( completedFuture( record2 ) ).thenReturn( completedFuture( record3 ) ) - .thenReturn( completedWithNull() ); - - ResultSummary summary = mock( ResultSummary.class ); - when( pullAllHandler.summaryAsync() ).thenReturn( completedFuture( summary ) ); - - InternalStatementResultCursor cursor = newCursor( pullAllHandler ); - - assertEquals( summary, await( cursor.consumeAsync() ) ); - verify( pullAllHandler, times( 4 ) ).nextAsync(); - } - - @Test - public void shouldConsumeAsyncWhenResultContainsOneRecords() - { - PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - - Record record = new InternalRecord( asList( "key1", "key2" ), values( 1, 1 ) ); - when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record ) ) - .thenReturn( completedWithNull() ); - - ResultSummary summary = mock( ResultSummary.class ); - when( pullAllHandler.summaryAsync() ).thenReturn( completedFuture( summary ) ); - - InternalStatementResultCursor cursor = newCursor( pullAllHandler ); - - assertEquals( summary, await( cursor.consumeAsync() ) ); - verify( pullAllHandler, times( 2 ) ).nextAsync(); - } - - @Test - public void shouldConsumeAsyncWhenResultContainsNoRecords() - { - PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - when( pullAllHandler.nextAsync() ).thenReturn( completedWithNull() ); - - ResultSummary summary = mock( ResultSummary.class ); - when( pullAllHandler.summaryAsync() ).thenReturn( completedFuture( summary ) ); - - InternalStatementResultCursor cursor = newCursor( pullAllHandler ); - - assertEquals( summary, await( cursor.consumeAsync() ) ); - verify( pullAllHandler ).nextAsync(); - } - @Test public void shouldForEachAsyncWhenResultContainsMultipleRecords() { @@ -455,6 +401,38 @@ public void shouldPropagateFailureFromListAsyncWithMapFunction() verify( pullAllHandler ).listAsync( mapFunction ); } + @Test + public void shouldConsumeAsync() + { + PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); + ResultSummary summary = mock( ResultSummary.class ); + when( pullAllHandler.consumeAsync() ).thenReturn( completedFuture( summary ) ); + + InternalStatementResultCursor cursor = newCursor( pullAllHandler ); + + assertEquals( summary, await( cursor.consumeAsync() ) ); + } + + @Test + public void shouldPropagateFailureInConsumeAsync() + { + PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); + RuntimeException error = new RuntimeException( "Hi" ); + when( pullAllHandler.consumeAsync() ).thenReturn( failedFuture( error ) ); + + InternalStatementResultCursor cursor = newCursor( pullAllHandler ); + + try + { + await( cursor.consumeAsync() ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( error, e ); + } + } + private static InternalStatementResultCursor newCursor( PullAllResponseHandler pullAllHandler ) { return new InternalStatementResultCursor( new RunResponseHandler( new CompletableFuture<>() ), pullAllHandler ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/BoltProtocolV1UtilTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/BoltProtocolV1UtilTest.java new file mode 100644 index 0000000000..3de4653d83 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/async/BoltProtocolV1UtilTest.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.neo4j.driver.internal.async.BoltProtocolV1Util.BOLT_MAGIC_PREAMBLE; +import static org.neo4j.driver.internal.async.BoltProtocolV1Util.NO_PROTOCOL_VERSION; +import static org.neo4j.driver.internal.async.BoltProtocolV1Util.PROTOCOL_VERSION_1; +import static org.neo4j.driver.internal.async.BoltProtocolV1Util.handshakeBuf; +import static org.neo4j.driver.internal.async.BoltProtocolV1Util.handshakeString; +import static org.neo4j.driver.internal.async.BoltProtocolV1Util.writeChunkHeader; +import static org.neo4j.driver.internal.async.BoltProtocolV1Util.writeEmptyChunkHeader; +import static org.neo4j.driver.internal.async.BoltProtocolV1Util.writeMessageBoundary; +import static org.neo4j.driver.v1.util.TestUtil.assertByteBufContains; + +public class BoltProtocolV1UtilTest +{ + @Test + public void shouldReturnHandshakeBuf() + { + assertByteBufContains( + handshakeBuf(), + BOLT_MAGIC_PREAMBLE, PROTOCOL_VERSION_1, NO_PROTOCOL_VERSION, NO_PROTOCOL_VERSION, NO_PROTOCOL_VERSION + ); + } + + @Test + public void shouldReturnHandshakeString() + { + assertEquals( "[0x6060B017, 1, 0, 0, 0]", handshakeString() ); + } + + @Test + public void shouldWriteMessageBoundary() + { + ByteBuf buf = Unpooled.buffer(); + + buf.writeInt( 1 ); + buf.writeInt( 2 ); + buf.writeInt( 3 ); + writeMessageBoundary( buf ); + + assertByteBufContains( buf, 1, 2, 3, (byte) 0, (byte) 0 ); + } + + @Test + public void shouldWriteEmptyChunkHeader() + { + ByteBuf buf = Unpooled.buffer(); + + writeEmptyChunkHeader( buf ); + buf.writeInt( 1 ); + buf.writeInt( 2 ); + buf.writeInt( 3 ); + + assertByteBufContains( buf, (byte) 0, (byte) 0, 1, 2, 3 ); + } + + @Test + public void shouldWriteChunkHeader() + { + ByteBuf buf = Unpooled.buffer(); + + writeEmptyChunkHeader( buf ); + buf.writeInt( 1 ); + buf.writeInt( 2 ); + buf.writeInt( 3 ); + writeChunkHeader( buf, 0, 42 ); + + assertByteBufContains( buf, (short) 42, 1, 2, 3 ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandlerTest.java index 8c7fd046dd..89b493604f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandlerTest.java @@ -50,13 +50,11 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.neo4j.driver.internal.async.BoltProtocolV1Util.messageBoundary; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.messaging.MessageFormat.Writer; import static org.neo4j.driver.internal.messaging.PullAllMessage.PULL_ALL; import static org.neo4j.driver.v1.Values.value; import static org.neo4j.driver.v1.util.TestUtil.assertByteBufContains; -import static org.neo4j.driver.v1.util.TestUtil.assertByteBufEquals; public class OutboundMessageHandlerTest { @@ -85,13 +83,14 @@ public void shouldOutputByteBufAsWrittenByWriterAndMessageBoundary() assertTrue( channel.writeOutbound( PULL_ALL ) ); assertTrue( channel.finish() ); - assertEquals( 2, channel.outboundMessages().size() ); + assertEquals( 1, channel.outboundMessages().size() ); - ByteBuf buf1 = channel.readOutbound(); - assertByteBufContains( buf1, (short) 5, (byte) 1, (byte) 2, (byte) 3, (byte) 4, (byte) 5 ); - - ByteBuf buf2 = channel.readOutbound(); - assertByteBufEquals( messageBoundary(), buf2 ); + ByteBuf buf = channel.readOutbound(); + assertByteBufContains( + buf, + (short) 5, (byte) 1, (byte) 2, (byte) 3, (byte) 4, (byte) 5, // message body + (byte) 0, (byte) 0 // message boundary + ); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTest.java index 65997201fa..656afffb22 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTest.java @@ -893,7 +893,7 @@ public void shouldFailListAsyncWhenTransformationFunctionThrows() } @Test - public void shouldReturnEmptyListInListAsync() + public void shouldReturnEmptyListInListAsyncAfterSuccess() { PullAllResponseHandler handler = newHandler(); @@ -902,6 +902,19 @@ public void shouldReturnEmptyListInListAsync() assertEquals( emptyList(), await( handler.listAsync( Functions.identity() ) ) ); } + @Test + public void shouldReturnEmptyListInListAsyncAfterFailure() + { + PullAllResponseHandler handler = newHandler(); + + RuntimeException error = new RuntimeException( "Hi" ); + handler.onFailure( error ); + + // consume the error + assertEquals( error, await( handler.failureAsync() ) ); + assertEquals( emptyList(), await( handler.listAsync( Functions.identity() ) ) ); + } + @Test public void shouldReturnTransformedListInListAsync() { @@ -943,6 +956,206 @@ public void shouldReturnNotTransformedListInListAsync() assertEquals( expectedRecords, list ); } + @Test + public void shouldConsumeAfterSuccessWithRecords() + { + PullAllResponseHandler handler = newHandler( singletonList( "key1" ) ); + handler.onRecord( values( 1 ) ); + handler.onRecord( values( 2 ) ); + handler.onSuccess( emptyMap() ); + + assertNotNull( await( handler.consumeAsync() ) ); + + assertNoRecordsCanBeFetched( handler ); + } + + @Test + public void shouldConsumeAfterSuccessWithoutRecords() + { + PullAllResponseHandler handler = newHandler(); + handler.onSuccess( emptyMap() ); + + assertNotNull( await( handler.consumeAsync() ) ); + + assertNoRecordsCanBeFetched( handler ); + } + + @Test + public void shouldConsumeAfterFailureWithRecords() + { + PullAllResponseHandler handler = newHandler( singletonList( "key1" ) ); + handler.onRecord( values( 1 ) ); + handler.onRecord( values( 2 ) ); + RuntimeException error = new RuntimeException( "Hi" ); + handler.onFailure( error ); + + try + { + await( handler.consumeAsync() ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( error, e ); + } + + assertNoRecordsCanBeFetched( handler ); + } + + @Test + public void shouldConsumeAfterFailureWithoutRecords() + { + PullAllResponseHandler handler = newHandler(); + RuntimeException error = new RuntimeException( "Hi" ); + handler.onFailure( error ); + + try + { + await( handler.consumeAsync() ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( error, e ); + } + + assertNoRecordsCanBeFetched( handler ); + } + + @Test + public void shouldConsumeAfterProcessedFailureWithRecords() + { + PullAllResponseHandler handler = newHandler( singletonList( "key1" ) ); + handler.onRecord( values( 1 ) ); + handler.onRecord( values( 2 ) ); + RuntimeException error = new RuntimeException( "Hi" ); + handler.onFailure( error ); + + // process failure + assertEquals( error, await( handler.failureAsync() ) ); + // consume all buffered records + assertNotNull( await( handler.consumeAsync() ) ); + + assertNoRecordsCanBeFetched( handler ); + } + + @Test + public void shouldConsumeAfterProcessedFailureWithoutRecords() + { + PullAllResponseHandler handler = newHandler(); + RuntimeException error = new RuntimeException( "Hi" ); + handler.onFailure( error ); + + // process failure + assertEquals( error, await( handler.failureAsync() ) ); + // consume all buffered records + assertNotNull( await( handler.consumeAsync() ) ); + + assertNoRecordsCanBeFetched( handler ); + } + + @Test + public void shouldConsumeUntilSuccess() + { + PullAllResponseHandler handler = newHandler( asList( "key1", "key2" ) ); + handler.onRecord( values( 1, 2 ) ); + handler.onRecord( values( 3, 4 ) ); + + CompletableFuture consumeFuture = handler.consumeAsync().toCompletableFuture(); + assertFalse( consumeFuture.isDone() ); + + handler.onRecord( values( 5, 6 ) ); + handler.onRecord( values( 7, 8 ) ); + assertFalse( consumeFuture.isDone() ); + + handler.onSuccess( emptyMap() ); + + assertTrue( consumeFuture.isDone() ); + assertNotNull( await( consumeFuture ) ); + + assertNoRecordsCanBeFetched( handler ); + } + + @Test + public void shouldConsumeUntilFailure() + { + PullAllResponseHandler handler = newHandler( asList( "key1", "key2" ) ); + handler.onRecord( values( 1, 2 ) ); + handler.onRecord( values( 3, 4 ) ); + + CompletableFuture consumeFuture = handler.consumeAsync().toCompletableFuture(); + assertFalse( consumeFuture.isDone() ); + + handler.onRecord( values( 5, 6 ) ); + handler.onRecord( values( 7, 8 ) ); + assertFalse( consumeFuture.isDone() ); + + RuntimeException error = new RuntimeException( "Hi" ); + handler.onFailure( error ); + + assertTrue( consumeFuture.isDone() ); + assertTrue( consumeFuture.isCompletedExceptionally() ); + try + { + await( consumeFuture ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( error, e ); + } + + assertNoRecordsCanBeFetched( handler ); + } + + @Test + public void shouldReturnNoRecordsWhenConsumed() + { + PullAllResponseHandler handler = newHandler( asList( "key1", "key2" ) ); + handler.onRecord( values( 1, 2 ) ); + handler.onRecord( values( 3, 4 ) ); + + CompletableFuture consumeFuture = handler.consumeAsync().toCompletableFuture(); + assertFalse( consumeFuture.isDone() ); + + CompletionStage peekStage1 = handler.peekAsync(); + CompletionStage nextStage1 = handler.nextAsync(); + + handler.onRecord( values( 5, 6 ) ); + handler.onRecord( values( 7, 8 ) ); + + CompletionStage peekStage2 = handler.peekAsync(); + CompletionStage nextStage2 = handler.nextAsync(); + assertFalse( consumeFuture.isDone() ); + + handler.onSuccess( emptyMap() ); + + assertNull( await( peekStage1 ) ); + assertNull( await( nextStage1 ) ); + assertNull( await( peekStage2 ) ); + assertNull( await( nextStage2 ) ); + + assertTrue( consumeFuture.isDone() ); + assertNotNull( await( consumeFuture ) ); + } + + @Test + public void shouldReceiveSummaryAfterConsume() + { + Statement statement = new Statement( "RETURN 'Hello!'" ); + PullAllResponseHandler handler = newHandler( statement, singletonList( "key" ) ); + handler.onRecord( values( "Hi!" ) ); + handler.onSuccess( singletonMap( "type", value( "rw" ) ) ); + + ResultSummary summary1 = await( handler.consumeAsync() ); + assertEquals( statement.text(), summary1.statement().text() ); + assertEquals( StatementType.READ_WRITE, summary1.statementType() ); + + ResultSummary summary2 = await( handler.summaryAsync() ); + assertEquals( statement.text(), summary2.statement().text() ); + assertEquals( StatementType.READ_WRITE, summary2.statementType() ); + } + private static PullAllResponseHandler newHandler() { return newHandler( new Statement( "RETURN 1" ) ); @@ -950,7 +1163,7 @@ private static PullAllResponseHandler newHandler() private static PullAllResponseHandler newHandler( Statement statement ) { - return newHandler( statement, emptyList(), connectionMock() ); + return newHandler( statement, emptyList() ); } private static PullAllResponseHandler newHandler( List statementKeys ) @@ -958,6 +1171,11 @@ private static PullAllResponseHandler newHandler( List statementKeys ) return newHandler( new Statement( "RETURN 1" ), statementKeys, connectionMock() ); } + private static PullAllResponseHandler newHandler( Statement statement, List statementKeys ) + { + return newHandler( statement, statementKeys, connectionMock() ); + } + private static PullAllResponseHandler newHandler( List statementKeys, Connection connection ) { return newHandler( new Statement( "RETURN 1" ), statementKeys, connection ); @@ -979,6 +1197,13 @@ private static Connection connectionMock() return connection; } + private static void assertNoRecordsCanBeFetched( PullAllResponseHandler handler ) + { + assertNull( await( handler.peekAsync() ) ); + assertNull( await( handler.nextAsync() ) ); + assertEquals( emptyList(), await( handler.listAsync( Functions.identity() ) ) ); + } + private static class TestPullAllResponseHandler extends PullAllResponseHandler { TestPullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler, Connection connection ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/MessageFormatTest.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/MessageFormatTest.java index 03118b959b..51a403b331 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/messaging/MessageFormatTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/MessageFormatTest.java @@ -32,6 +32,7 @@ import org.neo4j.driver.internal.InternalNode; import org.neo4j.driver.internal.InternalPath; import org.neo4j.driver.internal.InternalRelationship; +import org.neo4j.driver.internal.async.BoltProtocolV1Util; import org.neo4j.driver.internal.async.ChannelPipelineBuilderImpl; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; import org.neo4j.driver.internal.async.outbound.ChunkAwareByteBufOutput; @@ -45,7 +46,6 @@ import static org.junit.Assert.assertTrue; import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher; import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher; -import static org.neo4j.driver.internal.async.BoltProtocolV1Util.messageBoundary; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.v1.Values.EmptyMap; import static org.neo4j.driver.v1.Values.ofValue; @@ -121,6 +121,7 @@ public void shouldGiveHelpfulErrorOnMalformedNodeStruct() throws Throwable packer.packStructHeader( 0, PackStreamMessageFormatV1.NODE ); output.stop(); + BoltProtocolV1Util.writeMessageBoundary( buf ); // Expect exception.expect( ClientException.class ); @@ -129,7 +130,7 @@ public void shouldGiveHelpfulErrorOnMalformedNodeStruct() throws Throwable "received NODE structure has 0 fields." ) ); // When - unpack( Unpooled.wrappedBuffer( buf, messageBoundary() ), newEmbeddedChannel() ); + unpack( buf, newEmbeddedChannel() ); } private void assertSerializesValue( Value value ) throws Throwable diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/BoltServerAddressTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/BoltServerAddressTest.java index 2b9b2732ca..136a12987b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/BoltServerAddressTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/BoltServerAddressTest.java @@ -25,6 +25,7 @@ import org.neo4j.driver.internal.BoltServerAddress; import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertThat; import static org.neo4j.driver.internal.BoltServerAddress.DEFAULT_PORT; @@ -53,4 +54,11 @@ public void shouldAlwaysResolveAddress() assertNotSame( socketAddress1, socketAddress2 ); } + + @Test + public void shouldHaveCorrectToString() + { + assertEquals( "localhost:4242", new BoltServerAddress( "localhost", 4242 ).toString() ); + assertEquals( "127.0.0.1:8888", new BoltServerAddress( "127.0.0.1", 8888 ).toString() ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/packstream/PackStreamTest.java b/driver/src/test/java/org/neo4j/driver/internal/packstream/PackStreamTest.java index e8f55f6f01..6758d38063 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/packstream/PackStreamTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/packstream/PackStreamTest.java @@ -30,9 +30,10 @@ import java.nio.channels.WritableByteChannel; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.Map; +import org.neo4j.driver.internal.util.Iterables; + import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Arrays.asList; import static org.hamcrest.CoreMatchers.equalTo; @@ -49,7 +50,7 @@ public class PackStreamTest public static Map asMap( Object... keysAndValues ) { - Map map = new LinkedHashMap<>( keysAndValues.length / 2 ); + Map map = Iterables.newLinkedHashMapWithSize( keysAndValues.length / 2 ); String key = null; for ( Object keyOrValue : keysAndValues ) { diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/IterablesTest.java b/driver/src/test/java/org/neo4j/driver/internal/util/IterablesTest.java index c717b79d42..52a1ab803d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/IterablesTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/IterablesTest.java @@ -20,7 +20,13 @@ import org.junit.Test; +import java.util.Queue; + +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class IterablesTest @@ -31,6 +37,12 @@ public void shouldCreateHashMapWithExpectedSize() assertNotNull( Iterables.newHashMapWithSize( 42 ) ); } + @Test + public void shouldCreateLinkedHashMapWithExpectedSize() + { + assertNotNull( Iterables.newLinkedHashMapWithSize( 42 ) ); + } + @Test public void shouldThrowWhenNegativeHashMapSizeGiven() { @@ -43,4 +55,51 @@ public void shouldThrowWhenNegativeHashMapSizeGiven() { } } + + @Test + public void shouldThrowWhenNegativeLinkedHashMapSizeGiven() + { + try + { + Iterables.newLinkedHashMapWithSize( -42 ); + fail( "Exception expected" ); + } + catch ( IllegalArgumentException ignore ) + { + } + } + + @Test + public void shouldReturnEmptyQueue() + { + Queue queue = Iterables.emptyQueue(); + assertEquals( 0, queue.size() ); + assertTrue( queue.isEmpty() ); + assertNull( queue.peek() ); + assertNull( queue.poll() ); + + try + { + queue.add( "Hello" ); + fail( "Exception expected" ); + } + catch ( UnsupportedOperationException ignore ) + { + } + + try + { + queue.offer( "World" ); + fail( "Exception expected" ); + } + catch ( UnsupportedOperationException ignore ) + { + } + } + + @Test + public void shouldReturnSameEmptyQueue() + { + assertSame( Iterables.emptyQueue(), Iterables.emptyQueue() ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/ServerVersionTest.java b/driver/src/test/java/org/neo4j/driver/internal/util/ServerVersionTest.java index a9858fd773..f50b29e6c9 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/ServerVersionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/ServerVersionTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; public class ServerVersionTest @@ -41,4 +42,13 @@ public void versionShouldThrowExceptionIfServerVersionCantBeParsed() throws Exce fail( "Should have failed to parse version" ); } + @Test + public void shouldHaveCorrectToString() + { + assertEquals( "Neo4j/dev", ServerVersion.vInDev.toString() ); + assertEquals( "Neo4j/3.0.0", ServerVersion.v3_0_0.toString() ); + assertEquals( "Neo4j/3.1.0", ServerVersion.v3_1_0.toString() ); + assertEquals( "Neo4j/3.2.0", ServerVersion.v3_2_0.toString() ); + assertEquals( "Neo4j/3.5.7", ServerVersion.version( "Neo4j/3.5.7" ).toString() ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index bf94886e39..e0f1601cc5 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -60,6 +60,7 @@ import org.neo4j.driver.v1.util.TestNeo4j; import static java.util.Collections.emptyIterator; +import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.hamcrest.Matchers.containsString; @@ -1167,6 +1168,75 @@ public void shouldAllowReturningNullFromAsyncTransactionFunction() assertNull( await( writeResult ) ); } + @Test + public void shouldReturnNoRecordsWhenConsumed() + { + String query = "UNWIND range(1, 5) AS x RETURN x"; + CompletionStage summaryAndRecordStage = session.runAsync( query ) + .thenCompose( cursor -> + { + CompletionStage consumeStage = cursor.consumeAsync(); + CompletionStage recordStage = cursor.nextAsync(); + return consumeStage.thenCombine( recordStage, SummaryAndRecords::new ); + } ); + + SummaryAndRecords result = await( summaryAndRecordStage ); + + assertEquals( query, result.summary.statement().text() ); + assertEquals( StatementType.READ_ONLY, result.summary.statementType() ); + + assertEquals( 1, result.records.size() ); + assertNull( result.records.get( 0 ) ); + } + + @Test + public void shouldStopReturningRecordsAfterConsumed() + { + String query = "UNWIND range(1, 5) AS x RETURN x"; + CompletionStage summaryAndRecordsStage = session.runAsync( query ) + .thenCompose( cursor -> cursor.nextAsync() // fetch just a single record + .thenCompose( record1 -> + { + // then consume rest + CompletionStage consumeStage = cursor.consumeAsync(); + // and try to fetch another record + CompletionStage record2Stage = cursor.nextAsync(); + return consumeStage.thenCombine( record2Stage, + ( summary, record2 ) -> new SummaryAndRecords( summary, record1, record2 ) ); + } ) ); + + SummaryAndRecords result = await( summaryAndRecordsStage ); + + assertEquals( query, result.summary.statement().text() ); + assertEquals( StatementType.READ_ONLY, result.summary.statementType() ); + + assertEquals( 2, result.records.size() ); + Record record1 = result.records.get( 0 ); + assertNotNull( record1 ); + assertEquals( 1, record1.get( 0 ).asInt() ); + Record record2 = result.records.get( 1 ); + assertNull( record2 ); + } + + @Test + public void shouldReturnEmptyListOfRecordsWhenConsumed() + { + String query = "UNWIND range(1, 5) AS x RETURN x"; + CompletionStage summaryAndRecordsStage = session.runAsync( query ) + .thenCompose( cursor -> + { + CompletionStage consumeStage = cursor.consumeAsync(); + CompletionStage> recordsStage = cursor.listAsync(); + return consumeStage.thenCombine( recordsStage, SummaryAndRecords::new ); + } ); + + SummaryAndRecords result = await( summaryAndRecordsStage ); + + assertEquals( query, result.summary.statement().text() ); + assertEquals( StatementType.READ_ONLY, result.summary.statementType() ); + assertEquals( emptyList(), result.records ); + } + private Future>> runNestedQueries( StatementResultCursor inputCursor ) { CompletableFuture>> resultFuture = new CompletableFuture<>(); @@ -1362,6 +1432,12 @@ private static class SummaryAndRecords final ResultSummary summary; final List records; + SummaryAndRecords( ResultSummary summary, Record... records ) + { + this.summary = summary; + this.records = Arrays.asList( records ); + } + SummaryAndRecords( ResultSummary summary, List records ) { this.summary = summary; diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 9ff8981aae..ab2ca6bbbb 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.List; +import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -68,6 +69,7 @@ import org.neo4j.driver.v1.util.TestUtil; import static java.lang.String.format; +import static java.util.Collections.emptyList; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; @@ -1539,6 +1541,89 @@ public void shouldAllowReturningNullFromTransactionFunction() } } + @Test + public void shouldAllowIteratingOverEmptyResult() + { + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( "UNWIND [] AS x RETURN x" ); + assertFalse( result.hasNext() ); + try + { + result.next(); + fail( "Exception expected" ); + } + catch ( NoSuchElementException ignore ) + { + } + } + } + + @Test + public void shouldAllowConsumingEmptyResult() + { + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( "UNWIND [] AS x RETURN x" ); + ResultSummary summary = result.consume(); + assertNotNull( summary ); + assertEquals( StatementType.READ_ONLY, summary.statementType() ); + } + } + + @Test + public void shouldAllowListEmptyResult() + { + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( "UNWIND [] AS x RETURN x" ); + assertEquals( emptyList(), result.list() ); + } + } + + @Test + public void shouldConsume() + { + try ( Session session = neo4j.driver().session() ) + { + String query = "UNWIND [1, 2, 3, 4, 5] AS x RETURN x"; + StatementResult result = session.run( query ); + + ResultSummary summary = result.consume(); + assertEquals( query, summary.statement().text() ); + assertEquals( StatementType.READ_ONLY, summary.statementType() ); + + assertFalse( result.hasNext() ); + assertEquals( emptyList(), result.list() ); + } + } + + @Test + public void shouldConsumeWithFailure() + { + try ( Session session = neo4j.driver().session() ) + { + String query = "UNWIND [1, 2, 3, 4, 0] AS x RETURN 10 / x"; + StatementResult result = session.run( query ); + + try + { + result.consume(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e, is( arithmeticError() ) ); + } + + assertFalse( result.hasNext() ); + assertEquals( emptyList(), result.list() ); + + ResultSummary summary = result.summary(); + assertEquals( query, summary.statement().text() ); + } + } + private void assumeServerIs31OrLater() { ServerVersion serverVersion = ServerVersion.version( neo4j.driver() );