Skip to content

Commit d637500

Browse files
author
Zhen Li
committed
Fixed a bug found with ClusterStressIT regarding how we send run and pull messages.
When creating the async result, we write a RUN message, followed by a PULL message. The RUN and PULL messages shall be flushed together. If RUN and PULL are flushed separately, the following scenario may happen: C: RUN "RETURN Wrong" {} {mode="r"} S: FAILURE Neo.ClientError.Statement.SyntaxError "Variable `Wrong` not defined (line 1, column 8 (offset: 7)) C: RESET C: PULL {n=1000} S: SUCCESS {} S: FAILURE Neo.ClientError.Request.Invalid "Message 'PULL Map{n -> Long(1000)}' cannot be handled by a session in the READY state."
1 parent 36eb748 commit d637500

16 files changed

+67
-79
lines changed

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorOnlyFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public AsyncStatementResultCursorOnlyFactory( Connection connection, Message run
6060
public CompletionStage<AsyncStatementResultCursor> asyncResult()
6161
{
6262
// only write and flush messages when async result is wanted.
63-
connection.writeAndFlush( runMessage, runHandler );
63+
connection.write( runMessage, runHandler ); // queues the run message, will be flushed with pull message together
6464
pullAllHandler.prePopulateRecords();
6565

6666
if ( waitForRunResponse )

driver/src/main/java/org/neo4j/driver/internal/cursor/StatementResultCursorFactoryImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public StatementResultCursorFactoryImpl( Connection connection, Message runMessa
6363
public CompletionStage<AsyncStatementResultCursor> asyncResult()
6464
{
6565
// only write and flush messages when async result is wanted.
66-
connection.writeAndFlush( runMessage, runHandler );
66+
connection.write( runMessage, runHandler );
6767
pullAllHandler.prePopulateRecords();
6868

6969
if ( waitForRunResponse )

driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java

Lines changed: 1 addition & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,11 @@
4848
public class AutoPullResponseHandler extends BasicPullResponseHandler implements PullAllResponseHandler
4949
{
5050
private static final Queue<Record> UNINITIALIZED_RECORDS = Iterables.emptyQueue();
51-
52-
private static final int RECORD_BUFFER_LOW_WATERMARK = Integer.getInteger( "recordBufferLowWatermark", 300 );
53-
private static final int RECORD_BUFFER_HIGH_WATERMARK = Integer.getInteger( "recordBufferHighWatermark", 1000 );
5451
private final long fetchSize;
5552

5653
// initialized lazily when first record arrives
5754
private Queue<Record> records = UNINITIALIZED_RECORDS;
5855

59-
// private boolean autoReadManagementEnabled = true;
6056
private ResultSummary summary;
6157
private Throwable failure;
6258

@@ -113,18 +109,6 @@ private void handleFailure( Throwable error )
113109
failure = error;
114110
}
115111
}
116-
//
117-
// @Override
118-
// public boolean canManageAutoRead()
119-
// {
120-
// return true;
121-
// }
122-
123-
// @Override
124-
// public synchronized void disableAutoReadManagement()
125-
// {
126-
// autoReadManagementEnabled = false;
127-
// }
128112

129113
public synchronized CompletionStage<Record> peekAsync()
130114
{
@@ -198,7 +182,6 @@ private synchronized CompletionStage<ResultSummary> pullAllAsync()
198182
}
199183
else
200184
{
201-
// enableAutoRead();
202185
request( UNLIMITED_FETCH_SIZE );
203186
if ( summaryFuture == null )
204187
{
@@ -217,32 +200,11 @@ private void enqueueRecord( Record record )
217200
}
218201

219202
records.add( record );
220-
221-
// boolean shouldBufferAllRecords = failureFuture != null;
222-
// // when failure is requested we have to buffer all remaining records and then return the error
223-
// // do not disable auto-read in this case, otherwise records will not be consumed and trailing
224-
// // SUCCESS or FAILURE message will not arrive as well, so callers will get stuck waiting for the error
225-
// if ( !shouldBufferAllRecords && records.size() > RECORD_BUFFER_HIGH_WATERMARK )
226-
// {
227-
// // more than high watermark records are already queued, tell connection to stop auto-reading from network
228-
// // this is needed to deal with slow consumers, we do not want to buffer all records in memory if they are
229-
// // fetched from network faster than consumed
230-
// disableAutoRead();
231-
// }
232203
}
233204

234205
private Record dequeueRecord()
235206
{
236-
Record record = records.poll();
237-
238-
// if ( records.size() < RECORD_BUFFER_LOW_WATERMARK )
239-
// {
240-
// // less than low watermark records are now available in the buffer, tell connection to pre-fetch more
241-
// // and populate queue with new records from network
242-
// enableAutoRead();
243-
// }
244-
245-
return record;
207+
return records.poll();
246208
}
247209

248210
private <T> List<T> recordsAsList( Function<Record,T> mapFunction )
@@ -332,20 +294,4 @@ else if ( value == null )
332294
return completedFuture( value );
333295
}
334296
}
335-
336-
// private void enableAutoRead()
337-
// {
338-
// if ( autoReadManagementEnabled )
339-
// {
340-
// connection.enableAutoRead();
341-
// }
342-
// }
343-
//
344-
// private void disableAutoRead()
345-
// {
346-
// if ( autoReadManagementEnabled )
347-
// {
348-
// connection.disableAutoRead();
349-
// }
350-
// }
351297
}

driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,26 @@ void shouldChangeFetchSize() throws Exception
303303
}
304304
}
305305

306+
@Test
307+
void shouldAllowPullAll() throws Exception
308+
{
309+
StubServer server = StubServer.start( "streaming_records_v4_all.script", 9001 );
310+
try
311+
{
312+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", insecureBuilder().withFetchSize( -1 ).build() ) )
313+
{
314+
Session session = driver.session();
315+
StatementResult result = session.run( "MATCH (n) RETURN n.name" );
316+
List<String> list = result.list( record -> record.get( "n.name" ).asString() );
317+
assertEquals( list, asList( "Bob", "Alice", "Tina" ) );
318+
}
319+
}
320+
finally
321+
{
322+
assertEquals( 0, server.exitStatus() );
323+
}
324+
}
325+
306326
@Test
307327
void shouldThrowCommitErrorWhenTransactionCommit() throws Exception
308328
{

driver/src/test/java/org/neo4j/driver/internal/async/ExplicitTransactionTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@
5252
import static org.neo4j.driver.util.TestUtil.await;
5353
import static org.neo4j.driver.util.TestUtil.connectionMock;
5454
import static org.neo4j.driver.util.TestUtil.runMessageWithStatementMatcher;
55-
import static org.neo4j.driver.util.TestUtil.setupSuccessfulRun;
55+
import static org.neo4j.driver.util.TestUtil.setupSuccessfulRunRx;
5656
import static org.neo4j.driver.util.TestUtil.setupSuccessfulRunAndPull;
57-
import static org.neo4j.driver.util.TestUtil.verifyRun;
57+
import static org.neo4j.driver.util.TestUtil.verifyRunRx;
5858
import static org.neo4j.driver.util.TestUtil.verifyRunAndPull;
5959

6060
class ExplicitTransactionTest
@@ -81,13 +81,13 @@ void shouldFlushOnRunRx()
8181
// Given
8282
Connection connection = connectionMock( BoltProtocolV4.INSTANCE );
8383
ExplicitTransaction tx = beginTx( connection );
84-
setupSuccessfulRun( connection );
84+
setupSuccessfulRunRx( connection );
8585

8686
// When
8787
await( tx.runRx( new Statement( "RETURN 1" ) ) );
8888

8989
// Then
90-
verifyRun( connection, "RETURN 1" );
90+
verifyRunRx( connection, "RETURN 1" );
9191
}
9292

9393
@Test

driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,11 @@
6565
import static org.neo4j.driver.util.TestUtil.connectionMock;
6666
import static org.neo4j.driver.util.TestUtil.newSession;
6767
import static org.neo4j.driver.util.TestUtil.setupFailingBegin;
68-
import static org.neo4j.driver.util.TestUtil.setupSuccessfulRun;
68+
import static org.neo4j.driver.util.TestUtil.setupSuccessfulRunRx;
6969
import static org.neo4j.driver.util.TestUtil.setupSuccessfulRunAndPull;
7070
import static org.neo4j.driver.util.TestUtil.verifyBeginTx;
7171
import static org.neo4j.driver.util.TestUtil.verifyRollbackTx;
72-
import static org.neo4j.driver.util.TestUtil.verifyRun;
72+
import static org.neo4j.driver.util.TestUtil.verifyRunRx;
7373
import static org.neo4j.driver.util.TestUtil.verifyRunAndPull;
7474

