Skip to content

Commit 53befac

Browse files
tnachenAndrew Or
authored andcommitted
[SPARK-5338] [MESOS] Add cluster mode support for Mesos
This patch adds the support for cluster mode to run on Mesos. It introduces a new Mesos framework dedicated to launch new apps/drivers, and can be called with the spark-submit script and specifying --master flag to the cluster mode REST interface instead of Mesos master. Example: ./bin/spark-submit --deploy-mode cluster --class org.apache.spark.examples.SparkPi --master mesos://10.0.0.206:8077 --executor-memory 1G --total-executor-cores 100 examples/target/spark-examples_2.10-1.3.0-SNAPSHOT.jar 30 Part of this patch is also to abstract the StandaloneRestServer so it can have different implementations of the REST endpoints. Features of the cluster mode in this PR: - Supports supervise mode where scheduler will keep trying to reschedule exited job. - Adds a new UI for the cluster mode scheduler to see all the running jobs, finished jobs, and supervise jobs waiting to be retried - Supports state persistence to ZK, so when the cluster scheduler fails over it can pick up all the queued and running jobs Author: Timothy Chen <[email protected]> Author: Luc Bourlier <[email protected]> Closes apache#5144 from tnachen/mesos_cluster_mode and squashes the following commits: 069e946 [Timothy Chen] Fix rebase. e24b512 [Timothy Chen] Persist submitted driver. 390c491 [Timothy Chen] Fix zk conf key for mesos zk engine. e324ac1 [Timothy Chen] Fix merge. fd5259d [Timothy Chen] Address review comments. 1553230 [Timothy Chen] Address review comments. c6c6b73 [Timothy Chen] Pass spark properties to mesos cluster tasks. f7d8046 [Timothy Chen] Change app name to spark cluster. 17f93a2 [Timothy Chen] Fix head of line blocking in scheduling drivers. 6ff8e5c [Timothy Chen] Address comments and add logging. df355cd [Timothy Chen] Add metrics to mesos cluster scheduler. 20f7284 [Timothy Chen] Address review comments 7252612 [Timothy Chen] Fix tests. a46ad66 [Timothy Chen] Allow zk cli param override. 920fc4b [Timothy Chen] Fix scala style issues. 862b5b5 [Timothy Chen] Support asking driver status when it's retrying. 7f214c2 [Timothy Chen] Fix RetryState visibility e0f33f7 [Timothy Chen] Add supervise support and persist retries. 371ce65 [Timothy Chen] Handle cluster mode recovery and state persistence. 3d4dfa1 [Luc Bourlier] Adds support to kill submissions febfaba [Timothy Chen] Bound the finished drivers in memory 543a98d [Timothy Chen] Schedule multiple jobs 6887e5e [Timothy Chen] Support looking at SPARK_EXECUTOR_URI env variable in schedulers 8ec76bc [Timothy Chen] Fix Mesos dispatcher UI. d57d77d [Timothy Chen] Add documentation 825afa0 [Luc Bourlier] Supports more spark-submit parameters b8e7181 [Luc Bourlier] Adds a shutdown latch to keep the deamon running 0fa7780 [Luc Bourlier] Launch task through the mesos scheduler 5b7a12b [Timothy Chen] WIP: Making a cluster mode a mesos framework. 4b2f5ef [Timothy Chen] Specify user jar in command to be replaced with local. e775001 [Timothy Chen] Support fetching remote uris in driver runner. 7179495 [Timothy Chen] Change Driver page output and add logging 880bc27 [Timothy Chen] Add Mesos Cluster UI to display driver results 9986731 [Timothy Chen] Kill drivers when shutdown 67cbc18 [Timothy Chen] Rename StandaloneRestClient to RestClient and add sbin scripts e3facdd [Timothy Chen] Add Mesos Cluster dispatcher
1 parent 8009810 commit 53befac

30 files changed

+2147
-493
lines changed

core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.json4s._
3232
import org.json4s.jackson.JsonMethods
3333

