Skip to content

Commit e3facdd

Browse files
committed
Add Mesos Cluster dispatcher
1 parent ae036d0 commit e3facdd

File tree

5 files changed

+816
-297
lines changed

5 files changed

+816
-297
lines changed
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.mesos
19+
20+
import akka.actor.{Props, ActorSystem, Actor}
21+
22+
import scala.collection.mutable.{ArrayBuffer, HashMap}
23+
import scala.Some
24+
25+
import org.apache.spark.deploy.rest.{ErrorResponse, SubmitRestProtocolResponse, MesosRestServer}
26+
import org.apache.spark.{Logging, SecurityManager, SparkConf}
27+
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, IntParam, Utils}
28+
29+
import org.apache.spark.deploy.DeployMessages._
30+
31+
import org.apache.spark.deploy.DriverDescription
32+
import org.apache.spark.deploy.master.DriverInfo
33+
import org.apache.spark.deploy.master.DriverState.DriverState
34+
import org.apache.spark.deploy.master.DriverState
35+
import org.apache.spark.deploy.worker.DriverRunner
36+
37+
import java.io.File
38+
import java.util.Date
39+
import java.text.SimpleDateFormat
40+
41+
/*
42+
* A dispatcher actor that is responsible for managing drivers, that is intended to
43+
* used for Mesos cluster mode.
44+
* This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as
45+
* a daemon to launch drivers as Mesos frameworks upon request.
46+
*/
47+
class MesosClusterDispatcher(
48+
host: String,
49+
serverPort: Int,
50+
actorPort: Int,
51+
systemName: String,
52+
actorName: String,
53+
conf: SparkConf,
54+
masterUrl: String,
55+
workDirPath: Option[String] = None) extends Actor with ActorLogReceive with Logging {
56+
val server = new MesosRestServer(host, serverPort, self, conf, masterUrl)
57+
58+
val runners = new HashMap[String, DriverRunner]
59+
val drivers = new HashMap[String, DriverInfo]
60+
val completedDrivers = new ArrayBuffer[DriverInfo]
61+
val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
62+
var nextDriverNumber = 0
63+
64+
var workDir: File = null
65+
66+
def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
67+
68+
def createWorkDir() {
69+
workDir = workDirPath.map(new File(_)).getOrElse(new File(sparkHome, "work"))
70+
try {
71+
// This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
72+
// So attempting to create and then check if directory was created or not.
73+
workDir.mkdirs()
74+
if (!workDir.exists() || !workDir.isDirectory) {
75+
logError("Failed to create work directory " + workDir)
76+
System.exit(1)
77+
}
78+
assert (workDir.isDirectory)
79+
} catch {
80+
case e: Exception =>
81+
logError("Failed to create work directory " + workDir, e)
82+
System.exit(1)
83+
}
84+
}
85+
86+
val sparkHome =
87+
new File(sys.env.get("SPARK_HOME").getOrElse("."))
88+
89+
val akkaUrl = AkkaUtils.address(
90+
AkkaUtils.protocol(context.system),
91+
systemName,
92+
host,
93+
actorPort,
94+
actorName)
95+
96+
def newDriverId(submitDate: Date): String = {
97+
val appId = "driver-%s-%04d".format(createDateFormat.format(submitDate), nextDriverNumber)
98+
nextDriverNumber += 1
99+
appId
100+
}
101+
102+
def createDriver(desc: DriverDescription): DriverInfo = {
103+
val now = System.currentTimeMillis()
104+
val date = new Date(now)
105+
new DriverInfo(now, newDriverId(date), desc, date)
106+
}
107+
108+
override def preStart() {
109+
createWorkDir()
110+
server.start()
111+
}
112+
113+
override def postStop() {
114+
server.stop()
115+
}
116+
117+
override def receiveWithLogging = {
118+
case RequestSubmitDriver(driverDescription) => {
119+
val driverInfo = createDriver(driverDescription)
120+
val runner = new DriverRunner(conf, driverInfo.id, workDir,
121+
sparkHome, driverDescription, self, akkaUrl)
122+
runners(driverInfo.id) = runner
123+
drivers(driverInfo.id) = driverInfo
124+
runner.start()
125+
sender ! SubmitDriverResponse(true, Option(driverInfo.id), "")
126+
}
127+
128+
case RequestKillDriver(driverId) => {
129+
if (!drivers.contains(driverId)) {
130+
if (completedDrivers.exists(_.id == driverId)) {
131+
sender ! KillDriverResponse(driverId, false, "Driver already completed")
132+
} else {
133+
sender ! KillDriverResponse(driverId, false, "Unknown driver")
134+
}
135+
} else {
136+
runners(driverId).kill()
137+
sender ! KillDriverResponse(driverId, true, "")
138+
}
139+
}
140+
141+
case RequestDriverStatus(driverId) => {
142+
drivers.get(driverId).orElse(completedDrivers.find(_.id == driverId)) match {
143+
case Some(driver) =>
144+
sender ! DriverStatusResponse(found = true, Some(driver.state),
145+
None, None, driver.exception)
146+
case None =>
147+
sender ! DriverStatusResponse(found = false, None, None, None, None)
148+
}
149+
}
150+
151+
case DriverStateChanged(driverId, state, exception) => {
152+
state match {
153+
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
154+
removeDriver(driverId, state, exception)
155+
case _ =>
156+
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
157+
}
158+
}
159+
}
160+
161+
def removeDriver(driverId: String, state: DriverState, exception: Option[Exception]) {
162+
if (completedDrivers.size >= RETAINED_DRIVERS) {
163+
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
164+
completedDrivers.trimStart(toRemove)
165+
}
166+
val driverInfo = drivers.remove(driverId).get
167+
driverInfo.exception = exception
168+
driverInfo.state = state
169+
completedDrivers += driverInfo
170+
}
171+
}
172+
173+
object MesosClusterDispatcher {
174+
def main(args: Array[String]) {
175+
val conf = new SparkConf
176+
val clusterArgs = new ClusterDispatcherArguments(args, conf)
177+
val actorSystem = startSystemAndActor(clusterArgs)
178+
actorSystem.awaitTermination()
179+
}
180+
181+
def startSystemAndActor(
182+
args: ClusterDispatcherArguments): ActorSystem = {
183+
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
184+
val conf = new SparkConf
185+
val systemName = "spark-mesos-cluster"
186+
val actorName = "MesosClusterDispatcher"
187+
val securityMgr = new SecurityManager(conf)
188+
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
189+
systemName, args.host, 0, conf, securityMgr)
190+
actorSystem.actorOf(
191+
Props(classOf[MesosClusterDispatcher],
192+
args.host,
193+
args.port,
194+
boundPort,
195+
systemName,
196+
actorName,
197+
conf,
198+
args.masterUrl),
199+
name = actorName)
200+
actorSystem
201+
}
202+
203+
class ClusterDispatcherArguments(args: Array[String], conf: SparkConf) {
204+
var host = Utils.localHostName()
205+
var port = 7077
206+
var masterUrl: String = null
207+
208+
parse(args.toList)
209+
210+
def parse(args: List[String]): Unit = args match {
211+
case ("--host" | "-h") :: value :: tail =>
212+
Utils.checkHost(value, "Please use hostname " + value)
213+
host = value
214+
parse(tail)
215+
216+
case ("--port" | "-p") :: IntParam(value) :: tail =>
217+
port = value
218+
parse(tail)
219+
220+
case ("--master" | "-m") :: value :: tail =>
221+
if (!value.startsWith("mesos://")) {
222+
System.err.println("Cluster dispatcher only supports mesos (uri begins with mesos://)")
223+
System.exit(1)
224+
}
225+
masterUrl = value
226+
parse(tail)
227+
228+
case ("--help") :: tail =>
229+
printUsageAndExit(0)
230+
231+
case Nil => {
232+
if (masterUrl == null) {
233+
System.err.println("--master is required")
234+
System.exit(1)
235+
}
236+
}
237+
238+
case _ =>
239+
printUsageAndExit(1)
240+
}
241+
242+
/**
243+
* Print usage and exit JVM with the given exit code.
244+
*/
245+
def printUsageAndExit(exitCode: Int) {
246+
System.err.println(
247+
"Usage: MesosClusterDispatcher [options]\n" +
248+
"\n" +
249+
"Options:\n" +
250+
" -h HOST, --host HOST Hostname to listen on\n" +
251+
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
252+
" -m --master MASTER URI for connecting to Mesos master\n")
253+
System.exit(exitCode)
254+
}
255+
}
256+
}

0 commit comments

Comments
 (0)