Skip to content

Commit a0626ed

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into fix_hashcode
2 parents 89c2432 + 43c7ec6 commit a0626ed

File tree

157 files changed

+2046
-560
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

157 files changed

+2046
-560
lines changed

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,3 +950,4 @@ The following components are provided under the MIT License. See project link fo
950950
(MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt)
951951
(The MIT License) Mockito (org.mockito:mockito-all:1.8.5 - http://www.mockito.org)
952952
(MIT License) jquery (https://jquery.org/license/)
953+
(MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs)

core/src/main/scala/org/apache/spark/SerializableWritable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
4141
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
4242
in.defaultReadObject()
4343
val ow = new ObjectWritable()
44-
ow.setConf(new Configuration())
44+
ow.setConf(new Configuration(false))
4545
ow.readFields(in)
4646
t = ow.get().asInstanceOf[T]
4747
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -974,7 +974,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
974974
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
975975
assertNotStopped()
976976
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
977-
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
977+
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
978978
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
979979
new HadoopRDD(
980980
this,

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path
2828

2929
import org.apache.spark.mapred.SparkHadoopMapRedUtil
3030
import org.apache.spark.rdd.HadoopRDD
31+
import org.apache.spark.util.SerializableJobConf
3132

3233
/**
3334
* Internal helper class that saves an RDD using a Hadoop OutputFormat.
@@ -42,7 +43,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
4243
with Serializable {
4344

4445
private val now = new Date()
45-
private val conf = new SerializableWritable(jobConf)
46+
private val conf = new SerializableJobConf(jobConf)
4647

4748
private var jobID = 0
4849
private var splitID = 0

core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ package org.apache.spark.api.python
1919

2020
import org.apache.spark.broadcast.Broadcast
2121
import org.apache.spark.rdd.RDD
22-
import org.apache.spark.util.Utils
23-
import org.apache.spark.{Logging, SerializableWritable, SparkException}
22+
import org.apache.spark.util.{SerializableConfiguration, Utils}
23+
import org.apache.spark.{Logging, SparkException}
2424
import org.apache.hadoop.conf.Configuration
2525
import org.apache.hadoop.io._
2626
import scala.util.{Failure, Success, Try}
@@ -61,7 +61,7 @@ private[python] object Converter extends Logging {
6161
* Other objects are passed through without conversion.
6262
*/
6363
private[python] class WritableToJavaConverter(
64-
conf: Broadcast[SerializableWritable[Configuration]]) extends Converter[Any, Any] {
64+
conf: Broadcast[SerializableConfiguration]) extends Converter[Any, Any] {
6565

6666
/**
6767
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
3636
import org.apache.spark.broadcast.Broadcast
3737
import org.apache.spark.input.PortableDataStream
3838
import org.apache.spark.rdd.RDD
39-
import org.apache.spark.util.Utils
39+
import org.apache.spark.util.{SerializableConfiguration, Utils}
4040

4141
import scala.util.control.NonFatal
4242

@@ -445,7 +445,7 @@ private[spark] object PythonRDD extends Logging {
445445
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
446446
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
447447
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
448-
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
448+
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration()))
449449
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
450450
new WritableToJavaConverter(confBroadcasted))
451451
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -471,7 +471,7 @@ private[spark] object PythonRDD extends Logging {
471471
val rdd =
472472
newAPIHadoopRDDFromClassNames[K, V, F](sc,
473473
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
474-
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
474+
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
475475
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
476476
new WritableToJavaConverter(confBroadcasted))
477477
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -497,7 +497,7 @@ private[spark] object PythonRDD extends Logging {
497497
val rdd =
498498
newAPIHadoopRDDFromClassNames[K, V, F](sc,
499499
None, inputFormatClass, keyClass, valueClass, conf)
500-
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
500+
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
501501
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
502502
new WritableToJavaConverter(confBroadcasted))
503503
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -540,7 +540,7 @@ private[spark] object PythonRDD extends Logging {
540540
val rdd =
541541
hadoopRDDFromClassNames[K, V, F](sc,
542542
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
543-
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
543+
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
544544
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
545545
new WritableToJavaConverter(confBroadcasted))
546546
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -566,7 +566,7 @@ private[spark] object PythonRDD extends Logging {
566566
val rdd =
567567
hadoopRDDFromClassNames[K, V, F](sc,
568568
None, inputFormatClass, keyClass, valueClass, conf)
569-
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
569+
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
570570
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
571571
new WritableToJavaConverter(confBroadcasted))
572572
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))

core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@ import java.io.IOException
2121

2222
import scala.reflect.ClassTag
2323

24-
import org.apache.hadoop.conf.Configuration
2524
import org.apache.hadoop.fs.Path
2625

2726
import org.apache.spark._
2827
import org.apache.spark.broadcast.Broadcast
2928
import org.apache.spark.deploy.SparkHadoopUtil
30-
import org.apache.spark.util.Utils
29+
import org.apache.spark.util.{SerializableConfiguration, Utils}
3130

3231
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
3332

@@ -38,7 +37,7 @@ private[spark]
3837
class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
3938
extends RDD[T](sc, Nil) {
4039

41-
val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
40+
val broadcastedConf = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
4241

4342
@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
4443

@@ -87,7 +86,7 @@ private[spark] object CheckpointRDD extends Logging {
8786

8887
def writeToFile[T: ClassTag](
8988
path: String,
90-
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
89+
broadcastedConf: Broadcast[SerializableConfiguration],
9190
blockSize: Int = -1
9291
)(ctx: TaskContext, iterator: Iterator[T]) {
9392
val env = SparkEnv.get
@@ -135,7 +134,7 @@ private[spark] object CheckpointRDD extends Logging {
135134

136135
def readFromFile[T](
137136
path: Path,
138-
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
137+
broadcastedConf: Broadcast[SerializableConfiguration],
139138
context: TaskContext
140139
): Iterator[T] = {
141140
val env = SparkEnv.get
@@ -164,7 +163,7 @@ private[spark] object CheckpointRDD extends Logging {
164163
val path = new Path(hdfsPath, "temp")
165164
val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
166165
val fs = path.getFileSystem(conf)
167-
val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
166+
val broadcastedConf = sc.broadcast(new SerializableConfiguration(conf))
168167
sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, broadcastedConf, 1024) _)
169168
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
170169
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.broadcast.Broadcast
4444
import org.apache.spark.deploy.SparkHadoopUtil
4545
import org.apache.spark.executor.DataReadMethod
4646
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
47-
import org.apache.spark.util.{NextIterator, Utils}
47+
import org.apache.spark.util.{SerializableConfiguration, NextIterator, Utils}
4848
import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
4949
import org.apache.spark.storage.StorageLevel
5050

@@ -100,7 +100,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
100100
@DeveloperApi
101101
class HadoopRDD[K, V](
102102
@transient sc: SparkContext,
103-
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
103+
broadcastedConf: Broadcast[SerializableConfiguration],
104104
initLocalJobConfFuncOpt: Option[JobConf => Unit],
105105
inputFormatClass: Class[_ <: InputFormat[K, V]],
106106
keyClass: Class[K],
@@ -121,8 +121,8 @@ class HadoopRDD[K, V](
121121
minPartitions: Int) = {
122122
this(
123123
sc,
124-
sc.broadcast(new SerializableWritable(conf))
125-
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
124+
sc.broadcast(new SerializableConfiguration(conf))
125+
.asInstanceOf[Broadcast[SerializableConfiguration]],
126126
None /* initLocalJobConfFuncOpt */,
127127
inputFormatClass,
128128
keyClass,

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark._
3333
import org.apache.spark.executor.DataReadMethod
3434
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
3535
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
36-
import org.apache.spark.util.Utils
36+
import org.apache.spark.util.{SerializableConfiguration, Utils}
3737
import org.apache.spark.deploy.SparkHadoopUtil
3838
import org.apache.spark.storage.StorageLevel
3939

@@ -74,7 +74,7 @@ class NewHadoopRDD[K, V](
7474
with Logging {
7575

7676
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
77-
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
77+
private val confBroadcast = sc.broadcast(new SerializableConfiguration(conf))
7878
// private val serializableConf = new SerializableWritable(conf)
7979

8080
private val jobTrackerId: String = {

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.executor.{DataWriteMethod, OutputMetrics}
4444
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
4545
import org.apache.spark.partial.{BoundedDouble, PartialResult}
4646
import org.apache.spark.serializer.Serializer
47-
import org.apache.spark.util.Utils
47+
import org.apache.spark.util.{SerializableConfiguration, Utils}
4848
import org.apache.spark.util.collection.CompactBuffer
4949
import org.apache.spark.util.random.StratifiedSamplingUtils
5050

@@ -1002,7 +1002,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10021002
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
10031003
val jobtrackerID = formatter.format(new Date())
10041004
val stageId = self.id
1005-
val wrappedConf = new SerializableWritable(job.getConfiguration)
1005+
val wrappedConf = new SerializableConfiguration(job.getConfiguration)
10061006
val outfmt = job.getOutputFormatClass
10071007
val jobFormat = outfmt.newInstance
10081008

@@ -1065,7 +1065,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10651065
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
10661066
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
10671067
val hadoopConf = conf
1068-
val wrappedConf = new SerializableWritable(hadoopConf)
1068+
val wrappedConf = new SerializableConfiguration(hadoopConf)
10691069
val outputFormatInstance = hadoopConf.getOutputFormat
10701070
val keyClass = hadoopConf.getOutputKeyClass
10711071
val valueClass = hadoopConf.getOutputValueClass

0 commit comments

Comments
 (0)