Skip to content

Commit cd8f59d

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into scalatest
2 parents 046540d + bd67551 commit cd8f59d

File tree

14 files changed

+62
-87
lines changed

14 files changed

+62
-87
lines changed

README.md

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,22 @@ And run the following command, which should also return 1000:
3939
## Example Programs
4040

4141
Spark also comes with several sample programs in the `examples` directory.
42-
To run one of them, use `./bin/run-example <class> <params>`. For example:
42+
To run one of them, use `./bin/run-example <class> [params]`. For example:
4343

44-
./bin/run-example org.apache.spark.examples.SparkLR local[2]
44+
./bin/run-example org.apache.spark.examples.SparkLR
4545

46-
will run the Logistic Regression example locally on 2 CPUs.
46+
will run the Logistic Regression example locally.
4747

48-
Each of the example programs prints usage help if no params are given.
48+
You can set the MASTER environment variable when running examples to submit
49+
examples to a cluster. This can be a mesos:// or spark:// URL,
50+
"yarn-cluster" or "yarn-client" to run on YARN, and "local" to run
51+
locally with one thread, or "local[N]" to run locally with N threads. You
52+
can also use an abbreviated class name if the class is in the `examples`
53+
package. For instance:
4954

50-
All of the Spark samples take a `<master>` parameter that is the cluster URL
51-
to connect to. This can be a mesos:// or spark:// URL, or "local" to run
52-
locally with one thread, or "local[N]" to run locally with N threads.
55+
MASTER=spark://host:7077 ./bin/run-example SparkPi
56+
57+
Many of the example programs print usage help if no params are given.
5358

5459
## Running Tests
5560

bin/pyspark

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ if [ ! -f "$FWDIR/RELEASE" ]; then
3131
ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null
3232
if [[ $? != 0 ]]; then
3333
echo "Failed to find Spark assembly in $FWDIR/assembly/target" >&2
34-
echo "You need to build Spark with sbt/sbt assembly before running this program" >&2
34+
echo "You need to build Spark before running this program" >&2
3535
exit 1
3636
fi
3737
fi

bin/run-example

Lines changed: 18 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,10 @@
1717
# limitations under the License.
1818
#
1919

20-
cygwin=false
21-
case "`uname`" in
22-
CYGWIN*) cygwin=true;;
23-
esac
24-
2520
SCALA_VERSION=2.10
2621

27-
# Figure out where the Scala framework is installed
2822
FWDIR="$(cd `dirname $0`/..; pwd)"
29-
30-
# Export this as SPARK_HOME
3123
export SPARK_HOME="$FWDIR"
32-
33-
. $FWDIR/bin/load-spark-env.sh
34-
35-
if [ -z "$1" ]; then
36-
echo "Usage: run-example <example-class> [<args>]" >&2
37-
exit 1
38-
fi
39-
40-
# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
41-
# to avoid the -sources and -doc packages that are built by publish-local.
4224
EXAMPLES_DIR="$FWDIR"/examples
4325

4426
if [ -f "$FWDIR/RELEASE" ]; then
@@ -49,46 +31,29 @@ fi
4931

5032
if [[ -z $SPARK_EXAMPLES_JAR ]]; then
5133
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" >&2
52-
echo "You need to build Spark with sbt/sbt assembly before running this program" >&2
34+
echo "You need to build Spark before running this program" >&2
5335
exit 1
5436
fi
5537

38+
EXAMPLE_MASTER=${MASTER:-"local[*]"}
5639

