Skip to content

Commit 4ce92cc

Browse files
andrewor14pwendell
authored andcommitted
[SPARK-2260] Fix standalone-cluster mode, which was broken
The main thing was that spark configs were not propagated to the driver, and so applications that do not specify `master` or `appName` automatically failed. This PR fixes that and a couple of miscellaneous things that are related. One thing that may or may not be an issue is that the jars must be available on the driver node. In `standalone-cluster` mode, this effectively means these jars must be available on all the worker machines, since the driver is launched on one of them. The semantics here are not the same as `yarn-cluster` mode, where all the relevant jars are uploaded to a distributed cache automatically and shipped to the containers. This is probably not a concern, but still worth a mention. Author: Andrew Or <[email protected]> Closes #1538 from andrewor14/standalone-cluster and squashes the following commits: 8c11a0d [Andrew Or] Clean up imports / comments (minor) 2678d13 [Andrew Or] Handle extraJavaOpts properly 7660547 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-cluster 6f64a9b [Andrew Or] Revert changes in YARN 2f2908b [Andrew Or] Fix tests ed01491 [Andrew Or] Don't go overboard with escaping 8e105e1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-cluster b890949 [Andrew Or] Abstract usages of converting spark opts to java opts 79f63a3 [Andrew Or] Move sparkProps into javaOpts 78752f8 [Andrew Or] Fix tests 5a9c6c7 [Andrew Or] Fix line too long c141a00 [Andrew Or] Don't display "unknown app" on driver log pages d7e2728 [Andrew Or] Avoid deprecation warning in standalone Client 6ceb14f [Andrew Or] Allow relevant configs to propagate to standalone Driver 7f854bc [Andrew Or] Fix test 855256e [Andrew Or] Fix standalone-cluster mode fd9da51 [Andrew Or] Formatting changes (minor)
1 parent 077f633 commit 4ce92cc

File tree

16 files changed

+93
-51
lines changed

16 files changed

+93
-51
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ import scala.collection.mutable.HashMap
4040
*/
4141
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
4242

43+
import SparkConf._
44+
4345
/** Create a SparkConf that loads defaults from system properties and the classpath */
4446
def this() = this(true)
4547

@@ -198,7 +200,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
198200
*
199201
* E.g. spark.akka.option.x.y.x = "value"
200202
*/
201-
getAll.filter {case (k, v) => k.startsWith("akka.")}
203+
getAll.filter { case (k, _) => isAkkaConf(k) }
202204

203205
/** Does the configuration contain a given parameter? */
204206
def contains(key: String): Boolean = settings.contains(key)
@@ -292,3 +294,21 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
292294
settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
293295
}
294296
}
297+
298+
private[spark] object SparkConf {
299+
/**
300+
* Return whether the given config is an akka config (e.g. akka.actor.provider).
301+
* Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
302+
*/
303+
def isAkkaConf(name: String): Boolean = name.startsWith("akka.")
304+
305+
/**
306+
* Return whether the given config should be passed to an executor on start-up.
307+
*
308+
* Certain akka and authentication configs are required of the executor when it connects to
309+
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
310+
*/
311+
def isExecutorStartupConf(name: String): Boolean = {
312+
isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth")
313+
}
314+
}

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import scala.collection.JavaConversions._
21-
import scala.collection.mutable.Map
2220
import scala.concurrent._
2321

2422
import akka.actor._
@@ -50,9 +48,6 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
5048
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
5149
// truncate filesystem paths similar to what YARN does. For now, we just require
5250
// people call `addJar` assuming the jar is in the same directory.
53-
val env = Map[String, String]()
54-
System.getenv().foreach{case (k, v) => env(k) = v}
55-
5651
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
5752

5853
val classPathConf = "spark.driver.extraClassPath"
@@ -65,10 +60,13 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
6560
cp.split(java.io.File.pathSeparator)
6661
}
6762

68-
val javaOptionsConf = "spark.driver.extraJavaOptions"
69-
val javaOpts = sys.props.get(javaOptionsConf)
63+
val extraJavaOptsConf = "spark.driver.extraJavaOptions"
64+
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
65+
.map(Utils.splitCommandString).getOrElse(Seq.empty)
66+
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
67+
val javaOpts = sparkJavaOpts ++ extraJavaOpts
7068
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
71-
driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)
69+
driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts)
7270

