Skip to content

Optimized #consumeAsync() and couple other small things #452

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.net.URI;
import java.net.UnknownHostException;

import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -37,6 +36,7 @@ public class BoltServerAddress

private final String host;
private final int port;
private final String stringValue;

public BoltServerAddress( String address )
{
Expand All @@ -52,6 +52,7 @@ public BoltServerAddress( String host, int port )
{
this.host = requireNonNull( host );
this.port = port;
this.stringValue = String.format( "%s:%d", host, port );
}

@Override
Expand All @@ -78,7 +79,7 @@ public int hashCode()
@Override
public String toString()
{
return format( "%s:%d", host, port );
return stringValue;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,7 @@ private CompletionStage<InternalStatementResultCursor> run( Statement statement,
{
ensureCanRunQueries();
CompletionStage<InternalStatementResultCursor> cursorStage =
QueryRunner.runInTransaction( connection, statement,
this, waitForRunResponse );
QueryRunner.runInTransaction( connection, statement, this, waitForRunResponse );
resultCursors.add( cursorStage );
return cursorStage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,7 @@ public CompletionStage<Record> singleAsync()
@Override
public CompletionStage<ResultSummary> consumeAsync()
{
return forEachAsync( record ->
{
} );
return pullAllHandler.consumeAsync();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.netty.buffer.ByteBuf;

import static io.netty.buffer.Unpooled.copyInt;
import static io.netty.buffer.Unpooled.copyShort;
import static io.netty.buffer.Unpooled.unreleasableBuffer;

public final class BoltProtocolV1Util
Expand All @@ -41,12 +40,7 @@ public final class BoltProtocolV1Util
PROTOCOL_VERSION_1,
NO_PROTOCOL_VERSION,
NO_PROTOCOL_VERSION,
NO_PROTOCOL_VERSION ) )
.asReadOnly();

private static final ByteBuf MESSAGE_BOUNDARY_BUF = unreleasableBuffer( copyShort( 0 ) ).asReadOnly();

private static final ByteBuf CHUNK_HEADER_PLACEHOLDER_BUF = unreleasableBuffer( copyShort( 0 ) ).asReadOnly();
NO_PROTOCOL_VERSION ) ).asReadOnly();

private BoltProtocolV1Util()
{
Expand All @@ -62,13 +56,18 @@ public static String handshakeString()
return "[0x6060B017, 1, 0, 0, 0]";
}

public static ByteBuf messageBoundary()
public static void writeMessageBoundary( ByteBuf buf )
{
buf.writeShort( 0 );
}

public static void writeEmptyChunkHeader( ByteBuf buf )
{
return MESSAGE_BOUNDARY_BUF.duplicate();
buf.writeShort( 0 );
}

public static ByteBuf chunkHeaderPlaceholder()
public static void writeChunkHeader( ByteBuf buf, int chunkStartIndex, int headerValue )
{
return CHUNK_HEADER_PLACEHOLDER_BUF.duplicate();
buf.setShort( chunkStartIndex, headerValue );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@

import io.netty.buffer.ByteBuf;

import org.neo4j.driver.internal.async.BoltProtocolV1Util;
import org.neo4j.driver.internal.packstream.PackOutput;

import static java.util.Objects.requireNonNull;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.CHUNK_HEADER_SIZE_BYTES;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.DEFAULT_MAX_OUTBOUND_CHUNK_SIZE_BYTES;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.chunkHeaderPlaceholder;

public class ChunkAwareByteBufOutput implements PackOutput
{
Expand Down Expand Up @@ -138,15 +138,15 @@ private void ensureCanFitInCurrentChunk( int numberOfBytes )
private void startNewChunk( int index )
{
currentChunkStartIndex = index;
buf.writeBytes( chunkHeaderPlaceholder() );
BoltProtocolV1Util.writeEmptyChunkHeader( buf );
currentChunkSize = CHUNK_HEADER_SIZE_BYTES;
}

private void writeChunkSizeHeader()
{
// go to the beginning of the chunk and write 2 byte size header
// go to the beginning of the chunk and write the size header
int chunkBodySize = currentChunkSize - CHUNK_HEADER_SIZE_BYTES;
buf.setShort( currentChunkStartIndex, chunkBodySize );
BoltProtocolV1Util.writeChunkHeader( buf, currentChunkStartIndex, chunkBodySize );
}

private int availableBytesInCurrentChunk()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@

import java.util.List;

import org.neo4j.driver.internal.async.BoltProtocolV1Util;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.messaging.MessageFormat;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;

import static io.netty.buffer.ByteBufUtil.hexDump;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.messageBoundary;

public class OutboundMessageHandler extends MessageToMessageEncoder<Message>
{
Expand Down Expand Up @@ -95,8 +95,8 @@ protected void encode( ChannelHandlerContext ctx, Message msg, List<Object> out
log.trace( "C: %s", hexDump( messageBuf ) );
}

BoltProtocolV1Util.writeMessageBoundary( messageBuf );
out.add( messageBuf );
out.add( messageBoundary() );
}

public OutboundMessageHandler withoutByteArraySupport()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.Iterables;
import org.neo4j.driver.internal.util.MetadataUtil;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Statement;
Expand All @@ -45,19 +46,23 @@

public abstract class PullAllResponseHandler implements ResponseHandler
{
private static final Queue<Record> UNINITIALIZED_RECORDS = Iterables.emptyQueue();

static final int RECORD_BUFFER_LOW_WATERMARK = Integer.getInteger( "recordBufferLowWatermark", 300 );
static final int RECORD_BUFFER_HIGH_WATERMARK = Integer.getInteger( "recordBufferHighWatermark", 1000 );

private final Statement statement;
private final RunResponseHandler runResponseHandler;
protected final Connection connection;

private final Queue<Record> records = new ArrayDeque<>();
// initialized lazily when first record arrives
private Queue<Record> records = UNINITIALIZED_RECORDS;

private boolean finished;
private Throwable failure;
private ResultSummary summary;

private boolean ignoreRecords;
private CompletableFuture<Record> recordFuture;
private CompletableFuture<Throwable> failureFuture;

Expand Down Expand Up @@ -112,9 +117,16 @@ public synchronized void onFailure( Throwable error )
@Override
public synchronized void onRecord( Value[] fields )
{
Record record = new InternalRecord( runResponseHandler.statementKeys(), fields );
enqueueRecord( record );
completeRecordFuture( record );
if ( ignoreRecords )
{
completeRecordFuture( null );
}
else
{
Record record = new InternalRecord( runResponseHandler.statementKeys(), fields );
enqueueRecord( record );
completeRecordFuture( record );
}
}

public synchronized CompletionStage<Record> peekAsync()
Expand All @@ -127,7 +139,7 @@ public synchronized CompletionStage<Record> peekAsync()
return failedFuture( extractFailure() );
}

if ( finished )
if ( ignoreRecords || finished )
{
return completedWithNull();
}
Expand Down Expand Up @@ -161,6 +173,13 @@ public synchronized CompletionStage<ResultSummary> summaryAsync()
} );
}

