Skip to content

Commit 0028241

Browse files
committed
[SPARK-4236] Cleanup removed applications' files in shuffle service
This relies on a hook from whoever is hosting the shuffle service to invoke removeApplication() when the application is completed. Once invoked, we will clean up all the executors' shuffle directories we know about.
1 parent 5f13759 commit 0028241

File tree

8 files changed

+317
-22
lines changed

8 files changed

+317
-22
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,7 @@ private[spark] object Utils extends Logging {
754754
/**
755755
* Delete a file or directory and its contents recursively.
756756
* Don't follow directories if they are symlinks.
757+
* Throws an exception if deletion is unsuccessful.
757758
*/
758759
def deleteRecursively(file: File) {
759760
if (file != null) {

core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
6363
rdd.count()
6464
rdd.count()
6565

66-
// Invalidate the registered executors, disallowing access to their shuffle blocks.
67-
rpcHandler.clearRegisteredExecutors()
66+
// Invalidate the registered executors, disallowing access to their shuffle blocks (without
67+
// deleting the actual shuffle files, so we could access them without the shuffle service).
68+
rpcHandler.removeApplication(sc.conf.getAppId, false /* cleanupLocalDirs */)
6869

6970
// Now Spark will receive FetchFailed, and not retry the stage due to "spark.test.noStageRetry"
7071
// being set.

network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,20 @@
2020
import java.io.ByteArrayInputStream;
2121
import java.io.ByteArrayOutputStream;
2222
import java.io.Closeable;
23+
import java.io.File;
2324
import java.io.IOException;
2425
import java.io.ObjectInputStream;
2526
import java.io.ObjectOutputStream;
2627

28+
import com.google.common.base.Preconditions;
2729
import com.google.common.io.Closeables;
2830
import org.slf4j.Logger;
2931
import org.slf4j.LoggerFactory;
3032

33+
/**
34+
* General utilities available in the network package. Many of these are sourced from Spark's
35+
* own Utils, just accessible within this package.
36+
*/
3137
public class JavaUtils {
3238
private static final Logger logger = LoggerFactory.getLogger(JavaUtils.class);
3339

@@ -73,4 +79,58 @@ public static int nonNegativeHash(Object obj) {
7379
int hash = obj.hashCode();
7480
return hash != Integer.MIN_VALUE ? Math.abs(hash) : 0;
7581
}
82+
83+
/**
84+
* Delete a file or directory and its contents recursively.
85+
* Don't follow directories if they are symlinks.
86+
* Throws an exception if deletion is unsuccessful.
87+
*/
88+
public static void deleteRecursively(File file) throws IOException {
89+
if (file == null) { return; }
90+
91+
if (file.isDirectory() && !isSymlink(file)) {
92+
IOException savedIOException = null;
93+
for (File child : listFilesSafely(file)) {
94+
try {
95+
deleteRecursively(child);
96+
} catch (IOException e) {
97+
// In case of multiple exceptions, only last one will be thrown
98+
savedIOException = e;
99+
}
100+
}
101+
if (savedIOException != null) {
102+
throw savedIOException;
103+
}
104+
}
105+
106+
if (!file.delete()) {
107+
// Delete can also fail if the file simply did not exist
108+
if (file.exists()) {
109+
throw new IOException("Failed to delete: " + file.getAbsolutePath());
110+
}
111+
}
112+
}
113+
114+
private static File[] listFilesSafely(File file) throws IOException {
115+
if (file.exists()) {
116+
File[] files = file.listFiles();
117+
if (files == null) {
118+
throw new IOException("Failed to list files for dir: " + file);
119+
}
120+
return files;
121+
} else {
122+
return new File[0];
123+
}
124+
}
125+
126+
private static boolean isSymlink(File file) throws IOException {
127+
Preconditions.checkNotNull(file);
128+
File fileInCanonicalDir = null;
129+
if (file.getParent() == null) {
130+
fileInCanonicalDir = file;
131+
} else {
132+
fileInCanonicalDir = new File(file.getParentFile().getCanonicalFile(), file.getName());
133+
}
134+
return !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile());
135+
}
76136
}

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,11 @@ public StreamManager getStreamManager() {
9494
return streamManager;
9595
}
9696

97-
/** For testing, clears all executors registered with "RegisterExecutor". */
98-
@VisibleForTesting
99-
public void clearRegisteredExecutors() {
100-
blockManager.clearRegisteredExecutors();
97+
/**
98+
* Removes an application (once it has been terminated), and optionally will clean up any
99+
* local directories associated with the executors of that application in a separate thread.
100+
*/
101+
public void removeApplication(String appId, boolean cleanupLocalDirs) {
102+
blockManager.removeApplication(appId, cleanupLocalDirs);
101103
}
102104
}

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java

Lines changed: 106 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,16 @@
2121
import java.io.File;
2222
import java.io.FileInputStream;
2323
import java.io.IOException;
24-
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.Iterator;
25+
import java.util.Map;
26+
import java.util.concurrent.ConcurrentMap;
27+
import java.util.concurrent.Executor;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
2530

2631
import com.google.common.annotations.VisibleForTesting;
32+
import com.google.common.base.Objects;
33+
import com.google.common.collect.Maps;
2734
import org.slf4j.Logger;
2835
import org.slf4j.LoggerFactory;
2936

@@ -43,21 +50,30 @@
4350
public class ExternalShuffleBlockManager {
4451
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class);
4552

46-
// Map from "appId-execId" to the executor's configuration.
47-
private final ConcurrentHashMap<String, ExecutorShuffleInfo> executors =
48-
new ConcurrentHashMap<String, ExecutorShuffleInfo>();
53+
// Map containing all registered executors' metadata.
54+
private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
4955

50-
// Returns an id suitable for a single executor within a single application.
51-
private String getAppExecId(String appId, String execId) {
52-
return appId + "-" + execId;
56+
// Single-threaded executor used to perform expensive recursive directory deletion.
57+
private final Executor directoryCleanupExecutor;
58+
59+
public ExternalShuffleBlockManager() {
60+
// TODO: Give this thread a name.
61+
this(Executors.newSingleThreadExecutor());
62+
}
63+
64+
// Allows tests to have more control over when directories are cleaned up.
65+
@VisibleForTesting
66+
ExternalShuffleBlockManager(Executor directoryCleanupExecutor) {
67+
this.executors = Maps.newConcurrentMap();
68+
this.directoryCleanupExecutor = directoryCleanupExecutor;
5369
}
5470

5571
/** Registers a new Executor with all the configuration we need to find its shuffle files. */
5672
public void registerExecutor(
5773
String appId,
5874
String execId,
5975
ExecutorShuffleInfo executorInfo) {
60-
String fullId = getAppExecId(appId, execId);
76+
AppExecId fullId = new AppExecId(appId, execId);
6177
logger.info("Registered executor {} with {}", fullId, executorInfo);
6278
executors.put(fullId, executorInfo);
6379
}
@@ -78,7 +94,7 @@ public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
7894
int mapId = Integer.parseInt(blockIdParts[2]);
7995
int reduceId = Integer.parseInt(blockIdParts[3]);
8096

81-
ExecutorShuffleInfo executor = executors.get(getAppExecId(appId, execId));
97+
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
8298
if (executor == null) {
8399
throw new RuntimeException(
84100
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
@@ -94,6 +110,56 @@ public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
94110
}
95111
}
96112

113+
/**
114+
* Removes our metadata of all executors registered for the given application, and optionally
115+
* also deletes the local directories associated with the executors of that application in a
116+
* separate thread.
117+
*
118+
* It is not valid to call registerExecutor() for an executor with this appId after invoking
119+
* this method.
120+
*/
121+
public void removeApplication(String appId, boolean cleanupLocalDirs) {
122+
logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
123+
Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator();
124+
while (it.hasNext()) {
125+
Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next();
126+
AppExecId fullId = entry.getKey();
127+
final ExecutorShuffleInfo executor = entry.getValue();
128+
129+
// Only touch executors associated with the appId that was removed.
130+
if (appId.equals(fullId.appId)) {
131+
it.remove();
132+
133+
if (cleanupLocalDirs) {
134+
logger.debug("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
135+
136+
// Execute the actual deletion in a different thread, as it may take some time.
137+
directoryCleanupExecutor.execute(new Runnable() {
138+
@Override
139+
public void run() {
140+
deleteExecutorDirs(executor.localDirs);
141+
}
142+
});
143+
}
144+
}
145+
}
146+
}
147+
148+
/**
149+
* Synchronously deletes each directory one at a time.
150+
* Should be executed in its own thread, as this may take a long time.
151+
*/
152+
private void deleteExecutorDirs(String[] dirs) {
153+
for (String localDir : dirs) {
154+
try {
155+
JavaUtils.deleteRecursively(new File(localDir));
156+
logger.info("Successfully cleaned up directory: " + localDir);
157+
} catch (Exception e) {
158+
logger.error("Failed to delete directory: " + localDir, e);
159+
}
160+
}
161+
}
162+
97163
/**
98164
* Hash-based shuffle data is simply stored as one file per block.
99165
* This logic is from FileShuffleBlockManager.
@@ -146,9 +212,36 @@ static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename)
146212
return new File(new File(localDir, String.format("%02x", subDirId)), filename);
147213
}
148214

149-
/** For testing, clears all registered executors. */
150-
@VisibleForTesting
151-
void clearRegisteredExecutors() {
152-
executors.clear();
215+
/** Simply encodes an executor's full ID, which is appId + execId. */
216+
private static class AppExecId {
217+
final String appId;
218+
final String execId;
219+
220+
private AppExecId(String appId, String execId) {
221+
this.appId = appId;
222+
this.execId = execId;
223+
}
224+
225+
@Override
226+
public boolean equals(Object o) {
227+
if (this == o) return true;
228+
if (o == null || getClass() != o.getClass()) return false;
229+
230+
AppExecId appExecId = (AppExecId) o;
231+
return Objects.equal(appId, appExecId.appId) && Objects.equal(execId, appExecId.execId);
232+
}
233+
234+
@Override
235+
public int hashCode() {
236+
return Objects.hashCode(appId, execId);
237+
}
238+
239+
@Override
240+
public String toString() {
241+
return Objects.toStringHelper(this)
242+
.add("appId", appId)
243+
.add("execId", execId)
244+
.toString();
245+
}
153246
}
154247
}

0 commit comments

Comments
 (0)