Skip to content

Commit 2aaa75b

Browse files
authored
Merge pull request #414 from lutovich/1.5-driver-close-async
Expose `Driver#closeAsync()`
2 parents 1fd39df + f3a8f92 commit 2aaa75b

File tree

13 files changed

+131
-40
lines changed

13 files changed

+131
-40
lines changed

driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.concurrent.CompletionStage;
2222

2323
import org.neo4j.driver.internal.async.AsyncConnection;
24-
import org.neo4j.driver.internal.async.Futures;
2524
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
2625
import org.neo4j.driver.internal.net.BoltServerAddress;
2726
import org.neo4j.driver.internal.spi.ConnectionPool;
@@ -61,10 +60,18 @@ public CompletionStage<AsyncConnection> acquireAsyncConnection( AccessMode mode
6160
}
6261

6362
@Override
64-
public void close() throws Exception
63+
public CompletionStage<Void> close()
6564
{
66-
pool.close();
67-
Futures.getBlocking( asyncPool.closeAsync() );
65+
// todo: remove this try-catch when blocking API works on top of async
66+
try
67+
{
68+
pool.close();
69+
}
70+
catch ( Exception e )
71+
{
72+
throw new RuntimeException( e );
73+
}
74+
return asyncPool.close();
6875
}
6976

