Skip to content

Commit 83c8231

Browse files
Pipe: Fix connection leak caused by clients not closed after task dropped (2 situations) (#15910) (#15929)
(cherry picked from commit bf8329b)
1 parent ea8ee3a commit 83c8231

File tree

9 files changed

+76
-23
lines changed

9 files changed

+76
-23
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ private void doTransferWrapper(
138138

139139
private void doTransfer(final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
140140
throws PipeException {
141-
final Pair<IoTDBSyncClient, Boolean> clientAndStatus = clientManager.getClient();
141+
final Pair<IoTDBSyncClient, Boolean> clientAndStatus = getClientManager().getClient();
142142

143143
final TPipeTransferResp resp;
144144
try {
@@ -164,7 +164,7 @@ private void doTransfer(final PipeConfigRegionWritePlanEvent pipeConfigRegionWri
164164
final TSStatus status = resp.getStatus();
165165
// Send handshake req and then re-transfer the event
166166
if (status.getCode() == TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
167-
clientManager.sendHandshakeReq(clientAndStatus);
167+
getClientManager().sendHandshakeReq(clientAndStatus);
168168
}
169169
// Only handle the failed statuses to avoid string format performance overhead
170170
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
@@ -203,7 +203,7 @@ private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent)
203203
final long creationTime = snapshotEvent.getCreationTime();
204204
final File snapshotFile = snapshotEvent.getSnapshotFile();
205205
final File templateFile = snapshotEvent.getTemplateFile();
206-
final Pair<IoTDBSyncClient, Boolean> clientAndStatus = clientManager.getClient();
206+
final Pair<IoTDBSyncClient, Boolean> clientAndStatus = getClientManager().getClient();
207207

208208
// 1. Transfer snapshotFile, and template File if exists
209209
transferFilePieces(
@@ -250,7 +250,7 @@ private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent)
250250
final TSStatus status = resp.getStatus();
251251
// Send handshake req and then re-transfer the event
252252
if (status.getCode() == TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
253-
clientManager.sendHandshakeReq(clientAndStatus);
253+
getClientManager().sendHandshakeReq(clientAndStatus);
254254
}
255255
// Only handle the failed statuses to avoid string format performance overhead
256256
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,11 @@ protected boolean executeOnce() {
164164
}
165165

166166
private void transferHeartbeatEvent(final PipeHeartbeatEvent event) {
167+
// DO NOT call heartbeat or transfer after closed, or will cause connection leak
168+
if (isClosed.get()) {
169+
return;
170+
}
171+
167172
try {
168173
outputPipeConnector.heartbeat();
169174
outputPipeConnector.transfer(event);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,18 @@ public void onError(final Exception e) {
319319
client.resetMethodStateIfStopped();
320320
throw e;
321321
} finally {
322+
if (isClosed) {
323+
try {
324+
client.close();
325+
client.invalidateAll();
326+
} catch (final Exception e) {
327+
LOGGER.warn(
328+
"Failed to close client {}:{} after handshake failure when the manager is closed.",
329+
targetNodeUrl.getIp(),
330+
targetNodeUrl.getPort(),
331+
e);
332+
}
333+
}
322334
client.setShouldReturnSelf(true);
323335
client.returnSelf();
324336
}
@@ -372,8 +384,14 @@ public void close() {
372384
if (clientManager != null) {
373385
try {
374386
clientManager.close();
387+
LOGGER.info(
388+
"Closed AsyncPipeDataTransferServiceClientManager for receiver attributes: {}",
389+
receiverAttributes);
375390
} catch (final Exception e) {
376-
LOGGER.warn("Failed to close client manager.", e);
391+
LOGGER.warn(
392+
"Failed to close AsyncPipeDataTransferServiceClientManager for receiver attributes: {}",
393+
receiverAttributes,
394+
e);
377395
}
378396
}
379397

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,9 @@ public synchronized void handshake() throws Exception {
180180

181181
@Override
182182
public void heartbeat() throws Exception {
183-
syncConnector.heartbeat();
183+
if (!isClosed()) {
184+
syncConnector.heartbeat();
185+
}
184186
}
185187

186188
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ protected boolean tryTransfer(
8282
if (connector.isClosed()) {
8383
clearEventsReferenceCount();
8484
connector.eliminateHandler(this);
85+
client.setShouldReturnSelf(true);
86+
client.returnSelf();
8587
return false;
8688
}
8789
doTransfer(client, req);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -402,11 +402,25 @@ protected void onErrorInternal(final Exception exception) {
402402
}
403403

404404
private void returnClientIfNecessary() {
405-
if (client != null) {
406-
client.setShouldReturnSelf(true);
407-
client.returnSelf();
408-
client = null;
405+
if (client == null) {
406+
return;
407+
}
408+
409+
if (connector.isClosed()) {
410+
try {
411+
client.close();
412+
client.invalidateAll();
413+
} catch (final Exception e) {
414+
LOGGER.warn(
415+
"Failed to close or invalidate client when connector is closed. Client: {}, Exception: {}",
416+
client,
417+
e.getMessage(),
418+
e);
419+
}
409420
}
421+
client.setShouldReturnSelf(true);
422+
client.returnSelf();
423+
client = null;
410424
}
411425

412426
@Override

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,19 @@ public V borrowClient(K node) throws ClientManagerException {
6464
* return of a client is automatic whenever a particular client is used.
6565
*/
6666
public void returnClient(K node, V client) {
67-
Optional.ofNullable(node)
68-
.ifPresent(
69-
x -> {
70-
try {
71-
pool.returnObject(node, client);
72-
} catch (Exception e) {
73-
LOGGER.warn("Return client {} for node {} to pool failed.", client, node, e);
74-
}
75-
});
67+
if (node != null) {
68+
try {
69+
pool.returnObject(node, client);
70+
} catch (Exception e) {
71+
LOGGER.warn("Return client {} for node {} to pool failed.", client, node, e);
72+
}
73+
} else if (client instanceof ThriftClient) {
74+
((ThriftClient) client).invalidateAll();
75+
LOGGER.warn(
76+
"Return client {} to pool failed because the node is null. "
77+
+ "This may cause resource leak, please check your code.",
78+
client);
79+
}
7680
}
7781

7882
@Override

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void setTimeoutDynamically(final int timeout) {
127127
}
128128
}
129129

130-
private void close() {
130+
public void close() {
131131
___transport.close();
132132
___currentMethod = null;
133133
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,14 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector {
6262

6363
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSslSyncConnector.class);
6464

65-
protected IoTDBSyncClientManager clientManager;
65+
private volatile IoTDBSyncClientManager clientManager;
66+
67+
protected IoTDBSyncClientManager getClientManager() {
68+
if (clientManager == null) {
69+
throw new IllegalStateException("IoTDB sync client manager has been closed");
70+
}
71+
return clientManager;
72+
}
6673

6774
@Override
6875
public void validate(final PipeParameterValidator validator) throws Exception {
@@ -147,7 +154,7 @@ protected abstract IoTDBSyncClientManager constructClient(
147154

148155
@Override
149156
public void handshake() throws Exception {
150-
clientManager.checkClientStatusAndTryReconstructIfNecessary();
157+
getClientManager().checkClientStatusAndTryReconstructIfNecessary();
151158
}
152159

153160
@Override
@@ -222,7 +229,7 @@ protected void transferFilePieces(
222229
// Send handshake req and then re-transfer the event
223230
if (status.getCode()
224231
== TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
225-
clientManager.sendHandshakeReq(clientAndStatus);
232+
getClientManager().sendHandshakeReq(clientAndStatus);
226233
}
227234
// Only handle the failed statuses to avoid string format performance overhead
228235
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
@@ -246,6 +253,7 @@ protected abstract PipeTransferFilePieceReq getTransferMultiFilePieceReq(
246253
public void close() {
247254
if (clientManager != null) {
248255
clientManager.close();
256+
clientManager = null;
249257
}
250258

251259
super.close();

0 commit comments

Comments
 (0)