Skip to content

Commit fa0f6cc

Browse files
author
Zhen Li
committed
Changed session.run not discard previous run result, but buffer all into memory.
This enables nested session runs.
1 parent 91e0bb4 commit fa0f6cc

13 files changed

+150
-130
lines changed

driver/src/main/java/org/neo4j/driver/StatementResult.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,7 @@ public interface StatementResult extends Iterator<Record>
143143
*
144144
* If the records in the result is not fully consumed, then calling this method will exhausts the result.
145145
*
146-
* If you want to obtain the summary without discard the records, invoke
147-
* {@link StatementResult#list()} before calling this method to buffer all records into memory.
146+
* If you want to access unconsumed records after summary, you shall use {@link StatementResult#list()} to buffer all records into memory before summary.
148147
*
149148
* @return a summary for the whole query result.
150149
*/

driver/src/main/java/org/neo4j/driver/async/StatementResultCursor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import org.neo4j.driver.Record;
2929
import org.neo4j.driver.Records;
30+
import org.neo4j.driver.StatementResult;
3031
import org.neo4j.driver.exceptions.NoSuchRecordException;
3132
import org.neo4j.driver.summary.ResultSummary;
3233

@@ -74,8 +75,7 @@ public interface StatementResultCursor
7475
* <p>
7576
* If the records in the result is not fully consumed, then calling this method will exhausts the result.
7677
* <p>
77-
* If you want to obtain the summary without discarding the records, invoke {@link #listAsync()}
78-
* to buffer records into memory before calling this method.
78+
* If you want to access unconsumed records after summary, you shall use {@link StatementResult#list()} to buffer all records into memory before summary.
7979
*
8080
* @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be
8181
* completed exceptionally if query execution fails.

driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@
2222

2323
public interface FailableCursor
2424
{
25+
CompletionStage<Throwable> consumeAsync();
2526
CompletionStage<Throwable> failureAsync();
2627
}

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ public CompletionStage<Void> closeAsync()
194194
if ( cursor != null )
195195
{
196196
// there exists a cursor with potentially unconsumed error, try to extract and propagate it
197-
return cursor.failureAsync();
197+
return cursor.consumeAsync();
198198
}
199199
// no result cursor exists so no error exists
200200
return completedWithNull();

driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,6 @@ private static CompletionStage<Throwable> retrieveFailure( CompletionStage<? ext
7474
{
7575
return cursorStage
7676
.exceptionally( cursor -> null )
77-
.thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.failureAsync() );
77+
.thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.consumeAsync() );
7878
}
7979
}

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,18 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
111111
}
112112

113113
@Override
114-
public CompletionStage<Throwable> failureAsync()
114+
public CompletionStage<Throwable> consumeAsync()
115115
{
116116
return pullAllHandler.summaryAsync().handle( ( summary, error ) -> error );
117117
}
118118

119+
@Override
120+
public CompletionStage<Throwable> failureAsync()
121+
{
122+
return pullAllHandler.failureAsync();
123+
}
124+
125+
119126
private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Void> resultFuture )
120127
{
121128
CompletionStage<Record> recordFuture = nextAsync();

driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursorImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,19 @@ public void cancel()
9191
}
9292

9393
@Override
94-
public CompletionStage<Throwable> failureAsync()
94+
public CompletionStage<Throwable> consumeAsync()
9595
{
9696
// calling this method will enforce discarding record stream and finish running cypher query
9797
return summaryAsync().thenApply( summary -> (Throwable) null ).exceptionally( error -> error );
9898
}
9999

100+
@Override
101+
public CompletionStage<Throwable> failureAsync()
102+
{
103+
// It is safe to discard records as either the streaming has not started at all, or the streaming is fully finished.
104+
return consumeAsync();
105+
}
106+
100107
@Override
101108
public CompletionStage<ResultSummary> summaryAsync()
102109
{

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,7 @@ public interface PullAllResponseHandler extends ResponseHandler
3636

3737
<T> CompletionStage<List<T>> listAsync( Function<Record, T> mapFunction );
3838

39+
CompletionStage<Throwable> failureAsync();
40+
3941
void prePopulateRecords();
4042
}

driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,12 @@ public synchronized <T> CompletionStage<List<T>> listAsync( Function<Record,T> m
176176
return pullAllAsync().thenApply( summary -> recordsAsList( mapFunction ) );
177177
}
178178

179+
@Override
180+
public synchronized CompletionStage<Throwable> failureAsync()
181+
{
182+
return pullAllAsync().handle( ( ignore, error ) -> error );
183+
}
184+
179185
@Override
180186
public void prePopulateRecords()
181187
{

driver/src/test/java/org/neo4j/driver/internal/async/AsyncStatementResultCursorImplTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ void shouldReturnFailureWhenExists()
291291
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
292292

293293
ServiceUnavailableException error = new ServiceUnavailableException( "Hi" );
294-
when( pullAllHandler.summaryAsync() ).thenReturn( failedFuture( error ) );
294+
when( pullAllHandler.failureAsync() ).thenReturn( failedFuture( error ) );
295295

296296
AsyncStatementResultCursorImpl cursor = newCursor( pullAllHandler );
297297

@@ -302,7 +302,7 @@ void shouldReturnFailureWhenExists()
302302
void shouldReturnNullFailureWhenDoesNotExist()
303303
{
304304
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
305-
when( pullAllHandler.summaryAsync() ).thenReturn( completedWithNull() );
305+
when( pullAllHandler.failureAsync() ).thenReturn( completedWithNull() );
306306

307307
AsyncStatementResultCursorImpl cursor = newCursor( pullAllHandler );
308308

0 commit comments

Comments
 (0)