Skip to content

Commit 6a68686

Browse files
committed
Close Executor Service
1 parent 7670846 commit 6a68686

File tree

4 files changed

+40
-3
lines changed

4 files changed

+40
-3
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@
110110
import org.apache.hadoop.fs.azurebfs.services.AuthType;
111111
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
112112
import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
113+
import org.apache.hadoop.fs.azurebfs.services.ReadBufferManager;
114+
import org.apache.hadoop.fs.azurebfs.services.ReadBufferManagerV1;
115+
import org.apache.hadoop.fs.azurebfs.services.ReadBufferManagerV2;
113116
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
114117
import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy;
115118
import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
@@ -324,6 +327,10 @@ public void close() throws IOException {
324327
HadoopExecutors.shutdown(boundedThreadPool, LOG,
325328
30, TimeUnit.SECONDS);
326329
boundedThreadPool = null;
330+
ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
331+
ReadBufferManagerV1 bufferManagerV1 = ReadBufferManagerV1.getBufferManager();
332+
bufferManagerV1.close();
333+
bufferManagerV2.close();
327334
} catch (InterruptedException e) {
328335
LOG.error("Interrupted freeing leases", e);
329336
Thread.currentThread().interrupt();

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ abstract void doneReading(ReadBuffer buffer,
119119
*/
120120
abstract void purgeBuffersForStream(AbfsInputStream stream);
121121

122+
public void close() {
123+
closeReadBufferManager();
124+
}
125+
126+
abstract void closeReadBufferManager();
127+
122128

123129
// Following Methods are for testing purposes only and should not be used in production code.
124130

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* The Read Buffer Manager for Rest AbfsClient.
3434
* V1 implementation of ReadBufferManager.
3535
*/
36-
final class ReadBufferManagerV1 extends ReadBufferManager {
36+
public final class ReadBufferManagerV1 extends ReadBufferManager {
3737

3838
private static final int NUM_BUFFERS = 16;
3939
private static final int NUM_THREADS = 8;
@@ -66,7 +66,7 @@ static void setReadBufferManagerConfigs(int readAheadBlockSize) {
6666
* Returns the singleton instance of ReadBufferManagerV1.
6767
* @return the singleton instance of ReadBufferManagerV1
6868
*/
69-
static ReadBufferManagerV1 getBufferManager() {
69+
public static ReadBufferManagerV1 getBufferManager() {
7070
if (bufferManager == null) {
7171
LOCK.lock();
7272
try {
@@ -542,6 +542,13 @@ private void purgeList(AbfsInputStream stream, LinkedList<ReadBuffer> list) {
542542
}
543543
}
544544

545+
@Override
546+
public void closeReadBufferManager() {
547+
// no-op, as this is a singleton and should not be closed
548+
// the buffers will be cleaned up when the JVM exits
549+
LOGGER.debug("ReadBufferManagerV1 close called, but no action taken as it is a singleton.");
550+
}
551+
545552
/**
546553
* {@inheritDoc}
547554
*/

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.lang.management.MemoryMXBean;
3030
import java.lang.management.MemoryUsage;
3131
import java.util.ArrayList;
32+
import java.util.Arrays;
3233
import java.util.Collection;
3334
import java.util.Iterator;
3435
import java.util.LinkedList;
@@ -47,14 +48,15 @@
4748

4849
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
4950
import org.apache.hadoop.classification.VisibleForTesting;
51+
import org.apache.hadoop.util.concurrent.HadoopExecutors;
5052

5153
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_HUNDRED;
5254
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
5355

5456
/**
5557
* The Improved Read Buffer Manager for Rest AbfsClient.
5658
*/
57-
final class ReadBufferManagerV2 extends ReadBufferManager {
59+
public final class ReadBufferManagerV2 extends ReadBufferManager {
5860
// Internal constants
5961
private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManagerV2.class);
6062
private static final ReentrantLock LOCK = new ReentrantLock();
@@ -350,6 +352,21 @@ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
350352
purgeList(stream, getCompletedReadList());
351353
}
352354

355+
@Override
356+
public void closeReadBufferManager() {
357+
printTraceLog("Closing ReadBufferManagerV2");
358+
HadoopExecutors.shutdown(workerPool, LOGGER,
359+
30, TimeUnit.SECONDS);
360+
workerPool = null;
361+
if (bufferPool != null) {
362+
// help GC
363+
Arrays.fill(bufferPool, null);
364+
bufferPool = null;
365+
}
366+
setBufferManager(null); // reset the singleton instance
367+
printTraceLog("ReadBufferManagerV2 closed");
368+
}
369+
353370
private boolean isAlreadyQueued(final String eTag, final long requestedOffset) {
354371
// returns true if any part of the buffer is already queued
355372
return (isInList(getReadAheadQueue(), eTag, requestedOffset)

0 commit comments

Comments
 (0)