Skip to content

Commit 5b061ae

Browse files
committed
merge master
2 parents 444e750 + 0154587 commit 5b061ae

File tree

170 files changed

+3250
-740
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

170 files changed

+3250
-740
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
sbt/*.jar
88
.settings
99
.cache
10-
.generated-mima-excludes
10+
.generated-mima*
1111
/build/
1212
work/
1313
out/

bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
8080
test("large number of iterations") {
8181
// This tests whether jobs with a large number of iterations finish in a reasonable time,
8282
// because non-memoized recursion in RDD or DAGScheduler used to cause them to hang
83-
failAfter(10 seconds) {
83+
failAfter(30 seconds) {
8484
sc = new SparkContext("local", "test")
8585
val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
8686
val msgs = sc.parallelize(Array[(String, TestMessage)]())
@@ -101,7 +101,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
101101
sc = new SparkContext("local", "test")
102102
val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
103103
val msgs = sc.parallelize(Array[(String, TestMessage)]())
104-
val numSupersteps = 50
104+
val numSupersteps = 20
105105
val result =
106106
Bagel.run(sc, verts, msgs, sc.defaultParallelism, StorageLevel.DISK_ONLY) {
107107
(self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>

bin/compute-classpath.sh

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@ else
3838
JAR_CMD="jar"
3939
fi
4040

41-
# First check if we have a dependencies jar. If so, include binary classes with the deps jar
42-
if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
41+
# A developer option to prepend more recently compiled Spark classes
42+
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
43+
echo "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark"\
44+
"classes ahead of assembly." >&2
4345
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/classes"
4446
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/classes"
4547
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/classes"
@@ -51,17 +53,31 @@ if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
5153
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
5254
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"
5355
CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes"
56+
fi
5457

55-
ASSEMBLY_JAR=$(ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar 2>/dev/null)
58+
# Use spark-assembly jar from either RELEASE or assembly directory
59+
if [ -f "$FWDIR/RELEASE" ]; then
60+
assembly_folder="$FWDIR"/lib
5661
else
57-
# Else use spark-assembly jar from either RELEASE or assembly directory
58-
if [ -f "$FWDIR/RELEASE" ]; then
59-
ASSEMBLY_JAR=$(ls "$FWDIR"/lib/spark-assembly*hadoop*.jar 2>/dev/null)
60-
else
61-
ASSEMBLY_JAR=$(ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*.jar 2>/dev/null)
62-
fi
62+
assembly_folder="$ASSEMBLY_DIR"
6363
fi
6464

65+
num_jars=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc -l)
66+
if [ "$num_jars" -eq "0" ]; then
67+
echo "Failed to find Spark assembly in $assembly_folder"
68+
echo "You need to build Spark before running this program."
69+
exit 1
70+
fi
71+
if [ "$num_jars" -gt "1" ]; then
72+
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar")
73+
echo "Found multiple Spark assembly jars in $assembly_folder:"
74+
echo "$jars_list"
75+
echo "Please remove all but one jar."
76+
exit 1
77+
fi
78+
79+
ASSEMBLY_JAR=$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null)
80+
6581
# Verify that versions of java used to build the jars and run Spark are compatible
6682
jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
6783
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then

bin/pyspark

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ fi
4545
. $FWDIR/bin/load-spark-env.sh
4646

4747
# Figure out which Python executable to use
48-
if [ -z "$PYSPARK_PYTHON" ] ; then
48+
if [[ -z "$PYSPARK_PYTHON" ]]; then
4949
PYSPARK_PYTHON="python"
5050
fi
5151
export PYSPARK_PYTHON
@@ -59,7 +59,7 @@ export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
5959
export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py
6060

6161
# If IPython options are specified, assume user wants to run IPython
62-
if [ -n "$IPYTHON_OPTS" ]; then
62+
if [[ -n "$IPYTHON_OPTS" ]]; then
6363
IPYTHON=1
6464
fi
6565

@@ -76,6 +76,16 @@ for i in "$@"; do
7676
done
7777
export PYSPARK_SUBMIT_ARGS
7878

79+
# For pyspark tests
80+
if [[ -n "$SPARK_TESTING" ]]; then
81+
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
82+
exec "$PYSPARK_PYTHON" -m doctest $1
83+
else
84+
exec "$PYSPARK_PYTHON" $1
85+
fi
86+
exit
87+
fi
88+
7989
# If a python file is provided, directly run spark-submit.
8090
if [[ "$1" =~ \.py$ ]]; then
8191
echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0." 1>&2

bin/spark-class

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -108,23 +108,6 @@ fi
108108
export JAVA_OPTS
109109
# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!
110110

111-
if [ ! -f "$FWDIR/RELEASE" ]; then
112-
# Exit if the user hasn't compiled Spark
113-
num_jars=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep "spark-assembly.*hadoop.*.jar" | wc -l)
114-
jars_list=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep "spark-assembly.*hadoop.*.jar")
115-
if [ "$num_jars" -eq "0" ]; then
116-
echo "Failed to find Spark assembly in $FWDIR/assembly/target/scala-$SCALA_VERSION/" >&2
117-
echo "You need to build Spark before running this program." >&2
118-
exit 1
119-
fi
120-
if [ "$num_jars" -gt "1" ]; then
121-
echo "Found multiple Spark assembly jars in $FWDIR/assembly/target/scala-$SCALA_VERSION:" >&2
122-
echo "$jars_list"
123-
echo "Please remove all but one jar."
124-
exit 1
125-
fi
126-
fi
127-
128111
TOOLS_DIR="$FWDIR"/tools
129112
SPARK_TOOLS_JAR=""
130113
if [ -e "$TOOLS_DIR"/target/scala-$SCALA_VERSION/*assembly*[0-9Tg].jar ]; then

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
9696
}
9797

9898
/** Register a ShuffleDependency for cleanup when it is garbage collected. */
99-
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _]) {
99+
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]) {
100100
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
101101
}
102102

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark
2020
import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.rdd.RDD
2222
import org.apache.spark.serializer.Serializer
23+
import org.apache.spark.shuffle.ShuffleHandle
2324