3434
import org.apache.spark.{Logging, SparkConf, SparkContext}
35-
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
35+
import org.apache.spark.deploy.master.RecoveryState
3636
import org.apache.spark.util.Utils
3737

3838
/**

core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala renamed to core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.deploy.master
18+
package org.apache.spark.deploy
1919

2020
import scala.collection.JavaConversions._
2121

@@ -25,15 +25,17 @@ import org.apache.zookeeper.KeeperException
2525

2626
import org.apache.spark.{Logging, SparkConf}
2727

28-
private[deploy] object SparkCuratorUtil extends Logging {
28+
private[spark] object SparkCuratorUtil extends Logging {
2929

3030
private val ZK_CONNECTION_TIMEOUT_MILLIS = 15000
3131
private val ZK_SESSION_TIMEOUT_MILLIS = 60000
3232
private val RETRY_WAIT_MILLIS = 5000
3333
private val MAX_RECONNECT_ATTEMPTS = 3
3434

35-
def newClient(conf: SparkConf): CuratorFramework = {
36-
val ZK_URL = conf.get("spark.deploy.zookeeper.url")
35+
def newClient(
36+
conf: SparkConf,
37+
zkUrlConf: String = "spark.deploy.zookeeper.url"): CuratorFramework = {
38+
val ZK_URL = conf.get(zkUrlConf)
3739
val zk = CuratorFrameworkFactory.newClient(ZK_URL,
3840
ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS,
3941
new ExponentialBackoffRetry(RETRY_WAIT_MILLIS, MAX_RECONNECT_ATTEMPTS))

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ import org.apache.ivy.core.retrieve.RetrieveOptions
3636
import org.apache.ivy.core.settings.IvySettings
3737
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
3838
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
39-
4039
import org.apache.spark.SPARK_VERSION
4140
import org.apache.spark.deploy.rest._
4241
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
4342

43+
4444
/**
4545
* Whether to submit, kill, or request the status of an application.
4646
* The latter two operations are currently supported only for standalone cluster mode.
@@ -114,18 +114,20 @@ object SparkSubmit {
114114
}
115115
}
116116

117-
/** Kill an existing submission using the REST protocol. Standalone cluster mode only. */
117+
/**
118+
* Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
119+
*/
118120
private def kill(args: SparkSubmitArguments): Unit = {
119-
new StandaloneRestClient()
121+
new RestSubmissionClient()
120122
.killSubmission(args.master, args.submissionToKill)
121123
}
122124

123125
/**
124126
* Request the status of an existing submission using the REST protocol.
125-
* Standalone cluster mode only.
127+
* Standalone and Mesos cluster mode only.
126128
*/
127129
private def requestStatus(args: SparkSubmitArguments): Unit = {
128-
new StandaloneRestClient()
130+
new RestSubmissionClient()
129131
.requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
130132
}
131133

@@ -252,6 +254,7 @@ object SparkSubmit {
252254
}
253255

254256
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
257+
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
255258

256259
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
257260
// too for packages that include Python code
@@ -294,8 +297,9 @@ object SparkSubmit {
294297

295298
// The following modes are not supported or applicable
296299
(clusterManager, deployMode) match {
297-
case (MESOS, CLUSTER) =>
298-
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
300+
case (MESOS, CLUSTER) if args.isPython =>
301+
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
302+
"applications on Mesos clusters.")
299303
case (STANDALONE, CLUSTER) if args.isPython =>
300304
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
301305
"applications on standalone clusters.")
@@ -377,15 +381,6 @@ object SparkSubmit {
377381
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
378382
sysProp = "spark.driver.extraLibraryPath"),
379383

380-
// Standalone cluster only
381-
// Do not set CL arguments here because there are multiple possibilities for the main class
382-
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
383-
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
384-
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"),
385-
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, sysProp = "spark.driver.cores"),
386-
OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER,
387-
sysProp = "spark.driver.supervise"),
388-
389384
// Yarn client only
390385
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
391386
OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"),
@@ -413,7 +408,15 @@ object SparkSubmit {
413408
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
414409
sysProp = "spark.cores.max"),
415410
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
416-
sysProp = "spark.files")
411+
sysProp = "spark.files"),
412+
OptionAssigner(args.jars, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars"),
413+
OptionAssigner(args.driverMemory, STANDALONE | MESOS, CLUSTER,
414+
sysProp = "spark.driver.memory"),
415+
OptionAssigner(args.driverCores, STANDALONE | MESOS, CLUSTER,
416+
sysProp = "spark.driver.cores"),
417+
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
418+
sysProp = "spark.driver.supervise"),
419+
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
417420
)
418421

