Skip to content

Commit dd15536

Browse files
Marcelo VanzinJoshRosen
authored andcommitted
[SPARK-4834] [standalone] Clean up application files after app finishes.
Commit 7aacb7b added support for sharing downloaded files among multiple executors of the same app. That works great in Yarn, since the app's directory is cleaned up after the app is done. But Spark standalone mode didn't do that, so the lock/cache files created by that change were left around and could eventually fill up the disk hosting /tmp. To solve that, create app-specific directories under the local dirs when launching executors. Multiple executors launched by the same Worker will use the same app directories, so they should be able to share the downloaded files. When the application finishes, a new message is sent to all workers telling them the application has finished; once that message has been received, and all executors registered for the application shut down, then those directories will be cleaned up by the Worker. Note: Unit testing this is hard (if even possible), since local-cluster mode doesn't seem to leave the Master/Worker daemons running long enough after `sc.stop()` is called for the clean up protocol to take effect. Author: Marcelo Vanzin <[email protected]> Closes #3705 from vanzin/SPARK-4834 and squashes the following commits: b430534 [Marcelo Vanzin] Remove seemingly unnecessary synchronization. 50eb4b9 [Marcelo Vanzin] Review feedback. c0e5ea5 [Marcelo Vanzin] [SPARK-4834] [standalone] Clean up application files after app finishes.
1 parent 2d215ae commit dd15536

File tree

7 files changed

+61
-8
lines changed

7 files changed

+61
-8
lines changed

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ private[deploy] object DeployMessages {
8888

8989
case class KillDriver(driverId: String) extends DeployMessage
9090

91+
case class ApplicationFinished(id: String)
92+
9193
// Worker internal
9294

9395
case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
@@ -175,4 +177,5 @@ private[deploy] object DeployMessages {
175177
// Liveness checks in various places
176178

177179
case object SendHeartbeat
180+
178181
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,11 @@ private[spark] class Master(
704704
}
705705
persistenceEngine.removeApplication(app)
706706
schedule()
707+
708+
// Tell all workers that the application has finished, so they can clean up any app state.
709+
workers.foreach { w =>
710+
w.actor ! ApplicationFinished(app.id)
711+
}
707712
}
708713
}
709714

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ private[spark] class ExecutorRunner(
4747
val executorDir: File,
4848
val workerUrl: String,
4949
val conf: SparkConf,
50+
val appLocalDirs: Seq[String],
5051
var state: ExecutorState.Value)
5152
extends Logging {
5253

@@ -77,7 +78,7 @@ private[spark] class ExecutorRunner(
7778
/**
7879
* Kill executor process, wait for exit and notify worker to update resource status.
7980
*
80-
* @param message the exception message which caused the executor's death
81+
* @param message the exception message which caused the executor's death
8182
*/
8283
private def killProcess(message: Option[String]) {
8384
var exitCode: Option[Int] = None
@@ -129,6 +130,7 @@ private[spark] class ExecutorRunner(
129130
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
130131

131132
builder.directory(executorDir)
133+
builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
132134
// In case we are running this from within the Spark Shell, avoid creating a "scala"
133135
// parent process for the executor command
134136
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat
2323
import java.util.{UUID, Date}
2424

2525
import scala.collection.JavaConversions._
26-
import scala.collection.mutable.HashMap
26+
import scala.collection.mutable.{HashMap, HashSet}
2727
import scala.concurrent.duration._
2828
import scala.language.postfixOps
2929
import scala.util.Random
@@ -109,6 +109,8 @@ private[spark] class Worker(
109109
val finishedExecutors = new HashMap[String, ExecutorRunner]
110110
val drivers = new HashMap[String, DriverRunner]
111111
val finishedDrivers = new HashMap[String, DriverRunner]
112+
val appDirectories = new HashMap[String, Seq[String]]
113+
val finishedApps = new HashSet[String]
112114

113115
// The shuffle service is not actually started unless configured.
114116
val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
@@ -294,7 +296,7 @@ private[spark] class Worker(
294296
val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir)
295297
dir.isDirectory && !isAppStillRunning &&
296298
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS)
297-
}.foreach { dir =>
299+
}.foreach { dir =>
298300
logInfo(s"Removing directory: ${dir.getPath}")
299301
Utils.deleteRecursively(dir)
300302
}
@@ -339,8 +341,19 @@ private[spark] class Worker(
339341
throw new IOException("Failed to create directory " + executorDir)
340342
}
341343

344+
// Create local dirs for the executor. These are passed to the executor via the
345+
// SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
346+
// application finishes.
347+
val appLocalDirs = appDirectories.get(appId).getOrElse {
348+
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
349+
Utils.createDirectory(dir).getAbsolutePath()
350+
}.toSeq
351+
}
352+
appDirectories(appId) = appLocalDirs
353+
342354
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
343-
self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING)
355+
self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs,
356+
ExecutorState.LOADING)
344357
executors(appId + "/" + execId) = manager
345358
manager.start()
346359
coresUsed += cores_
@@ -377,6 +390,7 @@ private[spark] class Worker(
377390
message.map(" message " + _).getOrElse("") +
378391
exitStatus.map(" exitStatus " + _).getOrElse(""))
379392
}
393+
maybeCleanupApplication(appId)
380394
}
381395

