Skip to content

Commit 7101017

Browse files
committed
Remove Hadoop object cloning and warn users making Hadoop RDD's.
The code introduced in apache#359 used Hadoop's WritableUtils.clone() to duplicate objects when reading from Hadoop files. Some users have reported exceptions when cloning data in verious file formats, including Avro and another custom format. This patch removes that functionality to ensure stability for the 0.9 release. Instead, it puts a clear warning in the documentation that copying may be necessary for Hadoop data sets.
1 parent a1cd185 commit 7101017

File tree

5 files changed

+134
-221
lines changed

5 files changed

+134
-221
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 77 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ class SparkContext(
341341
*/
342342
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
343343
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
344-
minSplits, cloneRecords = false).map(pair => pair._2.toString)
344+
minSplits).map(pair => pair._2.toString)
345345
}
346346

347347
/**
@@ -354,33 +354,37 @@ class SparkContext(
354354
* @param keyClass Class of the keys
355355
* @param valueClass Class of the values
356356
* @param minSplits Minimum number of Hadoop Splits to generate.
357-
* @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
358-
* Most RecordReader implementations reuse wrapper objects across multiple
359-
* records, and can cause problems in RDD collect or aggregation operations.
360-
* By default the records are cloned in Spark. However, application
361-
* programmers can explicitly disable the cloning for better performance.
357+
*
358+
* Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
359+
* record, directly caching the returned RDD will create many references to the same object.
360+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
361+
* a `map` function.
362362
*/
363-
def hadoopRDD[K: ClassTag, V: ClassTag](
363+
def hadoopRDD[K, V](
364364
conf: JobConf,
365365
inputFormatClass: Class[_ <: InputFormat[K, V]],
366366
keyClass: Class[K],
367367
valueClass: Class[V],
368-
minSplits: Int = defaultMinSplits,
369-
cloneRecords: Boolean = true
368+
minSplits: Int = defaultMinSplits
370369
): RDD[(K, V)] = {
371370
// Add necessary security credentials to the JobConf before broadcasting it.
372371
SparkHadoopUtil.get.addCredentials(conf)
373-
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
372+
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
374373
}
375374

376-
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
377-
def hadoopFile[K: ClassTag, V: ClassTag](
375+
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
376+
*
377+
* Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
378+
* record, directly caching the returned RDD will create many references to the same object.
379+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
380+
* a `map` function.
381+
* */
382+
def hadoopFile[K, V](
378383
path: String,
379384
inputFormatClass: Class[_ <: InputFormat[K, V]],
380385
keyClass: Class[K],
381386
valueClass: Class[V],
382-
minSplits: Int = defaultMinSplits,
383-
cloneRecords: Boolean = true
387+
minSplits: Int = defaultMinSplits
384388
): RDD[(K, V)] = {
385389
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
386390
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -392,8 +396,7 @@ class SparkContext(
392396
inputFormatClass,
393397
keyClass,
394398
valueClass,
395-
minSplits,
396-
cloneRecords)
399+
minSplits)
397400
}
398401

399402
/**
@@ -403,16 +406,20 @@ class SparkContext(
403406
* {{{
404407
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
405408
* }}}
409+
*
410+
* Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
411+
* record, directly caching the returned RDD will create many references to the same object.
412+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
413+
* a `map` function.
406414
*/
407415
def hadoopFile[K, V, F <: InputFormat[K, V]]
408-
(path: String, minSplits: Int, cloneRecords: Boolean = true)
416+
(path: String, minSplits: Int)
409417
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
410418
hadoopFile(path,
411419
fm.runtimeClass.asInstanceOf[Class[F]],
412420
km.runtimeClass.asInstanceOf[Class[K]],
413421
vm.runtimeClass.asInstanceOf[Class[V]],
414-
minSplits,
415-
cloneRecords)
422+
minSplits)
416423
}
417424

418425
/**
@@ -421,69 +428,91 @@ class SparkContext(
421428
* can just write, for example,
422429
* {{{
423430
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
424-
* }}}
431+
*
432+
* Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
433+
* record, directly caching the returned RDD will create many references to the same object.
434+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
435+
* a `map` function.
425436
*/
426-
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneRecords: Boolean = true)
437+
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
427438
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
428-
hadoopFile[K, V, F](path, defaultMinSplits, cloneRecords)
439+
hadoopFile[K, V, F](path, defaultMinSplits)
429440

