Skip to content

Commit d19c4da

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into viz2
2 parents 7ef957c + 28b1af7 commit d19c4da

File tree

111 files changed

+5539
-891
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

111 files changed

+5539
-891
lines changed

assembly/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,6 @@
194194
<plugin>
195195
<groupId>org.apache.maven.plugins</groupId>
196196
<artifactId>maven-assembly-plugin</artifactId>
197-
<version>2.4</version>
198197
<executions>
199198
<execution>
200199
<id>dist</id>

bin/spark-class2.cmd

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
6161

6262
rem The launcher library prints the command to be executed in a single line suitable for being
6363
rem executed by the batch interpreter. So read all the output of the launcher into a variable.
64-
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %LAUNCH_CLASSPATH% org.apache.spark.launcher.Main %*"') do (
64+
set LAUNCHER_OUTPUT=%temp%\spark-class-launcher-output-%RANDOM%.txt
65+
"%RUNNER%" -cp %LAUNCH_CLASSPATH% org.apache.spark.launcher.Main %* > %LAUNCHER_OUTPUT%
66+
for /f "tokens=*" %%i in (%LAUNCHER_OUTPUT%) do (
6567
set SPARK_CMD=%%i
6668
)
69+
del %LAUNCHER_OUTPUT%
6770
%SPARK_CMD%

conf/spark-env.sh.template

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# This file is sourced when running various Spark programs.
44
# Copy it as spark-env.sh and edit that to configure Spark for your site.
55

6-
# Options read when launching programs locally with
6+
# Options read when launching programs locally with
77
# ./bin/run-example or ./bin/spark-submit
88
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
99
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
@@ -39,6 +39,7 @@
3939
# - SPARK_WORKER_DIR, to set the working directory of worker processes
4040
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
4141
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
42+
# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
4243
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
4344
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
4445

core/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,6 @@
478478
<plugin>
479479
<groupId>org.codehaus.mojo</groupId>
480480
<artifactId>exec-maven-plugin</artifactId>
481-
<version>1.3.2</version>
482481
<executions>
483482
<execution>
484483
<id>sparkr-pkg</id>

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
7676

7777
private var timeoutCheckingTask: ScheduledFuture[_] = null
7878

79-
private val timeoutCheckingThread =
80-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")
79+
// "eventLoopThread" is used to run some pretty fast actions. The actions running in it should not
80+
// block the thread for a long time.
81+
private val eventLoopThread =
82+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-receiver-event-loop-thread")
8183

8284
private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
8385

8486
override def onStart(): Unit = {
85-
timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
87+
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
8688
override def run(): Unit = Utils.tryLogNonFatalError {
8789
Option(self).foreach(_.send(ExpireDeadHosts))
8890
}
@@ -99,11 +101,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
99101
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
100102
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
101103
if (scheduler != null) {
102-
val unknownExecutor = !scheduler.executorHeartbeatReceived(
103-
executorId, taskMetrics, blockManagerId)
104-
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
105104
executorLastSeen(executorId) = System.currentTimeMillis()
106-
context.reply(response)
105+
eventLoopThread.submit(new Runnable {
106+
override def run(): Unit = Utils.tryLogNonFatalError {
107+
val unknownExecutor = !scheduler.executorHeartbeatReceived(
108+
executorId, taskMetrics, blockManagerId)
109+
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
110+
context.reply(response)
111+
}
112+
})
107113
} else {
108114
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
109115
// case rarely happens. However, if it really happens, log it and ask the executor to
@@ -125,7 +131,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
125131
if (sc.supportDynamicAllocation) {
126132
// Asynchronously kill the executor to avoid blocking the current thread
127133
killExecutorThread.submit(new Runnable {
128-
override def run(): Unit = sc.killExecutor(executorId)
134+
override def run(): Unit = Utils.tryLogNonFatalError {
135+
sc.killExecutor(executorId)
136+
}
129137
})
130138
}
131139
executorLastSeen.remove(executorId)
@@ -137,7 +145,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
137145
if (timeoutCheckingTask != null) {
138146
timeoutCheckingTask.cancel(true)
139147
}
140-
timeoutCheckingThread.shutdownNow()
148+
eventLoopThread.shutdownNow()
141149
killExecutorThread.shutdownNow()
142150
}
143151
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,74 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
211211
Utils.timeStringAsMs(get(key, defaultValue))
212212
}
213213

