Skip to content

Commit 7aacb7b

Browse files
li-zhihuiAndrew Or
authored andcommitted
[SPARK-2713] Executors of same application in same host should only download files & jars once
If Spark lunched multiple executors in one host for one application, every executor would download it dependent files and jars (if not using local: url) independently. It maybe result in huge latency. In my case, it result in 20 seconds latency to download dependent jars(size about 17M) when I lunched 32 executors in every host(total 4 hosts). This patch will cache downloaded files and jars for executors to reduce network throughput and download latency. In my case, the latency was reduced from 20 seconds to less than 1 second. Author: Li Zhihui <[email protected]> Author: li-zhihui <[email protected]> Closes apache#1616 from li-zhihui/cachefiles and squashes the following commits: 36940df [Li Zhihui] Close cache for local mode 935fed6 [Li Zhihui] Clean code. f9330d4 [Li Zhihui] Clean code again 7050d46 [Li Zhihui] Clean code 074a422 [Li Zhihui] Fix: deal with spark.files.overwrite 03ed3a8 [li-zhihui] rename cache file name as XXXXXXXXX_cache 2766055 [li-zhihui] Use url.hashCode + timestamp as cachedFileName 76a7b66 [Li Zhihui] Clean code & use applcation work directory as cache directory 3510eb0 [Li Zhihui] Keep fetchFile private 2ffd742 [Li Zhihui] add comment for FileLock e0ebd48 [Li Zhihui] Try and finally lock.release 7fb7c0b [Li Zhihui] Release lock before copy files 6b997bf [Li Zhihui] Executors of same application in same host should only download files & jars once
1 parent 6a40a76 commit 7aacb7b

File tree

3 files changed

+82
-20
lines changed

3 files changed

+82
-20
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -837,11 +837,12 @@ class SparkContext(config: SparkConf) extends Logging {
837837
case "local" => "file:" + uri.getPath
838838
case _ => path
839839
}
840-
addedFiles(key) = System.currentTimeMillis
840+
val timestamp = System.currentTimeMillis
841+
addedFiles(key) = timestamp
841842

842843
// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
843844
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
844-
hadoopConfiguration)
845+
hadoopConfiguration, timestamp, useCache = false)
845846

846847
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
847848
postEnvironmentUpdate()

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -322,14 +322,16 @@ private[spark] class Executor(
322322
// Fetch missing dependencies
323323
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
324324
logInfo("Fetching " + name + " with timestamp " + timestamp)
325-
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
326-
hadoopConf)
325+
// Fetch file with useCache mode, close cache for local mode.
326+
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
327+
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
327328
currentFiles(name) = timestamp
328329
}
329330
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
330331
logInfo("Fetching " + name + " with timestamp " + timestamp)
331-
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
332-
hadoopConf)
332+
// Fetch file with useCache mode, close cache for local mode.
333+
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
334+
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
333335
currentJars(name) = timestamp
334336
// Add it to our class loader
335337
val localName = name.split("/").last

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -347,15 +347,84 @@ private[spark] object Utils extends Logging {
347347
}
348348

349349
/**
350-
* Download a file requested by the executor. Supports fetching the file in a variety of ways,
350+
* Download a file to target directory. Supports fetching the file in a variety of ways,
351+
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
352+
*
353+
* If `useCache` is true, first attempts to fetch the file to a local cache that's shared
354+
* across executors running the same application. `useCache` is used mainly for
355+
* the executors, and not in local mode.
356+
*
357+
* Throws SparkException if the target file already exists and has different contents than
358+
* the requested file.
359+
*/
360+
def fetchFile(
361+
url: String,
362+
targetDir: File,
363+
conf: SparkConf,
364+
securityMgr: SecurityManager,
365+
hadoopConf: Configuration,
366+
timestamp: Long,
367+
useCache: Boolean) {
368+
val fileName = url.split("/").last
369+
val targetFile = new File(targetDir, fileName)
370+
if (useCache) {
371+
val cachedFileName = s"${url.hashCode}${timestamp}_cache"
372+
val lockFileName = s"${url.hashCode}${timestamp}_lock"
373+
val localDir = new File(getLocalDir(conf))
374+
val lockFile = new File(localDir, lockFileName)
375+
val raf = new RandomAccessFile(lockFile, "rw")
376+
// Only one executor entry.
377+
// The FileLock is only used to control synchronization for executors download file,
378+
// it's always safe regardless of lock type (mandatory or advisory).
379+
val lock = raf.getChannel().lock()
380+
val cachedFile = new File(localDir, cachedFileName)
381+
try {
382+
if (!cachedFile.exists()) {
383+
doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf)
384+
}
385+
} finally {
386+
lock.release()
387+
}
388+
if (targetFile.exists && !Files.equal(cachedFile, targetFile)) {
389+
if (conf.getBoolean("spark.files.overwrite", false)) {
390+
targetFile.delete()
391+
logInfo((s"File $targetFile exists and does not match contents of $url, " +
392+
s"replacing it with $url"))
393+
} else {
394+
throw new SparkException(s"File $targetFile exists and does not match contents of $url")
395+
}
396+
}
397+
Files.copy(cachedFile, targetFile)
398+
} else {
399+
doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
400+
}
401+
402+
// Decompress the file if it's a .tar or .tar.gz
403+
if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
404+
logInfo("Untarring " + fileName)
405+
Utils.execute(Seq("tar", "-xzf", fileName), targetDir)
406+
} else if (fileName.endsWith(".tar")) {
407+
logInfo("Untarring " + fileName)
408+
Utils.execute(Seq("tar", "-xf", fileName), targetDir)
409+
}
410+
// Make the file executable - That's necessary for scripts
411+
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
412+
}
413+
414+
/**
415+
* Download a file to target directory. Supports fetching the file in a variety of ways,
351416
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
352417
*
353418
* Throws SparkException if the target file already exists and has different contents than
354419
* the requested file.
355420
*/
356-
def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager,
357-
hadoopConf: Configuration) {
358-
val filename = url.split("/").last
421+
private def doFetchFile(
422+
url: String,
423+
targetDir: File,
424+
filename: String,
425+
conf: SparkConf,
426+
securityMgr: SecurityManager,
427+
hadoopConf: Configuration) {
359428
val tempDir = getLocalDir(conf)
360429
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
361430
val targetFile = new File(targetDir, filename)
@@ -443,16 +512,6 @@ private[spark] object Utils extends Logging {
443512
}
444513
Files.move(tempFile, targetFile)
445514
}
446-
// Decompress the file if it's a .tar or .tar.gz
447-
if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
448-
logInfo("Untarring " + filename)
449-
Utils.execute(Seq("tar", "-xzf", filename), targetDir)
450-
} else if (filename.endsWith(".tar")) {
451-
logInfo("Untarring " + filename)
452-
Utils.execute(Seq("tar", "-xf", filename), targetDir)
453-
}
454-
// Make the file executable - That's necessary for scripts
455-
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
456515
}
457516

458517
/**

0 commit comments

Comments
 (0)