Skip to content

Commit cf1d32e

Browse files
mccheahaarondav
authored andcommitted
[SPARK-1860] More conservative app directory cleanup.
First contribution to the project, so apologize for any significant errors. This PR addresses [SPARK-1860]. The application directories are now cleaned up in a more conservative manner. Previously, app-* directories were cleaned up if the directory's timestamp was older than a given time. However, the timestamp on a directory does not reflect the modification times of the files in that directory. Therefore, app-* directories were wiped out even if the files inside them were created recently and possibly being used by Executor tasks. The solution is to change the cleanup logic to inspect all files within the app-* directory and only eliminate the app-* directory if all files in the directory are stale. Author: mcheah <[email protected]> Closes apache#2609 from mccheah/worker-better-app-dir-cleanup and squashes the following commits: 87b5d03 [mcheah] [SPARK-1860] Using more string interpolation. Better error logging. 802473e [mcheah] [SPARK-1860] Cleaning up the logs generated when cleaning directories. e0a1f2e [mcheah] [SPARK-1860] Fixing broken unit test. 77a9de0 [mcheah] [SPARK-1860] More conservative app directory cleanup.
1 parent 79e45c9 commit cf1d32e

File tree

4 files changed

+62
-27
lines changed

4 files changed

+62
-27
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ private[spark] class ExecutorRunner(
4242
val workerId: String,
4343
val host: String,
4444
val sparkHome: File,
45-
val workDir: File,
45+
val executorDir: File,
4646
val workerUrl: String,
4747
val conf: SparkConf,
4848
var state: ExecutorState.Value)
@@ -130,12 +130,6 @@ private[spark] class ExecutorRunner(
130130
*/
131131
def fetchAndRunExecutor() {
132132
try {
133-
// Create the executor's working directory
134-
val executorDir = new File(workDir, appId + "/" + execId)
135-
if (!executorDir.mkdirs()) {
136-
throw new IOException("Failed to create directory " + executorDir)
137-
}
138-
139133
// Launch the process
140134
val command = getCommandSeq
141135
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,18 @@
1818
package org.apache.spark.deploy.worker
1919

2020
import java.io.File
21+
import java.io.IOException
2122
import java.text.SimpleDateFormat
2223
import java.util.Date
2324

25+
import scala.collection.JavaConversions._
2426
import scala.collection.mutable.HashMap
2527
import scala.concurrent.duration._
2628
import scala.language.postfixOps
2729

2830
import akka.actor._
2931
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
32+
import org.apache.commons.io.FileUtils
3033

3134
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
3235
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
@@ -191,6 +194,7 @@ private[spark] class Worker(
191194
changeMaster(masterUrl, masterWebUiUrl)
192195
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
193196
if (CLEANUP_ENABLED) {
197+
logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
194198
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
195199
CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
196200
}
@@ -201,10 +205,23 @@ private[spark] class Worker(
201205
case WorkDirCleanup =>
202206
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
203207
val cleanupFuture = concurrent.future {
204-
logInfo("Cleaning up oldest application directories in " + workDir + " ...")
205-
Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
206-
.foreach(Utils.deleteRecursively)
208+
val appDirs = workDir.listFiles()
209+
if (appDirs == null) {
210+
throw new IOException("ERROR: Failed to list files in " + appDirs)
211+
}
212+
appDirs.filter { dir =>
213+
// the directory is used by an application - check that the application is not running
214+
// when cleaning up
215+
val appIdFromDir = dir.getName
216+
val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir)
217+
dir.isDirectory && !isAppStillRunning &&
218+
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS)
219+
}.foreach { dir =>
220+
logInfo(s"Removing directory: ${dir.getPath}")
221+
Utils.deleteRecursively(dir)
222+
}
207223
}
224+
208225
cleanupFuture onFailure {
209226
case e: Throwable =>
210227
logError("App dir cleanup failed: " + e.getMessage, e)
@@ -233,21 +250,29 @@ private[spark] class Worker(
233250
} else {
234251
try {
235252
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
253+
254+
// Create the executor's working directory
255+
val executorDir = new File(workDir, appId + "/" + execId)
256+
if (!executorDir.mkdirs()) {
257+
throw new IOException("Failed to create directory " + executorDir)
258+
}
259+
236260
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
237-
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
261+
self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING)
238262
executors(appId + "/" + execId) = manager
239263
manager.start()
240264
coresUsed += cores_
241265
memoryUsed += memory_
242266
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
243267
} catch {
244268
case e: Exception => {
245-
logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
269+
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
246270
if (executors.contains(appId + "/" + execId)) {
247271
executors(appId + "/" + execId).kill()
248272
executors -= appId + "/" + execId
249273
}
250-
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
274+
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
275+
Some(e.toString), None)
251276
}
252277
}
253278
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ import scala.util.control.{ControlThrowable, NonFatal}
3535

3636
import com.google.common.io.Files
3737
import com.google.common.util.concurrent.ThreadFactoryBuilder
38+
import org.apache.commons.io.FileUtils
39+
import org.apache.commons.io.filefilter.TrueFileFilter
3840
import org.apache.commons.lang3.SystemUtils
3941
import org.apache.hadoop.conf.Configuration
4042
import org.apache.log4j.PropertyConfigurator
@@ -705,17 +707,20 @@ private[spark] object Utils extends Logging {
705707
}
706708

707709
/**
708-
* Finds all the files in a directory whose last modified time is older than cutoff seconds.
709-
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
710-
* @param cutoff measured in seconds. Files older than this are returned.
710+
* Determines if a directory contains any files newer than cutoff seconds.
711+
*
712+
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
713+
* @param cutoff measured in seconds. Returns true if there are any files in dir newer than this.
711714
*/
712-
def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
715+
def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = {
713716
val currentTimeMillis = System.currentTimeMillis
714-
if (dir.isDirectory) {
715-
val files = listFilesSafely(dir)
716-
files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) }
717+
if (!dir.isDirectory) {
718+
throw new IllegalArgumentException (dir + " is not a directory!")
717719
} else {
718-
throw new IllegalArgumentException(dir + " is not a directory!")
720+
val files = FileUtils.listFilesAndDirs(dir, TrueFileFilter.TRUE, TrueFileFilter.TRUE)
721+
val cutoffTimeInMillis = (currentTimeMillis - (cutoff * 1000))
722+
val newFiles = files.filter { _.lastModified > cutoffTimeInMillis }
723+
newFiles.nonEmpty
719724
}
720725
}
721726

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,17 +189,28 @@ class UtilsSuite extends FunSuite {
189189
assert(Utils.getIteratorSize(iterator) === 5L)
190190
}
191191

192-
test("findOldFiles") {
192+
test("doesDirectoryContainFilesNewerThan") {
193193
// create some temporary directories and files
194194
val parent: File = Utils.createTempDir()
195195
val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
196196
val child2: File = Utils.createTempDir(parent.getCanonicalPath)
197-
// set the last modified time of child1 to 10 secs old
198-
child1.setLastModified(System.currentTimeMillis() - (1000 * 10))
197+
val child3: File = Utils.createTempDir(child1.getCanonicalPath)
198+
// set the last modified time of child1 to 30 secs old
199+
child1.setLastModified(System.currentTimeMillis() - (1000 * 30))
199200

200-
val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
201-
assert(result.size.equals(1))
202-
assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
201+
// although child1 is old, child2 is still new so return true
202+
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
203+
204+
child2.setLastModified(System.currentTimeMillis - (1000 * 30))
205+
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
206+
207+
parent.setLastModified(System.currentTimeMillis - (1000 * 30))
208+
// although parent and its immediate children are new, child3 is still old
209+
// we expect a full recursive search for new files.
210+
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
211+
212+
child3.setLastModified(System.currentTimeMillis - (1000 * 30))
213+
assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
203214
}
204215

205216
test("resolveURI") {

0 commit comments

Comments
 (0)