Skip to content

Extended to make the saveAs(New)HadoopFile an async operation #43

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,34 @@ class SparkContext(config: SparkConf) extends Logging {
new SimpleFutureAction(waiter, resultFunc)
}

/**
* :: Experimental ::
* Submit a job for execution, explicitly pass in the task context
* with the partition job and return a FutureJob holding the result.
*/
@Experimental
def submitJobWithTaskContext[T, U, R](
rdd: RDD[T],
processPartition: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit,
resultFunc: => R): SimpleFutureAction[R] =
{
val cleanF = clean(processPartition)
val callSite = getCallSite

val waiter = dagScheduler.submitJob(
rdd,
(context: TaskContext, iter: Iterator[T]) => cleanF(context, iter),
partitions,
callSite,
allowLocal = false,
resultHandler,
localProperties.get)

new SimpleFutureAction(waiter, resultFunc)
}

/**
* Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]]
* for more information.
Expand Down
241 changes: 241 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ import org.apache.spark.util.Utils
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.util.random.StratifiedSamplingUtils

import scala.concurrent.ExecutionContext

/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
* Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
Expand Down Expand Up @@ -1034,6 +1036,245 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.commitJob()
}


//////////////////////////////////////////////////
// CSD async saveAs(New)HadoopFile extensions //
// code is duplicated from synchronized version //
// to maintain binary-compatibility //
//////////////////////////////////////////////////


/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
*/
def saveAsHadoopFileAsync[F <: OutputFormat[K, V]](
path: String)(
implicit fm: ClassTag[F], executor: ExecutionContext): FutureAction[Unit] = {
saveAsHadoopFileAsync(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}

/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD. Compress the result with the
* supplied codec.
*/
def saveAsHadoopFileAsync[F <: OutputFormat[K, V]](
path: String, codec: Class[_ <: CompressionCodec])(
implicit fm: ClassTag[F], executor: ExecutionContext): FutureAction[Unit] = {
val runtimeClass = fm.runtimeClass
saveAsHadoopFileAsync(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec)
}

/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD. Compress with the supplied codec.
*/
def saveAsHadoopFileAsync(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
codec: Class[_ <: CompressionCodec])(
implicit executor: ExecutionContext): FutureAction[Unit] = {
saveAsHadoopFileAsync(path, keyClass, valueClass, outputFormatClass,
new JobConf(self.context.hadoopConfiguration), Some(codec))
}

/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
*/
def saveAsHadoopFileAsync(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
codec: Option[Class[_ <: CompressionCodec]] = None)(
implicit executor: ExecutionContext): FutureAction[Unit] = {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
hadoopConf.setOutputKeyClass(keyClass)
hadoopConf.setOutputValueClass(valueClass)
// Doesn't work in Scala 2.9 due to what may be a generics bug
// TODO: Should we uncomment this for Scala 2.10?
// conf.setOutputFormat(outputFormatClass)
hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)
for (c <- codec) {
hadoopConf.setCompressMapOutput(true)
hadoopConf.set("mapred.output.compress", "true")
hadoopConf.setMapOutputCompressorClass(c)
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDatasetAsync(hadoopConf)
}

/**
* Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
* that storage system. The JobConf should set an OutputFormat and any output paths required
* (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
* MapReduce job.
*/
def saveAsHadoopDatasetAsync(conf: JobConf)(
implicit executor: ExecutionContext): FutureAction[Unit] = {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val outputFormatInstance = hadoopConf.getOutputFormat
val keyClass = hadoopConf.getOutputKeyClass
val valueClass = hadoopConf.getOutputValueClass
if (outputFormatInstance == null) {
throw new SparkException("Output format class not set")
}
if (keyClass == null) {
throw new SparkException("Output key class not set")
}
if (valueClass == null) {
throw new SparkException("Output value class not set")
}
SparkHadoopUtil.get.addCredentials(hadoopConf)

logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")

if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
// FileOutputFormat ignores the filesystem parameter
val ignoredFs = FileSystem.get(hadoopConf)
hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
}

val writer = new SparkHadoopWriter(hadoopConf)
writer.preSetup()

val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt

writer.setup(context.stageId, context.partitionId, attemptNumber)
writer.open()
try {
var count = 0
while (iter.hasNext) {
val record = iter.next()
count += 1
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
}
} finally {
writer.close()
}
writer.commit()
}


self.context.submitJobWithTaskContext(
self,
writeToFile,
0 until self.partitions.size,
(_, _: Unit) => {},
{ writer.commitJob() }
)
}

/**
* Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
* (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
*/
def saveAsNewAPIHadoopFileAsync[F <: NewOutputFormat[K, V]](path: String)(
implicit fm: ClassTag[F]): FutureAction[Unit] = {
saveAsNewAPIHadoopFileAsync(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}

/**
* Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
* (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
*/
def saveAsNewAPIHadoopFileAsync(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = self.context.hadoopConfiguration): FutureAction[Unit] =
{
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val job = new NewAPIHadoopJob(hadoopConf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
job.getConfiguration.set("mapred.output.dir", path)
saveAsNewAPIHadoopDatasetAsync(job.getConfiguration)
}

/**
* Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop
* Configuration object for that storage system. The Conf should set an OutputFormat and any
* output paths required (e.g. a table name to write to) in the same way as it would be
* configured for a Hadoop MapReduce job.
*/
def saveAsNewAPIHadoopDatasetAsync(conf: Configuration): FutureAction[Unit] = {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val job = new NewAPIHadoopJob(hadoopConf)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
val wrappedConf = new SerializableWritable(job.getConfiguration)
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance

if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
}

val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = outfmt.newInstance
format match {
case c: Configurable => c.setConf(wrappedConf.value)
case _ => ()
}
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
try {
while (iter.hasNext) {
val pair = iter.next()
writer.write(pair._1, pair._2)
}
} finally {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
1
} : Int

val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)

self.context.submitJobWithTaskContext(
self,
writeShard,
0 until self.partitions.size,
(_, _:Int) => {},
{ jobCommitter.commitJob(jobTaskContext) }
)
}


/**
* Return an RDD with the keys of each tuple.
*/
Expand Down