57-
# Since the examples JAR ideally shouldn't include spark-core (that dependency should be
58-
# "provided"), also add our standard Spark classpath, built using compute-classpath.sh.
59-
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
60-
CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH"
61-
62-
if $cygwin; then
63-
CLASSPATH=`cygpath -wp $CLASSPATH`
64-
export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR`
65-
fi
66-
67-
# Find java binary
68-
if [ -n "${JAVA_HOME}" ]; then
69-
RUNNER="${JAVA_HOME}/bin/java"
70-
else
71-
if [ `command -v java` ]; then
72-
RUNNER="java"
73-
else
74-
echo "JAVA_HOME is not set" >&2
75-
exit 1
76-
fi
77-
fi
78-
79-
# Set JAVA_OPTS to be able to load native libraries and to set heap size
80-
JAVA_OPTS="$SPARK_JAVA_OPTS"
81-
# Load extra JAVA_OPTS from conf/java-opts, if it exists
82-
if [ -e "$FWDIR/conf/java-opts" ] ; then
83-
JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
40+
if [ -n "$1" ]; then
41+
EXAMPLE_CLASS="$1"
42+
shift
43+
else
44+
echo "usage: ./bin/run-example <example-class> [example-args]"
45+
echo " - set MASTER=XX to use a specific master"
46+
echo " - can use abbreviated example class name (e.g. SparkPi, mllib.MovieLensALS)"
47+
echo
48+
exit -1
8449
fi
85-
export JAVA_OPTS
8650

87-
if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
88-
echo -n "Spark Command: "
89-
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
90-
echo "========================================"
91-
echo
51+
if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
52+
EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS"
9253
fi
9354

94-
exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
55+
./bin/spark-submit \
56+
--master $EXAMPLE_MASTER \
57+
--class $EXAMPLE_CLASS \
58+
$SPARK_EXAMPLES_JAR \
59+
"$@"

bin/spark-class

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ if [ ! -f "$FWDIR/RELEASE" ]; then
114114
jars_list=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep "spark-assembly.*hadoop.*.jar")
115115
if [ "$num_jars" -eq "0" ]; then
116116
echo "Failed to find Spark assembly in $FWDIR/assembly/target/scala-$SCALA_VERSION/" >&2
117-
echo "You need to build Spark with 'sbt/sbt assembly' before running this program." >&2
117+
echo "You need to build Spark before running this program." >&2
118118
exit 1
119119
fi
120120
if [ "$num_jars" -gt "1" ]; then

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,14 +160,15 @@ object SparkSubmit {
160160
// each deploy mode; we iterate through these below
161161
val options = List[OptionAssigner](
162162
OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
163+
OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
163164
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
164165
sysProp = "spark.driver.extraClassPath"),
165166
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
166167
sysProp = "spark.driver.extraJavaOptions"),
167168
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
168169
sysProp = "spark.driver.extraLibraryPath"),
169170
OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
170-
OptionAssigner(args.name, YARN, true, clOption = "--name"),
171+
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
171172
OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
172173
OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
173174
OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
@@ -188,8 +189,7 @@ object SparkSubmit {
188189
OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
189190
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
190191
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
191-
OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"),
192-
OptionAssigner(args.name, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.app.name")
192+
OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars")
193193
)
194194

195195
// For client mode make any added jars immediately visible on the classpath
@@ -205,7 +205,8 @@ object SparkSubmit {
205205
(clusterManager & opt.clusterManager) != 0) {
206206
if (opt.clOption != null) {
207207
childArgs += (opt.clOption, opt.value)
208-
} else if (opt.sysProp != null) {
208+
}
209+
if (opt.sysProp != null) {
209210
sysProps.put(opt.sysProp, opt.value)
210211
}
211212
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,9 +330,9 @@ abstract class RDD[T: ClassTag](
330330
if (shuffle) {
331331
// include a shuffle step so that our upstream tasks are still distributed
332332
new CoalescedRDD(
333-
new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)),
333+
new ShuffledRDD[Int, T, (Int, T)](map(x => (Utils.random.nextInt(), x)),
334334
new HashPartitioner(numPartitions)),
335-
numPartitions).keys
335+
numPartitions).values
336336
} else {
337337
new CoalescedRDD(this, numPartitions)
338338
}

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,12 @@ private[spark] object ShuffleMapTask {
5757
}
5858

5959
def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_]) = {
60-
synchronized {
61-
val loader = Thread.currentThread.getContextClassLoader
62-
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
63-
val ser = SparkEnv.get.closureSerializer.newInstance()
64-
val objIn = ser.deserializeStream(in)
65-
val rdd = objIn.readObject().asInstanceOf[RDD[_]]
66-
val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
67-
(rdd, dep)
68-
}
60+
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
61+
val ser = SparkEnv.get.closureSerializer.newInstance()
62+
val objIn = ser.deserializeStream(in)
63+
val rdd = objIn.readObject().asInstanceOf[RDD[_]]
64+
val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
65+
(rdd, dep)
6966
}
7067

7168
// Since both the JarSet and FileSet have the same format this is used for both.

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
250250
// Remove the block from the slave's BlockManager.
251251
// Doesn't actually wait for a confirmation and the message might get lost.
252252
// If message loss becomes frequent, we should add retry logic here.
253-
blockManager.get.slaveActor ! RemoveBlock(blockId)
253+
blockManager.get.slaveActor.ask(RemoveBlock(blockId))(akkaTimeout)
254254
}
255255
}
256256
}

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
104104
"--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
105105
"--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
106106
"--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
107-
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6",
107+
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "beauty",
108108
"thejar.jar", "arg1", "arg2")
109109
val appArgs = new SparkSubmitArguments(clArgs)
110110
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
@@ -122,16 +122,17 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
122122
childArgsStr should include ("--num-executors 6")
123123
mainClass should be ("org.apache.spark.deploy.yarn.Client")
124124
classpath should have length (0)
125-
sysProps should have size (1)
125+
sysProps("spark.app.name") should be ("beauty")
126+
sysProps("SPARK_SUBMIT") should be ("true")
126127
}
127128

128129
test("handles YARN client mode") {
129130
val clArgs = Seq("--deploy-mode", "client",
130131
"--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
131132
"--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
132133
"--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
133-
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "thejar.jar",
134-
"arg1", "arg2")
134+
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "trill",
135+
"thejar.jar", "arg1", "arg2")
135136
val appArgs = new SparkSubmitArguments(clArgs)
136137
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
137138
childArgs.mkString(" ") should be ("arg1 arg2")
@@ -140,6 +141,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
140141
classpath should contain ("one.jar")
141142
classpath should contain ("two.jar")
142143
classpath should contain ("three.jar")
144+
sysProps("spark.app.name") should be ("trill")
143145
sysProps("spark.executor.memory") should be ("5g")
144146
sysProps("spark.executor.cores") should be ("5")
145147
sysProps("spark.yarn.queue") should be ("thequeue")

docs/building-with-maven.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ Tests are run by default via the [ScalaTest Maven plugin](http://www.scalatest.o
9696

9797
The ScalaTest plugin also supports running only a specific test suite as follows:
9898

99-
$ mvn -Dhadoop.version=... -Dsuites=org.apache.spark.repl.ReplSuite test
99+
$ mvn -Dhadoop.version=... -DwildcardSuites=org.apache.spark.repl.ReplSuite test
100100

101101

102102
## Continuous Compilation ##

0 commit comments

Comments
 (0)