Skip to content

Commit c6c6b73

Browse files
committed
Pass spark properties to mesos cluster tasks.
1 parent f7d8046 commit c6c6b73

File tree

4 files changed

+14
-29
lines changed

4 files changed

+14
-29
lines changed

core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.deploy.Command
3333
* @param schedulerProperties Extra properties to pass the Mesos scheduler
3434
*/
3535
private[spark] case class MesosDriverDescription(
36+
val name: String,
3637
val jarUrl: String,
3738
val mem: Int,
3839
val cores: Double,

core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
2727
import org.apache.spark.util.Utils
2828
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
2929

30-
import scala.collection.mutable
31-
3230

3331
/**
3432
* A server that responds to requests submitted by the [[RestClient]].
@@ -87,22 +85,6 @@ private[mesos] class MesosSubmitRequestServlet(
8785
val driverCores = sparkProperties.get("spark.driver.cores")
8886
val appArgs = request.appArgs
8987
val environmentVariables = request.environmentVariables
90-
val schedulerProperties = new mutable.HashMap[String, String]
91-
// Store Spark submit specific arguments here to pass to the scheduler.
92-
schedulerProperties("spark.app.name") = sparkProperties.getOrElse("spark.app.name", mainClass)
93-
94-
sparkProperties.get("spark.executor.memory").foreach { v =>
95-
schedulerProperties("spark.executor.memory") = v
96-
}
97-
sparkProperties.get("spark.cores.max").foreach { v =>
98-
schedulerProperties("spark.cores.max") = v
99-
}
100-
sparkProperties.get("spark.executor.uri").foreach { v =>
101-
schedulerProperties("spark.executor.uri") = v
102-
}
103-
sparkProperties.get("spark.mesos.executor.home").foreach { v =>
104-
schedulerProperties("spark.mesos.executor.home") = v
105-
}
10688

10789
// Construct driver description
10890
val conf = new SparkConf(false)
@@ -119,8 +101,9 @@ private[mesos] class MesosSubmitRequestServlet(
119101
val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES)
120102

121103
new MesosDriverDescription(
104+
request.sparkProperties.get("spark.app.name").getOrElse(mainClass),
122105
appResource, actualDriverMemory, actualDriverCores,
123-
actualSuperviseDriver, command, schedulerProperties.toMap)
106+
actualSuperviseDriver, command, request.sparkProperties)
124107
}
125108

126109
protected override def handleSubmit(

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -287,16 +287,19 @@ private[spark] class MesosClusterSchedulerDriver(
287287
desc.command.environment.foreach { case (k, v) =>
288288
envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v).build())
289289
}
290-
builder.setEnvironment(envBuilder.build())
290+
val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v"}.mkString(" ")
291+
// Pass all spark properties to executor.
292+
envBuilder.addVariables(
293+
Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts))
291294
val cmdOptions = generateCmdOption(desc)
292295
val executorUri = desc.schedulerProperties.get("spark.executor.uri")
293296
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
297+
val appArguments = desc.command.arguments.mkString(" ")
294298
val cmd = if (executorUri.isDefined) {
295299
builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build())
296300
val folderBasename = executorUri.get.split('/').last.split('.').head
297301
val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
298302
val cmdJar = s"../${desc.jarUrl.split("/").last}"
299-
val appArguments = desc.command.arguments.mkString(" ")
300303
s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments"
301304
} else {
302305
val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home")
@@ -307,9 +310,10 @@ private[spark] class MesosClusterSchedulerDriver(
307310
}
308311
val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getCanonicalPath
309312
val cmdJar = desc.jarUrl.split("/").last
310-
s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar"
313+
s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments"
311314
}
312315
builder.setValue(cmd)
316+
builder.setEnvironment(envBuilder.build())
313317
builder.build
314318
}
315319

core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,13 @@ class MesosClusterSchedulerSuite extends FunSuite with LocalSparkContext with Mo
3939
scheduler.recoverState
4040
val response =
4141
scheduler.submitDriver(
42-
new MesosDriverDescription("jar", 1000, 1, true,
42+
new MesosDriverDescription("d1", "jar", 1000, 1, true,
4343
createCommand, Map[String, String]()))
4444
assert(response.success)
45-
4645
val response2 =
4746
scheduler.submitDriver(new MesosDriverDescription(
48-
"jar", 1000, 1, true, createCommand, Map[String, String]()))
47+
"d1", "jar", 1000, 1, true, createCommand, Map[String, String]()))
4948
assert(response2.success)
50-
5149
val state = scheduler.getState()
5250
assert(state.queuedDrivers.exists(d => d.submissionId.get == response.submissionId))
5351
assert(state.queuedDrivers.exists(d => d.submissionId.get == response2.submissionId))
@@ -62,12 +60,11 @@ class MesosClusterSchedulerSuite extends FunSuite with LocalSparkContext with Mo
6260
scheduler.recoverState
6361
val response =
6462
scheduler.submitDriver(
65-
new MesosDriverDescription("jar", 1000, 1, true, createCommand, Map[String, String]()))
63+
new MesosDriverDescription("d1", "jar", 1000, 1, true,
64+
createCommand, Map[String, String]()))
6665
assert(response.success)
67-
6866
val killResponse = scheduler.killDriver(response.submissionId)
6967
assert(killResponse.success)
70-
7168
val state = scheduler.getState()
7269
assert(state.queuedDrivers.isEmpty)
7370
}

0 commit comments

Comments
 (0)