Skip to content

Commit c3b4ff2

Browse files
fix: statement cancel not working with mysql driver
1 parent aba7e6b commit c3b4ff2

File tree

5 files changed

+97
-36
lines changed

5 files changed

+97
-36
lines changed

wrapper/src/main/java/software/amazon/jdbc/ConnectionPluginManager.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
import java.util.HashMap;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.Objects;
2728
import java.util.Properties;
2829
import java.util.Set;
29-
import java.util.concurrent.locks.ReentrantLock;
3030
import java.util.logging.Logger;
3131
import org.checkerframework.checker.nullness.qual.NonNull;
3232
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -45,6 +45,7 @@
4545
import software.amazon.jdbc.plugin.staledns.AuroraStaleDnsPlugin;
4646
import software.amazon.jdbc.plugin.strategy.fastestresponse.FastestResponseStrategyPlugin;
4747
import software.amazon.jdbc.profile.ConfigurationProfile;
48+
import software.amazon.jdbc.util.AsynchronousMethodsHelper;
4849
import software.amazon.jdbc.util.Messages;
4950
import software.amazon.jdbc.util.SqlMethodAnalyzer;
5051
import software.amazon.jdbc.util.WrapperUtils;
@@ -91,8 +92,6 @@ public class ConnectionPluginManager implements CanReleaseResources, Wrapper {
9192
private static final String NOTIFY_CONNECTION_CHANGED_METHOD = "notifyConnectionChanged";
9293
private static final String NOTIFY_NODE_LIST_CHANGED_METHOD = "notifyNodeListChanged";
9394
private static final SqlMethodAnalyzer sqlMethodAnalyzer = new SqlMethodAnalyzer();
94-
private final ReentrantLock lock = new ReentrantLock();
95-
9695
protected Properties props = new Properties();
9796
protected List<ConnectionPlugin> plugins;
9897
protected final @NonNull ConnectionProvider defaultConnProvider;
@@ -148,14 +147,6 @@ public ConnectionPluginManager(
148147
this.telemetryFactory = telemetryFactory;
149148
}
150149

151-
public void lock() {
152-
lock.lock();
153-
}
154-
155-
public void unlock() {
156-
lock.unlock();
157-
}
158-
159150
/**
160151
* Initialize a chain of {@link ConnectionPlugin} using their corresponding {@link
161152
* ConnectionPluginFactory}. If {@code PropertyDefinition.PLUGINS} is provided by the user,
@@ -305,13 +296,15 @@ public <T, E extends Exception> T execute(
305296
final Object[] jdbcMethodArgs)
306297
throws E {
307298

308-
final Connection conn = WrapperUtils.getConnectionFromSqlObject(methodInvokeOn);
309-
if (conn != null && conn != this.pluginService.getCurrentConnection()
310-
&& !sqlMethodAnalyzer.isMethodClosingSqlObject(methodName)) {
311-
final SQLException e =
312-
new SQLException(Messages.get("ConnectionPluginManager.methodInvokedAgainstOldConnection",
313-
new Object[] {methodInvokeOn}));
314-
throw WrapperUtils.wrapExceptionIfNeeded(exceptionClass, e);
299+
// The target driver may block on Statement.getConnection().
300+
if (!AsynchronousMethodsHelper.ASYNCHRONOUS_METHODS.contains(methodName)) {
301+
final Connection conn = WrapperUtils.getConnectionFromSqlObject(methodInvokeOn);
302+
if (!Objects.equals(conn, this.pluginService.getCurrentConnection())
303+
&& !sqlMethodAnalyzer.isMethodClosingSqlObject(methodName)) {
304+
throw WrapperUtils.wrapExceptionIfNeeded(
305+
exceptionClass,
306+
new SQLException(Messages.get("ConnectionPluginManager.invokedAgainstOldConnection", new Object[] {methodInvokeOn})));
307+
}
315308
}
316309

317310
return executeWithSubscribedPlugins(
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package software.amazon.jdbc.util;
18+
19+
import java.util.Collections;
20+
import java.util.List;
21+
22+
public class AsynchronousMethodsHelper {
23+
public static final List<String> ASYNCHRONOUS_METHODS = Collections.singletonList(
24+
"Statement.cancel"
25+
);
26+
}

wrapper/src/main/java/software/amazon/jdbc/util/WrapperUtils.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.util.Set;
5151
import java.util.concurrent.ConcurrentHashMap;
5252
import java.util.concurrent.ConcurrentMap;
53+
import java.util.concurrent.locks.ReentrantLock;
5354
import org.checkerframework.checker.nullness.qual.Nullable;
5455
import software.amazon.jdbc.ConnectionPluginManager;
5556
import software.amazon.jdbc.JdbcCallable;
@@ -84,6 +85,8 @@ public class WrapperUtils {
8485
private static final ConcurrentMap<Class<?>, Boolean> isJdbcInterfaceCache =
8586
new ConcurrentHashMap<>();
8687

88+
private static final ReentrantLock lock = new ReentrantLock();
89+
8790
private static final Map<Class<?>, Class<?>> availableWrappers =
8891
new HashMap<Class<?>, Class<?>>() {
8992
{
@@ -182,7 +185,9 @@ public static <T> T executeWithPlugins(
182185
final JdbcCallable<T, RuntimeException> jdbcMethodFunc,
183186
final Object... jdbcMethodArgs) {
184187

185-
pluginManager.lock();
188+
if (!AsynchronousMethodsHelper.ASYNCHRONOUS_METHODS.contains(methodName)) {
189+
lock.lock();
190+
}
186191
TelemetryFactory telemetryFactory = pluginManager.getTelemetryFactory();
187192
TelemetryContext context = null;
188193

@@ -208,7 +213,9 @@ public static <T> T executeWithPlugins(
208213
throw new RuntimeException(e);
209214
}
210215
} finally {
211-
pluginManager.unlock();
216+
if (lock.isHeldByCurrentThread()) {
217+
lock.unlock();
218+
}
212219
if (context != null) {
213220
context.closeContext();
214221
}
@@ -225,7 +232,9 @@ public static <T, E extends Exception> T executeWithPlugins(
225232
final Object... jdbcMethodArgs)
226233
throws E {
227234

228-
pluginManager.lock();
235+
if (!AsynchronousMethodsHelper.ASYNCHRONOUS_METHODS.contains(methodName)) {
236+
lock.lock();
237+
}
229238
TelemetryFactory telemetryFactory = pluginManager.getTelemetryFactory();
230239
TelemetryContext context = null;
231240

@@ -251,7 +260,9 @@ public static <T, E extends Exception> T executeWithPlugins(
251260
}
252261

253262
} finally {
254-
pluginManager.unlock();
263+
if (lock.isHeldByCurrentThread()) {
264+
lock.unlock();
265+
}
255266
if (context != null) {
256267
context.closeContext();
257268
}

wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ ConnectionStringHostListProvider.errorIdentifyConnection=An error occurred while
9090
ConnectionPluginManager.releaseResources=Releasing resources.
9191
ConnectionPluginManager.unknownPluginCode=Unknown plugin code: ''{0}''.
9292
ConnectionPluginManager.unableToLoadPlugin=Unable to load connection plugin factory: ''{0}''.
93-
ConnectionPluginManager.methodInvokedAgainstOldConnection=The internal connection has changed since ''{0}'' was created. This is likely due to failover or read-write splitting functionality. To ensure you are using the updated connection, please re-create Statement and ResultSet objects after failover and/or calling setReadOnly.
93+
ConnectionPluginManager.invokedAgainstOldConnection=The internal connection has changed since ''{0}'' was created. This is likely due to failover or read-write splitting functionality. To ensure you are using the updated connection, please re-create Statement and ResultSet objects after failover and/or calling setReadOnly.
9494

9595
# Connection Provider
9696
ConnectionProvider.noConnection=The target driver did not return a connection.

wrapper/src/test/java/software/amazon/jdbc/util/WrapperUtilsTest.java

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.junit.jupiter.api.Assertions.fail;
2121
import static org.mockito.ArgumentMatchers.any;
2222
import static org.mockito.ArgumentMatchers.anyString;
23+
import static org.mockito.ArgumentMatchers.argThat;
2324
import static org.mockito.ArgumentMatchers.eq;
2425
import static org.mockito.Mockito.doAnswer;
2526
import static org.mockito.Mockito.mock;
@@ -54,19 +55,9 @@ public class WrapperUtilsTest {
5455
@BeforeEach
5556
@SuppressWarnings("unchecked")
5657
void init() {
57-
final ReentrantLock pluginManagerLock = new ReentrantLock();
5858
final ReentrantLock testLock = new ReentrantLock();
5959
closeable = MockitoAnnotations.openMocks(this);
6060

61-
doAnswer(invocation -> {
62-
pluginManagerLock.lock();
63-
return null;
64-
}).when(pluginManager).lock();
65-
doAnswer(invocation -> {
66-
pluginManagerLock.unlock();
67-
return null;
68-
}).when(pluginManager).unlock();
69-
7061
doAnswer(invocation -> {
7162
boolean lockIsFree = testLock.tryLock();
7263
if (!lockIsFree) {
@@ -79,7 +70,7 @@ void init() {
7970
any(Class.class),
8071
any(Class.class),
8172
any(Object.class),
82-
any(String.class),
73+
argThat(methodName -> !AsynchronousMethodsHelper.ASYNCHRONOUS_METHODS.contains(methodName)),
8374
any(JdbcCallable.class),
8475
any(Object[].class));
8576

@@ -93,23 +84,39 @@ void cleanUp() throws Exception {
9384
closeable.close();
9485
}
9586

87+
Integer callCancelExecuteWithPlugins() {
88+
return callExecuteWithPlugins("Statement.cancel");
89+
}
90+
9691
Integer callExecuteWithPlugins() {
92+
return callExecuteWithPlugins("methodName");
93+
}
94+
95+
Integer callExecuteWithPlugins(String methodName) {
9796
return WrapperUtils.executeWithPlugins(
9897
Integer.class,
9998
pluginManager,
10099
object,
101-
"methodName",
100+
methodName,
102101
() -> 1);
103102
}
104103

104+
Integer callCancelExecuteWithPluginsWithException() {
105+
return callExecuteWithPluginsWithException("Statement.cancel");
106+
}
107+
105108
Integer callExecuteWithPluginsWithException() {
109+
return callExecuteWithPluginsWithException("methodName");
110+
}
111+
112+
Integer callExecuteWithPluginsWithException(String methodName) {
106113
try {
107114
return WrapperUtils.executeWithPlugins(
108115
Integer.class,
109116
SQLException.class,
110117
pluginManager,
111118
object,
112-
"methodName",
119+
methodName,
113120
() -> 1);
114121
} catch (SQLException e) {
115122
fail();
@@ -144,6 +151,30 @@ void testExecutesWithPluginsWithExceptionIsSequential() {
144151
}
145152
}
146153

154+
@Test
155+
void testCancelStatementIsNotBlockedExecute() {
156+
List<CompletableFuture<Integer>> futures = new ArrayList<>();
157+
158+
futures.add(CompletableFuture.supplyAsync(this::callExecuteWithPlugins));
159+
futures.add(CompletableFuture.supplyAsync(this::callCancelExecuteWithPlugins));
160+
161+
for (CompletableFuture<Integer> future : futures) {
162+
future.join();
163+
}
164+
}
165+
166+
@Test
167+
void testCancelStatementIsNotBlockedExecuteWithException() {
168+
List<CompletableFuture<Integer>> futures = new ArrayList<>();
169+
170+
futures.add(CompletableFuture.supplyAsync(this::callExecuteWithPluginsWithException));
171+
futures.add(CompletableFuture.supplyAsync(this::callCancelExecuteWithPluginsWithException));
172+
173+
for (CompletableFuture<Integer> future : futures) {
174+
future.join();
175+
}
176+
}
177+
147178
@Test
148179
void getConnectionFromSqlObjectChecksStatementNotClosed() throws Exception {
149180
final Statement mockClosedStatement = mock(Statement.class);

0 commit comments

Comments
 (0)