7077
public BoltServerAddress getAddress()

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
9292
try
9393
{
9494
connectionPool.close();
95-
Futures.getBlocking( asyncConnectionPool.closeAsync() );
95+
Futures.getBlocking( asyncConnectionPool.close() );
9696
}
9797
catch ( Throwable closeError )
9898
{

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,18 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import java.util.concurrent.CompletionStage;
2122
import java.util.concurrent.atomic.AtomicBoolean;
2223

24+
import org.neo4j.driver.internal.async.Futures;
2325
import org.neo4j.driver.internal.security.SecurityPlan;
2426
import org.neo4j.driver.v1.AccessMode;
2527
import org.neo4j.driver.v1.Driver;
2628
import org.neo4j.driver.v1.Logger;
2729
import org.neo4j.driver.v1.Logging;
2830
import org.neo4j.driver.v1.Session;
2931

30-
import static java.lang.String.format;
32+
import static java.util.concurrent.CompletableFuture.completedFuture;
3133

3234
public class InternalDriver implements Driver
3335
{
@@ -95,23 +97,26 @@ private Session newSession( AccessMode mode, Bookmark bookmark )
9597
Session session = sessionFactory.newInstance( mode, bookmark );
9698
if ( closed.get() )
9799
{
98-
// the driver is already closed and we either 1. obtain this session from the old session pool
99-
// or 2. we obtain this session from a new session pool
100-
// For 1. this closeResources will take no effect as everything is already closed.
101-
// For 2. this closeResources will close the new connection pool just created to ensure no resource leak.
102-
closeResources();
100+
// session does not immediately acquire connection, it is fine to just throw
103101
throw driverCloseException();
104102
}
105103
return session;
106104
}
107105

108106
@Override
109107
public final void close()
108+
{
109+
Futures.getBlocking( closeAsync() );
110+
}
111+
112+
@Override
113+
public CompletionStage<Void> closeAsync()
110114
{
111115
if ( closed.compareAndSet( false, true ) )
112116
{
113-
closeResources();
117+
return sessionFactory.close();
114118
}
119+
return completedFuture( null );
115120
}
116121

117122
/**
@@ -126,18 +131,6 @@ public final SessionFactory getSessionFactory()
126131
return sessionFactory;
127132
}
128133

129-
private void closeResources()
130-
{
131-
try
132-
{
133-
sessionFactory.close();
134-
}
135-
catch ( Exception ex )
136-
{
137-
log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex );
138-
}
139-
}
140-
141134
private void assertOpen()
142135
{
143136
if ( closed.get() )

driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import java.util.concurrent.CompletionStage;
22+
2123
import org.neo4j.driver.v1.AccessMode;
2224
import org.neo4j.driver.v1.Session;
2325

24-
public interface SessionFactory extends AutoCloseable
26+
public interface SessionFactory
2527
{
2628
Session newInstance( AccessMode mode, Bookmark bookmark );
29+
30+
CompletionStage<Void> close();
2731
}

driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import java.util.concurrent.CompletionStage;
22+
2123
import org.neo4j.driver.internal.retry.RetryLogic;
2224
import org.neo4j.driver.internal.spi.ConnectionProvider;
2325
import org.neo4j.driver.v1.AccessMode;
@@ -57,9 +59,9 @@ protected NetworkSession createSession( ConnectionProvider connectionProvider, R
5759
}
5860

5961
@Override
60-
public final void close() throws Exception
62+
public final CompletionStage<Void> close()
6163
{
62-
connectionProvider.close();
64+
return connectionProvider.close();
6365
}
6466

6567
/**

driver/src/main/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,5 @@ public interface AsyncConnectionPool
3333

3434
int activeConnections( BoltServerAddress address );
3535

36-
CompletionStage<?> closeAsync();
36+
CompletionStage<Void> close();
3737
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPoolImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public int activeConnections( BoltServerAddress address )
109109
}
110110

111111
@Override
112-
public CompletionStage<?> closeAsync()
112+
public CompletionStage<Void> close()
113113
{
114114
if ( closed.compareAndSet( false, true ) )
115115
{
@@ -128,7 +128,8 @@ public CompletionStage<?> closeAsync()
128128
eventLoopGroup().shutdownGracefully();
129129
}
130130
}
131-
return Futures.asCompletionStage( eventLoopGroup().terminationFuture() );
131+
return Futures.asCompletionStage( eventLoopGroup().terminationFuture() )
132+
.thenApply( ignore -> null );
132133
}
133134

134135
private ChannelPool getOrCreatePool( BoltServerAddress address )

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
import org.neo4j.driver.internal.RoutingErrorHandler;
2828
import org.neo4j.driver.internal.async.AsyncConnection;
29-
import org.neo4j.driver.internal.async.Futures;
3029
import org.neo4j.driver.internal.async.RoutingAsyncConnection;
3130
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
3231
import org.neo4j.driver.internal.cluster.AddressSet;
@@ -52,7 +51,7 @@
5251

5352
import static java.util.concurrent.CompletableFuture.completedFuture;
5453

55-
public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, AutoCloseable
54+
public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler
5655
{
5756
private static final String LOAD_BALANCER_LOG_NAME = "LoadBalancer";
5857

@@ -131,10 +130,18 @@ public void onWriteFailure( BoltServerAddress address )
131130
}
132131

133132
@Override
134-
public void close() throws Exception
133+
public CompletionStage<Void> close()
135134
{
136-
connections.close();
137-
Futures.getBlocking( asyncConnectionPool.closeAsync() );
135+
try
136+
{
137+
connections.close();
138+
}
139+
catch ( Exception e )
140+
{
141+
throw new RuntimeException( e );
142+
}
143+
144+
return asyncConnectionPool.close();
138145
}
139146

140147
private PooledConnection acquireConnection( AccessMode mode, AddressSet servers )

driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionProvider.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
* Interface defines a layer used by the driver to obtain connections. It is meant to be the only component that
2828
* differs between "direct" and "routing" driver.
2929
*/
30-
public interface ConnectionProvider extends AutoCloseable
30+
public interface ConnectionProvider
3131
{
3232
/**
3333
* Acquire new {@link PooledConnection pooled connection} for the given {@link AccessMode mode}.
@@ -38,4 +38,6 @@ public interface ConnectionProvider extends AutoCloseable
3838
PooledConnection acquireConnection( AccessMode mode );
3939

4040
CompletionStage<AsyncConnection> acquireAsyncConnection( AccessMode mode );
41+
42+
CompletionStage<Void> close();
4143
}

driver/src/main/java/org/neo4j/driver/v1/Driver.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.v1;
2020

21+
import java.util.concurrent.CompletionStage;
22+
2123
/**
2224
* Accessor for a specific Neo4j graph database.
2325
* <p>
@@ -140,4 +142,6 @@ public interface Driver extends AutoCloseable
140142
* Close all the resources assigned to this driver, including any open connections.
141143
*/
142144
void close();
145+
146+
CompletionStage<Void> closeAsync();
143147
}

0 commit comments

Comments
 (0)