419422
// In client mode, launch the application main class directly
@@ -452,7 +455,7 @@ object SparkSubmit {
452455
// All Spark parameters are expected to be passed to the client through system properties.
453456
if (args.isStandaloneCluster) {
454457
if (args.useRest) {
455-
childMainClass = "org.apache.spark.deploy.rest.StandaloneRestClient"
458+
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
456459
childArgs += (args.primaryResource, args.mainClass)
457460
} else {
458461
// In legacy standalone cluster mode, use Client as a wrapper around the user class
@@ -496,6 +499,15 @@ object SparkSubmit {
496499
}
497500
}
498501

502+
if (isMesosCluster) {
503+
assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
504+
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
505+
childArgs += (args.primaryResource, args.mainClass)
506+
if (args.childArgs != null) {
507+
childArgs ++= args.childArgs
508+
}
509+
}
510+
499511
// Load any properties specified through --conf and the default properties file
500512
for ((k, v) <- args.sparkProperties) {
501513
sysProps.getOrElseUpdate(k, v)

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,18 +241,19 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
241241
}
242242

243243
private def validateKillArguments(): Unit = {
244-
if (!master.startsWith("spark://")) {
245-
SparkSubmit.printErrorAndExit("Killing submissions is only supported in standalone mode!")
244+
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
245+
SparkSubmit.printErrorAndExit(
246+
"Killing submissions is only supported in standalone or Mesos mode!")
246247
}
247248
if (submissionToKill == null) {
248249
SparkSubmit.printErrorAndExit("Please specify a submission to kill.")
249250
}
250251
}
251252

252253
private def validateStatusRequestArguments(): Unit = {
253-
if (!master.startsWith("spark://")) {
254+
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
254255
SparkSubmit.printErrorAndExit(
255-
"Requesting submission statuses is only supported in standalone mode!")
256+
"Requesting submission statuses is only supported in standalone or Mesos mode!")
256257
}
257258
if (submissionToRequestStatusFor == null) {
258259
SparkSubmit.printErrorAndExit("Please specify a submission to request status for.")
@@ -485,6 +486,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
485486
|
486487
| Spark standalone with cluster deploy mode only:
487488
| --driver-cores NUM Cores for driver (Default: 1).
489+
|
490+
| Spark standalone or Mesos with cluster deploy mode only:
488491
| --supervise If given, restarts the driver on failure.
489492
| --kill SUBMISSION_ID If given, kills the driver specified.
490493
| --status SUBMISSION_ID If given, requests the status of the driver specified.

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private[master] class Master(
130130
private val restServer =
131131
if (restServerEnabled) {
132132
val port = conf.getInt("spark.master.rest.port", 6066)
133-
Some(new StandaloneRestServer(host, port, self, masterUrl, conf))
133+
Some(new StandaloneRestServer(host, port, conf, self, masterUrl))
134134
} else {
135135
None
136136
}

core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.{Logging, SparkConf}
2323
import org.apache.spark.deploy.master.MasterMessages._
2424
import org.apache.curator.framework.CuratorFramework
2525
import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}
26+
import org.apache.spark.deploy.SparkCuratorUtil
2627

2728
private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
2829
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {

core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFramework
2626
import org.apache.zookeeper.CreateMode
2727

2828
import org.apache.spark.{Logging, SparkConf}
29+
import org.apache.spark.deploy.SparkCuratorUtil
2930

3031

3132
private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.mesos
19+
20+
import java.util.concurrent.CountDownLatch
21+
22+
import org.apache.spark.deploy.mesos.ui.MesosClusterUI
23+
import org.apache.spark.deploy.rest.mesos.MesosRestServer
24+
import org.apache.spark.scheduler.cluster.mesos._
25+
import org.apache.spark.util.SignalLogger
26+
import org.apache.spark.{Logging, SecurityManager, SparkConf}
27+
28+
/*
29+
* A dispatcher that is responsible for managing and launching drivers, and is intended to be
30+
* used for Mesos cluster mode. The dispatcher is a long-running process started by the user in
31+
* the cluster independently of Spark applications.
32+
* It contains a [[MesosRestServer]] that listens for requests to submit drivers and a
33+
* [[MesosClusterScheduler]] that processes these requests by negotiating with the Mesos master
34+
* for resources.
35+
*
36+
* A typical new driver lifecycle is the following:
37+
* - Driver submitted via spark-submit talking to the [[MesosRestServer]]
38+
* - [[MesosRestServer]] queues the driver request to [[MesosClusterScheduler]]
39+
* - [[MesosClusterScheduler]] gets resource offers and launches the drivers that are in queue
40+
*
41+
* This dispatcher supports both Mesos fine-grain or coarse-grain mode as the mode is configurable
42+
* per driver launched.
43+
* This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as
44+
* a daemon to launch drivers as Mesos frameworks upon request. The dispatcher is also started and
45+
* stopped by sbin/start-mesos-dispatcher and sbin/stop-mesos-dispatcher respectively.
46+
*/
47+
private[mesos] class MesosClusterDispatcher(
48+
args: MesosClusterDispatcherArguments,
49+
conf: SparkConf)
50+
extends Logging {
51+
52+
private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host)
53+
private val recoveryMode = conf.get("spark.mesos.deploy.recoveryMode", "NONE").toUpperCase()
54+
logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode)
55+
56+
private val engineFactory = recoveryMode match {
57+
case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory
58+
case "ZOOKEEPER" => new ZookeeperMesosClusterPersistenceEngineFactory(conf)
59+
case _ => throw new IllegalArgumentException("Unsupported recovery mode: " + recoveryMode)
60+
}
61+
62+
private val scheduler = new MesosClusterScheduler(engineFactory, conf)
63+
64+
private val server = new MesosRestServer(args.host, args.port, conf, scheduler)
65+
private val webUi = new MesosClusterUI(
66+
new SecurityManager(conf),
67+
args.webUiPort,
68+
conf,
69+
publicAddress,
70+
scheduler)
71+
72+
private val shutdownLatch = new CountDownLatch(1)
73+
74+
def start(): Unit = {
75+
webUi.bind()
76+
scheduler.frameworkUrl = webUi.activeWebUiUrl
77+
scheduler.start()
78+
server.start()
79+
}
80+
81+
def awaitShutdown(): Unit = {
82+
shutdownLatch.await()
83+
}
84+
85+
def stop(): Unit = {
86+
webUi.stop()
87+
server.stop()
88+
scheduler.stop()
89+
shutdownLatch.countDown()
90+
}
91+
}
92+
93+
private[mesos] object MesosClusterDispatcher extends Logging {
94+
def main(args: Array[String]) {
95+
SignalLogger.register(log)
96+
val conf = new SparkConf
97+
val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
98+
conf.setMaster(dispatcherArgs.masterUrl)
99+
conf.setAppName(dispatcherArgs.name)
100+
dispatcherArgs.zookeeperUrl.foreach { z =>
101+
conf.set("spark.mesos.deploy.recoveryMode", "ZOOKEEPER")
102+
conf.set("spark.mesos.deploy.zookeeper.url", z)
103+
}
104+
val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
105+
dispatcher.start()
106+
val shutdownHook = new Thread() {
107+
override def run() {
108+
logInfo("Shutdown hook is shutting down dispatcher")
109+
dispatcher.stop()
110+
dispatcher.awaitShutdown()
111+
}
112+
}
113+
Runtime.getRuntime.addShutdownHook(shutdownHook)
114+
dispatcher.awaitShutdown()
115+
}
116+
}

0 commit comments

Comments
 (0)