diff --git a/driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java b/driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java index 28ab4c097d..b247fa6a21 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java +++ b/driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java @@ -25,6 +25,7 @@ import java.net.UnknownHostException; import static java.lang.String.format; +import static java.util.Objects.requireNonNull; /** * Holds a host and port pair that denotes a Bolt server address. @@ -49,23 +50,23 @@ public BoltServerAddress( URI uri ) public BoltServerAddress( String host, int port ) { - this.host = host; + this.host = requireNonNull( host ); this.port = port; } @Override - public boolean equals( Object obj ) + public boolean equals( Object o ) { - if ( this == obj ) + if ( this == o ) { return true; } - if ( !(obj instanceof BoltServerAddress) ) + if ( o == null || getClass() != o.getClass() ) { return false; } - BoltServerAddress address = (BoltServerAddress) obj; - return host.equals( address.host ) && port == address.port; + BoltServerAddress that = (BoltServerAddress) o; + return port == that.port && host.equals( that.host ); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/Bookmark.java b/driver/src/main/java/org/neo4j/driver/internal/Bookmark.java index 4c88203999..2c35af86cc 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/Bookmark.java +++ b/driver/src/main/java/org/neo4j/driver/internal/Bookmark.java @@ -19,7 +19,6 @@ package org.neo4j.driver.internal; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Objects; @@ -28,6 +27,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; +import static org.neo4j.driver.internal.util.Iterables.newHashMapWithSize; import static org.neo4j.driver.v1.Values.value; public final class Bookmark @@ -93,7 +93,7 @@ public Map asBeginTransactionParameters() // {bookmarks: ["one", "two", "max"]} for backwards compatibility reasons. Old servers can only accept single // bookmark that is why driver has to parse and compare given list of bookmarks. This functionality will // eventually be removed. - Map parameters = new HashMap<>( 4 ); + Map parameters = newHashMapWithSize( 2 ); parameters.put( BOOKMARK_KEY, value( maxValue ) ); parameters.put( BOOKMARKS_KEY, value( values ) ); return parameters; diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index a76890f580..3da5499813 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -47,6 +47,7 @@ import static java.util.Collections.emptyMap; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.neo4j.driver.internal.util.Futures.completedWithNull; import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.v1.Values.value; @@ -163,7 +164,7 @@ else if ( state == State.ACTIVE || state == State.MARKED_FAILED || state == Stat } else { - return completedFuture( null ); + return completedWithNull(); } } @@ -172,7 +173,7 @@ public CompletionStage commitAsync() { if ( state == State.COMMITTED ) { - return completedFuture( null ); + return completedWithNull(); } else if ( state == State.ROLLED_BACK ) { @@ -200,13 +201,13 @@ public CompletionStage rollbackAsync() } else if ( state == State.ROLLED_BACK ) { - return completedFuture( null ); + return completedWithNull(); } else if ( state == State.TERMINATED ) { // transaction has been terminated by RESET and should be rolled back by the database state = State.ROLLED_BACK; - return completedFuture( null ); + return completedWithNull(); } else { diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java index fbb5143803..0f45464b42 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java @@ -29,7 +29,7 @@ import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Session; -import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.neo4j.driver.internal.util.Futures.completedWithNull; public class InternalDriver implements Driver { @@ -116,7 +116,7 @@ public CompletionStage closeAsync() log.info( "Closing driver instance %s", this ); return sessionFactory.close(); } - return completedFuture( null ); + return completedWithNull(); } public CompletionStage verifyConnectivity() diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java index 6e2bd0d7a2..4db027d074 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java @@ -18,7 +18,6 @@ */ package org.neo4j.driver.internal; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -118,9 +117,7 @@ public CompletionStage> listAsync() @Override public CompletionStage> listAsync( Function mapFunction ) { - CompletableFuture> resultFuture = new CompletableFuture<>(); - internalListAsync( new ArrayList<>(), resultFuture, mapFunction ); - return resultFuture; + return pullAllHandler.listAsync( mapFunction ); } public CompletionStage failureAsync() @@ -160,40 +157,4 @@ else if ( record != null ) } } ); } - - private void internalListAsync( List result, CompletableFuture> resultFuture, - Function mapFunction ) - { - CompletionStage recordFuture = nextAsync(); - - // use async completion listener because of recursion, otherwise it is possible for - // the caller thread to get StackOverflowError when result is large and buffered - recordFuture.whenCompleteAsync( ( record, completionError ) -> - { - Throwable error = Futures.completionExceptionCause( completionError ); - if ( error != null ) - { - resultFuture.completeExceptionally( error ); - } - else if ( record != null ) - { - T value; - try - { - value = mapFunction.apply( record ); - } - catch ( Throwable mapError ) - { - resultFuture.completeExceptionally( mapError ); - return; - } - result.add( value ); - internalListAsync( result, resultFuture, mapFunction ); - } - else - { - resultFuture.complete( result ); - } - } ); - } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 292dc19f43..e13f0e56d7 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -46,7 +46,7 @@ import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.types.TypeSystem; -import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.neo4j.driver.internal.util.Futures.completedWithNull; import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.v1.Values.value; @@ -60,9 +60,9 @@ public class NetworkSession implements Session protected final Logger logger; private volatile Bookmark bookmark = Bookmark.empty(); - private volatile CompletionStage transactionStage = completedFuture( null ); - private volatile CompletionStage connectionStage = completedFuture( null ); - private volatile CompletionStage resultCursorStage = completedFuture( null ); + private volatile CompletionStage transactionStage = completedWithNull(); + private volatile CompletionStage connectionStage = completedWithNull(); + private volatile CompletionStage resultCursorStage = completedWithNull(); private final AtomicBoolean open = new AtomicBoolean( true ); @@ -168,7 +168,7 @@ public CompletionStage closeAsync() { if ( cursor == null ) { - return completedFuture( null ); + return completedWithNull(); } return cursor.failureAsync(); } ).thenCompose( error -> releaseResources().thenApply( ignore -> @@ -186,7 +186,7 @@ public CompletionStage closeAsync() } } ) ); } - return completedFuture( null ); + return completedWithNull(); } @Override @@ -274,10 +274,6 @@ public TypeSystem typeSystem() CompletionStage currentConnectionIsOpen() { - if ( connectionStage == null ) - { - return completedFuture( false ); - } return connectionStage.handle( ( connection, error ) -> error == null && // no acquisition error connection != null && // some connection has actually been acquired @@ -363,7 +359,7 @@ private CompletionStage safeExecuteWork( ExplicitTransaction tx, Transact CompletionStage result = work.execute( tx ); // protect from given transaction function returning null - return result == null ? completedFuture( null ) : result; + return result == null ? completedWithNull() : result; } catch ( Throwable workError ) { @@ -459,7 +455,7 @@ private CompletionStage acquireConnection( AccessMode mode ) { if ( cursor == null ) { - return completedFuture( null ); + return completedWithNull(); } // make sure previous result is fully consumed and connection is released back to the pool return cursor.failureAsync(); @@ -508,7 +504,7 @@ private CompletionStage rollbackTransaction() { return tx.rollbackAsync(); } - return completedFuture( null ); + return completedWithNull(); } ).exceptionally( error -> { Throwable cause = Futures.completionExceptionCause( error ); @@ -519,13 +515,13 @@ private CompletionStage rollbackTransaction() private CompletionStage releaseConnection() { - return existingConnectionOrNull().thenCompose( connection -> + return connectionStage.thenCompose( connection -> { if ( connection != null ) { return connection.release(); } - return completedFuture( null ); + return completedWithNull(); } ); } @@ -574,15 +570,10 @@ private CompletionStage ensureNoOpenTx( String errorMessage ) private CompletionStage existingTransactionOrNull() { return transactionStage - .exceptionally( error -> null ) // handle previous acquisition failures + .exceptionally( error -> null ) // handle previous connection acquisition and tx begin failures .thenApply( tx -> tx != null && tx.isOpen() ? tx : null ); } - private CompletionStage existingConnectionOrNull() - { - return connectionStage.exceptionally( error -> null ); // handle previous acquisition failures - } - private void ensureSessionIsOpen() { if ( !open.get() ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java b/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java index 62ae386ebb..f7b6516002 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java @@ -26,6 +26,7 @@ import org.neo4j.driver.internal.InternalStatementResultCursor; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.neo4j.driver.internal.util.Futures.completedWithNull; public class ResultCursorsHolder { @@ -41,14 +42,14 @@ public CompletionStage retrieveNotConsumedError() { return cursorStages.stream() .map( this::retrieveFailure ) - .reduce( completedFuture( null ), this::nonNullFailureFromEither ); + .reduce( completedWithNull(), this::nonNullFailureFromEither ); } private CompletionStage retrieveFailure( CompletionStage cursorStage ) { return cursorStage .exceptionally( cursor -> null ) - .thenCompose( cursor -> cursor == null ? completedFuture( null ) : cursor.failureAsync() ); + .thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.failureAsync() ); } private CompletionStage nonNullFailureFromEither( CompletionStage stage1, diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ActiveChannelTracker.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ActiveChannelTracker.java index 81741b45a0..64953bc8d8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ActiveChannelTracker.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ActiveChannelTracker.java @@ -20,10 +20,10 @@ import io.netty.channel.Channel; import io.netty.channel.pool.ChannelPoolHandler; -import io.netty.util.internal.ConcurrentSet; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.v1.Logger; @@ -33,12 +33,11 @@ public class ActiveChannelTracker implements ChannelPoolHandler { - private final ConcurrentMap> addressToActiveChannelCount; + private final Map addressToActiveChannelCount = new ConcurrentHashMap<>(); private final Logger log; public ActiveChannelTracker( Logging logging ) { - this.addressToActiveChannelCount = new ConcurrentHashMap<>(); this.log = logging.getLog( getClass().getSimpleName() ); } @@ -65,52 +64,25 @@ public void channelCreated( Channel channel ) public int activeChannelCount( BoltServerAddress address ) { - ConcurrentSet activeChannels = addressToActiveChannelCount.get( address ); - return activeChannels == null ? 0 : activeChannels.size(); - } - - public void purge( BoltServerAddress address ) - { - ConcurrentSet activeChannels = addressToActiveChannelCount.remove( address ); - if ( activeChannels != null ) - { - for ( Channel channel : activeChannels ) - { - channel.close(); - } - } + AtomicInteger count = addressToActiveChannelCount.get( address ); + return count == null ? 0 : count.get(); } private void channelActive( Channel channel ) { BoltServerAddress address = serverAddress( channel ); - ConcurrentSet activeChannels = addressToActiveChannelCount.get( address ); - if ( activeChannels == null ) - { - ConcurrentSet newActiveChannels = new ConcurrentSet<>(); - ConcurrentSet existingActiveChannels = addressToActiveChannelCount.putIfAbsent( address, - newActiveChannels ); - if ( existingActiveChannels == null ) - { - activeChannels = newActiveChannels; - } - else - { - activeChannels = existingActiveChannels; - } - } - - activeChannels.add( channel ); + AtomicInteger count = addressToActiveChannelCount.computeIfAbsent( address, k -> new AtomicInteger() ); + count.incrementAndGet(); } private void channelInactive( Channel channel ) { BoltServerAddress address = serverAddress( channel ); - ConcurrentSet activeChannels = addressToActiveChannelCount.get( address ); - if ( activeChannels == null ) + AtomicInteger count = addressToActiveChannelCount.get( address ); + if ( count == null ) { - throw new IllegalStateException( "No channels exist for address '" + address + "'" ); + throw new IllegalStateException( "No count exist for address '" + address + "'" ); } - activeChannels.remove( channel ); + count.decrementAndGet(); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java index af296ea674..9ce4337f5f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java @@ -38,6 +38,7 @@ import static java.lang.String.format; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.neo4j.driver.internal.util.Futures.completedWithNull; public class Rediscovery { @@ -178,7 +179,7 @@ private CompletionStage lookupOnKnownRouters( RoutingTable r { BoltServerAddress[] addresses = routingTable.routers().toArray(); - CompletableFuture result = completedFuture( null ); + CompletableFuture result = completedWithNull(); for ( BoltServerAddress address : addresses ) { result = result.thenCompose( composition -> @@ -203,7 +204,7 @@ private CompletionStage lookupOnInitialRouter( RoutingTable Set addresses = hostNameResolver.resolve( initialRouter ); addresses.removeAll( seenServers ); - CompletableFuture result = completedFuture( null ); + CompletableFuture result = completedWithNull(); for ( BoltServerAddress address : addresses ) { result = result.thenCompose( composition -> diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java index 14aefcabc5..97da4e9831 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java @@ -19,6 +19,8 @@ package org.neo4j.driver.internal.handlers; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.CompletableFuture; @@ -27,15 +29,18 @@ import org.neo4j.driver.internal.InternalRecord; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ResponseHandler; +import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.internal.util.MetadataUtil; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Statement; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.summary.ResultSummary; +import org.neo4j.driver.v1.util.Function; import static java.util.Collections.emptyMap; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.neo4j.driver.internal.util.Futures.completedWithNull; import static org.neo4j.driver.internal.util.Futures.failedFuture; public abstract class PullAllResponseHandler implements ResponseHandler @@ -54,7 +59,6 @@ public abstract class PullAllResponseHandler implements ResponseHandler private ResultSummary summary; private CompletableFuture recordFuture; - private CompletableFuture summaryFuture; private CompletableFuture failureFuture; public PullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler, Connection connection ) @@ -73,7 +77,6 @@ public synchronized void onSuccess( Map metadata ) afterSuccess(); completeRecordFuture( null ); - completeSummaryFuture( summary ); completeFailureFuture( null ); } @@ -90,26 +93,16 @@ public synchronized void onFailure( Throwable error ) boolean failedRecordFuture = failRecordFuture( error ); if ( failedRecordFuture ) { - // error propagated through record future, complete other two - completeSummaryFuture( summary ); + // error propagated through the record future completeFailureFuture( null ); } else { - boolean failedSummaryFuture = failSummaryFuture( error ); - if ( failedSummaryFuture ) + boolean completedFailureFuture = completeFailureFuture( error ); + if ( !completedFailureFuture ) { - // error propagated through summary future, complete other one - completeFailureFuture( null ); - } - else - { - boolean completedFailureFuture = completeFailureFuture( error ); - if ( !completedFailureFuture ) - { - // error has not been propagated to the user, remember it - failure = error; - } + // error has not been propagated to the user, remember it + failure = error; } } } @@ -120,7 +113,7 @@ public synchronized void onFailure( Throwable error ) public synchronized void onRecord( Value[] fields ) { Record record = new InternalRecord( runResponseHandler.statementKeys(), fields ); - queueRecord( record ); + enqueueRecord( record ); completeRecordFuture( record ); } @@ -136,7 +129,7 @@ public synchronized CompletionStage peekAsync() if ( finished ) { - return completedFuture( null ); + return completedWithNull(); } if ( recordFuture == null ) @@ -158,26 +151,26 @@ public synchronized CompletionStage nextAsync() public synchronized CompletionStage summaryAsync() { - if ( failure != null ) - { - return failedFuture( extractFailure() ); - } - else if ( summary != null ) + return failureAsync().thenApply( error -> { - return completedFuture( summary ); - } - else + if ( error != null ) + { + throw Futures.asCompletionException( error ); + } + return summary; + } ); + } + + public synchronized CompletionStage> listAsync( Function mapFunction ) + { + return failureAsync().thenApply( error -> { - if ( summaryFuture == null ) + if ( error != null ) { - // neither SUCCESS nor FAILURE message has arrived, register future to be notified when it arrives - // future will be completed with summary on SUCCESS and completed exceptionally on FAILURE - // enable auto-read, otherwise we might not read SUCCESS/FAILURE if records are not consumed - connection.enableAutoRead(); - summaryFuture = new CompletableFuture<>(); + throw Futures.asCompletionException( error ); } - return summaryFuture; - } + return recordsAsList( mapFunction ); + } ); } public synchronized CompletionStage failureAsync() @@ -188,7 +181,7 @@ public synchronized CompletionStage failureAsync() } else if ( finished ) { - return completedFuture( null ); + return completedWithNull(); } else { @@ -204,14 +197,14 @@ else if ( finished ) } } - private void queueRecord( Record record ) + private void enqueueRecord( Record record ) { records.add( record ); - boolean shouldBufferAllRecords = summaryFuture != null || failureFuture != null; - // when summary or failure is requested we have to buffer all remaining records and then return summary/failure + boolean shouldBufferAllRecords = failureFuture != null; + // when failure is requested we have to buffer all remaining records and then return the error // do not disable auto-read in this case, otherwise records will not be consumed and trailing - // SUCCESS or FAILURE message will not arrive as well, so callers will get stuck waiting for summary/failure + // SUCCESS or FAILURE message will not arrive as well, so callers will get stuck waiting for the error if ( !shouldBufferAllRecords && records.size() > RECORD_BUFFER_HIGH_WATERMARK ) { // more than high watermark records are already queued, tell connection to stop auto-reading from network @@ -235,6 +228,22 @@ private Record dequeueRecord() return record; } + private List recordsAsList( Function mapFunction ) + { + if ( !finished ) + { + throw new IllegalStateException( "Can't get records as list because SUCCESS or FAILURE did not arrive" ); + } + + List result = new ArrayList<>( records.size() ); + while ( !records.isEmpty() ) + { + Record record = records.poll(); + result.add( mapFunction.apply( record ) ); + } + return result; + } + private Throwable extractFailure() { if ( failure == null ) @@ -269,28 +278,6 @@ private boolean failRecordFuture( Throwable error ) return false; } - private void completeSummaryFuture( ResultSummary summary ) - { - if ( summaryFuture != null ) - { - CompletableFuture future = summaryFuture; - summaryFuture = null; - future.complete( summary ); - } - } - - private boolean failSummaryFuture( Throwable error ) - { - if ( summaryFuture != null ) - { - CompletableFuture future = summaryFuture; - summaryFuture = null; - future.completeExceptionally( error ); - return true; - } - return false; - } - private boolean completeFailureFuture( Throwable error ) { if ( failureFuture != null ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java index 5baa35cf1c..ae5691464a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -73,8 +73,6 @@ public class PackStreamMessageFormatV1 implements MessageFormat public static final int NODE_FIELDS = 3; - private static final Map EMPTY_STRING_VALUE_MAP = new HashMap<>( 0 ); - @Override public MessageFormat.Writer newWriter( PackOutput output, boolean byteArraySupportEnabled ) { @@ -543,7 +541,7 @@ private InternalNode unpackNode() throws IOException labels.add( unpacker.unpackString() ); } int numProps = (int) unpacker.unpackMapHeader(); - Map props = new HashMap<>(); + Map props = Iterables.newHashMapWithSize( numProps ); for ( int j = 0; j < numProps; j++ ) { String key = unpacker.unpackString(); @@ -636,9 +634,9 @@ private Map unpackMap() throws IOException int size = (int) unpacker.unpackMapHeader(); if ( size == 0 ) { - return EMPTY_STRING_VALUE_MAP; + return Collections.emptyMap(); } - Map map = new HashMap<>( size ); + Map map = Iterables.newHashMapWithSize( size ); for ( int i = 0; i < size; i++ ) { String key = unpacker.unpackString(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java b/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java index 7f4c44efda..d9fa927a1a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java @@ -30,12 +30,22 @@ import org.neo4j.driver.internal.async.EventLoopGroupFactory; +import static java.util.concurrent.CompletableFuture.completedFuture; + public final class Futures { + private static final CompletableFuture COMPLETED_WITH_NULL = completedFuture( null ); + private Futures() { } + @SuppressWarnings( "unchecked" ) + public static CompletableFuture completedWithNull() + { + return (CompletableFuture) COMPLETED_WITH_NULL; + } + public static CompletionStage asCompletionStage( io.netty.util.concurrent.Future future ) { CompletableFuture result = new CompletableFuture<>(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/Iterables.java b/driver/src/main/java/org/neo4j/driver/internal/util/Iterables.java index 37a673ecee..f971601821 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/Iterables.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/Iterables.java @@ -29,6 +29,8 @@ public class Iterables { + private static final float DEFAULT_HASH_MAP_LOAD_FACTOR = 0.75F; + public static int count( Iterable it ) { if ( it instanceof Collection ) { return ((Collection) it).size(); } @@ -68,7 +70,7 @@ public static T single( Iterable it ) public static Map map( String ... alternatingKeyValue ) { - Map out = new HashMap<>(); + Map out = newHashMapWithSize( alternatingKeyValue.length / 2 ); for ( int i = 0; i < alternatingKeyValue.length; i+=2 ) { out.put( alternatingKeyValue[i], alternatingKeyValue[i+1] ); @@ -107,4 +109,22 @@ public void remove() } }; } + + public static HashMap newHashMapWithSize( int expectedSize ) + { + return new HashMap<>( hashMapCapacity( expectedSize ) ); + } + + private static int hashMapCapacity( int expectedSize ) + { + if ( expectedSize < 3 ) + { + if ( expectedSize < 0 ) + { + throw new IllegalArgumentException( "Illegal map size: " + expectedSize ); + } + return expectedSize + 1; + } + return (int) ((float) expectedSize / DEFAULT_HASH_MAP_LOAD_FACTOR + 1.0F); + } } diff --git a/driver/src/main/java/org/neo4j/driver/v1/Statement.java b/driver/src/main/java/org/neo4j/driver/v1/Statement.java index 6d51a31ba6..471185da45 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Statement.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Statement.java @@ -18,15 +18,15 @@ */ package org.neo4j.driver.v1; -import java.util.HashMap; import java.util.Map; import org.neo4j.driver.v1.summary.ResultSummary; import org.neo4j.driver.v1.util.Immutable; import static java.lang.String.format; -import static org.neo4j.driver.v1.Values.value; +import static org.neo4j.driver.internal.util.Iterables.newHashMapWithSize; import static org.neo4j.driver.v1.Values.ofValue; +import static org.neo4j.driver.v1.Values.value; /** * An executable statement, i.e. the statements' text and its parameters. @@ -136,7 +136,7 @@ public Statement withUpdatedParameters( Value updates ) } else { - Map newParameters = new HashMap<>( Math.max( parameters.size(), updates.size() ) ); + Map newParameters = newHashMapWithSize( Math.max( parameters.size(), updates.size() ) ); newParameters.putAll( parameters.asMap( ofValue() ) ); for ( Map.Entry entry : updates.asMap( ofValue() ).entrySet() ) { diff --git a/driver/src/main/java/org/neo4j/driver/v1/Values.java b/driver/src/main/java/org/neo4j/driver/v1/Values.java index 0719719204..dfa469736a 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Values.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Values.java @@ -44,6 +44,8 @@ import org.neo4j.driver.v1.types.TypeSystem; import org.neo4j.driver.v1.util.Function; +import static org.neo4j.driver.internal.util.Iterables.newHashMapWithSize; + /** * Utility for wrapping regular Java types and exposing them as {@link Value} * objects, and vice versa. @@ -247,7 +249,7 @@ public static Value value( final boolean val ) public static Value value( final Map val ) { - Map asValues = new HashMap<>( val.size() ); + Map asValues = newHashMapWithSize( val.size() ); for ( Map.Entry entry : val.entrySet() ) { asValues.put( entry.getKey(), value( entry.getValue() ) ); @@ -284,7 +286,7 @@ public static Value parameters( Object... keysAndValues ) "alternating key and value. Arguments were: " + Arrays.toString( keysAndValues ) + "." ); } - HashMap map = new HashMap<>( keysAndValues.length / 2 ); + HashMap map = newHashMapWithSize( keysAndValues.length / 2 ); for ( int i = 0; i < keysAndValues.length; i += 2 ) { Object value = keysAndValues[i + 1]; diff --git a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java index 098cd5679c..abaf02a103 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java @@ -56,6 +56,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.util.Futures.completedWithNull; import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.v1.AccessMode.READ; import static org.neo4j.driver.v1.Config.defaultConfig; @@ -143,8 +144,8 @@ public void usesLeakLoggingSessionFactoryWhenConfigured() public void shouldVerifyConnectivity() { SessionFactory sessionFactory = mock( SessionFactory.class ); - when( sessionFactory.verifyConnectivity() ).thenReturn( completedFuture( null ) ); - when( sessionFactory.close() ).thenReturn( completedFuture( null ) ); + when( sessionFactory.verifyConnectivity() ).thenReturn( completedWithNull() ); + when( sessionFactory.close() ).thenReturn( completedWithNull() ); DriverFactoryWithSessions driverFactory = new DriverFactoryWithSessions( sessionFactory ); try ( Driver driver = createDriver( driverFactory ) ) @@ -160,7 +161,7 @@ public void shouldThrowWhenUnableToVerifyConnectivity() SessionFactory sessionFactory = mock( SessionFactory.class ); ServiceUnavailableException error = new ServiceUnavailableException( "Hello" ); when( sessionFactory.verifyConnectivity() ).thenReturn( failedFuture( error ) ); - when( sessionFactory.close() ).thenReturn( completedFuture( null ) ); + when( sessionFactory.close() ).thenReturn( completedWithNull() ); DriverFactoryWithSessions driverFactory = new DriverFactoryWithSessions( sessionFactory ); try @@ -191,7 +192,7 @@ private static ConnectionPool connectionPoolMock() ConnectionPool pool = mock( ConnectionPool.class ); Connection connection = mock( Connection.class ); when( pool.acquire( any( BoltServerAddress.class ) ) ).thenReturn( completedFuture( connection ) ); - when( pool.close() ).thenReturn( completedFuture( null ) ); + when( pool.close() ).thenReturn( completedWithNull() ); return pool; } @@ -234,7 +235,7 @@ private static class SessionFactoryCapturingDriverFactory extends DriverFactory protected InternalDriver createDriver( SessionFactory sessionFactory, SecurityPlan securityPlan, Config config ) { InternalDriver driver = mock( InternalDriver.class ); - when( driver.verifyConnectivity() ).thenReturn( completedFuture( null ) ); + when( driver.verifyConnectivity() ).thenReturn( completedWithNull() ); return driver; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java index f62ab78dac..37a93c9909 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java @@ -24,13 +24,13 @@ import org.neo4j.driver.internal.security.SecurityPlan; -import static java.util.concurrent.CompletableFuture.completedFuture; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.internal.util.Futures.completedWithNull; import static org.neo4j.driver.v1.util.TestUtil.await; public class InternalDriverTest @@ -62,7 +62,7 @@ public void shouldNotCloseSessionFactoryMultipleTimes() public void shouldVerifyConnectivity() { SessionFactory sessionFactory = sessionFactoryMock(); - CompletableFuture connectivityStage = completedFuture( null ); + CompletableFuture connectivityStage = completedWithNull(); when( sessionFactory.verifyConnectivity() ).thenReturn( connectivityStage ); InternalDriver driver = newDriver( sessionFactory ); @@ -78,7 +78,7 @@ private static InternalDriver newDriver( SessionFactory sessionFactory ) private static SessionFactory sessionFactoryMock() { SessionFactory sessionFactory = mock( SessionFactory.class ); - when( sessionFactory.close() ).thenReturn( completedFuture( null ) ); + when( sessionFactory.close() ).thenReturn( completedWithNull() ); return sessionFactory; } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultCursorTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultCursorTest.java index a47a5356f9..4543c2db77 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultCursorTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultCursorTest.java @@ -38,6 +38,8 @@ import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.summary.ResultSummary; import org.neo4j.driver.v1.summary.StatementType; +import org.neo4j.driver.v1.util.Function; +import org.neo4j.driver.v1.util.Functions; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; @@ -53,6 +55,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.util.Futures.completedWithNull; +import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.v1.Values.value; import static org.neo4j.driver.v1.Values.values; import static org.neo4j.driver.v1.util.TestUtil.await; @@ -106,7 +110,7 @@ public void shouldReturnNextExistingRecord() public void shouldReturnNextNonExistingRecord() { PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( null ) ); + when( pullAllHandler.nextAsync() ).thenReturn( completedWithNull() ); InternalStatementResultCursor cursor = newCursor( pullAllHandler ); @@ -130,7 +134,7 @@ public void shouldPeekExistingRecord() public void shouldPeekNonExistingRecord() { PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - when( pullAllHandler.peekAsync() ).thenReturn( completedFuture( null ) ); + when( pullAllHandler.peekAsync() ).thenReturn( completedWithNull() ); InternalStatementResultCursor cursor = newCursor( pullAllHandler ); @@ -144,7 +148,7 @@ public void shouldReturnSingleRecord() Record record = new InternalRecord( asList( "key1", "key2" ), values( 42, 42 ) ); when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record ) ) - .thenReturn( completedFuture( null ) ); + .thenReturn( completedWithNull() ); InternalStatementResultCursor cursor = newCursor( pullAllHandler ); @@ -155,7 +159,7 @@ public void shouldReturnSingleRecord() public void shouldFailWhenAskedForSingleRecordButResultIsEmpty() { PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( null ) ); + when( pullAllHandler.nextAsync() ).thenReturn( completedWithNull() ); InternalStatementResultCursor cursor = newCursor( pullAllHandler ); @@ -203,7 +207,7 @@ public void shouldConsumeAsyncWhenResultContainsMultipleRecords() Record record3 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 3, 3, 3 ) ); when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record1 ) ) .thenReturn( completedFuture( record2 ) ).thenReturn( completedFuture( record3 ) ) - .thenReturn( completedFuture( null ) ); + .thenReturn( completedWithNull() ); ResultSummary summary = mock( ResultSummary.class ); when( pullAllHandler.summaryAsync() ).thenReturn( completedFuture( summary ) ); @@ -221,7 +225,7 @@ public void shouldConsumeAsyncWhenResultContainsOneRecords() Record record = new InternalRecord( asList( "key1", "key2" ), values( 1, 1 ) ); when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record ) ) - .thenReturn( completedFuture( null ) ); + .thenReturn( completedWithNull() ); ResultSummary summary = mock( ResultSummary.class ); when( pullAllHandler.summaryAsync() ).thenReturn( completedFuture( summary ) ); @@ -236,7 +240,7 @@ public void shouldConsumeAsyncWhenResultContainsOneRecords() public void shouldConsumeAsyncWhenResultContainsNoRecords() { PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( null ) ); + when( pullAllHandler.nextAsync() ).thenReturn( completedWithNull() ); ResultSummary summary = mock( ResultSummary.class ); when( pullAllHandler.summaryAsync() ).thenReturn( completedFuture( summary ) ); @@ -257,7 +261,7 @@ public void shouldForEachAsyncWhenResultContainsMultipleRecords() Record record3 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 3, 3, 3 ) ); when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record1 ) ) .thenReturn( completedFuture( record2 ) ).thenReturn( completedFuture( record3 ) ) - .thenReturn( completedFuture( null ) ); + .thenReturn( completedWithNull() ); ResultSummary summary = mock( ResultSummary.class ); when( pullAllHandler.summaryAsync() ).thenReturn( completedFuture( summary ) ); @@ -278,7 +282,7 @@ public void shouldForEachAsyncWhenResultContainsOneRecords() Record record = new InternalRecord( asList( "key1", "key2", "key3" ), values( 1, 1, 1 ) ); when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record ) ) - .thenReturn( completedFuture( null ) ); + .thenReturn( completedWithNull() ); ResultSummary summary = mock( ResultSummary.class ); when( pullAllHandler.summaryAsync() ).thenReturn( completedFuture( summary ) ); @@ -296,7 +300,7 @@ public void shouldForEachAsyncWhenResultContainsOneRecords() public void shouldForEachAsyncWhenResultContainsNoRecords() { PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( null ) ); + when( pullAllHandler.nextAsync() ).thenReturn( completedWithNull() ); ResultSummary summary = mock( ResultSummary.class ); when( pullAllHandler.summaryAsync() ).thenReturn( completedFuture( summary ) ); @@ -320,7 +324,7 @@ public void shouldFailForEachWhenGivenActionThrows() Record record3 = new InternalRecord( asList( "key1", "key2" ), values( 3, 3 ) ); when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record1 ) ) .thenReturn( completedFuture( record2 ) ).thenReturn( completedFuture( record3 ) ) - .thenReturn( completedFuture( null ) ); + .thenReturn( completedWithNull() ); InternalStatementResultCursor cursor = newCursor( pullAllHandler ); @@ -353,159 +357,102 @@ public void shouldFailForEachWhenGivenActionThrows() } @Test - public void shouldListAsyncWhenResultContainsMultipleRecords() + public void shouldReturnFailureWhenExists() { PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - Record record1 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 1, 1, 1 ) ); - Record record2 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 2, 2, 2 ) ); - Record record3 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 3, 3, 3 ) ); - Record record4 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 4, 4, 4 ) ); - when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record1 ) ) - .thenReturn( completedFuture( record2 ) ).thenReturn( completedFuture( record3 ) ) - .thenReturn( completedFuture( record4 ) ).thenReturn( completedFuture( null ) ); + ServiceUnavailableException error = new ServiceUnavailableException( "Hi" ); + when( pullAllHandler.failureAsync() ).thenReturn( completedFuture( error ) ); InternalStatementResultCursor cursor = newCursor( pullAllHandler ); - assertEquals( asList( record1, record2, record3, record4 ), await( cursor.listAsync() ) ); + assertEquals( error, await( cursor.failureAsync() ) ); } @Test - public void shouldListAsyncWhenResultContainsOneRecords() + public void shouldReturnNullFailureWhenDoesNotExist() { PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - - Record record = new InternalRecord( asList( "key1", "key2", "key3" ), values( 1, 1, 1 ) ); - when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record ) ) - .thenReturn( completedFuture( null ) ); + when( pullAllHandler.failureAsync() ).thenReturn( completedWithNull() ); InternalStatementResultCursor cursor = newCursor( pullAllHandler ); - assertEquals( singletonList( record ), await( cursor.listAsync() ) ); + assertNull( await( cursor.failureAsync() ) ); } @Test - public void shouldListAsyncWhenResultContainsNoRecords() + public void shouldListAsyncWithoutMapFunction() { PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( null ) ); - - InternalStatementResultCursor cursor = newCursor( pullAllHandler ); - - assertEquals( 0, await( cursor.listAsync() ).size() ); - } - @Test - public void shouldListAsyncWithFunctionWhenResultContainsMultipleRecords() - { - PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); + Record record1 = new InternalRecord( asList( "key1", "key2" ), values( 1, 1 ) ); + Record record2 = new InternalRecord( asList( "key1", "key2" ), values( 2, 2 ) ); + List records = asList( record1, record2 ); - Record record1 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 1, 11, 111 ) ); - Record record2 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 2, 22, 222 ) ); - Record record3 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 3, 33, 333 ) ); - Record record4 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 4, 44, 444 ) ); - when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record1 ) ) - .thenReturn( completedFuture( record2 ) ).thenReturn( completedFuture( record3 ) ) - .thenReturn( completedFuture( record4 ) ).thenReturn( completedFuture( null ) ); + when( pullAllHandler.listAsync( Functions.identity() ) ).thenReturn( completedFuture( records ) ); InternalStatementResultCursor cursor = newCursor( pullAllHandler ); - List values = await( cursor.listAsync( record -> record.get( "key2" ).asInt() ) ); - assertEquals( asList( 11, 22, 33, 44 ), values ); + assertEquals( records, await( cursor.listAsync() ) ); + verify( pullAllHandler ).listAsync( Functions.identity() ); } @Test - public void shouldListAsyncWithFunctionWhenResultContainsOneRecords() + public void shouldListAsyncWithMapFunction() { + Function mapFunction = record -> record.get( 0 ).asString(); PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - Record singleRecord = new InternalRecord( asList( "key1", "key2", "key3" ), values( 1, 11, 111 ) ); - when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( singleRecord ) ) - .thenReturn( completedFuture( null ) ); + List values = asList( "a", "b", "c", "d", "e" ); + when( pullAllHandler.listAsync( mapFunction ) ).thenReturn( completedFuture( values ) ); InternalStatementResultCursor cursor = newCursor( pullAllHandler ); - List values = await( cursor.listAsync( record -> record.get( "key3" ).asLong() ) ); - assertEquals( singletonList( 111L ), values ); + assertEquals( values, await( cursor.listAsync( mapFunction ) ) ); + verify( pullAllHandler ).listAsync( mapFunction ); } @Test - public void shouldListAsyncWithFunctionWhenResultContainsNoRecords() + public void shouldPropagateFailureFromListAsyncWithoutMapFunction() { PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( null ) ); - - InternalStatementResultCursor cursor = newCursor( pullAllHandler ); - - List values = await( cursor.listAsync( record -> record.get( "key42" ).asString() ) ); - assertEquals( 0, values.size() ); - } - - @Test - public void shouldFailListAsyncWhenGivenFunctionThrows() - { - PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - - Record record1 = new InternalRecord( asList( "key1", "key2" ), values( 1, 1 ) ); - Record record2 = new InternalRecord( asList( "key1", "key2" ), values( 2, 2 ) ); - Record record3 = new InternalRecord( asList( "key1", "key2" ), values( 3, 3 ) ); - when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record1 ) ) - .thenReturn( completedFuture( record2 ) ).thenReturn( completedFuture( record3 ) ) - .thenReturn( completedFuture( null ) ); + RuntimeException error = new RuntimeException( "Hi" ); + when( pullAllHandler.listAsync( Functions.identity() ) ).thenReturn( failedFuture( error ) ); InternalStatementResultCursor cursor = newCursor( pullAllHandler ); - AtomicInteger recordsProcessed = new AtomicInteger(); - RuntimeException error = new RuntimeException( "Hello" ); - - CompletionStage> stage = cursor.listAsync( record -> - { - if ( record.get( "key1" ).asInt() == 2 ) - { - throw error; - } - else - { - recordsProcessed.incrementAndGet(); - return record.get( "key1" ).asInt(); - } - } ); - try { - await( stage ); + await( cursor.listAsync() ); fail( "Exception expected" ); } catch ( RuntimeException e ) { assertEquals( error, e ); } - assertEquals( 1, recordsProcessed.get() ); - verify( pullAllHandler, times( 2 ) ).nextAsync(); + verify( pullAllHandler ).listAsync( Functions.identity() ); } @Test - public void shouldReturnFailureWhenExists() + public void shouldPropagateFailureFromListAsyncWithMapFunction() { + Function mapFunction = record -> record.get( 0 ).asString(); PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - - ServiceUnavailableException error = new ServiceUnavailableException( "Hi" ); - when( pullAllHandler.failureAsync() ).thenReturn( completedFuture( error ) ); + RuntimeException error = new RuntimeException( "Hi" ); + when( pullAllHandler.listAsync( mapFunction ) ).thenReturn( failedFuture( error ) ); InternalStatementResultCursor cursor = newCursor( pullAllHandler ); - assertEquals( error, await( cursor.failureAsync() ) ); - } - - @Test - public void shouldReturnNullFailureWhenDoesNotExist() - { - PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class ); - when( pullAllHandler.failureAsync() ).thenReturn( completedFuture( null ) ); - - InternalStatementResultCursor cursor = newCursor( pullAllHandler ); - - assertNull( await( cursor.failureAsync() ) ); + try + { + await( cursor.listAsync( mapFunction ) ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( error, e ); + } + verify( pullAllHandler ).listAsync( mapFunction ); } private static InternalStatementResultCursor newCursor( PullAllResponseHandler pullAllHandler ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 1791520704..65b843bf50 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -70,6 +70,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.internal.util.Futures.completedWithNull; import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.v1.AccessMode.READ; import static org.neo4j.driver.v1.AccessMode.WRITE; @@ -89,7 +90,7 @@ public class NetworkSessionTest public void setUp() { connection = connectionMock(); - when( connection.release() ).thenReturn( completedFuture( null ) ); + when( connection.release() ).thenReturn( completedWithNull() ); when( connection.serverAddress() ).thenReturn( BoltServerAddress.LOCAL_DEFAULT ); when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 ); connectionProvider = mock( ConnectionProvider.class ); @@ -672,7 +673,7 @@ public void shouldMarkTransactionAsTerminatedAndThenReleaseConnectionOnReset() { // verify that tx is not open when connection is released assertFalse( tx.isOpen() ); - return completedFuture( null ); + return completedWithNull(); } ); session.reset(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java index 3780f311c9..d599367f34 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java @@ -63,7 +63,7 @@ public class ChannelConnectorImplTest { private final TestNeo4j neo4j = new TestNeo4j(); @Rule - public final RuleChain ruleChain = RuleChain.outerRule( Timeout.seconds( 20 ) ).around( neo4j ); + public final RuleChain ruleChain = RuleChain.outerRule( Timeout.seconds( 60 ) ).around( neo4j ); private Bootstrap bootstrap; diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ActiveChannelTrackerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ActiveChannelTrackerTest.java index 1cc874534e..ea22d317ee 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ActiveChannelTrackerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ActiveChannelTrackerTest.java @@ -26,12 +26,10 @@ import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.neo4j.driver.internal.async.ChannelAttributes.setServerAddress; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; -import static org.neo4j.driver.v1.util.TestUtil.await; public class ActiveChannelTrackerTest { @@ -106,35 +104,6 @@ public void shouldReturnZeroActiveCountForUnknownAddress() assertEquals( 0, tracker.activeChannelCount( address ) ); } - @Test - public void shouldPruneForMissingAddress() - { - assertEquals( 0, tracker.activeChannelCount( address ) ); - tracker.purge( address ); - assertEquals( 0, tracker.activeChannelCount( address ) ); - } - - @Test - public void shouldPruneForExistingAddress() - { - Channel channel1 = newChannel(); - Channel channel2 = newChannel(); - Channel channel3 = newChannel(); - - tracker.channelAcquired( channel1 ); - tracker.channelAcquired( channel2 ); - tracker.channelAcquired( channel3 ); - - assertEquals( 3, tracker.activeChannelCount( address ) ); - - tracker.purge( address ); - - assertEquals( 0, tracker.activeChannelCount( address ) ); - assertNull( await( channel1.closeFuture() ) ); - assertNull( await( channel2.closeFuture() ) ); - assertNull( await( channel3.closeFuture() ) ); - } - private Channel newChannel() { EmbeddedChannel channel = new EmbeddedChannel(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunnerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunnerTest.java index 10bc68459a..92ce06e722 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunnerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunnerTest.java @@ -45,6 +45,7 @@ import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.GET_ROUTING_TABLE; import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.GET_ROUTING_TABLE_PARAM; import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.GET_SERVERS; +import static org.neo4j.driver.internal.util.Futures.completedWithNull; import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.internal.util.ServerVersion.version; import static org.neo4j.driver.v1.Values.parameters; @@ -183,7 +184,7 @@ public void shouldPropagateReleaseError() private static CompletionStage connectionStage( String serverVersion ) { - return connectionStage( serverVersion, completedFuture( null ) ); + return connectionStage( serverVersion, completedWithNull() ); } private static CompletionStage connectionStage( String serverVersion, diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTest.java index 7a4a8c7273..65997201fa 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTest.java @@ -27,18 +27,22 @@ import java.util.concurrent.CompletionStage; import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.InternalRecord; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Statement; +import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.SessionExpiredException; import org.neo4j.driver.v1.summary.ResultSummary; import org.neo4j.driver.v1.summary.StatementType; +import org.neo4j.driver.v1.util.Functions; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -819,6 +823,126 @@ public void shouldEnableAutoReadOnConnectionWhenSummaryRequestedButNotAvailable( assertNotNull( summaryFuture.get() ); } + @Test + public void shouldPropagateFailureFromListAsync() + { + PullAllResponseHandler handler = newHandler(); + RuntimeException error = new RuntimeException( "Hi!" ); + handler.onFailure( error ); + + try + { + await( handler.listAsync( Functions.identity() ) ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( error, e ); + } + } + + @Test + public void shouldPropagateFailureAfterRecordFromListAsync() + { + PullAllResponseHandler handler = newHandler( asList( "key1", "key2" ) ); + + handler.onRecord( values( "a", "b" ) ); + + RuntimeException error = new RuntimeException( "Hi!" ); + handler.onFailure( error ); + + try + { + await( handler.listAsync( Functions.identity() ) ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( error, e ); + } + } + + @Test + public void shouldFailListAsyncWhenTransformationFunctionThrows() + { + PullAllResponseHandler handler = newHandler( asList( "key1", "key2" ) ); + handler.onRecord( values( 1, 2 ) ); + handler.onRecord( values( 3, 4 ) ); + handler.onSuccess( emptyMap() ); + + RuntimeException error = new RuntimeException( "Hi!" ); + + CompletionStage> stage = handler.listAsync( record -> + { + if ( record.get( 1 ).asInt() == 4 ) + { + throw error; + } + return 42; + } ); + + try + { + await( stage ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( error, e ); + } + } + + @Test + public void shouldReturnEmptyListInListAsync() + { + PullAllResponseHandler handler = newHandler(); + + handler.onSuccess( emptyMap() ); + + assertEquals( emptyList(), await( handler.listAsync( Functions.identity() ) ) ); + } + + @Test + public void shouldReturnTransformedListInListAsync() + { + PullAllResponseHandler handler = newHandler( singletonList( "key1" ) ); + + handler.onRecord( values( 1 ) ); + handler.onRecord( values( 2 ) ); + handler.onRecord( values( 3 ) ); + handler.onRecord( values( 4 ) ); + handler.onSuccess( emptyMap() ); + + List transformedList = await( handler.listAsync( record -> record.get( 0 ).asInt() * 2 ) ); + + assertEquals( asList( 2, 4, 6, 8 ), transformedList ); + } + + @Test + public void shouldReturnNotTransformedListInListAsync() + { + List keys = asList( "key1", "key2" ); + PullAllResponseHandler handler = newHandler( keys ); + + Value[] fields1 = values( "a", "b" ); + Value[] fields2 = values( "c", "d" ); + Value[] fields3 = values( "e", "f" ); + + handler.onRecord( fields1 ); + handler.onRecord( fields2 ); + handler.onRecord( fields3 ); + handler.onSuccess( emptyMap() ); + + List list = await( handler.listAsync( Functions.identity() ) ); + + List expectedRecords = asList( + new InternalRecord( keys, fields1 ), + new InternalRecord( keys, fields2 ), + new InternalRecord( keys, fields3 ) ); + + assertEquals( expectedRecords, list ); + } + private static PullAllResponseHandler newHandler() { return newHandler( new Statement( "RETURN 1" ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/IterablesTest.java b/driver/src/test/java/org/neo4j/driver/internal/util/IterablesTest.java new file mode 100644 index 0000000000..c717b79d42 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/util/IterablesTest.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public class IterablesTest +{ + @Test + public void shouldCreateHashMapWithExpectedSize() + { + assertNotNull( Iterables.newHashMapWithSize( 42 ) ); + } + + @Test + public void shouldThrowWhenNegativeHashMapSizeGiven() + { + try + { + Iterables.newHashMapWithSize( -42 ); + fail( "Exception expected" ); + } + catch ( IllegalArgumentException ignore ) + { + } + } +}