Skip to content

Commit 48a19a6

Browse files
aarondavAndrew Or
authored andcommitted
[SPARK-4236] Cleanup removed applications' files in shuffle service
This relies on a hook from whoever is hosting the shuffle service to invoke removeApplication() when the application is completed. Once invoked, we will clean up all the executors' shuffle directories we know about. Author: Aaron Davidson <[email protected]> Closes #3126 from aarondav/cleanup and squashes the following commits: 33a64a9 [Aaron Davidson] Missing brace e6e428f [Aaron Davidson] Address comments 16a0d27 [Aaron Davidson] Cleanup e4df3e7 [Aaron Davidson] [SPARK-4236] Cleanup removed applications' files in shuffle service
1 parent f165b2b commit 48a19a6

File tree

8 files changed

+319
-22
lines changed

8 files changed

+319
-22
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,7 @@ private[spark] object Utils extends Logging {
755755
/**
756756
* Delete a file or directory and its contents recursively.
757757
* Don't follow directories if they are symlinks.
758+
* Throws an exception if deletion is unsuccessful.
758759
*/
759760
def deleteRecursively(file: File) {
760761
if (file != null) {

core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
6363
rdd.count()
6464
rdd.count()
6565

66-
// Invalidate the registered executors, disallowing access to their shuffle blocks.
67-
rpcHandler.clearRegisteredExecutors()
66+
// Invalidate the registered executors, disallowing access to their shuffle blocks (without
67+
// deleting the actual shuffle files, so we could access them without the shuffle service).
68+
rpcHandler.applicationRemoved(sc.conf.getAppId, false /* cleanupLocalDirs */)
6869

6970
// Now Spark will receive FetchFailed, and not retry the stage due to "spark.test.noStageRetry"
7071
// being set.

network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,22 @@
2222
import java.io.ByteArrayInputStream;
2323
import java.io.ByteArrayOutputStream;
2424
import java.io.Closeable;
25+
import java.io.File;
2526
import java.io.IOException;
2627
import java.io.ObjectInputStream;
2728
import java.io.ObjectOutputStream;
2829

30+
import com.google.common.base.Preconditions;
2931
import com.google.common.io.Closeables;
3032
import com.google.common.base.Charsets;
3133
import io.netty.buffer.Unpooled;
3234
import org.slf4j.Logger;
3335
import org.slf4j.LoggerFactory;
3436

37+
/**
38+
* General utilities available in the network package. Many of these are sourced from Spark's
39+
* own Utils, just accessible within this package.
40+
*/
3541
public class JavaUtils {
3642
private static final Logger logger = LoggerFactory.getLogger(JavaUtils.class);
3743

@@ -93,4 +99,57 @@ public static ByteBuffer stringToBytes(String s) {
9399
public static String bytesToString(ByteBuffer b) {
94100
return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8);
95101
}
102+
103+
/*
104+
* Delete a file or directory and its contents recursively.
105+
* Don't follow directories if they are symlinks.
106+
* Throws an exception if deletion is unsuccessful.
107+
*/
108+
public static void deleteRecursively(File file) throws IOException {
109+
if (file == null) { return; }
110+
111+
if (file.isDirectory() && !isSymlink(file)) {
112+
IOException savedIOException = null;
113+
for (File child : listFilesSafely(file)) {
114+
try {
115+
deleteRecursively(child);
116+
} catch (IOException e) {
117+
// In case of multiple exceptions, only last one will be thrown
118+
savedIOException = e;
119+
}
120+
}
121+
if (savedIOException != null) {
122+
throw savedIOException;
123+
}
124+
}
125+
126+
boolean deleted = file.delete();
127+
// Delete can also fail if the file simply did not exist.
128+
if (!deleted && file.exists()) {
129+
throw new IOException("Failed to delete: " + file.getAbsolutePath());
130+
}
131+
}
132+
133+
private static File[] listFilesSafely(File file) throws IOException {
134+
if (file.exists()) {
135+
File[] files = file.listFiles();
136+
if (files == null) {
137+
throw new IOException("Failed to list files for dir: " + file);
138+
}
139+
return files;
140+
} else {
141+
return new File[0];
142+
}
143+
}
144+
145+
private static boolean isSymlink(File file) throws IOException {
146+
Preconditions.checkNotNull(file);
147+
File fileInCanonicalDir = null;
148+
if (file.getParent() == null) {
149+
fileInCanonicalDir = file;
150+
} else {
151+
fileInCanonicalDir = new File(file.getParentFile().getCanonicalFile(), file.getName());
152+
}
153+
return !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile());
154+
}
96155
}

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,11 @@ public StreamManager getStreamManager() {
9494
return streamManager;
9595
}
9696

97-
/** For testing, clears all executors registered with "RegisterExecutor". */
98-
@VisibleForTesting
99-
public void clearRegisteredExecutors() {
100-
blockManager.clearRegisteredExecutors();
97+
/**
98+
* Removes an application (once it has been terminated), and optionally will clean up any
99+
* local directories associated with the executors of that application in a separate thread.
100+
*/
101+
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
102+
blockManager.applicationRemoved(appId, cleanupLocalDirs);
101103
}
102104
}

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java

Lines changed: 105 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,15 @@
2121
import java.io.File;
2222
import java.io.FileInputStream;
2323
import java.io.IOException;
24-
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.Iterator;
25+
import java.util.Map;
26+
import java.util.concurrent.ConcurrentMap;
27+
import java.util.concurrent.Executor;
28+
import java.util.concurrent.Executors;
2529

2630
import com.google.common.annotations.VisibleForTesting;
31+
import com.google.common.base.Objects;
32+
import com.google.common.collect.Maps;
2733
import org.slf4j.Logger;
2834
import org.slf4j.LoggerFactory;
2935

@@ -43,21 +49,30 @@
4349
public class ExternalShuffleBlockManager {
4450
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class);
4551

46-
// Map from "appId-execId" to the executor's configuration.
47-
private final ConcurrentHashMap<String, ExecutorShuffleInfo> executors =
48-
new ConcurrentHashMap<String, ExecutorShuffleInfo>();
52+
// Map containing all registered executors' metadata.
53+
private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
4954

50-
// Returns an id suitable for a single executor within a single application.
51-
private String getAppExecId(String appId, String execId) {
52-
return appId + "-" + execId;
55+
// Single-threaded Java executor used to perform expensive recursive directory deletion.
56+
private final Executor directoryCleaner;
57+
58+
public ExternalShuffleBlockManager() {
59+
// TODO: Give this thread a name.
60+
this(Executors.newSingleThreadExecutor());
61+
}
62+
63+
// Allows tests to have more control over when directories are cleaned up.
64+
@VisibleForTesting
65+
ExternalShuffleBlockManager(Executor directoryCleaner) {
66+
this.executors = Maps.newConcurrentMap();
67+
this.directoryCleaner = directoryCleaner;
5368
}
5469

5570
/** Registers a new Executor with all the configuration we need to find its shuffle files. */
5671
public void registerExecutor(
5772
String appId,
5873
String execId,
5974
ExecutorShuffleInfo executorInfo) {
60-
String fullId = getAppExecId(appId, execId);
75+
AppExecId fullId = new AppExecId(appId, execId);
6176
logger.info("Registered executor {} with {}", fullId, executorInfo);
6277
executors.put(fullId, executorInfo);
6378
}
@@ -78,7 +93,7 @@ public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
7893
int mapId = Integer.parseInt(blockIdParts[2]);
7994
int reduceId = Integer.parseInt(blockIdParts[3]);
8095

81-
ExecutorShuffleInfo executor = executors.get(getAppExecId(appId, execId));
96+
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
8297
if (executor == null) {
8398
throw new RuntimeException(
8499
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
@@ -94,6 +109,56 @@ public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
94109
}
95110
}
96111

112+
/**
113+
* Removes our metadata of all executors registered for the given application, and optionally
114+
* also deletes the local directories associated with the executors of that application in a
115+
* separate thread.
116+
*
117+
* It is not valid to call registerExecutor() for an executor with this appId after invoking
118+
* this method.
119+
*/
120+
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
121+
logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
122+
Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator();
123+
while (it.hasNext()) {
124+
Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next();
125+
AppExecId fullId = entry.getKey();
126+
final ExecutorShuffleInfo executor = entry.getValue();
127+
128+
// Only touch executors associated with the appId that was removed.
129+
if (appId.equals(fullId.appId)) {
130+
it.remove();
131+
132+
if (cleanupLocalDirs) {
133+
logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
134+
135+
// Execute the actual deletion in a different thread, as it may take some time.
136+
directoryCleaner.execute(new Runnable() {
137+
@Override
138+
public void run() {
139+
deleteExecutorDirs(executor.localDirs);
140+
}
141+
});
142+
}
143+
}
144+
}
145+
}
146+
147+
/**
148+
* Synchronously deletes each directory one at a time.
149+
* Should be executed in its own thread, as this may take a long time.
150+
*/
151+
private void deleteExecutorDirs(String[] dirs) {
152+
for (String localDir : dirs) {
153+
try {
154+
JavaUtils.deleteRecursively(new File(localDir));
155+
logger.debug("Successfully cleaned up directory: " + localDir);
156+
} catch (Exception e) {
157+
logger.error("Failed to delete directory: " + localDir, e);
158+
}
159+
}
160+
}
161+
97162
/**
98163
* Hash-based shuffle data is simply stored as one file per block.
99164
* This logic is from FileShuffleBlockManager.
@@ -146,9 +211,36 @@ static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename)
146211
return new File(new File(localDir, String.format("%02x", subDirId)), filename);
147212
}
148213

149-
/** For testing, clears all registered executors. */
150-
@VisibleForTesting
151-
void clearRegisteredExecutors() {
152-
executors.clear();
214+
/** Simply encodes an executor's full ID, which is appId + execId. */
215+
private static class AppExecId {
216+
final String appId;
217+
final String execId;
218+
219+
private AppExecId(String appId, String execId) {
220+
this.appId = appId;
221+
this.execId = execId;
222+
}
223+
224+
@Override
225+
public boolean equals(Object o) {
226+
if (this == o) return true;
227+
if (o == null || getClass() != o.getClass()) return false;
228+
229+
AppExecId appExecId = (AppExecId) o;
230+
return Objects.equal(appId, appExecId.appId) && Objects.equal(execId, appExecId.execId);
231+
}
232+
233+
@Override
234+
public int hashCode() {
235+
return Objects.hashCode(appId, execId);
236+
}
237+
238+
@Override
239+
public String toString() {
240+
return Objects.toStringHelper(this)
241+
.add("appId", appId)
242+
.add("execId", execId)
243+
.toString();
244+
}
153245
}
154246
}

0 commit comments

Comments
 (0)