diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 58834e2487992..db9ad4bc34d24 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1230,6 +1230,34 @@ class SparkContext(config: SparkConf) extends Logging { new SimpleFutureAction(waiter, resultFunc) } + /** + * :: Experimental :: + * Submit a job for execution, explicitly pass in the task context + * with the partition job and return a FutureJob holding the result. + */ + @Experimental + def submitJobWithTaskContext[T, U, R]( + rdd: RDD[T], + processPartition: (TaskContext, Iterator[T]) => U, + partitions: Seq[Int], + resultHandler: (Int, U) => Unit, + resultFunc: => R): SimpleFutureAction[R] = + { + val cleanF = clean(processPartition) + val callSite = getCallSite + + val waiter = dagScheduler.submitJob( + rdd, + (context: TaskContext, iter: Iterator[T]) => cleanF(context, iter), + partitions, + callSite, + allowLocal = false, + resultHandler, + localProperties.get) + + new SimpleFutureAction(waiter, resultFunc) + } + /** * Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]] * for more information. diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 7cc67ea2278d3..2f90350da3458 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -46,6 +46,8 @@ import org.apache.spark.util.Utils import org.apache.spark.util.collection.CompactBuffer import org.apache.spark.util.random.StratifiedSamplingUtils +import scala.concurrent.ExecutionContext + /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions. @@ -1034,6 +1036,245 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.commitJob() } + + ////////////////////////////////////////////////// + // CSD async saveAs(New)HadoopFile extensions // + // code is duplicated from synchronized version // + // to maintain binary-compatibility // + ////////////////////////////////////////////////// + + + /** + * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class + * supporting the key and value types K and V in this RDD. + */ + def saveAsHadoopFileAsync[F <: OutputFormat[K, V]]( + path: String)( + implicit fm: ClassTag[F], executor: ExecutionContext): FutureAction[Unit] = { + saveAsHadoopFileAsync(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + } + + /** + * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class + * supporting the key and value types K and V in this RDD. Compress the result with the + * supplied codec. + */ + def saveAsHadoopFileAsync[F <: OutputFormat[K, V]]( + path: String, codec: Class[_ <: CompressionCodec])( + implicit fm: ClassTag[F], executor: ExecutionContext): FutureAction[Unit] = { + val runtimeClass = fm.runtimeClass + saveAsHadoopFileAsync(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec) + } + + /** + * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class + * supporting the key and value types K and V in this RDD. Compress with the supplied codec. + */ + def saveAsHadoopFileAsync( + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + codec: Class[_ <: CompressionCodec])( + implicit executor: ExecutionContext): FutureAction[Unit] = { + saveAsHadoopFileAsync(path, keyClass, valueClass, outputFormatClass, + new JobConf(self.context.hadoopConfiguration), Some(codec)) + } + + /** + * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class + * supporting the key and value types K and V in this RDD. + */ + def saveAsHadoopFileAsync( + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + conf: JobConf = new JobConf(self.context.hadoopConfiguration), + codec: Option[Class[_ <: CompressionCodec]] = None)( + implicit executor: ExecutionContext): FutureAction[Unit] = { + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). + val hadoopConf = conf + hadoopConf.setOutputKeyClass(keyClass) + hadoopConf.setOutputValueClass(valueClass) + // Doesn't work in Scala 2.9 due to what may be a generics bug + // TODO: Should we uncomment this for Scala 2.10? + // conf.setOutputFormat(outputFormatClass) + hadoopConf.set("mapred.output.format.class", outputFormatClass.getName) + for (c <- codec) { + hadoopConf.setCompressMapOutput(true) + hadoopConf.set("mapred.output.compress", "true") + hadoopConf.setMapOutputCompressorClass(c) + hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) + hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) + } + hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) + FileOutputFormat.setOutputPath(hadoopConf, + SparkHadoopWriter.createPathFromString(path, hadoopConf)) + saveAsHadoopDatasetAsync(hadoopConf) + } + + /** + * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for + * that storage system. The JobConf should set an OutputFormat and any output paths required + * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop + * MapReduce job. + */ + def saveAsHadoopDatasetAsync(conf: JobConf)( + implicit executor: ExecutionContext): FutureAction[Unit] = { + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). + val hadoopConf = conf + val outputFormatInstance = hadoopConf.getOutputFormat + val keyClass = hadoopConf.getOutputKeyClass + val valueClass = hadoopConf.getOutputValueClass + if (outputFormatInstance == null) { + throw new SparkException("Output format class not set") + } + if (keyClass == null) { + throw new SparkException("Output key class not set") + } + if (valueClass == null) { + throw new SparkException("Output value class not set") + } + SparkHadoopUtil.get.addCredentials(hadoopConf) + + logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + + valueClass.getSimpleName + ")") + + if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { + // FileOutputFormat ignores the filesystem parameter + val ignoredFs = FileSystem.get(hadoopConf) + hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) + } + + val writer = new SparkHadoopWriter(hadoopConf) + writer.preSetup() + + val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => { + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt + + writer.setup(context.stageId, context.partitionId, attemptNumber) + writer.open() + try { + var count = 0 + while (iter.hasNext) { + val record = iter.next() + count += 1 + writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) + } + } finally { + writer.close() + } + writer.commit() + } + + + self.context.submitJobWithTaskContext( + self, + writeToFile, + 0 until self.partitions.size, + (_, _: Unit) => {}, + { writer.commitJob() } + ) + } + + /** + * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` + * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. + */ + def saveAsNewAPIHadoopFileAsync[F <: NewOutputFormat[K, V]](path: String)( + implicit fm: ClassTag[F]): FutureAction[Unit] = { + saveAsNewAPIHadoopFileAsync(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + } + + /** + * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` + * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. + */ + def saveAsNewAPIHadoopFileAsync( + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + conf: Configuration = self.context.hadoopConfiguration): FutureAction[Unit] = + { + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). + val hadoopConf = conf + val job = new NewAPIHadoopJob(hadoopConf) + job.setOutputKeyClass(keyClass) + job.setOutputValueClass(valueClass) + job.setOutputFormatClass(outputFormatClass) + job.getConfiguration.set("mapred.output.dir", path) + saveAsNewAPIHadoopDatasetAsync(job.getConfiguration) + } + + /** + * Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop + * Configuration object for that storage system. The Conf should set an OutputFormat and any + * output paths required (e.g. a table name to write to) in the same way as it would be + * configured for a Hadoop MapReduce job. + */ + def saveAsNewAPIHadoopDatasetAsync(conf: Configuration): FutureAction[Unit] = { + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). + val hadoopConf = conf + val job = new NewAPIHadoopJob(hadoopConf) + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + val jobtrackerID = formatter.format(new Date()) + val stageId = self.id + val wrappedConf = new SerializableWritable(job.getConfiguration) + val outfmt = job.getOutputFormatClass + val jobFormat = outfmt.newInstance + + if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { + // FileOutputFormat ignores the filesystem parameter + jobFormat.checkOutputSpecs(job) + } + + val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => { + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt + /* "reduce task" */ + val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, + attemptNumber) + val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) + val format = outfmt.newInstance + format match { + case c: Configurable => c.setConf(wrappedConf.value) + case _ => () + } + val committer = format.getOutputCommitter(hadoopContext) + committer.setupTask(hadoopContext) + val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] + try { + while (iter.hasNext) { + val pair = iter.next() + writer.write(pair._1, pair._2) + } + } finally { + writer.close(hadoopContext) + } + committer.commitTask(hadoopContext) + 1 + } : Int + + val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0) + val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) + val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) + jobCommitter.setupJob(jobTaskContext) + + self.context.submitJobWithTaskContext( + self, + writeShard, + 0 until self.partitions.size, + (_, _:Int) => {}, + { jobCommitter.commitJob(jobTaskContext) } + ) + } + + /** * Return an RDD with the keys of each tuple. */