7371
val driverDescription = new DriverDescription(
7472
driverArgs.jarUrl,
@@ -109,6 +107,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
109107
// Exception, if present
110108
statusResponse.exception.map { e =>
111109
println(s"Exception from cluster was: $e")
110+
e.printStackTrace()
112111
System.exit(-1)
113112
}
114113
System.exit(0)
@@ -141,8 +140,10 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
141140
*/
142141
object Client {
143142
def main(args: Array[String]) {
144-
println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
145-
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
143+
if (!sys.props.contains("SPARK_SUBMIT")) {
144+
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
145+
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
146+
}
146147

147148
val conf = new SparkConf()
148149
val driverArgs = new ClientArguments(args)

core/src/main/scala/org/apache/spark/deploy/Command.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ private[spark] case class Command(
2525
environment: Map[String, String],
2626
classPathEntries: Seq[String],
2727
libraryPathEntries: Seq[String],
28-
extraJavaOptions: Option[String] = None) {
28+
javaOpts: Seq[String]) {
2929
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,6 @@ object SparkSubmit {
136136
(clusterManager, deployMode) match {
137137
case (MESOS, CLUSTER) =>
138138
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
139-
case (STANDALONE, CLUSTER) =>
140-
printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.")
141139
case (_, CLUSTER) if args.isPython =>
142140
printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
143141
case (_, CLUSTER) if isShell(args.primaryResource) =>
@@ -170,9 +168,9 @@ object SparkSubmit {
170168
val options = List[OptionAssigner](
171169

172170
// All cluster managers
173-
OptionAssigner(args.master, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.master"),
174-
OptionAssigner(args.name, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.app.name"),
175-
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
171+
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
172+
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
173+
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
176174

177175
// Standalone cluster only
178176
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
@@ -203,9 +201,9 @@ object SparkSubmit {
203201
sysProp = "spark.driver.extraJavaOptions"),
204202
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
205203
sysProp = "spark.driver.extraLibraryPath"),
206-
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, CLIENT,
204+
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
207205
sysProp = "spark.executor.memory"),
208-
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, CLIENT,
206+
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
209207
sysProp = "spark.cores.max"),
210208
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
211209
sysProp = "spark.files")

core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ private[spark] object TestClient {
4646
def main(args: Array[String]) {
4747
val url = args(0)
4848
val conf = new SparkConf
49-
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
49+
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
5050
conf = conf, securityManager = new SecurityManager(conf))
5151
val desc = new ApplicationDescription(
52-
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(),
53-
Seq()), Some("dummy-spark-home"), "ignored")
52+
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(),
53+
Seq(), Seq(), Seq()), Some("dummy-spark-home"), "ignored")
5454
val listener = new TestListener
5555
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
5656
client.start()

core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ object CommandUtils extends Logging {
4747
*/
4848
def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
4949
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
50-
val extraOpts = command.extraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq())
5150

5251
// Exists for backwards compatibility with older Spark versions
5352
val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString)
@@ -62,7 +61,7 @@ object CommandUtils extends Logging {
6261
val joined = command.libraryPathEntries.mkString(File.pathSeparator)
6362
Seq(s"-Djava.library.path=$joined")
6463
} else {
65-
Seq()
64+
Seq()
6665
}
6766

6867
val permGenOpt = Seq("-XX:MaxPermSize=128m")
@@ -71,11 +70,11 @@ object CommandUtils extends Logging {
7170
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
7271
val classPath = Utils.executeAndGetOutput(
7372
Seq(sparkHome + "/bin/compute-classpath" + ext),
74-
extraEnvironment=command.environment)
73+
extraEnvironment = command.environment)
7574
val userClassPath = command.classPathEntries ++ Seq(classPath)
7675

7776
Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
78-
permGenOpt ++ libraryOpts ++ extraOpts ++ workerLocalOpts ++ memoryOpts
77+
permGenOpt ++ libraryOpts ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts
7978
}
8079

8180
/** Spawn a thread that will redirect a given stream to a file */

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState
3636

3737
/**
3838
* Manages the execution of one driver, including automatically restarting the driver on failure.
39+
* This is currently only used in standalone cluster deploy mode.
3940
*/
4041
private[spark] class DriverRunner(
4142
val driverId: String,
@@ -81,7 +82,7 @@ private[spark] class DriverRunner(
8182
driverDesc.command.environment,
8283
classPath,
8384
driverDesc.command.libraryPathEntries,
84-
driverDesc.command.extraJavaOptions)
85+
driverDesc.command.javaOpts)
8586
val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
8687
sparkHome.getAbsolutePath)
8788
launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise)

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.util.logging.FileAppender
3030