2425
/**
2526
* :: DeveloperApi ::
@@ -50,19 +51,24 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
5051
* Represents a dependency on the output of a shuffle stage.
5152
* @param rdd the parent RDD
5253
* @param partitioner partitioner used to partition the shuffle output
53-
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to null,
54+
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
5455
* the default serializer, as specified by `spark.serializer` config option, will
5556
* be used.
5657
*/
5758
@DeveloperApi
58-
class ShuffleDependency[K, V](
59+
class ShuffleDependency[K, V, C](
5960
@transient rdd: RDD[_ <: Product2[K, V]],
6061
val partitioner: Partitioner,
61-
val serializer: Serializer = null)
62+
val serializer: Option[Serializer] = None,
63+
val keyOrdering: Option[Ordering[K]] = None,
64+
val aggregator: Option[Aggregator[K, V, C]] = None)
6265
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
6366

6467
val shuffleId: Int = rdd.context.newShuffleId()
6568

69+
val shuffleHandle: ShuffleHandle = rdd.context.env.shuffleManager.registerShuffle(
70+
shuffleId, rdd.partitions.size, this)
71+
6672
rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
6773
}
6874

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,14 +290,17 @@ class SparkContext(config: SparkConf) extends Logging {
290290
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
291291
executorEnvs(envKey) = value
292292
}
293+
Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
294+
executorEnvs("SPARK_PREPEND_CLASSES") = v
295+
}
293296
// The Mesos scheduler backend relies on this environment variable to set executor memory.
294297
// TODO: Set this only in the Mesos scheduler.
295298
executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
296299
executorEnvs ++= conf.getExecutorEnv
297300

298301
// Set SPARK_USER for user who is running SparkContext.
299302
val sparkUser = Option {
300-
Option(System.getProperty("user.name")).getOrElse(System.getenv("SPARK_USER"))
303+
Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name"))
301304
}.getOrElse {
302305
SparkContext.SPARK_UNKNOWN_USER
303306
}
@@ -431,12 +434,21 @@ class SparkContext(config: SparkConf) extends Logging {
431434

432435
// Methods for creating RDDs
433436

434-
/** Distribute a local Scala collection to form an RDD. */
437+
/** Distribute a local Scala collection to form an RDD.
438+
*
439+
* @note Parallelize acts lazily. If `seq` is a mutable collection and is
440+
* altered after the call to parallelize and before the first action on the
441+
* RDD, the resultant RDD will reflect the modified collection. Pass a copy of
442+
* the argument to avoid this.
443+
*/
435444
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
436445
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
437446
}
438447

439-
/** Distribute a local Scala collection to form an RDD. */
448+
/** Distribute a local Scala collection to form an RDD.
449+
*
450+
* This method is identical to `parallelize`.
451+
*/
440452
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
441453
parallelize(seq, numSlices)
442454
}
@@ -823,9 +835,11 @@ class SparkContext(config: SparkConf) extends Logging {
823835
}
824836

