Skip to content

Commit 8380064

Browse files
committed
Merge branch 'master' of github.com:apache/spark into streaming
Conflicts: python/docs/modules.rst python/run-tests
2 parents 52c535b + 69c3f44 commit 8380064

File tree

146 files changed

+3269
-1514
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

146 files changed

+3269
-1514
lines changed

bin/compute-classpath.cmd

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,13 @@ rem Load environment variables from conf\spark-env.cmd, if it exists
3636
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
3737

3838
rem Build up classpath
39-
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%;%FWDIR%conf
39+
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%
40+
41+
if "x%SPARK_CONF_DIR%"!="x" (
42+
set CLASSPATH=%CLASSPATH%;%SPARK_CONF_DIR%
43+
) else (
44+
set CLASSPATH=%CLASSPATH%;%FWDIR%conf
45+
)
4046

4147
if exist "%FWDIR%RELEASE" (
4248
for %%d in ("%FWDIR%lib\spark-assembly*.jar") do (

bin/compute-classpath.sh

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,14 @@ FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
2727

2828
. "$FWDIR"/bin/load-spark-env.sh
2929

30+
CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH"
31+
3032
# Build up classpath
31-
CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:$FWDIR/conf"
33+
if [ -n "$SPARK_CONF_DIR" ]; then
34+
CLASSPATH="$CLASSPATH:$SPARK_CONF_DIR"
35+
else
36+
CLASSPATH="$CLASSPATH:$FWDIR/conf"
37+
fi
3238

3339
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"
3440

bin/pyspark

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,20 @@ fi
5252

5353
# Figure out which Python executable to use
5454
if [[ -z "$PYSPARK_PYTHON" ]]; then
55-
PYSPARK_PYTHON="python"
55+
if [[ "$IPYTHON" = "1" || -n "$IPYTHON_OPTS" ]]; then
56+
# for backward compatibility
57+
PYSPARK_PYTHON="ipython"
58+
else
59+
PYSPARK_PYTHON="python"
60+
fi
5661
fi
5762
export PYSPARK_PYTHON
5863

64+
if [[ -z "$PYSPARK_PYTHON_OPTS" && -n "$IPYTHON_OPTS" ]]; then
65+
# for backward compatibility
66+
PYSPARK_PYTHON_OPTS="$IPYTHON_OPTS"
67+
fi
68+
5969
# Add the PySpark classes to the Python path:
6070
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
6171
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
@@ -64,11 +74,6 @@ export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
6474
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
6575
export PYTHONSTARTUP="$FWDIR/python/pyspark/shell.py"
6676

67-
# If IPython options are specified, assume user wants to run IPython
68-
if [[ -n "$IPYTHON_OPTS" ]]; then
69-
IPYTHON=1
70-
fi
71-
7277
# Build up arguments list manually to preserve quotes and backslashes.
7378
# We export Spark submit arguments as an environment variable because shell.py must run as a
7479
# PYTHONSTARTUP script, which does not take in arguments. This is required for IPython notebooks.
@@ -106,10 +111,5 @@ if [[ "$1" =~ \.py$ ]]; then
106111
else
107112
# PySpark shell requires special handling downstream
108113
export PYSPARK_SHELL=1
109-
# Only use ipython if no command line arguments were provided [SPARK-1134]
110-
if [[ "$IPYTHON" = "1" ]]; then
111-
exec ${PYSPARK_PYTHON:-ipython} $IPYTHON_OPTS
112-
else
113-
exec "$PYSPARK_PYTHON"
114-
fi
114+
exec "$PYSPARK_PYTHON" $PYSPARK_PYTHON_OPTS
115115
fi

bin/pyspark2.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*
3333
)
3434
if [%FOUND_JAR%] == [0] (
3535
echo Failed to find Spark assembly JAR.
36-
echo You need to build Spark with sbt\sbt assembly before running this program.
36+
echo You need to build Spark before running this program.
3737
goto exit
3838
)
3939
:skip_build_test

bin/run-example2.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ if exist "%FWDIR%RELEASE" (
5252
)
5353
if "x%SPARK_EXAMPLES_JAR%"=="x" (
5454
echo Failed to find Spark examples assembly JAR.
55-
echo You need to build Spark with sbt\sbt assembly before running this program.
55+
echo You need to build Spark before running this program.
5656
goto exit
5757
)
5858

bin/spark-class

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ fi
146146
if [[ "$1" =~ org.apache.spark.tools.* ]]; then
147147
if test -z "$SPARK_TOOLS_JAR"; then
148148
echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SCALA_VERSION/" 1>&2
149-
echo "You need to build spark before running $1." 1>&2
149+
echo "You need to build Spark before running $1." 1>&2
150150
exit 1
151151
fi
152152
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"

bin/spark-class2.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*
104104
)
105105
if "%FOUND_JAR%"=="0" (
106106
echo Failed to find Spark assembly JAR.
107-
echo You need to build Spark with sbt\sbt assembly before running this program.
107+
echo You need to build Spark before running this program.
108108
goto exit
109109
)
110110
:skip_build_test

bin/utils.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
# limitations under the License.
1818
#
1919

