Skip to content

[SPARK-1198] Allow pipes tasks to run in different sub-directories #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 61 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.rdd

import java.io.File
import java.io.FilenameFilter
import java.io.IOException
import java.io.PrintWriter
import java.util.StringTokenizer

Expand All @@ -27,6 +30,7 @@ import scala.io.Source
import scala.reflect.ClassTag

import org.apache.spark.{Partition, SparkEnv, TaskContext}
import org.apache.spark.util.Utils


/**
Expand All @@ -38,7 +42,8 @@ class PipedRDD[T: ClassTag](
command: Seq[String],
envVars: Map[String, String],
printPipeContext: (String => Unit) => Unit,
printRDDElement: (T, String => Unit) => Unit)
printRDDElement: (T, String => Unit) => Unit,
separateWorkingDir: Boolean)
extends RDD[String](prev) {

// Similar to Runtime.exec(), if we are given a single string, split it into words
Expand All @@ -48,12 +53,24 @@ class PipedRDD[T: ClassTag](
command: String,
envVars: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null) =
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement)
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false) =
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
separateWorkingDir)


override def getPartitions: Array[Partition] = firstParent[T].partitions

/**
* A FilenameFilter that accepts anything that isn't equal to the name passed in.
* @param name of file or directory to leave out
*/
class NotEqualsFileNameFilter(filterName: String) extends FilenameFilter {
def accept(dir: File, name: String): Boolean = {
!name.equals(filterName)
}
}

override def compute(split: Partition, context: TaskContext): Iterator[String] = {
val pb = new ProcessBuilder(command)
// Add the environmental variables to the process.
Expand All @@ -67,6 +84,38 @@ class PipedRDD[T: ClassTag](
currentEnvVars.putAll(hadoopSplit.getPipeEnvVars())
}

// When spark.worker.separated.working.directory option is turned on, each
// task will be run in separate directory. This should be resolve file
// access conflict issue
val taskDirectory = "./tasks/" + java.util.UUID.randomUUID.toString
var workInTaskDirectory = false
logDebug("taskDirectory = " + taskDirectory)
if (separateWorkingDir == true) {
val currentDir = new File(".")
logDebug("currentDir = " + currentDir.getAbsolutePath())
val taskDirFile = new File(taskDirectory)
taskDirFile.mkdirs()

try {
val tasksDirFilter = new NotEqualsFileNameFilter("tasks")

// Need to add symlinks to jars, files, and directories. On Yarn we could have
// directories and other files not known to the SparkContext that were added via the
// Hadoop distributed cache. We also don't want to symlink to the /tasks directories we
// are creating here.
for (file <- currentDir.list(tasksDirFilter)) {
val fileWithDir = new File(currentDir, file)
Utils.symlink(new File(fileWithDir.getAbsolutePath()),
new File(taskDirectory + "/" + fileWithDir.getName()))
}
pb.directory(taskDirFile)
workInTaskDirectory = true
} catch {
case e: Exception => logError("Unable to setup task working directory: " + e.getMessage +
" (" + taskDirectory + ")")
}
}

val proc = pb.start()
val env = SparkEnv.get

Expand Down Expand Up @@ -112,6 +161,15 @@ class PipedRDD[T: ClassTag](
if (exitStatus != 0) {
throw new Exception("Subprocess exited with status " + exitStatus)
}

// cleanup task working directory if used
if (workInTaskDirectory == true) {
scala.util.control.Exception.ignoring(classOf[IOException]) {
Utils.deleteRecursively(new File(taskDirectory))
}
logDebug("Removed task working directory " + taskDirectory)
}

false
}
}
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -481,16 +481,19 @@ abstract class RDD[T: ClassTag](
* instead of constructing a huge String to concat all the elements:
* def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
* for (e <- record._2){f(e)}
* @param separateWorkingDir Use separate working directories for each task.
* @return the result RDD
*/
def pipe(
command: Seq[String],
env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null): RDD[String] = {
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false): RDD[String] = {
new PipedRDD(this, command, env,
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
if (printRDDElement ne null) sc.clean(printRDDElement) else null)
if (printRDDElement ne null) sc.clean(printRDDElement) else null,
separateWorkingDir)
}

/**
Expand Down
45 changes: 44 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.SortedSet
import scala.io.Source
import scala.reflect.ClassTag

Expand All @@ -43,6 +44,8 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
*/
private[spark] object Utils extends Logging {

val osName = System.getProperty("os.name")

/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
Expand Down Expand Up @@ -521,9 +524,10 @@ private[spark] object Utils extends Logging {

/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
*/
def deleteRecursively(file: File) {
if (file.isDirectory) {
if ((file.isDirectory) && !isSymlink(file)) {
for (child <- listFilesSafely(file)) {
deleteRecursively(child)
}
Expand All @@ -536,6 +540,25 @@ private[spark] object Utils extends Logging {
}
}

/**
* Check to see if file is a symbolic link.
*/
def isSymlink(file: File): Boolean = {
if (file == null) throw new NullPointerException("File must not be null")
if (osName.startsWith("Windows")) return false
val fileInCanonicalDir = if (file.getParent() == null) {
file
} else {
new File(file.getParentFile().getCanonicalFile(), file.getName())
}

if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) {
return false;
} else {
return true;
}
}

/**
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
*/
Expand Down Expand Up @@ -898,6 +921,26 @@ private[spark] object Utils extends Logging {
count
}

/**
* Creates a symlink. Note jdk1.7 has Files.createSymbolicLink but not used here
* for jdk1.6 support. Supports windows by doing copy, everything else uses "ln -sf".
* @param src absolute path to the source
* @param dst relative path for the destination
*/
def symlink(src: File, dst: File) {
if (!src.isAbsolute()) {
throw new IOException("Source must be absolute")
}
if (dst.isAbsolute()) {
throw new IOException("Destination must be relative")
}
val linkCmd = if (osName.startsWith("Windows")) "copy" else "ln -sf"
import scala.sys.process._
(linkCmd + " " + src.getAbsolutePath() + " " + dst.getPath()) lines_! ProcessLogger(line =>
(logInfo(line)))
}


/** Return the class name of the given object, removing all dollar signs */
def getFormattedClassName(obj: AnyRef) = {
obj.getClass.getSimpleName.replace("$", "")
Expand Down
28 changes: 27 additions & 1 deletion core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.spark

import org.scalatest.FunSuite
import java.io.File

import com.google.common.io.Files

import org.scalatest.FunSuite

import org.apache.spark.rdd.{HadoopRDD, PipedRDD, HadoopPartition}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat, FileSplit}
Expand Down Expand Up @@ -126,6 +129,29 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
}
}

test("basic pipe with separate working directory") {
if (testCommandAvailable("cat")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)
val c = piped.collect()
assert(c.size === 4)
assert(c(0) === "1")
assert(c(1) === "2")
assert(c(2) === "3")
assert(c(3) === "4")
val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
val collectPwd = pipedPwd.collect()
assert(collectPwd(0).contains("tasks/"))
val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true).collect()
// make sure symlinks were created
assert(pipedLs.length > 0)
// clean up top level tasks directory
new File("tasks").delete()
} else {
assert(true)
}
}

test("test pipe exports map_input_file") {
testExportInputFile("map_input_file")
}
Expand Down