825837
/**
838+
* :: DeveloperApi ::
826839
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
827840
* they take, etc.
828841
*/
842+
@DeveloperApi
829843
def getRDDStorageInfo: Array[RDDInfo] = {
830844
StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
831845
}
@@ -837,8 +851,10 @@ class SparkContext(config: SparkConf) extends Logging {
837851
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
838852

839853
/**
854+
* :: DeveloperApi ::
840855
* Return information about blocks stored in all of the slaves
841856
*/
857+
@DeveloperApi
842858
def getExecutorStorageStatus: Array[StorageStatus] = {
843859
env.blockManager.master.getStorageStatus
844860
}

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.metrics.MetricsSystem
3434
import org.apache.spark.network.ConnectionManager
3535
import org.apache.spark.scheduler.LiveListenerBus
3636
import org.apache.spark.serializer.Serializer
37+
import org.apache.spark.shuffle.ShuffleManager
3738
import org.apache.spark.storage._
3839
import org.apache.spark.util.{AkkaUtils, Utils}
3940

@@ -56,7 +57,7 @@ class SparkEnv (
5657
val closureSerializer: Serializer,
5758
val cacheManager: CacheManager,
5859
val mapOutputTracker: MapOutputTracker,
59-
val shuffleFetcher: ShuffleFetcher,
60+
val shuffleManager: ShuffleManager,
6061
val broadcastManager: BroadcastManager,
6162
val blockManager: BlockManager,
6263
val connectionManager: ConnectionManager,
@@ -80,7 +81,7 @@ class SparkEnv (
8081
pythonWorkers.foreach { case(key, worker) => worker.stop() }
8182
httpFileServer.stop()
8283
mapOutputTracker.stop()
83-
shuffleFetcher.stop()
84+
shuffleManager.stop()
8485
broadcastManager.stop()
8586
blockManager.stop()
8687
blockManager.master.stop()
@@ -163,13 +164,20 @@ object SparkEnv extends Logging {
163164
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
164165
val name = conf.get(propertyName, defaultClassName)
165166
val cls = Class.forName(name, true, Utils.getContextOrSparkClassLoader)
166-
// First try with the constructor that takes SparkConf. If we can't find one,
167-
// use a no-arg constructor instead.
167+
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
168+
// SparkConf, then one taking no arguments
168169
try {
169-
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
170+
cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
171+
.newInstance(conf, new java.lang.Boolean(isDriver))
172+
.asInstanceOf[T]
170173
} catch {
171174
case _: NoSuchMethodException =>
172-
cls.getConstructor().newInstance().asInstanceOf[T]
175+
try {
176+
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
177+
} catch {
178+
case _: NoSuchMethodException =>
179+
cls.getConstructor().newInstance().asInstanceOf[T]
180+
}
173181
}
174182
}
175183

@@ -219,9 +227,6 @@ object SparkEnv extends Logging {
219227

220228
val cacheManager = new CacheManager(blockManager)
221229

222-
val shuffleFetcher = instantiateClass[ShuffleFetcher](
223-
"spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher")
224-
225230
val httpFileServer = new HttpFileServer(securityManager)
226231
httpFileServer.initialize()
227232
conf.set("spark.fileserver.uri", httpFileServer.serverUri)
@@ -242,6 +247,9 @@ object SparkEnv extends Logging {
242247
"."
243248
}
244249

250+
val shuffleManager = instantiateClass[ShuffleManager](
251+
"spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
252+
245253
// Warn about deprecated spark.cache.class property
246254
if (conf.contains("spark.cache.class")) {
247255
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
@@ -255,7 +263,7 @@ object SparkEnv extends Logging {
255263
closureSerializer,
256264
cacheManager,
257265
mapOutputTracker,
258-
shuffleFetcher,
266+
shuffleManager,
259267
broadcastManager,
260268
blockManager,
261269
connectionManager,

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,50 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
228228
: PartialResult[java.util.Map[K, BoundedDouble]] =
229229
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
230230

231+
/**
232+
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
233+
* This function can return a different result type, U, than the type of the values in this RDD,
234+
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
235+
* as in scala.TraversableOnce. The former operation is used for merging values within a
236+
* partition, and the latter is used for merging values between partitions. To avoid memory
237+
* allocation, both of these functions are allowed to modify and return their first argument
238+
* instead of creating a new U.
239+
*/
240+
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner, seqFunc: JFunction2[U, V, U],
241+
combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
242+
implicit val ctag: ClassTag[U] = fakeClassTag
243+
fromRDD(rdd.aggregateByKey(zeroValue, partitioner)(seqFunc, combFunc))
244+
}
245+
246+
/**
247+
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
248+
* This function can return a different result type, U, than the type of the values in this RDD,
249+
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
250+
* as in scala.TraversableOnce. The former operation is used for merging values within a
251+
* partition, and the latter is used for merging values between partitions. To avoid memory
252+
* allocation, both of these functions are allowed to modify and return their first argument
253+
* instead of creating a new U.
254+
*/
255+
def aggregateByKey[U](zeroValue: U, numPartitions: Int, seqFunc: JFunction2[U, V, U],
256+
combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
257+
implicit val ctag: ClassTag[U] = fakeClassTag
258+
fromRDD(rdd.aggregateByKey(zeroValue, numPartitions)(seqFunc, combFunc))
259+
}
260+
261+
/**
262+
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
263+
* This function can return a different result type, U, than the type of the values in this RDD,
264+
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's.
265+
* The former operation is used for merging values within a partition, and the latter is used for
266+
* merging values between partitions. To avoid memory allocation, both of these functions are
267+
* allowed to modify and return their first argument instead of creating a new U.
268+
*/
269+
def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U], combFunc: JFunction2[U, U, U]):
270+
JavaPairRDD[K, U] = {
271+
implicit val ctag: ClassTag[U] = fakeClassTag
272+
fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc))
273+
}
274+
231275
/**
232276
* Merge the values for each key using an associative function and a neutral "zero value" which
233277
* may be added to the result an arbitrary number of times, and must not change the result

0 commit comments

Comments
 (0)