430441
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
431442
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
432-
(path: String, cloneRecords: Boolean = true)
443+
(path: String)
433444
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
434445
newAPIHadoopFile(
435446
path,
436447
fm.runtimeClass.asInstanceOf[Class[F]],
437448
km.runtimeClass.asInstanceOf[Class[K]],
438-
vm.runtimeClass.asInstanceOf[Class[V]],
439-
cloneRecords = cloneRecords)
449+
vm.runtimeClass.asInstanceOf[Class[V]])
440450
}
441451

442452
/**
443453
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
444454
* and extra configuration options to pass to the input format.
455+
*
456+
* Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
457+
* record, directly caching the returned RDD will create many references to the same object.
458+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
459+
* a `map` function.
445460
*/
446-
def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
461+
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
447462
path: String,
448463
fClass: Class[F],
449464
kClass: Class[K],
450465
vClass: Class[V],
451-
conf: Configuration = hadoopConfiguration,
452-
cloneRecords: Boolean = true): RDD[(K, V)] = {
466+
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
453467
val job = new NewHadoopJob(conf)
454468
NewFileInputFormat.addInputPath(job, new Path(path))
455469
val updatedConf = job.getConfiguration
456-
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneRecords)
470+
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
457471
}
458472

459473
/**
460474
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
461475
* and extra configuration options to pass to the input format.
476+
*
477+
* Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
478+
* record, directly caching the returned RDD will create many references to the same object.
479+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
480+
* a `map` function.
462481
*/
463-
def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
482+
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
464483
conf: Configuration = hadoopConfiguration,
465484
fClass: Class[F],
466485
kClass: Class[K],
467-
vClass: Class[V],
468-
cloneRecords: Boolean = true): RDD[(K, V)] = {
469-
new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneRecords)
486+
vClass: Class[V]): RDD[(K, V)] = {
487+
new NewHadoopRDD(this, fClass, kClass, vClass, conf)
470488
}
471489

472-
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
490+
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
491+
*
492+
* Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
493+
* record, directly caching the returned RDD will create many references to the same object.
494+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
495+
* a `map` function.
496+
* */
473497
def sequenceFile[K: ClassTag, V: ClassTag](path: String,
474498
keyClass: Class[K],
475499
valueClass: Class[V],
476-
minSplits: Int,
477-
cloneRecords: Boolean = true
500+
minSplits: Int
478501
): RDD[(K, V)] = {
479502
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
480-
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
503+
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
481504
}
482505

483-
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
484-
def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
485-
cloneRecords: Boolean = true): RDD[(K, V)] =
486-
sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneRecords)
506+
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
507+
*
508+
* Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
509+
* record, directly caching the returned RDD will create many references to the same object.
510+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
511+
* a `map` function.
512+
* */
513+
def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V]
514+
): RDD[(K, V)] =
515+
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
487516

488517
/**
489518
* Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -500,9 +529,14 @@ class SparkContext(
500529
* have a parameterized singleton object). We use functions instead to create a new converter
501530
* for the appropriate type. In addition, we pass the converter a ClassTag of its type to
502531
* allow it to figure out the Writable class to use in the subclass case.
532+
*
533+
* Note: Because Hadoop's RecordReader class re-uses the same Writable object for each
534+
* record, directly caching the returned RDD will create many references to the same object.
535+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
536+
* a `map` function.
503537
*/
504538
def sequenceFile[K, V]
505-
(path: String, minSplits: Int = defaultMinSplits, cloneRecords: Boolean = true)
539+
(path: String, minSplits: Int = defaultMinSplits)
506540
(implicit km: ClassTag[K], vm: ClassTag[V],
507541
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
508542
: RDD[(K, V)] = {
@@ -511,7 +545,7 @@ class SparkContext(
511545
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
512546
val writables = hadoopFile(path, format,
513547
kc.writableClass(km).asInstanceOf[Class[Writable]],
514-
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneRecords)
548+
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
515549
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
516550
}
517551

0 commit comments

Comments
 (0)