214+
/**
215+
* Get a size parameter as bytes; throws a NoSuchElementException if it's not set. If no
216+
* suffix is provided then bytes are assumed.
217+
* @throws NoSuchElementException
218+
*/
219+
def getSizeAsBytes(key: String): Long = {
220+
Utils.byteStringAsBytes(get(key))
221+
}
222+
223+
/**
224+
* Get a size parameter as bytes, falling back to a default if not set. If no
225+
* suffix is provided then bytes are assumed.
226+
*/
227+
def getSizeAsBytes(key: String, defaultValue: String): Long = {
228+
Utils.byteStringAsBytes(get(key, defaultValue))
229+
}
230+
231+
/**
232+
* Get a size parameter as Kibibytes; throws a NoSuchElementException if it's not set. If no
233+
* suffix is provided then Kibibytes are assumed.
234+
* @throws NoSuchElementException
235+
*/
236+
def getSizeAsKb(key: String): Long = {
237+
Utils.byteStringAsKb(get(key))
238+
}
239+
240+
/**
241+
* Get a size parameter as Kibibytes, falling back to a default if not set. If no
242+
* suffix is provided then Kibibytes are assumed.
243+
*/
244+
def getSizeAsKb(key: String, defaultValue: String): Long = {
245+
Utils.byteStringAsKb(get(key, defaultValue))
246+
}
247+
248+
/**
249+
* Get a size parameter as Mebibytes; throws a NoSuchElementException if it's not set. If no
250+
* suffix is provided then Mebibytes are assumed.
251+
* @throws NoSuchElementException
252+
*/
253+
def getSizeAsMb(key: String): Long = {
254+
Utils.byteStringAsMb(get(key))
255+
}
256+
257+
/**
258+
* Get a size parameter as Mebibytes, falling back to a default if not set. If no
259+
* suffix is provided then Mebibytes are assumed.
260+
*/
261+
def getSizeAsMb(key: String, defaultValue: String): Long = {
262+
Utils.byteStringAsMb(get(key, defaultValue))
263+
}
264+
265+
/**
266+
* Get a size parameter as Gibibytes; throws a NoSuchElementException if it's not set. If no
267+
* suffix is provided then Gibibytes are assumed.
268+
* @throws NoSuchElementException
269+
*/
270+
def getSizeAsGb(key: String): Long = {
271+
Utils.byteStringAsGb(get(key))
272+
}
214273

274+
/**
275+
* Get a size parameter as Gibibytes, falling back to a default if not set. If no
276+
* suffix is provided then Gibibytes are assumed.
277+
*/
278+
def getSizeAsGb(key: String, defaultValue: String): Long = {
279+
Utils.byteStringAsGb(get(key, defaultValue))
280+
}
281+
215282
/** Get a parameter as an Option */
216283
def getOption(key: String): Option[String] = {
217284
Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
@@ -407,7 +474,13 @@ private[spark] object SparkConf extends Logging {
407474
"The spark.cache.class property is no longer being used! Specify storage levels using " +
408475
"the RDD.persist() method instead."),
409476
DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
410-
"Please use spark.{driver,executor}.userClassPathFirst instead."))
477+
"Please use spark.{driver,executor}.userClassPathFirst instead."),
478+
DeprecatedConfig("spark.kryoserializer.buffer.mb", "1.4",
479+
"Please use spark.kryoserializer.buffer instead. The default value for " +
480+
"spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
481+
"are no longer accepted. To specify the equivalent now, one may use '64k'.")
482+
)
483+
411484
Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
412485
}
413486