public synchronized CompletionStage<ResultSummary> consumeAsync()
{
ignoreRecords = true;
records.clear();
return summaryAsync();
}

public synchronized <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
{
return failureAsync().thenApply( error ->
Expand Down Expand Up @@ -199,6 +218,11 @@ else if ( finished )

private void enqueueRecord( Record record )
{
if ( records == UNINITIALIZED_RECORDS )
{
records = new ArrayDeque<>();
}

records.add( record );

boolean shouldBufferAllRecords = failureFuture != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -274,7 +273,7 @@ private void packValue( Value value ) throws IOException
packer.packStructHeader( 3, PATH );

// Unique nodes
Map<Node, Integer> nodeIdx = new LinkedHashMap<>();
Map<Node,Integer> nodeIdx = Iterables.newLinkedHashMapWithSize( path.length() + 1 );
for ( Node node : path.nodes() )
{
if ( !nodeIdx.containsKey( node ) )
Expand All @@ -289,7 +288,7 @@ private void packValue( Value value ) throws IOException
}

// Unique rels
Map<Relationship, Integer> relIdx = new LinkedHashMap<>();
Map<Relationship,Integer> relIdx = Iterables.newLinkedHashMapWithSize( path.length() );
for ( Relationship rel : path.relationships() )
{
if ( !relIdx.containsKey( rel ) )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -102,7 +101,7 @@ public static <T> Map<String, T> map( Map<String, Value> data, Function<Value, T
Map.Entry<String, Value> head = data.entrySet().iterator().next();
return singletonMap( head.getKey(), mapFunction.apply( head.getValue() ) );
} else {
Map<String, T> map = new LinkedHashMap<>( size );
Map<String,T> map = Iterables.newLinkedHashMapWithSize( size );
for ( Map.Entry<String, Value> entry : data.entrySet() )
{
map.put( entry.getKey(), mapFunction.apply( entry.getValue() ) );
Expand All @@ -124,7 +123,7 @@ public static <T> Map<String, T> map( Record record, Function<Value, T> mapFunct
return singletonMap( record.keys().get( 0 ), mapFunction.apply( record.get( 0 ) ) );

default:
Map<String, T> map = new LinkedHashMap<>( size );
Map<String,T> map = Iterables.newLinkedHashMapWithSize( size );
List<String> keys = record.keys();
for ( int i = 0; i < size; i++ )
{
Expand Down
50 changes: 50 additions & 0 deletions driver/src/main/java/org/neo4j/driver/internal/util/Iterables.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@
*/
package org.neo4j.driver.internal.util;

import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;

import org.neo4j.driver.v1.util.Function;

public class Iterables
{
@SuppressWarnings( "rawtypes" )
private static final Queue EMPTY_QUEUE = new EmptyQueue();
private static final float DEFAULT_HASH_MAP_LOAD_FACTOR = 0.75F;

public static int count( Iterable<?> it )
Expand Down Expand Up @@ -110,11 +116,22 @@ public void remove()
};
}

@SuppressWarnings( "unchecked" )
public static <T> Queue<T> emptyQueue()
{
return (Queue<T>) EMPTY_QUEUE;
}

public static <K, V> HashMap<K,V> newHashMapWithSize( int expectedSize )
{
return new HashMap<>( hashMapCapacity( expectedSize ) );
}

public static <K, V> LinkedHashMap<K,V> newLinkedHashMapWithSize( int expectedSize )
{
return new LinkedHashMap<>( hashMapCapacity( expectedSize ) );
}

private static int hashMapCapacity( int expectedSize )
{
if ( expectedSize < 3 )
Expand All @@ -127,4 +144,37 @@ private static int hashMapCapacity( int expectedSize )
}
return (int) ((float) expectedSize / DEFAULT_HASH_MAP_LOAD_FACTOR + 1.0F);
}

private static class EmptyQueue<T> extends AbstractQueue<T>
{
@Override
public Iterator<T> iterator()
{
return Collections.emptyIterator();
}

@Override
public int size()
{
return 0;
}

@Override
public boolean offer( T t )
{
throw new UnsupportedOperationException();
}

@Override
public T poll()
{
return null;
}

@Override
public T peek()
{
return null;
}
}
}
Loading