Skip to content

[SPARK-5338][MESOS] Add cluster mode support for Mesos #5144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e3facdd
Add Mesos Cluster dispatcher
tnachen Feb 20, 2015
67cbc18
Rename StandaloneRestClient to RestClient and add sbin scripts
tnachen Feb 24, 2015
9986731
Kill drivers when shutdown
tnachen Feb 24, 2015
880bc27
Add Mesos Cluster UI to display driver results
tnachen Feb 26, 2015
7179495
Change Driver page output and add logging
tnachen Feb 27, 2015
e775001
Support fetching remote uris in driver runner.
tnachen Feb 27, 2015
4b2f5ef
Specify user jar in command to be replaced with local.
tnachen Feb 28, 2015
5b7a12b
WIP: Making a cluster mode a mesos framework.
tnachen Mar 10, 2015
0fa7780
Launch task through the mesos scheduler
Mar 10, 2015
b8e7181
Adds a shutdown latch to keep the deamon running
Mar 11, 2015
825afa0
Supports more spark-submit parameters
Mar 11, 2015
d57d77d
Add documentation
tnachen Mar 11, 2015
8ec76bc
Fix Mesos dispatcher UI.
tnachen Mar 11, 2015
6887e5e
Support looking at SPARK_EXECUTOR_URI env variable in schedulers
tnachen Mar 12, 2015
543a98d
Schedule multiple jobs
tnachen Mar 12, 2015
febfaba
Bound the finished drivers in memory
tnachen Mar 12, 2015
3d4dfa1
Adds support to kill submissions
Mar 11, 2015
371ce65
Handle cluster mode recovery and state persistence.
tnachen Mar 18, 2015
e0f33f7
Add supervise support and persist retries.
tnachen Mar 23, 2015
7f214c2
Fix RetryState visibility
tnachen Mar 23, 2015
862b5b5
Support asking driver status when it's retrying.
tnachen Mar 24, 2015
920fc4b
Fix scala style issues.
tnachen Mar 24, 2015
a46ad66
Allow zk cli param override.
tnachen Mar 24, 2015
7252612
Fix tests.
tnachen Mar 25, 2015
20f7284
Address review comments
tnachen Mar 28, 2015
df355cd
Add metrics to mesos cluster scheduler.
tnachen Apr 1, 2015
6ff8e5c
Address comments and add logging.
tnachen Apr 3, 2015
17f93a2
Fix head of line blocking in scheduling drivers.
tnachen Apr 5, 2015
f7d8046
Change app name to spark cluster.
tnachen Apr 6, 2015
c6c6b73
Pass spark properties to mesos cluster tasks.
tnachen Apr 6, 2015
1553230
Address review comments.
tnachen Apr 11, 2015
fd5259d
Address review comments.
tnachen Apr 17, 2015
e324ac1
Fix merge.
tnachen Apr 17, 2015
390c491
Fix zk conf key for mesos zk engine.
tnachen Apr 19, 2015
e24b512
Persist submitted driver.
tnachen Apr 19, 2015
069e946
Fix rebase.
tnachen Apr 22, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.json4s._
import org.json4s.jackson.JsonMethods

import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
import org.apache.spark.deploy.master.RecoveryState
import org.apache.spark.util.Utils

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.deploy.master
package org.apache.spark.deploy

import scala.collection.JavaConversions._

Expand All @@ -25,15 +25,17 @@ import org.apache.zookeeper.KeeperException

import org.apache.spark.{Logging, SparkConf}