@@ -432,6 +505,21 @@ private[spark] object SparkConf extends Logging {
432505
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
433506
// Translate old value to a duration, with 10s wait time per try.
434507
translation = s => s"${s.toLong * 10}s")),
508+
"spark.reducer.maxSizeInFlight" -> Seq(
509+
AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
510+
"spark.kryoserializer.buffer" ->
511+
Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
512+
translation = s => s"${s.toDouble * 1000}k")),
513+
"spark.kryoserializer.buffer.max" -> Seq(
514+
AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
515+
"spark.shuffle.file.buffer" -> Seq(
516+
AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")),
517+
"spark.executor.logs.rolling.maxSize" -> Seq(
518+
AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")),
519+
"spark.io.compression.snappy.blockSize" -> Seq(
520+
AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),
521+
"spark.io.compression.lz4.blockSize" -> Seq(
522+
AlternateConfig("spark.io.compression.lz4.block.size", "1.4")),
435523
"spark.rpc.numRetries" -> Seq(
436524
AlternateConfig("spark.akka.num.retries", "1.4")),
437525
"spark.rpc.retry.wait" -> Seq(

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1413,6 +1413,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14131413
* Register an RDD to be persisted in memory and/or disk storage
14141414
*/
14151415
private[spark] def persistRDD(rdd: RDD[_]) {
1416+
_executorAllocationManager.foreach { _ =>
1417+
logWarning(
1418+
s"Dynamic allocation currently does not support cached RDDs. Cached data for RDD " +
1419+
s"${rdd.id} will be lost when executors are removed.")
1420+
}
14161421
persistentRdds(rdd.id) = rdd
14171422
}
14181423

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
7474
} else {
7575
None
7676
}
77-
blockSize = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
77+
// Note: use getSizeAsKb (not bytes) to maintain compatiblity if no units are provided
78+
blockSize = conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024
7879
}
7980
setConf(SparkEnv.get.conf)
8081

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,17 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.deploy.worker
18+
package org.apache.spark.deploy
19+
20+
import java.util.concurrent.CountDownLatch
1921

2022
import org.apache.spark.{Logging, SparkConf, SecurityManager}
2123
import org.apache.spark.network.TransportContext
2224
import org.apache.spark.network.netty.SparkTransportConf
2325
import org.apache.spark.network.sasl.SaslRpcHandler
2426
import org.apache.spark.network.server.TransportServer
2527
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
28+
import org.apache.spark.util.Utils
2629

2730
/**
2831
* Provides a server from which Executors can read shuffle files (rather than reading directly from
@@ -31,8 +34,8 @@ import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
3134
*
3235
* Optionally requires SASL authentication in order to read. See [[SecurityManager]].
3336
*/
34-
private[worker]
35-
class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
37+
private[deploy]
38+
class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
3639
extends Logging {
3740

3841
private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
@@ -51,16 +54,58 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
5154
/** Starts the external shuffle service if the user has configured us to. */
5255
def startIfEnabled() {
5356
if (enabled) {
54-
require(server == null, "Shuffle server already started")
55-
logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
56-
server = transportContext.createServer(port)
57+
start()
5758
}
5859
}
5960

61+
/** Start the external shuffle service */
62+
def start() {
63+
require(server == null, "Shuffle server already started")
64+
logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
65+
server = transportContext.createServer(port)
66+
}
67+
6068
def stop() {
61-
if (enabled && server != null) {
69+
if (server != null) {
6270
server.close()
6371
server = null
6472
}
6573
}
6674
}
75+
76+
/**
77+
* A main class for running the external shuffle service.
78+
*/
79+
object ExternalShuffleService extends Logging {
80+
@volatile
81+
private var server: ExternalShuffleService = _
82+
83+
private val barrier = new CountDownLatch(1)
84+
85+
def main(args: Array[String]): Unit = {
86+
val sparkConf = new SparkConf
87+
Utils.loadDefaultSparkProperties(sparkConf)
88+
val securityManager = new SecurityManager(sparkConf)
89+
90+
// we override this value since this service is started from the command line
91+
// and we assume the user really wants it to be running
92+
sparkConf.set("spark.shuffle.service.enabled", "true")
93+
server = new ExternalShuffleService(sparkConf, securityManager)
94+
server.start()
95+
96+
installShutdownHook()
97+
98+
// keep running until the process is terminated
99+
barrier.await()
100+
}
101+
102+
private def installShutdownHook(): Unit = {
103+
Runtime.getRuntime.addShutdownHook(new Thread("External Shuffle Service shutdown thread") {
104+
override def run() {
105+
logInfo("Shutting down shuffle service.")
106+
server.stop()
107+
barrier.countDown()
108+
}
109+
})
110+
}
111+
}

core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.json4s._
3232
import org.json4s.jackson.JsonMethods
3333

3434
import org.apache.spark.{Logging, SparkConf, SparkContext}
35-
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
35+
import org.apache.spark.deploy.master.RecoveryState
3636
import org.apache.spark.util.Utils
3737

3838
/**

0 commit comments

Comments
 (0)