From 74f2f4751a5fd0d5878b4ceebab8dbd2a18c89ac Mon Sep 17 00:00:00 2001 From: Wei Li Date: Sun, 8 Mar 2015 22:48:06 -0700 Subject: [PATCH] Extended to make the saveAsHadoop file an async operation to leverage the FutureAction --- .../scala/org/apache/spark/FutureAction.scala | 78 ++++++ .../scala/org/apache/spark/SparkContext.scala | 23 ++ .../apache/spark/rdd/PairRDDFunctions.scala | 239 ++++++++++++++++++ 3 files changed, 340 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 3bbf1aab199bd..78d9de140263c 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -24,6 +24,7 @@ import scala.util.Try import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter} +import java.util.concurrent.atomic.AtomicBoolean /** * :: Experimental :: @@ -151,6 +152,83 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } } +/** + * :: Experimental :: + * A [[FutureAction]] holding the result of an action that triggers a single job. Also a post + * complete function passed in for resource collection and/or job accounting, etc. + */ +@Experimental +class SimpleFutureWithPostCompleteAction[T] private[spark]( + jobWaiter: JobWaiter[_], resultFunc: => T, postCompleteFunc: () => Unit) + extends FutureAction[T] { + + override def cancel() { + jobWaiter.cancel() + } + + override def ready(atMost: Duration)( + implicit permit: CanAwait): SimpleFutureWithPostCompleteAction.this.type = { + if (!atMost.isFinite()) { + awaitResult() + } else jobWaiter.synchronized { + val finishTime = System.currentTimeMillis() + atMost.toMillis + while (!isCompleted) { + val time = System.currentTimeMillis() + if (time >= finishTime) { + executePostCompleteFunc() + throw new TimeoutException + } else { + jobWaiter.wait(finishTime - time) + } + } + } + this + } + + @throws(classOf[Exception]) + override def result(atMost: Duration)(implicit permit: CanAwait): T = { + ready(atMost)(permit) + awaitResult() match { + case scala.util.Success(res) => res + case scala.util.Failure(e) => throw e + } + } + + override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext) { + executor.execute(new Runnable { + override def run() { + func(awaitResult()) + } + }) + } + + override def isCompleted: Boolean = jobWaiter.jobFinished + + override def value: Option[Try[T]] = { + if (jobWaiter.jobFinished) { + Some(awaitResult()) + } else { + None + } + } + + private def awaitResult(): Try[T] = { + jobWaiter.awaitResult() match { + case JobSucceeded => + executePostCompleteFunc() + scala.util.Success(resultFunc) + case JobFailed(e: Exception) => + executePostCompleteFunc() + scala.util.Failure(e) + } + } + + private[this] def executePostCompleteFunc() { + if (!postCompleteExecuted.getAndSet(true)) postCompleteFunc() + } + + private[this] val postCompleteExecuted: AtomicBoolean = new AtomicBoolean(false) +} /** * :: Experimental :: diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 58834e2487992..69bc89b2ce8ae 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1119,6 +1119,29 @@ class SparkContext(config: SparkConf) extends Logging { rdd.doCheckpoint() } + def submitSaveAsHadoopFileJob[T, U: ClassTag]( + rdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + partitions: Seq[Int], + allowLocal: Boolean, + resultHandler: (Int, U) => Unit, + postCompleteFunc: () => Unit): FutureAction[Unit] = { + if (dagScheduler == null) { + throw new SparkException("SparkContext has been shutdown") + } + val callSite = getCallSite + val cleanedFunc = clean(func) + val waiter = dagScheduler.submitJob( + rdd, + cleanedFunc, + partitions, + callSite, + allowLocal, + resultHandler, + localProperties.get) + new SimpleFutureWithPostCompleteAction(waiter, Unit, postCompleteFunc) + } + /** * Run a function on a given set of partitions in an RDD and return the results as an array. The * allowLocal flag specifies whether the scheduler can run the computation on the driver rather diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 7cc67ea2278d3..5297c601620be 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1034,6 +1034,245 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.commitJob() } + ////////////////////////////////////// + // Async saveAsHadoopFile extension // + ////////////////////////////////////// + + /** + * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class + * supporting the key and value types K and V in this RDD. + */ + def saveAsHadoopFileAsync[F <: OutputFormat[K, V]](path: String)( + implicit fm: ClassTag[F]): FutureAction[Unit] = { + saveAsHadoopFileAsync(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + } + + /** + * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class + * supporting the key and value types K and V in this RDD. Compress the result with the + * supplied codec. + */ + def saveAsHadoopFileAsync[F <: OutputFormat[K, V]]( + path: String, codec: Class[_ <: CompressionCodec]) ( + implicit fm: ClassTag[F]): FutureAction[Unit] = { + val runtimeClass = fm.runtimeClass + saveAsHadoopFileAsync(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec) + } + + /** + * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` + * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. + */ + def saveAsNewAPIHadoopFileAsync[F <: NewOutputFormat[K, V]]( + path: String)( + implicit fm: ClassTag[F]): FutureAction[Unit] = { + saveAsNewAPIHadoopFileAsync(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + } + + /** + * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` + * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. + */ + def saveAsNewAPIHadoopFileAsync( + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + conf: Configuration = self.context.hadoopConfiguration): FutureAction[Unit] = + { + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). + val hadoopConf = conf + val job = new NewAPIHadoopJob(hadoopConf) + job.setOutputKeyClass(keyClass) + job.setOutputValueClass(valueClass) + job.setOutputFormatClass(outputFormatClass) + job.getConfiguration.set("mapred.output.dir", path) + saveAsNewAPIHadoopDatasetAsync(job.getConfiguration) + } + + /** + * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class + * supporting the key and value types K and V in this RDD. Compress with the supplied codec. + */ + def saveAsHadoopFileAsync( + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + codec: Class[_ <: CompressionCodec]): FutureAction[Unit] = { + saveAsHadoopFileAsync(path, keyClass, valueClass, outputFormatClass, + new JobConf(self.context.hadoopConfiguration), Some(codec)) + } + + /** + * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class + * supporting the key and value types K and V in this RDD. + */ + def saveAsHadoopFileAsync( + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + conf: JobConf = new JobConf(self.context.hadoopConfiguration), + codec: Option[Class[_ <: CompressionCodec]] = None): FutureAction[Unit] = { + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). + val hadoopConf = conf + hadoopConf.setOutputKeyClass(keyClass) + hadoopConf.setOutputValueClass(valueClass) + // Doesn't work in Scala 2.9 due to what may be a generics bug + // TODO: Should we uncomment this for Scala 2.10? + // conf.setOutputFormat(outputFormatClass) + hadoopConf.set("mapred.output.format.class", outputFormatClass.getName) + for (c <- codec) { + hadoopConf.setCompressMapOutput(true) + hadoopConf.set("mapred.output.compress", "true") + hadoopConf.setMapOutputCompressorClass(c) + hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) + hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) + } + hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) + FileOutputFormat.setOutputPath(hadoopConf, + SparkHadoopWriter.createPathFromString(path, hadoopConf)) + saveAsHadoopDatasetAsync(hadoopConf) + } + + /** + * Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop + * Configuration object for that storage system. The Conf should set an OutputFormat and any + * output paths required (e.g. a table name to write to) in the same way as it would be + * configured for a Hadoop MapReduce job. + */ + def saveAsNewAPIHadoopDatasetAsync(conf: Configuration): FutureAction[Unit] = { + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). + val hadoopConf = conf + val job = new NewAPIHadoopJob(hadoopConf) + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + val jobtrackerID = formatter.format(new Date()) + val stageId = self.id + val wrappedConf = new SerializableWritable(job.getConfiguration) + val outfmt = job.getOutputFormatClass + val jobFormat = outfmt.newInstance + + if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { + // FileOutputFormat ignores the filesystem parameter + jobFormat.checkOutputSpecs(job) + } + + val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => { + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt + /* "reduce task" */ + val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, + attemptNumber) + val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) + val format = outfmt.newInstance + format match { + case c: Configurable => c.setConf(wrappedConf.value) + case _ => () + } + val committer = format.getOutputCommitter(hadoopContext) + committer.setupTask(hadoopContext) + val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] + try { + while (iter.hasNext) { + val pair = iter.next() + writer.write(pair._1, pair._2) + } + } finally { + writer.close(hadoopContext) + } + committer.commitTask(hadoopContext) + 1 + } : Int + + val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0) + val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) + val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) + jobCommitter.setupJob(jobTaskContext) + + val postCompleteFunc = () => { jobCommitter.commitJob(jobTaskContext) } + + self.context.submitSaveAsHadoopFileJob( + self, + writeShard, + 0 until self.partitions.size, + false, + (_, _: Int) => Unit, + postCompleteFunc + ) + + } + + /** + * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for + * that storage system. The JobConf should set an OutputFormat and any output paths required + * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop + * MapReduce job. + */ + def saveAsHadoopDatasetAsync(conf: JobConf): FutureAction[Unit] = { + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). + val hadoopConf = conf + val outputFormatInstance = hadoopConf.getOutputFormat + val keyClass = hadoopConf.getOutputKeyClass + val valueClass = hadoopConf.getOutputValueClass + if (outputFormatInstance == null) { + throw new SparkException("Output format class not set") + } + if (keyClass == null) { + throw new SparkException("Output key class not set") + } + if (valueClass == null) { + throw new SparkException("Output value class not set") + } + SparkHadoopUtil.get.addCredentials(hadoopConf) + + logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + + valueClass.getSimpleName + ")") + + if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { + // FileOutputFormat ignores the filesystem parameter + val ignoredFs = FileSystem.get(hadoopConf) + hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) + } + + val writer = new SparkHadoopWriter(hadoopConf) + writer.preSetup() + + val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => { + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt + + writer.setup(context.stageId, context.partitionId, attemptNumber) + writer.open() + try { + var count = 0 + while (iter.hasNext) { + val record = iter.next() + count += 1 + writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) + } + } finally { + writer.close() + } + writer.commit() + } + + val postCompleteFunc = () => { writer.commitJob() } + + self.context.submitSaveAsHadoopFileJob( + self, + writeToFile, + 0 until self.partitions.size, + false, + (_, _: Unit) => Unit, + postCompleteFunc + ) + } + + + /** * Return an RDD with the keys of each tuple. */