Skip to content

Commit 79f63a3

Browse files
committed
Move sparkProps into javaOpts
1 parent 78752f8 commit 79f63a3

File tree

11 files changed

+32
-26
lines changed

11 files changed

+32
-26
lines changed

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,6 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
4848
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
4949
// truncate filesystem paths similar to what YARN does. For now, we just require
5050
// people call `addJar` assuming the jar is in the same directory.
51-
val env = sys.env
52-
val props = conf.getAll.toMap
53-
5451
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
5552

5653
val classPathConf = "spark.driver.extraClassPath"
@@ -63,10 +60,12 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
6360
cp.split(java.io.File.pathSeparator)
6461
}
6562

66-
val javaOptionsConf = "spark.driver.extraJavaOptions"
67-
val javaOpts = sys.props.get(javaOptionsConf)
63+
val extraJavaOptsConf = "spark.driver.extraJavaOptions"
64+
val extraJavaOpts = sys.props.get(extraJavaOptsConf).toSeq
65+
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
66+
val javaOpts = sparkJavaOpts ++ extraJavaOpts
6867
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
69-
driverArgs.driverOptions, env, props, classPathEntries, libraryPathEntries, javaOpts)
68+
driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts)
7069

7170
val driverDescription = new DriverDescription(
7271
driverArgs.jarUrl,

core/src/main/scala/org/apache/spark/deploy/Command.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ private[spark] case class Command(
2323
mainClass: String,
2424
arguments: Seq[String],
2525
environment: Map[String, String],
26-
sparkProps: Map[String, String],
2726
classPathEntries: Seq[String],
2827
libraryPathEntries: Seq[String],
29-
extraJavaOptions: Option[String] = None) {
28+
javaOpts: Seq[String]) {
3029
}

core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ private[spark] object TestClient {
4646
def main(args: Array[String]) {
4747
val url = args(0)
4848
val conf = new SparkConf
49-
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
49+
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
5050
conf = conf, securityManager = new SecurityManager(conf))
5151
val desc = new ApplicationDescription(
52-
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Map(),
53-
Seq(), Seq()), Some("dummy-spark-home"), "ignored")
52+
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(),
53+
Seq(), Seq(), Seq()), Some("dummy-spark-home"), "ignored")
5454
val listener = new TestListener
5555
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
5656
client.start()

core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ object CommandUtils extends Logging {
4747
*/
4848
def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
4949
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
50-
val extraOpts = command.extraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq())
5150

5251
// Exists for backwards compatibility with older Spark versions
5352
val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString)
@@ -67,9 +66,6 @@ object CommandUtils extends Logging {
6766

6867
val permGenOpt = Seq("-XX:MaxPermSize=128m")
6968

70-
// Convert Spark properties to java system properties
71-
val sparkOpts = command.sparkProps.map { case (k, v) => s"-D$k=$v" }
72-
7369
// Figure out our classpath with the external compute-classpath script
7470
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
7571
val classPath = Utils.executeAndGetOutput(
@@ -78,7 +74,7 @@ object CommandUtils extends Logging {
7874
val userClassPath = command.classPathEntries ++ Seq(classPath)
7975

8076
Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
81-
sparkOpts ++ permGenOpt ++ libraryOpts ++ extraOpts ++ workerLocalOpts ++ memoryOpts
77+
permGenOpt ++ libraryOpts ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts
8278
}
8379

8480
/** Spawn a thread that will redirect a given stream to a file */

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,9 @@ private[spark] class DriverRunner(
8080
driverDesc.command.mainClass,
8181
driverDesc.command.arguments.map(substituteVariables),
8282
driverDesc.command.environment,
83-
driverDesc.command.sparkProps,
8483
classPath,
8584
driverDesc.command.libraryPathEntries,
86-
driverDesc.command.extraJavaOptions)
85+
driverDesc.command.javaOpts)
8786
val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
8887
sparkHome.getAbsolutePath)
8988
launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise)

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,9 @@ private[spark] class ExecutorRunner(
119119
appDesc.command.mainClass,
120120
appDesc.command.arguments.map(substituteVariables) ++ Seq(appId),
121121
appDesc.command.environment,
122-
appDesc.command.sparkProps,
123122
appDesc.command.classPathEntries,
124123
appDesc.command.libraryPathEntries,
125-
appDesc.command.extraJavaOptions)
124+
appDesc.command.javaOpts)
126125
CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath)
127126
}
128127

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ private[spark] class SparkDeploySchedulerBackend(
4545
conf.get("spark.driver.host"), conf.get("spark.driver.port"),
4646
CoarseGrainedSchedulerBackend.ACTOR_NAME)
4747
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
48-
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
48+
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").toSeq
4949
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
5050
cp.split(java.io.File.pathSeparator)
5151
}
@@ -54,8 +54,13 @@ private[spark] class SparkDeploySchedulerBackend(
5454
cp.split(java.io.File.pathSeparator)
5555
}
5656

57+
// Start executors with a few necessary configs for registering with the scheduler
58+
val sparkJavaOpts = Utils.sparkJavaOpts(conf, (key: String) =>
59+
key.startsWith("spark.akka") || key.startsWith("spark.auth")
60+
)
61+
val javaOpts = sparkJavaOpts ++ extraJavaOpts
5762
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
58-
args, sc.executorEnvs, conf.getAll.toMap, classPathEntries, libraryPathEntries, extraJavaOpts)
63+
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
5964
val sparkHome = sc.getSparkHome()
6065
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
6166
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,4 +1306,13 @@ private[spark] object Utils extends Logging {
13061306
s"$className: $desc\n$st"
13071307
}
13081308

1309+
/**
1310+
* Convert all spark properties set in the given SparkConf to a sequence of java options.
1311+
*/
1312+
def sparkJavaOpts(conf: SparkConf, filterKey: (String => Boolean) = _ => true): Seq[String] = {
1313+
conf.getAll
1314+
.filter { case (k, _) => filterKey(k) }
1315+
.map { case (k, v) => "-D" + k + "=\\\"" + v + "\\\"" }
1316+
}
1317+
13091318
}

core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class JsonProtocolSuite extends FunSuite {
8888
}
8989

9090
def createAppDesc(): ApplicationDescription = {
91-
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Map(), Seq(), Seq())
91+
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
9292
new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
9393
}
9494

@@ -101,7 +101,7 @@ class JsonProtocolSuite extends FunSuite {
101101

102102
def createDriverCommand() = new Command(
103103
"org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
104-
Map(("K1", "V1"), ("K2", "V2")), Map(), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Some("-Dfoo")
104+
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
105105
)
106106

107107
def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,

core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.deploy.{Command, DriverDescription}
2929

3030
class DriverRunnerTest extends FunSuite {
3131
private def createDriverRunner() = {
32-
val command = new Command("mainClass", Seq(), Map(), Map(), Seq(), Seq())
32+
val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq())
3333
val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
3434
new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription,
3535
null, "akka://1.2.3.4/worker/")

0 commit comments

Comments
 (0)