3131
/**
3232
* Manages the execution of one executor process.
33+
* This is currently only used in standalone mode.
3334
*/
3435
private[spark] class ExecutorRunner(
3536
val appId: String,
@@ -72,7 +73,7 @@ private[spark] class ExecutorRunner(
7273
}
7374

7475
/**
75-
* kill executor process, wait for exit and notify worker to update resource status
76+
* Kill executor process, wait for exit and notify worker to update resource status.
7677
*
7778
* @param message the exception message which caused the executor's death
7879
*/
@@ -114,10 +115,13 @@ private[spark] class ExecutorRunner(
114115
}
115116

116117
def getCommandSeq = {
117-
val command = Command(appDesc.command.mainClass,
118-
appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment,
119-
appDesc.command.classPathEntries, appDesc.command.libraryPathEntries,
120-
appDesc.command.extraJavaOptions)
118+
val command = Command(
119+
appDesc.command.mainClass,
120+
appDesc.command.arguments.map(substituteVariables) ++ Seq(appId),
121+
appDesc.command.environment,
122+
appDesc.command.classPathEntries,
123+
appDesc.command.libraryPathEntries,
124+
appDesc.command.javaOpts)
121125
CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath)
122126
}
123127

core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@
1717

1818
package org.apache.spark.deploy.worker.ui
1919

20-
import java.io.File
2120
import javax.servlet.http.HttpServletRequest
2221

2322
import scala.xml.Node
2423

2524
import org.apache.spark.ui.{WebUIPage, UIUtils}
2625
import org.apache.spark.util.Utils
2726
import org.apache.spark.Logging
28-
import org.apache.spark.util.logging.{FileAppender, RollingFileAppender}
27+
import org.apache.spark.util.logging.RollingFileAppender
2928

3029
private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging {
3130
private val worker = parent.worker
@@ -64,11 +63,11 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w
6463
val offset = Option(request.getParameter("offset")).map(_.toLong)
6564
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
6665

67-
val (logDir, params) = (appId, executorId, driverId) match {
66+
val (logDir, params, pageName) = (appId, executorId, driverId) match {
6867
case (Some(a), Some(e), None) =>
69-
(s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e")
68+
(s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e", s"$a/$e")
7069
case (None, None, Some(d)) =>
71-
(s"${workDir.getPath}/$d/", s"driverId=$d")
70+
(s"${workDir.getPath}/$d/", s"driverId=$d", d)
7271
case _ =>
7372
throw new Exception("Request must specify either application or driver identifiers")
7473
}
@@ -120,7 +119,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w
120119
</div>
121120
</body>
122121
</html>
123-
UIUtils.basicSparkPage(content, logType + " log page for " + appId.getOrElse("unknown app"))
122+
UIUtils.basicSparkPage(content, logType + " log page for " + pageName)
124123
}
125124

126125
/** Get the part of the log files given the offset and desired length of bytes */

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,13 @@ private[spark] class CoarseGrainedExecutorBackend(
9898
}
9999

100100
private[spark] object CoarseGrainedExecutorBackend extends Logging {
101-
def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
102-
workerUrl: Option[String]) {
101+
102+
private def run(
103+
driverUrl: String,
104+
executorId: String,
105+
hostname: String,
106+
cores: Int,
107+
workerUrl: Option[String]) {
103108

104109
SignalLogger.register(log)
105110

0 commit comments

Comments
 (0)