20-
# Gather all all spark-submit options into SUBMISSION_OPTS
20+
# Gather all spark-submit options into SUBMISSION_OPTS
2121
function gatherSparkSubmitOpts() {
2222

2323
if [ -z "$SUBMIT_USAGE_FUNCTION" ]; then

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,9 @@ import org.apache.spark.deploy.SparkHadoopUtil
103103
* and a Server, so for a particular connection is has to determine what to do.
104104
* A ConnectionId was added to be able to track connections and is used to
105105
* match up incoming messages with connections waiting for authentication.
106-
* If its acting as a client and trying to send a message to another ConnectionManager,
107-
* it blocks the thread calling sendMessage until the SASL negotiation has occurred.
108106
* The ConnectionManager tracks all the sendingConnections using the ConnectionId
109-
* and waits for the response from the server and does the handshake.
107+
* and waits for the response from the server and does the handshake before sending
108+
* the real message.
110109
*
111110
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
112111
* can be used. Yarn requires a specific AmIpFilter be installed for security to work

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,15 @@ class SparkContext(config: SparkConf) extends Logging {
187187
val master = conf.get("spark.master")
188188
val appName = conf.get("spark.app.name")
189189

190+
private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
191+
private[spark] val eventLogDir: Option[String] = {
192+
if (isEventLogEnabled) {
193+
Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
194+
} else {
195+
None
196+
}
197+
}
198+
190199
// Generate the random name for a temp folder in Tachyon
191200
// Add a timestamp as the suffix here to make it more safe
192201
val tachyonFolderName = "spark-" + randomUUID.toString()
@@ -200,6 +209,7 @@ class SparkContext(config: SparkConf) extends Logging {
200209
private[spark] val listenerBus = new LiveListenerBus
201210

202211
// Create the Spark execution environment (cache, map output tracker, etc)
212+
conf.set("spark.executor.id", "driver")
203213
private[spark] val env = SparkEnv.create(
204214
conf,
205215
"<driver>",
@@ -232,19 +242,6 @@ class SparkContext(config: SparkConf) extends Logging {
232242
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
233243
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
234244

235-
// Optionally log Spark events
236-
private[spark] val eventLogger: Option[EventLoggingListener] = {
237-
if (conf.getBoolean("spark.eventLog.enabled", false)) {
238-
val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
239-
logger.start()
240-
listenerBus.addListener(logger)
241-
Some(logger)
242-
} else None
243-
}
244-
245-
// At this point, all relevant SparkListeners have been registered, so begin releasing events
246-
listenerBus.start()
247-
248245
val startTime = System.currentTimeMillis()
249246

250247
// Add each JAR given through the constructor
@@ -309,6 +306,29 @@ class SparkContext(config: SparkConf) extends Logging {
309306
// constructor
310307
taskScheduler.start()
311308

309+
val applicationId: String = taskScheduler.applicationId()
310+
conf.set("spark.app.id", applicationId)
311+
312+
val metricsSystem = env.metricsSystem
313+
314+
// The metrics system for Driver need to be set spark.app.id to app ID.
315+
// So it should start after we get app ID from the task scheduler and set spark.app.id.
316+
metricsSystem.start()
317+
318+
// Optionally log Spark events
319+
private[spark] val eventLogger: Option[EventLoggingListener] = {
320+
if (isEventLogEnabled) {
321+
val logger =
322+
new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
323+
logger.start()
324+
listenerBus.addListener(logger)
325+
Some(logger)
326+
} else None
327+
}
328+
329+
// At this point, all relevant SparkListeners have been registered, so begin releasing events
330+
listenerBus.start()
331+
312332
private[spark] val cleaner: Option[ContextCleaner] = {
313333
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
314334
Some(new ContextCleaner(this))
@@ -411,8 +431,8 @@ class SparkContext(config: SparkConf) extends Logging {
411431
// Post init
412432
taskScheduler.postStartHook()
413433

414-
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
415-
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
434+
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
435+
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
416436

417437
private def initDriverMetrics() {
418438
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
@@ -759,20 +779,20 @@ class SparkContext(config: SparkConf) extends Logging {
759779
/**
760780
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
761781
* with `+=`. Only the driver can access the accumuable's `value`.
762-
* @tparam T accumulator type
763-
* @tparam R type that can be added to the accumulator
782+
* @tparam R accumulator result type
783+
* @tparam T type that can be added to the accumulator
764784
*/
765-
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
785+
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) =
766786
new Accumulable(initialValue, param)
767787

768788
/**
769789
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
770790
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
771791
* access the accumuable's `value`.
772-
* @tparam T accumulator type
773-
* @tparam R type that can be added to the accumulator
792+
* @tparam R accumulator result type
793+
* @tparam T type that can be added to the accumulator
774794
*/
775-
def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) =
795+
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) =
776796
new Accumulable(initialValue, param, Some(name))
777797

778798
/**
@@ -1278,7 +1298,7 @@ class SparkContext(config: SparkConf) extends Logging {
12781298
private def postApplicationStart() {
12791299
// Note: this code assumes that the task scheduler has been initialized and has contacted
12801300
// the cluster manager to get an application ID (in case the cluster manager provides one).
1281-
listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
1301+
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
12821302
startTime, sparkUser))
12831303
}
12841304

0 commit comments

Comments
 (0)