382396
case KillExecutor(masterUrl, appId, execId) =>
@@ -446,6 +460,9 @@ private[spark] class Worker(
446460
case ReregisterWithMaster =>
447461
reregisterWithMaster()
448462

463+
case ApplicationFinished(id) =>
464+
finishedApps += id
465+
maybeCleanupApplication(id)
449466
}
450467

451468
private def masterDisconnected() {
@@ -454,6 +471,19 @@ private[spark] class Worker(
454471
registerWithMaster()
455472
}
456473

474+
private def maybeCleanupApplication(id: String): Unit = {
475+
val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id)
476+
if (shouldCleanup) {
477+
finishedApps -= id
478+
appDirectories.remove(id).foreach { dirList =>
479+
logInfo(s"Cleaning up local directories for application $id")
480+
dirList.foreach { dir =>
481+
Utils.deleteRecursively(new File(dir))
482+
}
483+
}
484+
}
485+
}
486+
457487
def generateWorkerId(): String = {
458488
"worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
459489
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,11 @@ private[spark] object Utils extends Logging {
246246
retval
247247
}
248248

249-
/** Create a temporary directory inside the given parent directory */
250-
def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
249+
/**
250+
* Create a directory inside the given parent directory. The directory is guaranteed to be
251+
* newly created, and is not marked for automatic deletion.
252+
*/
253+
def createDirectory(root: String): File = {
251254
var attempts = 0
252255
val maxAttempts = 10
253256
var dir: File = null
@@ -265,6 +268,15 @@ private[spark] object Utils extends Logging {
265268
} catch { case e: SecurityException => dir = null; }
266269
}
267270

271+
dir
272+
}
273+
274+
/**
275+
* Create a temporary directory inside the given parent directory. The directory will be
276+
* automatically deleted when the VM shuts down.
277+
*/
278+
def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
279+
val dir = createDirectory(root)
268280
registerShutdownDeleteDir(dir)
269281
dir
270282
}

core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite {
119119
def createExecutorRunner(): ExecutorRunner = {
120120
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
121121
new File("sparkHome"), new File("workDir"), "akka://worker",
122-
new SparkConf, ExecutorState.RUNNING)
122+
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
123123
}
124124

125125
def createDriverRunner(): DriverRunner = {

core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ class ExecutorRunnerTest extends FunSuite {
3333
val appDesc = new ApplicationDescription("app name", Some(8), 500,
3434
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
3535
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
36-
new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
36+
new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
37+
ExecutorState.RUNNING)
3738
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
3839
assert(builder.command().last === appId)
3940
}

0 commit comments

Comments
 (0)