private[deploy] object SparkCuratorUtil extends Logging {
private[spark] object SparkCuratorUtil extends Logging {

private val ZK_CONNECTION_TIMEOUT_MILLIS = 15000
private val ZK_SESSION_TIMEOUT_MILLIS = 60000
private val RETRY_WAIT_MILLIS = 5000
private val MAX_RECONNECT_ATTEMPTS = 3

def newClient(conf: SparkConf): CuratorFramework = {
val ZK_URL = conf.get("spark.deploy.zookeeper.url")
def newClient(
conf: SparkConf,
zkUrlConf: String = "spark.deploy.zookeeper.url"): CuratorFramework = {
val ZK_URL = conf.get(zkUrlConf)
val zk = CuratorFrameworkFactory.newClient(ZK_URL,
ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS,
new ExponentialBackoffRetry(RETRY_WAIT_MILLIS, MAX_RECONNECT_ATTEMPTS))
Expand Down
48 changes: 30 additions & 18 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}

import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}


/**
* Whether to submit, kill, or request the status of an application.
* The latter two operations are currently supported only for standalone cluster mode.
Expand Down Expand Up @@ -114,18 +114,20 @@ object SparkSubmit {
}
}

/** Kill an existing submission using the REST protocol. Standalone cluster mode only. */
/**
* Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
*/
private def kill(args: SparkSubmitArguments): Unit = {
new StandaloneRestClient()
new RestSubmissionClient()
.killSubmission(args.master, args.submissionToKill)
}

/**
* Request the status of an existing submission using the REST protocol.
* Standalone cluster mode only.
* Standalone and Mesos cluster mode only.
*/
private def requestStatus(args: SparkSubmitArguments): Unit = {
new StandaloneRestClient()
new RestSubmissionClient()
.requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
}

Expand Down Expand Up @@ -252,6 +254,7 @@ object SparkSubmit {
}

val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER

// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
Expand Down Expand Up @@ -294,8 +297,9 @@ object SparkSubmit {

// The following modes are not supported or applicable
(clusterManager, deployMode) match {
case (MESOS, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
case (MESOS, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on Mesos clusters.")
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
Expand Down Expand Up @@ -377,15 +381,6 @@ object SparkSubmit {
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraLibraryPath"),

// Standalone cluster only
// Do not set CL arguments here because there are multiple possibilities for the main class
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER,
sysProp = "spark.driver.supervise"),

// Yarn client only
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"),
Expand Down Expand Up @@ -413,7 +408,15 @@ object SparkSubmit {
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.files")
sysProp = "spark.files"),
OptionAssigner(args.jars, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.supervise"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
)

// In client mode, launch the application main class directly
Expand Down Expand Up @@ -452,7 +455,7 @@ object SparkSubmit {
// All Spark parameters are expected to be passed to the client through system properties.
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = "org.apache.spark.deploy.rest.StandaloneRestClient"
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
childArgs += (args.primaryResource, args.mainClass)
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
Expand Down Expand Up @@ -496,6 +499,15 @@ object SparkSubmit {
}
}

if (isMesosCluster) {
assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
childArgs += (args.primaryResource, args.mainClass)
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}

// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,19 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}

private def validateKillArguments(): Unit = {
if (!master.startsWith("spark://")) {
SparkSubmit.printErrorAndExit("Killing submissions is only supported in standalone mode!")
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
SparkSubmit.printErrorAndExit(
"Killing submissions is only supported in standalone or Mesos mode!")
}
if (submissionToKill == null) {
SparkSubmit.printErrorAndExit("Please specify a submission to kill.")
}
}

private def validateStatusRequestArguments(): Unit = {
if (!master.startsWith("spark://")) {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically not needed since we already do another level of validation in the RestSubmissionClient, but OK to keep

SparkSubmit.printErrorAndExit(
"Requesting submission statuses is only supported in standalone mode!")
"Requesting submission statuses is only supported in standalone or Mesos mode!")
}
if (submissionToRequestStatusFor == null) {
SparkSubmit.printErrorAndExit("Please specify a submission to request status for.")
Expand Down Expand Up @@ -475,6 +476,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
|
| Spark standalone with cluster deploy mode only:
| --driver-cores NUM Cores for driver (Default: 1).
|
| Spark standalone or Mesos with cluster deploy mode only:
| --supervise If given, restarts the driver on failure.
| --kill SUBMISSION_ID If given, kills the driver specified.
| --status SUBMISSION_ID If given, requests the status of the driver specified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private[master] class Master(
private val restServer =
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
Some(new StandaloneRestServer(host, port, self, masterUrl, conf))
Some(new StandaloneRestServer(host, port, conf, self, masterUrl))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}
import org.apache.spark.deploy.SparkCuratorUtil

private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkCuratorUtil


private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.mesos

import java.util.concurrent.CountDownLatch

import org.apache.spark.deploy.mesos.ui.MesosClusterUI
import org.apache.spark.deploy.rest.mesos.MesosRestServer
import org.apache.spark.scheduler.cluster.mesos._
import org.apache.spark.util.SignalLogger
import org.apache.spark.{Logging, SecurityManager, SparkConf}

/*
* A dispatcher that is responsible for managing and launching drivers, and is intended to be
* used for Mesos cluster mode. The dispatcher is a long-running process started by the user in
* the cluster independently of Spark applications.
* It contains a [[MesosRestServer]] that listens for requests to submit drivers and a
* [[MesosClusterScheduler]] that processes these requests by negotiating with the Mesos master
* for resources.
*
* A typical new driver lifecycle is the following:
* - Driver submitted via spark-submit talking to the [[MesosRestServer]]
* - [[MesosRestServer]] queues the driver request to [[MesosClusterScheduler]]
* - [[MesosClusterScheduler]] gets resource offers and launches the drivers that are in queue
*
* This dispatcher supports both Mesos fine-grain or coarse-grain mode as the mode is configurable
* per driver launched.
* This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as
* a daemon to launch drivers as Mesos frameworks upon request. The dispatcher is also started and
* stopped by sbin/start-mesos-dispatcher and sbin/stop-mesos-dispatcher respectively.
*/
private[mesos] class MesosClusterDispatcher(
args: MesosClusterDispatcherArguments,
conf: SparkConf)
extends Logging {

private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host)
private val recoveryMode = conf.get("spark.mesos.deploy.recoveryMode", "NONE").toUpperCase()
logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode)

private val engineFactory = recoveryMode match {
case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory
case "ZOOKEEPER" => new ZookeeperMesosClusterPersistenceEngineFactory(conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause a compilation warning complaining that the match is not exhaustive. If the user provides a random string then this will fail at run time with a bad message. Did you mean to make BlackHole the default? Or should we throw an informative exception?

case _ => throw new IllegalArgumentException("Unsupported recovery mode: " + recoveryMode)
}

private val scheduler = new MesosClusterScheduler(engineFactory, conf)

private val server = new MesosRestServer(args.host, args.port, conf, scheduler)
private val webUi = new MesosClusterUI(
new SecurityManager(conf),
args.webUiPort,
conf,
publicAddress,
scheduler)

private val shutdownLatch = new CountDownLatch(1)

def start(): Unit = {
webUi.bind()
scheduler.frameworkUrl = webUi.activeWebUiUrl
scheduler.start()
server.start()
}

def awaitShutdown(): Unit = {
shutdownLatch.await()
}

def stop(): Unit = {
webUi.stop()
server.stop()
scheduler.stop()
shutdownLatch.countDown()
}
}

private[mesos] object MesosClusterDispatcher extends Logging {
def main(args: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
conf.setMaster(dispatcherArgs.masterUrl)
conf.setAppName(dispatcherArgs.name)
dispatcherArgs.zookeeperUrl.foreach { z =>
conf.set("spark.mesos.deploy.recoveryMode", "ZOOKEEPER")
conf.set("spark.mesos.deploy.zookeeper.url", z)
}
val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
dispatcher.start()
val shutdownHook = new Thread() {
override def run() {
logInfo("Shutdown hook is shutting down dispatcher")
dispatcher.stop()
dispatcher.awaitShutdown()
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
dispatcher.awaitShutdown()
}
}
Loading