7575
class NetworkSessionTest
@@ -101,10 +101,10 @@ void shouldFlushOnRunAsync( boolean waitForResponse )
101101
@Test
102102
void shouldFlushOnRunRx()
103103
{
104-
setupSuccessfulRun( connection );
104+
setupSuccessfulRunRx( connection );
105105
await( session.runRx( new Statement( "RETURN 1" ), TransactionConfig.empty() ) );
106106

107-
verifyRun( connection, "RETURN 1" );
107+
verifyRunRx( connection, "RETURN 1" );
108108
}
109109

110110
@Test
@@ -195,7 +195,7 @@ void releasesOpenConnectionUsedForRunWhenSessionIsClosed()
195195
close( session );
196196

197197
InOrder inOrder = inOrder( connection );
198-
inOrder.verify( connection ).writeAndFlush( any( RunWithMetadataMessage.class ), any() );
198+
inOrder.verify( connection ).write( any( RunWithMetadataMessage.class ), any() );
199199
inOrder.verify( connection ).writeAndFlush( any( PullMessage.class ), any() );
200200
inOrder.verify( connection, atLeastOnce() ).release();
201201
}

driver/src/test/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorOnlyFactoryTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import static org.mockito.ArgumentMatchers.any;
4343
import static org.mockito.Mockito.mock;
4444
import static org.mockito.Mockito.verify;
45-
import static org.mockito.Mockito.verifyNoMoreInteractions;
4645
import static org.mockito.Mockito.when;
4746
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4847
import static org.neo4j.driver.internal.util.Futures.failedFuture;
@@ -174,7 +173,7 @@ private AsyncStatementResultCursorOnlyFactory newResultCursorFactory( Completabl
174173

175174
private void verifyRunCompleted( Connection connection, CompletionStage<AsyncStatementResultCursor> cursorFuture )
176175
{
177-
verify( connection ).writeAndFlush( any( Message.class ), any( RunResponseHandler.class ) );
176+
verify( connection ).write( any( Message.class ), any( RunResponseHandler.class ) );
178177
assertThat( getNow( cursorFuture ), instanceOf( AsyncStatementResultCursorImpl.class ) );
179178
}
180179
}

driver/src/test/java/org/neo4j/driver/internal/cursor/StatementResultCursorFactoryImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ private StatementResultCursorFactoryImpl newResultCursorFactory( CompletableFutu
221221

222222
private void verifyRunCompleted( Connection connection, CompletionStage<AsyncStatementResultCursor> cursorFuture )
223223
{
224-
verify( connection ).writeAndFlush( any( Message.class ), any( RunResponseHandler.class ) );
224+
verify( connection ).write( any( Message.class ), any( RunResponseHandler.class ) );
225225
assertThat( getNow( cursorFuture ), instanceOf( AsyncStatementResultCursorImpl.class ) );
226226
}
227227

driver/src/test/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1Test.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ private static ResponseHandler verifyRunInvoked( Connection connection )
369369
ArgumentCaptor<ResponseHandler> runHandlerCaptor = ArgumentCaptor.forClass( ResponseHandler.class );
370370
ArgumentCaptor<ResponseHandler> pullAllHandlerCaptor = ArgumentCaptor.forClass( ResponseHandler.class );
371371

372-
verify( connection ).writeAndFlush( eq( new RunMessage( QUERY, PARAMS ) ), runHandlerCaptor.capture() );
372+
verify( connection ).write( eq( new RunMessage( QUERY, PARAMS ) ), runHandlerCaptor.capture() );
373373
verify( connection ).writeAndFlush( eq( PullAllMessage.PULL_ALL ), pullAllHandlerCaptor.capture() );
374374

375375
assertThat( runHandlerCaptor.getValue(), instanceOf( RunResponseHandler.class ) );

driver/src/test/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3Test.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ private static ResponseHandlers verifyRunInvoked( Connection connection, boolean
473473
expectedMessage = RunWithMetadataMessage.explicitTxRunMessage( STATEMENT );
474474
}
475475

476-
verify( connection ).writeAndFlush( eq( expectedMessage ), runHandlerCaptor.capture() );
476+
verify( connection ).write( eq( expectedMessage ), runHandlerCaptor.capture() );
477477
verify( connection ).writeAndFlush( eq( PullAllMessage.PULL_ALL ), pullAllHandlerCaptor.capture() );
478478

479479
assertThat( runHandlerCaptor.getValue(), instanceOf( RunResponseHandler.class ) );

0 commit comments

Comments
 (0)