diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1a0bee4e3aea9..e2f5caf4e473b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -57,7 +57,7 @@ import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
SparkDeploySchedulerBackend, SimrSchedulerBackend}
-import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
+import org.apache.spark.scheduler.cluster.mesos.{CoarseGrainedMesosSchedulerBackend, FineGrainedMesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
import org.apache.spark.ui.{SparkUI, ConsoleProgressBar}
@@ -2244,9 +2244,9 @@ object SparkContext extends Logging {
val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
val backend = if (coarseGrained) {
- new CoarseMesosSchedulerBackend(scheduler, sc, url)
+ new CoarseGrainedMesosSchedulerBackend(scheduler, sc, url)
} else {
- new MesosSchedulerBackend(scheduler, sc, url)
+ new FineGrainedMesosSchedulerBackend(scheduler, sc, url)
}
scheduler.initialize(backend)
(backend, scheduler)
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index cfd672e1d8a97..c4849be3d432a 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -54,7 +54,7 @@ private[spark] class MesosExecutorBackend
frameworkInfo: FrameworkInfo,
slaveInfo: SlaveInfo) {
- // Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend.
+ // Get num cores for this task from ExecutorInfo, created in FineGrainedMesosSchedulerBackend.
val cpusPerTask = executorInfo.getResourcesList
.find(_.getName == "cpus")
.map(_.getScalar.getValue.toInt)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 6f77fa32ce37b..0324e47bfe6a0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -75,7 +75,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
override protected def log = CoarseGrainedSchedulerBackend.this.log
private val addressToExecutorId = new HashMap[Address, String]
- override def preStart() {
+ override def preStart(): Unit = {
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
@@ -163,21 +163,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
}
// Make fake resource offers on all executors
- def makeOffers() {
+ def makeOffers(): Unit = {
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq))
}
// Make fake resource offers on just one executor
- def makeOffers(executorId: String) {
+ def makeOffers(executorId: String): Unit = {
val executorData = executorDataMap(executorId)
launchTasks(scheduler.resourceOffers(
Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))))
}
// Launch tasks returned by a set of resource offers
- def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
+ def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
for (task <- tasks.flatten) {
val ser = SparkEnv.get.closureSerializer.newInstance()
val serializedTask = ser.serialize(task)
@@ -227,7 +227,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
var driverActor: ActorRef = null
val taskIdsOnSlave = new HashMap[String, HashSet[String]]
- override def start() {
+ override def start(): Unit = {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
@@ -239,7 +239,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
- def stopExecutors() {
+ def stopExecutors(): Unit = {
try {
if (driverActor != null) {
logInfo("Shutting down all executors")
@@ -252,7 +252,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
}
}
- override def stop() {
+ override def stop(): Unit = {
stopExecutors()
try {
if (driverActor != null) {
@@ -265,11 +265,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
}
}
- override def reviveOffers() {
+ override def reviveOffers(): Unit = {
driverActor ! ReviveOffers
}
- override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) {
+ override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
driverActor ! KillTask(taskId, executorId, interruptThread)
}
@@ -363,6 +363,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
*/
final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
+ println(s"killExecutors: ids = ${executorIds}")
val filteredExecutorIds = new ArrayBuffer[String]
executorIds.foreach { id =>
if (executorDataMap.contains(id)) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseGrainedMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseGrainedMesosSchedulerBackend.scala
new file mode 100644
index 0000000000000..04cb3fbd18884
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseGrainedMesosSchedulerBackend.scala
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.io.File
+import java.util.{List => JList}
+import java.util.Collections
+import java.util.concurrent.locks.ReentrantLock
+
+import com.google.common.collect.HashBiMap
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{HashMap => MutableHashMap, HashSet => MutableHashSet}
+
+import org.apache.mesos.{Scheduler => MScheduler}
+import org.apache.mesos._
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+
+import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.{Utils, AkkaUtils}
+
+/**
+ * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
+ * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
+ * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
+ * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
+ * latency.
+ *
+ * Unfortunately, there is some duplication with FineGrainedMesosSchedulerBackend
+ * that is hard to remove.
+ */
+private[spark] class CoarseGrainedMesosSchedulerBackend(
+ val scheduler: TaskSchedulerImpl,
+ val sparkContext: SparkContext,
+ val master: String)
+ extends CoarseGrainedSchedulerBackend(scheduler, sparkContext.env.actorSystem)
+ with CommonMesosSchedulerBackend
+ with MScheduler
+ with Logging {
+
+ val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
+
+ // Maximum number of cores to acquire (TODO: we'll need more flexible controls here.)
+ private[mesos] val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
+
+ /** Cores we have acquired with each Mesos task ID */
+ private[mesos] val coresByTaskId = MutableHashMap.empty[Int, Int]
+
+ private[mesos] var totalCoresAcquired = 0
+
+ // How many times tasks on each slave failed?
+ private[mesos] val failuresBySlaveId = MutableHashMap.empty[String, Int]
+
+ private[mesos] val pendingRemovedSlaveIds = MutableHashSet.empty[String]
+
+ protected val executorBackend = this.getClass
+
+ val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
+
+ var nextMesosTaskId = 0
+
+ /** Return a new task id for coarse-grained mode. */
+ def newMesosTaskId(): Int = {
+ val id = nextMesosTaskId
+ nextMesosTaskId += 1
+ id
+ }
+
+ // ==== Definitions for start():
+
+ // Nothing to do
+ protected def preStart(): Unit = {}
+
+ // Nothing to do
+ protected def postStart(): Unit = {}
+
+ /** @see CommonMesosSchedulerBackend.doStart() */
+ override def start() = {
+ super.start()
+ doStart()
+ }
+
+ /** @see CommonMesosSchedulerBackend.doStop() */
+ override def stop() {
+ super.stop()
+ doStop()
+ }
+
+ def createCommand(offer: Offer, numCores: Int): CommandInfo = {
+ val extraCommandArguments =
+ s" --driver-url $driverUrl" +
+ s" --executor-id ${offer.getSlaveId.getValue}" +
+ s" --hostname ${offer.getHostname}" +
+ s" --cores $numCores" +
+ s" --app-id $appId"
+ createCommandInfo(extraCommandArguments)
+ }
+
+ protected def driverUrl: String = {
+ AkkaUtils.address(
+ AkkaUtils.protocol(sparkContext.env.actorSystem),
+ SparkEnv.driverActorSystemName,
+ conf.get("spark.driver.host"),
+ conf.get("spark.driver.port"),
+ CoarseGrainedSchedulerBackend.ACTOR_NAME)
+ }
+
+ override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) = {
+ doRegistered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo)
+ }
+
+ /**
+ * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
+ * unless we've already launched more than we wanted to.
+ */
+ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
+ stateLock.synchronized {
+ val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
+
+ for (offer <- offers) {
+ val slaveId = offer.getSlaveId.getValue
+ val mem = getResource(offer.getResourcesList, "mem")
+ val cpus = getResource(offer.getResourcesList, "cpus").toInt
+ if (taskIdToSlaveId.size < executorLimit &&
+ totalCoresAcquired < maxCores &&
+ mem >= MemoryUtils.calculateTotalMemory(sparkContext) &&
+ cpus >= 1 &&
+ failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
+ !slaveIdsWithExecutors.contains(slaveId)) {
+ // Launch an executor on the slave
+ val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
+ totalCoresAcquired += cpusToUse
+ val taskId = newMesosTaskId()
+ taskIdToSlaveId(taskId) = slaveId
+ slaveIdsWithExecutors += slaveId
+ coresByTaskId(taskId) = cpusToUse
+ val task = MesosTaskInfo.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
+ .setSlaveId(offer.getSlaveId)
+ .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
+ .setName("Task " + taskId)
+ .addResources(createResource("cpus", cpusToUse))
+ .addResources(createResource("mem",
+ MemoryUtils.calculateTotalMemory(sparkContext)))
+ .build()
+ d.launchTasks(
+ Collections.singleton(offer.getId), Collections.singletonList(task), filters)
+ } else {
+ // Filter it out
+ driver.declineOffer(offer.getId)
+ }
+ }
+ }
+ }
+
+ /** Build a Mesos resource protobuf object */
+ private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
+ Resource.newBuilder()
+ .setName(resourceName)
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
+ .build()
+ }
+
+
+ override def statusUpdate(d: SchedulerDriver, status: TaskStatus): Unit = {
+ val taskId = status.getTaskId.getValue.toInt
+ val state = status.getState
+ logInfo("Mesos task " + taskId + " is now " + state)
+ stateLock.synchronized {
+ if (isFinished(state)) {
+ val slaveId = taskIdToSlaveId(taskId)
+ slaveIdsWithExecutors -= slaveId
+ taskIdToSlaveId -= taskId
+ // Remove the cores we have remembered for this task, if it's in the hashmap
+ for (cores <- coresByTaskId.get(taskId)) {
+ totalCoresAcquired -= cores
+ coresByTaskId -= taskId
+ }
+ // If it was a failure, mark the slave as failed for blacklisting purposes
+ if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
+ failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
+ if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
+ logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
+ "is Spark installed on it?")
+ }
+ }
+ executorTerminated(d, slaveId, s"Executor finished with state $state")
+ driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
+ }
+ }
+ }
+
+ override def error(d: SchedulerDriver, message: String) = doError(d, message)
+
+ /** Called when a slave is lost or a Mesos task finished. Update local view on
+ * what tasks are running and remove the terminated slave from the list of pending
+ * slave IDs that we might have asked to be killed. It also notifies the driver
+ * that an executor was removed.
+ */
+ private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String) {
+ stateLock.synchronized {
+ if (slaveIdsWithExecutors.contains(slaveId)) {
+ val slaveIdToTaskId = taskIdToSlaveId.inverse()
+ if (slaveIdToTaskId.contains(slaveId)) {
+ val taskId: Long = slaveIdToTaskId.get(slaveId)
+ taskIdToSlaveId.remove(taskId)
+ removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason)
+ }
+ pendingRemovedSlaveIds -= slaveId
+ slaveIdsWithExecutors -= slaveId
+ }
+ }
+ }
+
+ private def sparkExecutorId(slaveId: String, taskId: String) = "%s/%s".format(slaveId, taskId)
+
+ override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+ val sid = slaveId.getValue
+ logInfo("Mesos slave lost: " + sid)
+ executorTerminated(d, sid, "Mesos slave lost: " + sid)
+ }
+
+ override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
+ logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
+ slaveLost(d, s)
+ }
+
+ override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+ // We don't truly know if we can fulfill the full amount of executors
+ // since at coarse grain it depends on the amount of slaves available.
+ logInfo("Capping the total amount of executors to " + requestedTotal)
+ executorLimit = requestedTotal
+ true
+ }
+
+ override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+ if (driver == null) {
+ logWarning("Asked to kill executors before the executor was started.")
+ return false
+ }
+
+ val slaveIdToTaskId = taskIdToSlaveId.inverse()
+ for (executorId <- executorIds) {
+ val slaveId = executorId.split("/")(0)
+ if (slaveIdToTaskId.contains(slaveId)) {
+ driver.killTask(
+ TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build)
+ pendingRemovedSlaveIds += slaveId
+ } else {
+ logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler")
+ }
+ }
+
+ assert(pendingRemovedSlaveIds.size <= taskIdToSlaveId.size)
+
+ // We cannot simply decrement from the existing executor limit as we may not able to
+ // launch as much executors as the limit. But we assume if we are notified to kill
+ // executors, that means the scheduler wants to set the limit that is less than
+ // the amount of the executors that has been launched. Therefore, we take the existing
+ // amount of executors launched and deduct the executors killed as the new limit.
+ executorLimit = taskIdToSlaveId.size - pendingRemovedSlaveIds.size
+ true
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
deleted file mode 100644
index 90dfe14352a8e..0000000000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.io.File
-import java.util.{List => JList}
-import java.util.Collections
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{HashMap, HashSet}
-
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.{Utils, AkkaUtils}
-
-/**
- * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
- * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
- * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
- * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
- * latency.
- *
- * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
- * remove this.
- */
-private[spark] class CoarseMesosSchedulerBackend(
- scheduler: TaskSchedulerImpl,
- sc: SparkContext,
- master: String)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
- with MScheduler
- with Logging {
-
- val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
-
- // Lock used to wait for scheduler to be registered
- var isRegistered = false
- val registeredLock = new Object()
-
- // Driver for talking to Mesos
- var driver: SchedulerDriver = null
-
- // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
- val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
-
- // Cores we have acquired with each Mesos task ID
- val coresByTaskId = new HashMap[Int, Int]
- var totalCoresAcquired = 0
-
- val slaveIdsWithExecutors = new HashSet[String]
-
- val taskIdToSlaveId = new HashMap[Int, String]
- val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
-
-
- val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
-
- var nextMesosTaskId = 0
-
- @volatile var appId: String = _
-
- def newMesosTaskId(): Int = {
- val id = nextMesosTaskId
- nextMesosTaskId += 1
- id
- }
-
- override def start() {
- super.start()
-
- synchronized {
- new Thread("CoarseMesosSchedulerBackend driver") {
- setDaemon(true)
- override def run() {
- val scheduler = CoarseMesosSchedulerBackend.this
- val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
- driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
- try { {
- val ret = driver.run()
- logInfo("driver.run() returned with code " + ret)
- }
- } catch {
- case e: Exception => logError("driver.run() failed", e)
- }
- }
- }.start()
-
- waitForRegister()
- }
- }
-
- def createCommand(offer: Offer, numCores: Int): CommandInfo = {
- val executorSparkHome = conf.getOption("spark.mesos.executor.home")
- .orElse(sc.getSparkHome())
- .getOrElse {
- throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
- }
- val environment = Environment.newBuilder()
- val extraClassPath = conf.getOption("spark.executor.extraClassPath")
- extraClassPath.foreach { cp =>
- environment.addVariables(
- Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
- }
- val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "")
-
- // Set the environment variable through a command prefix
- // to append to the existing value of the variable
- val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p =>
- Utils.libraryPathEnvPrefix(Seq(p))
- }.getOrElse("")
-
- environment.addVariables(
- Environment.Variable.newBuilder()
- .setName("SPARK_EXECUTOR_OPTS")
- .setValue(extraJavaOpts)
- .build())
-
- sc.executorEnvs.foreach { case (key, value) =>
- environment.addVariables(Environment.Variable.newBuilder()
- .setName(key)
- .setValue(value)
- .build())
- }
- val command = CommandInfo.newBuilder()
- .setEnvironment(environment)
- val driverUrl = AkkaUtils.address(
- AkkaUtils.protocol(sc.env.actorSystem),
- SparkEnv.driverActorSystemName,
- conf.get("spark.driver.host"),
- conf.get("spark.driver.port"),
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
- val uri = conf.get("spark.executor.uri", null)
- if (uri == null) {
- val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
- command.setValue(
- "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
- .format(prefixEnv, runScript) +
- s" --driver-url $driverUrl" +
- s" --executor-id ${offer.getSlaveId.getValue}" +
- s" --hostname ${offer.getHostname}" +
- s" --cores $numCores" +
- s" --app-id $appId")
- } else {
- // Grab everything to the first '.'. We'll use that and '*' to
- // glob the directory "correctly".
- val basename = uri.split('/').last.split('.').head
- command.setValue(
- s"cd $basename*; $prefixEnv " +
- "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
- s" --driver-url $driverUrl" +
- s" --executor-id ${offer.getSlaveId.getValue}" +
- s" --hostname ${offer.getHostname}" +
- s" --cores $numCores" +
- s" --app-id $appId")
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
- }
- command.build()
- }
-
- override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
- appId = frameworkId.getValue
- logInfo("Registered as framework ID " + appId)
- registeredLock.synchronized {
- isRegistered = true
- registeredLock.notifyAll()
- }
- }
-
- def waitForRegister() {
- registeredLock.synchronized {
- while (!isRegistered) {
- registeredLock.wait()
- }
- }
- }
-
- override def disconnected(d: SchedulerDriver) {}
-
- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
-
- /**
- * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
- * unless we've already launched more than we wanted to.
- */
- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
- synchronized {
- val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
-
- for (offer <- offers) {
- val slaveId = offer.getSlaveId.toString
- val mem = getResource(offer.getResourcesList, "mem")
- val cpus = getResource(offer.getResourcesList, "cpus").toInt
- if (totalCoresAcquired < maxCores &&
- mem >= MemoryUtils.calculateTotalMemory(sc) &&
- cpus >= 1 &&
- failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
- !slaveIdsWithExecutors.contains(slaveId)) {
- // Launch an executor on the slave
- val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
- totalCoresAcquired += cpusToUse
- val taskId = newMesosTaskId()
- taskIdToSlaveId(taskId) = slaveId
- slaveIdsWithExecutors += slaveId
- coresByTaskId(taskId) = cpusToUse
- val task = MesosTaskInfo.newBuilder()
- .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
- .setSlaveId(offer.getSlaveId)
- .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
- .setName("Task " + taskId)
- .addResources(createResource("cpus", cpusToUse))
- .addResources(createResource("mem",
- MemoryUtils.calculateTotalMemory(sc)))
- .build()
- d.launchTasks(
- Collections.singleton(offer.getId), Collections.singletonList(task), filters)
- } else {
- // Filter it out
- d.launchTasks(
- Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters)
- }
- }
- }
- }
-
- /** Helper function to pull out a resource from a Mesos Resources protobuf */
- private def getResource(res: JList[Resource], name: String): Double = {
- for (r <- res if r.getName == name) {
- return r.getScalar.getValue
- }
- 0
- }
-
- /** Build a Mesos resource protobuf object */
- private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
- Resource.newBuilder()
- .setName(resourceName)
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
- .build()
- }
-
- /** Check whether a Mesos task state represents a finished task */
- private def isFinished(state: MesosTaskState) = {
- state == MesosTaskState.TASK_FINISHED ||
- state == MesosTaskState.TASK_FAILED ||
- state == MesosTaskState.TASK_KILLED ||
- state == MesosTaskState.TASK_LOST
- }
-
- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
- val taskId = status.getTaskId.getValue.toInt
- val state = status.getState
- logInfo("Mesos task " + taskId + " is now " + state)
- synchronized {
- if (isFinished(state)) {
- val slaveId = taskIdToSlaveId(taskId)
- slaveIdsWithExecutors -= slaveId
- taskIdToSlaveId -= taskId
- // Remove the cores we have remembered for this task, if it's in the hashmap
- for (cores <- coresByTaskId.get(taskId)) {
- totalCoresAcquired -= cores
- coresByTaskId -= taskId
- }
- // If it was a failure, mark the slave as failed for blacklisting purposes
- if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
- failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
- if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
- logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
- "is Spark installed on it?")
- }
- }
- driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
- }
- }
- }
-
- override def error(d: SchedulerDriver, message: String) {
- logError("Mesos error: " + message)
- scheduler.error(message)
- }
-
- override def stop() {
- super.stop()
- if (driver != null) {
- driver.stop()
- }
- }
-
- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
- logInfo("Mesos slave lost: " + slaveId.getValue)
- synchronized {
- if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
- // Note that the slave ID corresponds to the executor ID on that slave
- slaveIdsWithExecutors -= slaveId.getValue
- removeExecutor(slaveId.getValue, "Mesos slave lost")
- }
- }
- }
-
- override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
- logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
- slaveLost(d, s)
- }
-
- override def applicationId(): String =
- Option(appId).getOrElse {
- logWarning("Application ID is not initialized yet.")
- super.applicationId
- }
-
-}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CommonMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CommonMesosSchedulerBackend.scala
new file mode 100644
index 0000000000000..6635ca6f7546f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CommonMesosSchedulerBackend.scala
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.io.File
+import java.util.{List => JList}
+import java.util.Collections
+import java.util.concurrent.locks.ReentrantLock
+
+import com.google.common.collect.HashBiMap
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{HashMap => MutableHashMap, HashSet => MutableHashSet}
+
+import org.apache.mesos.{Scheduler => MScheduler}
+import org.apache.mesos._
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+
+import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
+import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.{Utils, AkkaUtils}
+
+/**
+ * Shared code between {@link FineGrainedMesosSchedulerBackend} and
+ * {@link CoarseGrainedMesosSchedulerBackend}.
+ */
+trait CommonMesosSchedulerBackend extends SchedulerBackend {
+
+ self: MScheduler with Logging =>
+
+ // TODO Move these declarations somewhere else?
+ def resourceOffers(d: SchedulerDriver, offers: JList[Offer]): Unit
+ def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit
+ def statusUpdate(d: SchedulerDriver, status: TaskStatus): Unit
+ def requestExecutors(numAdditionalExecutors: Int): Boolean
+ def requestTotalExecutors(numAdditionalExecutors: Int): Boolean
+ def doKillExecutors(executorIds: Seq[String]): Boolean
+
+ val scheduler: TaskSchedulerImpl
+ val sparkContext: SparkContext
+ val master: String
+
+ /** Driver for talking to Mesos */
+ var driver: SchedulerDriver = null
+
+ // the total number of executors we aim to have
+ private[mesos] var executorLimit = Int.MaxValue
+
+ /**
+ * Return the current executor limit, which may be [[Int.MaxValue]].
+ */
+ def getExecutorLimit = executorLimit
+
+ protected val executorBackend: Class[_]
+
+ private[mesos] val taskIdToSlaveId = HashBiMap.create[Long, String]
+
+ private[mesos] val slaveIdsWithExecutors = MutableHashSet.empty[String]
+
+ def slaveHasExecutor(slaveId: String) = {
+ slaveIdsWithExecutors.contains(slaveId)
+ }
+
+ private def executorBackendName: String = executorBackend.getName
+ private def executorSimpleBackendName: String = executorBackend.getSimpleName
+
+ @volatile var appId: String = _
+
+ // Lock used to wait for scheduler to be registered
+ private var isRegistered = false
+ private val registeredLock = new Object()
+
+ // Protected lock object protecting other mutable state. Using the intrinsic lock
+ // may lead to deadlocks since the superclass might also try to lock
+ protected val stateLock = new ReentrantLock
+
+ // ==== Declarations for doStart():
+
+ protected def preStart(): Unit
+ protected def postStart(): Unit
+
+ /**
+ * We would like to override start here and we almost can, except
+ * unfortunately, we have to call super.start in
+ * CoarseGrainedMesosSchedulerBackend.start, to invoke
+ * CoarseGrainedSchedulerBackend.start), which is concrete.
+ * However, for FineGrainedMesosSchedulerBackend, we _can't_ call
+ * super.start, because SchedulerBackend.start is abstract.
+ * So, all the common logic is implemented in this helper method and each
+ * concrete class overrides start itself.
+ */
+ protected def doStart(): Unit = {
+ preStart()
+
+ stateLock.synchronized {
+ new Thread(s"$executorSimpleBackendName driver") {
+ setDaemon(true)
+ override def run() {
+ // i.e., val scheduler = CoarseGrainedMesosSchedulerBackend.this
+ val scheduler = self
+ val fwInfo = FrameworkInfo.newBuilder().
+ setUser(sparkContext.sparkUser).setName(sparkContext.appName).build()
+ driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
+ try {
+ val ret = driver.run()
+ logInfo("driver.run() returned with code " + ret)
+ } catch {
+ case e: Exception => logError("driver.run() failed", e)
+ }
+ }
+ }.start()
+
+ waitForRegister()
+ postStart()
+ }
+ }
+
+ /** Like start, we must override stop the same way. */
+ protected def doStop(): Unit = {
+ if (driver != null) {
+ driver.stop()
+ }
+ }
+
+ def createCommandInfo(extraCommandArguments: String): CommandInfo = {
+ val executorSparkHome = sparkContext.conf.getOption("spark.mesos.executor.home")
+ .orElse(sparkContext.getSparkHome())
+ .getOrElse {
+ throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
+ }
+ val environment = Environment.newBuilder()
+ sparkContext.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
+ environment.addVariables(
+ Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
+ }
+ val extraJavaOpts = sparkContext.conf.get("spark.executor.extraJavaOptions", "")
+
+ // Set the environment variable through a command prefix
+ // to append to the existing value of the variable
+ val prefixEnv = sparkContext.conf.getOption("spark.executor.extraLibraryPath").map { p =>
+ Utils.libraryPathEnvPrefix(Seq(p))
+ }.getOrElse("")
+
+ environment.addVariables(
+ Environment.Variable.newBuilder()
+ .setName("SPARK_EXECUTOR_OPTS")
+ .setValue(extraJavaOpts)
+ .build())
+
+ sparkContext.executorEnvs.foreach { case (key, value) =>
+ environment.addVariables(Environment.Variable.newBuilder()
+ .setName(key)
+ .setValue(value)
+ .build())
+ }
+ val command = CommandInfo.newBuilder()
+ .setEnvironment(environment)
+
+ val uri = sparkContext.conf.get("spark.executor.uri", null)
+ if (uri == null) {
+ val executorPath= new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
+ command.setValue("%s \"%s\" %s %s".format(
+ prefixEnv, executorPath, executorBackendName, extraCommandArguments))
+ } else {
+ // Grab everything to the first '.'. We'll use that and '*' to
+ // glob the directory "correctly".
+ val basename = uri.split('/').last.split('.').head
+ command.setValue("cd %s*; %s \"%s\" %s %s".format(
+ basename, prefixEnv, "./bin/spark-class", executorBackendName, extraCommandArguments))
+ command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+ }
+ command.build()
+ }
+
+ /** Handle rescinding of an offer from Mesos. */
+ override def offerRescinded(d: SchedulerDriver, o: OfferID): Unit = {}
+
+ /**
+ * Implements registered for coarse grained, but the fine grained
+ * implementation wraps it in the separate class loader.
+ */
+ protected def doRegistered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo): Unit = {
+ appId = frameworkId.getValue
+ logInfo("Registered as framework ID " + appId)
+ registeredLock.synchronized {
+ isRegistered = true
+ registeredLock.notifyAll()
+ }
+ }
+
+ /** Busy-wait for registration to complete. */
+ def waitForRegister(): Unit = {
+ registeredLock.synchronized {
+ while (!isRegistered) {
+ registeredLock.wait()
+ }
+ }
+ }
+
+ override def disconnected(d: SchedulerDriver): Unit = {}
+
+ override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo): Unit = {}
+
+
+ /** Helper function to pull out a resource from a Mesos Resources protobuf */
+ protected def getResource(res: JList[Resource], name: String): Double = {
+ for (r <- res if r.getName == name) {
+ return r.getScalar.getValue
+ }
+ 0
+ }
+
+ /** Check whether a Mesos task state represents a finished task */
+ protected def isFinished(state: MesosTaskState): Boolean = {
+ state == MesosTaskState.TASK_FINISHED ||
+ state == MesosTaskState.TASK_FAILED ||
+ state == MesosTaskState.TASK_KILLED ||
+ state == MesosTaskState.TASK_LOST
+ }
+
+ /**
+ * Implements error for coarse grained, but the fine grained
+ * implementation wraps it in the separate class loader.
+ */
+ protected def doError(d: SchedulerDriver, message: String): Unit = {
+ logError("Mesos error: " + message)
+ scheduler.error(message)
+ }
+
+ override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]): Unit = {}
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/FineGrainedMesosSchedulerBackend.scala
similarity index 58%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
rename to core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/FineGrainedMesosSchedulerBackend.scala
index cfb6592e14aa8..27e719e633926 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/FineGrainedMesosSchedulerBackend.scala
@@ -40,101 +40,50 @@ import org.apache.spark.util.Utils
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
* separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
* from multiple apps can run on different cores) and in time (a core can switch ownership).
+ *
+ * Unfortunately, there is some duplication with CoarseGrainedMesosSchedulerBackend
+ * that is hard to remove.
*/
-private[spark] class MesosSchedulerBackend(
- scheduler: TaskSchedulerImpl,
- sc: SparkContext,
- master: String)
- extends SchedulerBackend
+private[spark] class FineGrainedMesosSchedulerBackend(
+ val scheduler: TaskSchedulerImpl,
+ val sparkContext: SparkContext,
+ val master: String)
+ extends CommonMesosSchedulerBackend
with MScheduler
with Logging {
- // Lock used to wait for scheduler to be registered
- var isRegistered = false
- val registeredLock = new Object()
-
- // Driver for talking to Mesos
- var driver: SchedulerDriver = null
-
- // Which slave IDs we have executors on
- val slaveIdsWithExecutors = new HashSet[String]
- val taskIdToSlaveId = new HashMap[Long, String]
-
// An ExecutorInfo for our tasks
var execArgs: Array[Byte] = null
var classLoader: ClassLoader = null
// The listener bus to publish executor added/removed events.
- val listenerBus = sc.listenerBus
-
- @volatile var appId: String = _
-
- override def start() {
- synchronized {
- classLoader = Thread.currentThread.getContextClassLoader
-
- new Thread("MesosSchedulerBackend driver") {
- setDaemon(true)
- override def run() {
- val scheduler = MesosSchedulerBackend.this
- val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
- driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
- try {
- val ret = driver.run()
- logInfo("driver.run() returned with code " + ret)
- } catch {
- case e: Exception => logError("driver.run() failed", e)
- }
- }
- }.start()
+ val listenerBus = sparkContext.listenerBus
- waitForRegister()
- }
+ // ==== Definitions for start():
+
+ protected val executorBackend = classOf[MesosExecutorBackend]
+
+ // Initialize the classLoader.
+ protected def preStart(): Unit = {
+ classLoader = Thread.currentThread.getContextClassLoader
+ }
+
+ // Nothing to do
+ protected def postStart(): Unit = {}
+
+ /** @see CommonMesosSchedulerBackend.doStart() */
+ override def start(): Unit = {
+ doStart()
+ }
+
+ /** @see CommonMesosSchedulerBackend.doStop() */
+ override def stop(): Unit = {
+ doStop()
}
def createExecutorInfo(execId: String): MesosExecutorInfo = {
- val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
- .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
- .getOrElse {
- throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
- }
- val environment = Environment.newBuilder()
- sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
- environment.addVariables(
- Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
- }
- val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("")
-
- val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { p =>
- Utils.libraryPathEnvPrefix(Seq(p))
- }.getOrElse("")
-
- environment.addVariables(
- Environment.Variable.newBuilder()
- .setName("SPARK_EXECUTOR_OPTS")
- .setValue(extraJavaOpts)
- .build())
- sc.executorEnvs.foreach { case (key, value) =>
- environment.addVariables(Environment.Variable.newBuilder()
- .setName(key)
- .setValue(value)
- .build())
- }
- val command = CommandInfo.newBuilder()
- .setEnvironment(environment)
- val uri = sc.conf.get("spark.executor.uri", null)
- val executorBackendName = classOf[MesosExecutorBackend].getName
- if (uri == null) {
- val executorPath = new File(executorSparkHome, "/bin/spark-class").getCanonicalPath
- command.setValue(s"$prefixEnv $executorPath $executorBackendName")
- } else {
- // Grab everything to the first '.'. We'll use that and '*' to
- // glob the directory "correctly".
- val basename = uri.split('/').last.split('.').head
- command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
- }
+ val command = createCommandInfo("")
val cpus = Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
@@ -146,7 +95,7 @@ private[spark] class MesosSchedulerBackend(
.setType(Value.Type.SCALAR)
.setScalar(
Value.Scalar.newBuilder()
- .setValue(MemoryUtils.calculateTotalMemory(sc)).build())
+ .setValue(MemoryUtils.calculateTotalMemory(sparkContext)).build())
.build()
MesosExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
@@ -164,7 +113,7 @@ private[spark] class MesosSchedulerBackend(
private def createExecArg(): Array[Byte] = {
if (execArgs == null) {
val props = new HashMap[String, String]
- for ((key,value) <- sc.conf.getAll) {
+ for ((key,value) <- sparkContext.conf.getAll) {
props(key) = value
}
// Serialize the map as an array of (String, String) pairs
@@ -173,28 +122,14 @@ private[spark] class MesosSchedulerBackend(
execArgs
}
- override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+ /** TODO: Is wrapping in the separate class loader necessary? */
+ override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo): Unit = {
inClassLoader() {
- appId = frameworkId.getValue
- logInfo("Registered as framework ID " + appId)
- registeredLock.synchronized {
- isRegistered = true
- registeredLock.notifyAll()
- }
+ doRegistered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo)
}
}
- def waitForRegister() {
- registeredLock.synchronized {
- while (!isRegistered) {
- registeredLock.wait()
- }
- }
- }
-
- private def inClassLoader()(fun: => Unit) = {
+ private def inClassLoader()(fun: => Unit): Unit = {
val oldClassLoader = Thread.currentThread.getContextClassLoader
Thread.currentThread.setContextClassLoader(classLoader)
try {
@@ -204,39 +139,30 @@ private[spark] class MesosSchedulerBackend(
}
}
- override def disconnected(d: SchedulerDriver) {}
-
- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
-
/**
* Method called by Mesos to offer resources on slaves. We respond by asking our active task sets
* for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
* tasks are balanced across the cluster.
*/
- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
+ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]): Unit = {
inClassLoader() {
// Fail-fast on offers we know will be rejected
val (usableOffers, unUsableOffers) = offers.partition { o =>
+ val slaveId = o.getSlaveId.getValue
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
- val slaveId = o.getSlaveId.getValue
+ val minMemory = MemoryUtils.calculateTotalMemory(sparkContext)
// TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK?
- (mem >= MemoryUtils.calculateTotalMemory(sc) &&
- // need at least 1 for executor, 1 for task
- cpus >= 2 * scheduler.CPUS_PER_TASK) ||
- (slaveIdsWithExecutors.contains(slaveId) &&
- cpus >= scheduler.CPUS_PER_TASK)
+ (mem >= minMemory && cpus >= 2 * scheduler.CPUS_PER_TASK) ||
+ (slaveHasExecutor(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
}
val workerOffers = usableOffers.map { o =>
- val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
- getResource(o.getResourcesList, "cpus").toInt
- } else {
- // If the executor doesn't exist yet, subtract CPU for executor
- // TODO(pwendell): Should below just subtract "1"?
- getResource(o.getResourcesList, "cpus").toInt -
- scheduler.CPUS_PER_TASK
- }
+ // If the executor doesn't exist yet, subtract CPU for executor
+ // TODO(pwendell): Should below just subtract "1"?
+ val slaveId = o.getSlaveId.getValue
+ val cpus1 = getResource(o.getResourcesList, "cpus").toInt
+ val cpus = if (slaveHasExecutor(slaveId)) cpus1 else cpus1 - scheduler.CPUS_PER_TASK
new WorkerOffer(
o.getSlaveId.getValue,
o.getHostname,
@@ -268,11 +194,12 @@ private[spark] class MesosSchedulerBackend(
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
mesosTasks.foreach { case (slaveId, tasks) =>
- slaveIdToWorkerOffer.get(slaveId).foreach(o =>
- listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
+ slaveIdToWorkerOffer.get(slaveId).foreach { o =>
+ val executorAdded = SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
// TODO: Add support for log urls for Mesos
- new ExecutorInfo(o.host, o.cores, Map.empty)))
- )
+ new ExecutorInfo(o.host, o.cores, Map.empty))
+ listenerBus.post(executorAdded)
+ }
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}
@@ -287,14 +214,6 @@ private[spark] class MesosSchedulerBackend(
}
}
- /** Helper function to pull out a resource from a Mesos Resources protobuf */
- def getResource(res: JList[Resource], name: String): Double = {
- for (r <- res if r.getName == name) {
- return r.getScalar.getValue
- }
- 0
- }
-
/** Turn a Spark TaskDescription into a Mesos task */
def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
@@ -313,19 +232,11 @@ private[spark] class MesosSchedulerBackend(
.build()
}
- /** Check whether a Mesos task state represents a finished task */
- def isFinished(state: MesosTaskState) = {
- state == MesosTaskState.TASK_FINISHED ||
- state == MesosTaskState.TASK_FAILED ||
- state == MesosTaskState.TASK_KILLED ||
- state == MesosTaskState.TASK_LOST
- }
-
- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
+ override def statusUpdate(d: SchedulerDriver, status: TaskStatus): Unit = {
inClassLoader() {
val tid = status.getTaskId.getValue.toLong
val state = TaskState.fromMesos(status.getState)
- synchronized {
+ stateLock.synchronized {
if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
// We lost the executor on this slave, so remember that it's gone
removeExecutor(taskIdToSlaveId(tid), "Lost executor")
@@ -338,36 +249,28 @@ private[spark] class MesosSchedulerBackend(
}
}
- override def error(d: SchedulerDriver, message: String) {
+ /** TODO: is wrapping in the separate class loader necessary? */
+ override def error(d: SchedulerDriver, message: String): Unit = {
inClassLoader() {
- logError("Mesos error: " + message)
- scheduler.error(message)
- }
- }
-
- override def stop() {
- if (driver != null) {
- driver.stop()
+ doError(d, message)
}
}
- override def reviveOffers() {
+ override def reviveOffers(): Unit = {
driver.reviveOffers()
}
- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
/**
* Remove executor associated with slaveId in a thread safe manner.
*/
- private def removeExecutor(slaveId: String, reason: String) = {
- synchronized {
+ private def removeExecutor(slaveId: String, reason: String): Unit = {
+ stateLock.synchronized {
listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason))
slaveIdsWithExecutors -= slaveId
}
}
- private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
+ private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason): Unit = {
inClassLoader() {
logInfo("Mesos slave lost: " + slaveId.getValue)
removeExecutor(slaveId.getValue, reason.toString)
@@ -375,12 +278,12 @@ private[spark] class MesosSchedulerBackend(
}
}
- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+ override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
recordSlaveLost(d, slaveId, SlaveLost())
}
override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
- slaveId: SlaveID, status: Int) {
+ slaveId: SlaveID, status: Int): Unit = {
logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
slaveId.getValue))
recordSlaveLost(d, slaveId, ExecutorExited(status))
@@ -393,13 +296,12 @@ private[spark] class MesosSchedulerBackend(
)
}
- // TODO: query Mesos for number of cores
- override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
+ // TODO: Not currently used.
+ def requestExecutors(numAdditionalExecutors: Int): Boolean = false
+ def requestTotalExecutors(numAdditionalExecutors: Int): Boolean = false
+ def doKillExecutors(executorIds: Seq[String]): Boolean = false
- override def applicationId(): String =
- Option(appId).getOrElse {
- logWarning("Application ID is not initialized yet.")
- super.applicationId
- }
+ // TODO: query Mesos for number of cores
+ override def defaultParallelism() = sparkContext.conf.getInt("spark.default.parallelism", 8)
}
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index bbed8ddc6bafc..82eed7f3b4431 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -21,7 +21,7 @@ import org.scalatest.{FunSuite, PrivateMethodTester}
import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
-import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
+import org.apache.spark.scheduler.cluster.mesos.{CoarseGrainedMesosSchedulerBackend, FineGrainedMesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
class SparkContextSchedulerCreationSuite
@@ -166,14 +166,14 @@ class SparkContextSchedulerCreationSuite
}
test("mesos fine-grained") {
- testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend], coarse = false)
+ testMesos("mesos://localhost:1234", classOf[FineGrainedMesosSchedulerBackend], coarse = false)
}
test("mesos coarse-grained") {
- testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend], coarse = true)
+ testMesos("mesos://localhost:1234", classOf[CoarseGrainedMesosSchedulerBackend], coarse = true)
}
test("mesos with zookeeper") {
- testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend], coarse = false)
+ testMesos("zk://localhost:1234,localhost:2345", classOf[FineGrainedMesosSchedulerBackend], coarse = false)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseGrainedMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseGrainedMesosSchedulerBackendSuite.scala
new file mode 100644
index 0000000000000..e7c1fbd3278f5
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseGrainedMesosSchedulerBackendSuite.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util
+import java.util.Collections
+
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+import org.apache.mesos.Protos.Value.Scalar
+import org.apache.mesos.Protos._
+import org.apache.mesos.SchedulerDriver
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.{ LocalSparkContext, SparkConf, SparkEnv, SparkContext }
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.{ ArgumentCaptor, Matchers }
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+
+import scala.collection.mutable
+
+class CoarseGrainedMesosSchedulerBackendSuite
+ extends FunSuite
+ with MesosSchedulerBackendSuiteHelper
+ with LocalSparkContext
+ with MockitoSugar {
+
+ protected def makeTestMesosSchedulerBackend(
+ taskScheduler: TaskSchedulerImpl): CoarseGrainedMesosSchedulerBackend = {
+ new CoarseGrainedMesosSchedulerBackend(taskScheduler, taskScheduler.sc, "master") {
+ override val driverUrl = ""
+ }
+ }
+
+ val (taskIDVal1, slaveIDVal1) = ("0", "s1")
+ val (taskIDVal2, slaveIDVal2) = ("1", "s2")
+
+ def makeMesosExecutorsTest(): (CommonMesosSchedulerBackend, SchedulerDriver) = {
+ val (backend, driver) = makeBackendAndDriver
+
+ val (minMem, minCPU) = minMemMinCPU(backend.sparkContext)
+
+ val mesosOffers1 = makeOffersList(makeOffer(taskIDVal1, slaveIDVal1, minMem, minCPU))
+
+ backend.resourceOffers(driver, mesosOffers1)
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers1.get(0).getId)),
+ any[util.Collection[TaskInfo]],
+ any[Filters])
+
+ // Verify we have one executor and the executor limit is 1.
+ assert(backend.slaveIdsWithExecutors.size === 1)
+ assert(backend.getExecutorLimit >= 1)
+
+ (backend, driver) // Return so this test can be embedded in others.
+ }
+
+ def killMesosExecutorDeprecateByOneTest(): (CommonMesosSchedulerBackend, SchedulerDriver) = {
+ val (backend, driver) = makeMesosExecutorsTest()
+
+ // Calling doKillExecutors should invoke driver.killTask.
+ val taskID1 = makeTaskID(taskIDVal1)
+ assert(backend.doKillExecutors(Seq(s"$slaveIDVal1/$taskIDVal1")))
+ verify(driver, times(1)).killTask(taskID1)
+ // Must invoke the status update explicitly here.
+ // TODO: can we mock other parts of the API so this can be called automatically?
+ backend.statusUpdate(driver, makeKilledTaskStatus(taskIDVal1, slaveIDVal1))
+
+ // Verify we don't have any executors.
+ assert(backend.slaveIdsWithExecutors.size === 0)
+ // Verify that the executor limit is now 0.
+ assert(backend.getExecutorLimit === 0)
+
+ val (minMem, minCPU) = minMemMinCPU(backend.sparkContext)
+ val mesosOffers2 = makeOffersList(makeOffer(taskIDVal2, slaveIDVal2, minMem, minCPU))
+ backend.resourceOffers(driver, mesosOffers2)
+
+ verify(driver, times(1))
+ .declineOffer(makeOfferID(taskIDVal2))
+
+ // Verify we didn't launch any new executor
+ assert(backend.slaveIdsWithExecutors.size === 0)
+ assert(backend.getExecutorLimit === 0)
+
+ (backend, driver) // Return so this test can be embedded in others.
+ }
+
+ def increaseAllowedMesosExecutorsTest(): (CommonMesosSchedulerBackend, SchedulerDriver) = {
+ val (backend, driver) = killMesosExecutorDeprecateByOneTest()
+
+ val (minMem, minCPU) = minMemMinCPU(backend.sparkContext)
+ val mesosOffers2 = makeOffersList(makeOffer(taskIDVal2, slaveIDVal2, minMem, minCPU))
+
+ // Now allow one more executor:
+ backend.requestExecutors(1)
+ backend.resourceOffers(driver, mesosOffers2)
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers2.get(0).getId)),
+ any[util.Collection[TaskInfo]],
+ any[Filters])
+
+ assert(backend.slaveIdsWithExecutors.size === 1)
+ assert(backend.getExecutorLimit >= 1)
+
+ (backend, driver) // Return so this test can be embedded in others.
+ }
+
+ def slaveLostDoesntChangeMaxAllowedMesosExecutorsTest(): Unit = {
+ val (backend, driver) = increaseAllowedMesosExecutorsTest()
+
+ backend.slaveLost(driver, makeSlaveID(slaveIDVal2))
+ assert(backend.slaveIdsWithExecutors.size === 0)
+ assert(backend.getExecutorLimit >= 1)
+ }
+
+ def killAndRelaunchTasksTest(): Unit = {
+ val (backend, driver) = makeBackendAndDriver
+ val (minMem, minCPU) = minMemMinCPU(backend.sparkContext, 1024)
+ val offer1 = makeOffer(taskIDVal1, slaveIDVal1, minMem, minCPU)
+ val mesosOffers = makeOffersList(offer1)
+
+ backend.resourceOffers(driver, mesosOffers)
+
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(offer1.getId)),
+ anyObject(),
+ anyObject[Filters])
+ assert(backend.slaveIdsWithExecutors.contains(slaveIDVal1))
+
+ backend.statusUpdate(driver, makeKilledTaskStatus(taskIDVal1, slaveIDVal1))
+ assert(!backend.slaveIdsWithExecutors.contains(slaveIDVal1))
+ assert(backend.slaveIdsWithExecutors.size === 0)
+ assert(backend.getExecutorLimit >= 1)
+
+ val offer2 = makeOffer(taskIDVal2, slaveIDVal2, minMem, 1)
+ mesosOffers.clear()
+ mesosOffers.add(offer2)
+ backend.resourceOffers(driver, mesosOffers)
+ assert(!backend.slaveIdsWithExecutors.contains(slaveIDVal1))
+ assert( backend.slaveIdsWithExecutors.contains(slaveIDVal2))
+ assert(backend.slaveIdsWithExecutors.size === 1)
+ assert(backend.getExecutorLimit >= 1)
+
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(offer2.getId)),
+ anyObject(),
+ anyObject[Filters])
+
+ verify(driver, times(1)).reviveOffers()
+ }
+
+ test("When Mesos offers resources, a Mesos executor is created.") {
+ makeMesosExecutorsTest()
+ }
+
+ test("When a Mesos executor is killed, the maximum number of allowed Mesos executors is deprecated by one") {
+ killMesosExecutorDeprecateByOneTest()
+ }
+
+ test("The maximum number of allowed Mesos executors can be increased by explicitly requesting new Mesos executors") {
+ increaseAllowedMesosExecutorsTest()
+ }
+
+ test("Losing a slave and its Mesos executor doesn't change the maximum allowed number of Mesos executors") {
+ slaveLostDoesntChangeMaxAllowedMesosExecutorsTest()
+ }
+
+ test("mesos supports killing and relaunching tasks with executors") {
+ killAndRelaunchTasksTest()
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/FineGrainedMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/FineGrainedMesosSchedulerBackendSuite.scala
new file mode 100644
index 0000000000000..4faac8d6662ef
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/FineGrainedMesosSchedulerBackendSuite.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.Collections
+import java.util.{ ArrayList => JArrayList }
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.mesos.SchedulerDriver
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.Value.Scalar
+import org.apache.mesos.Protos.{TaskState => MesosTaskState}
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+import org.mockito.{ArgumentCaptor, Matchers}
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
+import org.apache.spark.executor.MesosExecutorBackend
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded,
+ TaskDescription, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+
+class FineGrainedMesosSchedulerBackendSuite
+ extends FunSuite
+ with MesosSchedulerBackendSuiteHelper
+ with LocalSparkContext
+ with MockitoSugar {
+
+ protected def makeTestMesosSchedulerBackend(
+ taskScheduler: TaskSchedulerImpl): FineGrainedMesosSchedulerBackend = {
+ new FineGrainedMesosSchedulerBackend(taskScheduler, taskScheduler.sc, "master")
+ }
+
+ protected def makeTestOffers(sc: SparkContext): (Offer, Offer, Offer, Offer) = {
+ val (minMem, minCpu) = minMemMinCPU(sc)
+ val goodOffer1 = makeOffer("o1", "s1", minMem, minCpu)
+ val badOffer1 = makeOffer("o2", "s2", minMem - 1, minCpu) // memory will be too small.
+ val goodOffer2 = makeOffer("o3", "s3", minMem, minCpu)
+ val badOffer2 = makeOffer("o4", "s4", minMem, minCpu - 1) // CPUs will be too small.
+ (goodOffer1, badOffer1, goodOffer2, badOffer2)
+ }
+
+ protected def checkLaunchTask(
+ driver: SchedulerDriver, offer: Offer, expectedValue: Int): ArgumentCaptor[util.Collection[TaskInfo]] = {
+ val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]])
+ when(
+ driver.launchTasks(
+ Matchers.eq(Collections.singleton(offer.getId)),
+ capture.capture(),
+ any(classOf[Filters])
+ )
+ ).thenReturn(Status.valueOf(expectedValue))
+ capture
+ }
+
+ test("The spark-class location is correctly computed") {
+ val sc = makeMockSparkContext()
+ sc.conf.set("spark.mesos.executor.home" , "/mesos-home")
+
+ sc.listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+ when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
+
+ val taskScheduler = mock[TaskSchedulerImpl]
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
+
+ val mesosSchedulerBackend = new FineGrainedMesosSchedulerBackend(taskScheduler, sc, "master")
+
+ // uri is null.
+ val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id")
+ assert(executorInfo.getCommand.getValue === s""" "/mesos-home/bin/spark-class" ${classOf[MesosExecutorBackend].getName} """)
+
+ // uri exists.
+ sc.conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
+ val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id")
+ assert(executorInfo1.getCommand.getValue === s"""cd test-app-1*; "./bin/spark-class" ${classOf[MesosExecutorBackend].getName} """)
+ }
+
+ // The mock taskScheduler will only accept the first offer.
+ private val expectedTaskId1 = 1L
+ private val expectedTaskDescriptions =
+ Seq(Seq(new TaskDescription(expectedTaskId1, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))))
+
+ protected def offerResourcesHelper():
+ (CommonMesosSchedulerBackend, SchedulerDriver, ArgumentCaptor[util.Collection[TaskInfo]], JArrayList[Offer]) = {
+
+ val (backend, driver) = makeBackendAndDriver()
+ val taskScheduler = backend.scheduler
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
+
+ val sc = taskScheduler.sc // a mocked object
+ when(sc.getSparkHome()).thenReturn(Option("/path"))
+ sc.listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host_s1", 2, Map.empty)))
+
+ val (goodOffer1, badOffer1, goodOffer2, badOffer2) = makeTestOffers(sc)
+ val mesosOffers = makeOffersList(goodOffer1, badOffer1, goodOffer2, badOffer2)
+
+ val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
+ expectedWorkerOffers.append(new WorkerOffer(
+ goodOffer1.getSlaveId.getValue,
+ goodOffer1.getHostname,
+ 2
+ ))
+ expectedWorkerOffers.append(new WorkerOffer(
+ goodOffer2.getSlaveId.getValue,
+ goodOffer2.getHostname,
+ 2
+ ))
+
+ when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(expectedTaskDescriptions)
+
+ val capture = checkLaunchTask(driver, goodOffer1, 1)
+ when(driver.declineOffer(badOffer1.getId)).thenReturn(Status.valueOf(1))
+ when(driver.declineOffer(goodOffer2.getId)).thenReturn(Status.valueOf(1))
+ when(driver.declineOffer(badOffer2.getId)).thenReturn(Status.valueOf(1))
+
+ backend.resourceOffers(driver, mesosOffers)
+
+ (backend, driver, capture, mesosOffers)
+ }
+
+ test("When acceptable Mesos resource offers are received, tasks are launched for them") {
+
+ val (backend, driver, capture, mesosOffers) = offerResourcesHelper()
+ val goodOffer1 = mesosOffers.get(0)
+
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(goodOffer1.getId)),
+ capture.capture(),
+ any(classOf[Filters])
+ )
+
+ assert(capture.getValue.size() === 1)
+ val taskInfo = capture.getValue.iterator().next()
+ assert(taskInfo.getName === "n1")
+ val cpus = taskInfo.getResourcesList.get(0)
+ assert(cpus.getName === "cpus")
+ val actual = cpus.getScalar.getValue - 2.0
+ val delta = 0.00001
+ assert(actual >= - delta && actual <= delta)
+ assert(taskInfo.getSlaveId.getValue === "s1")
+ }
+
+ test("When unacceptable Mesos resource offers are received, no tasks are launched for them") {
+
+ val (backend, driver, capture, mesosOffers) = offerResourcesHelper()
+ val badOffer1 = mesosOffers.get(1)
+ val goodOffer2 = mesosOffers.get(2)
+ val badOffer2 = mesosOffers.get(3)
+
+ verify(driver, times(1)).declineOffer(badOffer1.getId)
+ verify(driver, times(1)).declineOffer(goodOffer2.getId)
+ verify(driver, times(1)).declineOffer(badOffer2.getId)
+ }
+
+ test("When acceptable Mesos resource offers are received for a node that already has an executor, they are declined") {
+
+ val (backend, driver, capture, mesosOffers) = offerResourcesHelper()
+ val goodOffer1 = mesosOffers.get(0)
+ val taskScheduler = backend.scheduler
+ resetTaskScheduler(taskScheduler)
+ reset(driver)
+
+ when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq()))
+ when(driver.declineOffer(goodOffer1.getId)).thenReturn(Status.valueOf(1))
+
+ backend.resourceOffers(driver, makeOffersList(goodOffer1))
+ verify(driver, times(1)).declineOffer(goodOffer1.getId)
+ }
+
+ test("When acceptable Mesos resource offers are received for a node that had an executor that is now gone, they are accepted") {
+
+ val (backend, driver, capture, mesosOffers) = offerResourcesHelper()
+ val goodOffer1 = mesosOffers.get(0)
+ val goodOffer2 = mesosOffers.get(1)
+ val slaveId1 = goodOffer1.getSlaveId.getValue
+ val taskScheduler = backend.scheduler
+
+ resetTaskScheduler(taskScheduler)
+ reset(driver)
+
+ // First, reconfirm that offers are rejected while the executor exists.
+ when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq()))
+ when(driver.declineOffer(goodOffer1.getId)).thenReturn(Status.valueOf(1))
+
+ backend.resourceOffers(driver, makeOffersList(goodOffer1))
+ verify(driver, times(1)).declineOffer(goodOffer1.getId)
+
+ // Now, kill the executor, re-offer, and confirm an offer is now accepted (again).
+ resetTaskScheduler(taskScheduler)
+ reset(driver)
+
+ val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
+ expectedWorkerOffers.append(new WorkerOffer(
+ goodOffer1.getSlaveId.getValue,
+ goodOffer1.getHostname,
+ 2
+ ))
+
+ when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(expectedTaskDescriptions)
+
+ backend.statusUpdate(driver, makeKilledTaskStatus(expectedTaskId1.toString, slaveId1, MesosTaskState.TASK_LOST))
+ assert(backend.slaveHasExecutor(slaveId1) === false)
+ checkLaunchTask(driver, goodOffer1, 1)
+
+ backend.resourceOffers(driver, makeOffersList(goodOffer1))
+ verify(driver, times(0)).declineOffer(goodOffer1.getId)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuiteHelper.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuiteHelper.scala
new file mode 100644
index 0000000000000..759c5465533a4
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuiteHelper.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util
+import java.util.Collections
+import java.util.{ ArrayList => JArrayList }
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+import org.apache.mesos.Protos.Value.Scalar
+import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
+import org.apache.mesos.SchedulerDriver
+import org.apache.spark.scheduler.{ LiveListenerBus, SchedulerBackend, TaskSchedulerImpl }
+import org.apache.spark.{ LocalSparkContext, SparkConf, SparkEnv, SparkContext }
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.{ ArgumentCaptor, Matchers }
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+
+import scala.collection.mutable
+
+trait MesosSchedulerBackendSuiteHelper {
+ self: FunSuite with LocalSparkContext with MockitoSugar =>
+
+ protected def makeTestMesosSchedulerBackend(
+ taskScheduler: TaskSchedulerImpl): CommonMesosSchedulerBackend
+
+ protected def makeOffer(offerId: String, slaveId: String, mem: Int, cpu: Int) = {
+ val builder = Offer.newBuilder()
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(mem))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(cpu))
+ builder.setId(OfferID.newBuilder().setValue(offerId).build())
+ .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+ .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
+ .setHostname(s"host_$slaveId").build()
+ }
+
+ protected def makeOffersList(offers: Offer*): JArrayList[Offer] = {
+ val mesosOffers = new JArrayList[Offer]
+ for (o <- offers) mesosOffers.add(o)
+ mesosOffers
+ }
+
+ protected def makeMockSparkContext(): SparkContext = {
+ val sparkConf = new SparkConf
+ sparkConf.set("spark.driver.host", "driverHost")
+ sparkConf.set("spark.driver.port", "1234")
+ val se = mock[SparkEnv]
+ val sc = mock[SparkContext]
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.getSparkHome()).thenReturn(Option("/path"))
+
+ val emptyHashMap = mutable.HashMap.empty[String, String]
+ when(sc.executorEnvs).thenReturn(emptyHashMap)
+ when(sc.conf).thenReturn(sparkConf)
+ when(sc.env).thenReturn(se)
+
+ val listenerBus = mock[LiveListenerBus]
+ when(sc.listenerBus).thenReturn(listenerBus)
+
+ sc
+ }
+
+ protected def resetTaskScheduler(taskScheduler: TaskSchedulerImpl): TaskSchedulerImpl = {
+ val sc = taskScheduler.sc
+ reset(taskScheduler)
+ when(taskScheduler.sc).thenReturn(sc)
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
+ taskScheduler
+ }
+
+ protected def makeMockEnvironment(): (SparkContext, TaskSchedulerImpl, SchedulerDriver) = {
+ val sc = makeMockSparkContext()
+ val driver = mock[SchedulerDriver]
+ val taskScheduler = mock[TaskSchedulerImpl]
+ when(taskScheduler.sc).thenReturn(sc)
+ (sc, taskScheduler, driver)
+ }
+
+ protected def makeBackendAndDriver(): (CommonMesosSchedulerBackend, SchedulerDriver) = {
+ val (sc, taskScheduler, driver) = makeMockEnvironment()
+ val backend = makeTestMesosSchedulerBackend(taskScheduler)
+ backend.driver = driver
+ (backend, driver)
+ }
+
+ protected def makeTaskID( id: String): TaskID = TaskID.newBuilder().setValue(id).build()
+ protected def makeSlaveID(id: String): SlaveID = SlaveID.newBuilder().setValue(id).build()
+ protected def makeOfferID(id: String): OfferID = OfferID.newBuilder().setValue(id).build()
+
+ // Simulate task killed message, signaling that an executor is no longer running.
+ protected def makeKilledTaskStatus(taskId: String, slaveId: String, state: MesosTaskState = MesosTaskState.TASK_KILLED) =
+ TaskStatus.newBuilder()
+ .setTaskId(makeTaskID(taskId))
+ .setSlaveId(makeSlaveID(slaveId))
+ .setState(state)
+ .build
+
+ protected def minMemMinCPU(sc: SparkContext, extraMemory: Int = 0, numCores: Int = 4): (Int,Int) =
+ (MemoryUtils.calculateTotalMemory(sc).toInt + extraMemory, numCores)
+
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosTaskLaunchDataSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
similarity index 96%
rename from core/src/test/scala/org/apache/spark/scheduler/mesos/MesosTaskLaunchDataSuite.scala
rename to core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
index 86a42a7398e4d..f2cb02a06870c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosTaskLaunchDataSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.mesos
+package org.apache.spark.scheduler.cluster.mesos
import java.nio.ByteBuffer
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
deleted file mode 100644
index afbaa9ade811f..0000000000000
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.mesos
-
-import java.nio.ByteBuffer
-import java.util
-import java.util.Collections
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.mesos.SchedulerDriver
-import org.apache.mesos.Protos._
-import org.apache.mesos.Protos.Value.Scalar
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-import org.mockito.{ArgumentCaptor, Matchers}
-import org.scalatest.FunSuite
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
-import org.apache.spark.executor.MesosExecutorBackend
-import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded,
- TaskDescription, TaskSchedulerImpl, WorkerOffer}
-import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.scheduler.cluster.mesos.{MesosSchedulerBackend, MemoryUtils}
-
-class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with MockitoSugar {
-
- test("check spark-class location correctly") {
- val conf = new SparkConf
- conf.set("spark.mesos.executor.home" , "/mesos-home")
-
- val listenerBus = mock[LiveListenerBus]
- listenerBus.post(
- SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
-
- val sc = mock[SparkContext]
- when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
-
- when(sc.conf).thenReturn(conf)
- when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
- when(sc.executorMemory).thenReturn(100)
- when(sc.listenerBus).thenReturn(listenerBus)
- val taskScheduler = mock[TaskSchedulerImpl]
- when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
-
- val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master")
-
- // uri is null.
- val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id")
- assert(executorInfo.getCommand.getValue === s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
-
- // uri exists.
- conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
- val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id")
- assert(executorInfo1.getCommand.getValue === s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
- }
-
- test("mesos resource offers result in launching tasks") {
- def createOffer(id: Int, mem: Int, cpu: Int) = {
- val builder = Offer.newBuilder()
- builder.addResourcesBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(mem))
- builder.addResourcesBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(cpu))
- builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
- .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build()
- }
-
- val driver = mock[SchedulerDriver]
- val taskScheduler = mock[TaskSchedulerImpl]
-
- val listenerBus = mock[LiveListenerBus]
- listenerBus.post(
- SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
-
- val sc = mock[SparkContext]
- when(sc.executorMemory).thenReturn(100)
- when(sc.getSparkHome()).thenReturn(Option("/path"))
- when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
- when(sc.conf).thenReturn(new SparkConf)
- when(sc.listenerBus).thenReturn(listenerBus)
-
- val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
- val minCpu = 4
-
- val mesosOffers = new java.util.ArrayList[Offer]
- mesosOffers.add(createOffer(1, minMem, minCpu))
- mesosOffers.add(createOffer(2, minMem - 1, minCpu))
- mesosOffers.add(createOffer(3, minMem, minCpu))
-
- val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
-
- val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
- expectedWorkerOffers.append(new WorkerOffer(
- mesosOffers.get(0).getSlaveId.getValue,
- mesosOffers.get(0).getHostname,
- 2
- ))
- expectedWorkerOffers.append(new WorkerOffer(
- mesosOffers.get(2).getSlaveId.getValue,
- mesosOffers.get(2).getHostname,
- 2
- ))
- val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
- when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
- when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
-
- val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]])
- when(
- driver.launchTasks(
- Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
- capture.capture(),
- any(classOf[Filters])
- )
- ).thenReturn(Status.valueOf(1))
- when(driver.declineOffer(mesosOffers.get(1).getId)).thenReturn(Status.valueOf(1))
- when(driver.declineOffer(mesosOffers.get(2).getId)).thenReturn(Status.valueOf(1))
-
- backend.resourceOffers(driver, mesosOffers)
-
- verify(driver, times(1)).launchTasks(
- Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
- capture.capture(),
- any(classOf[Filters])
- )
- verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId)
- verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId)
- assert(capture.getValue.size() == 1)
- val taskInfo = capture.getValue.iterator().next()
- assert(taskInfo.getName.equals("n1"))
- val cpus = taskInfo.getResourcesList.get(0)
- assert(cpus.getName.equals("cpus"))
- assert(cpus.getScalar.getValue.equals(2.0))
- assert(taskInfo.getSlaveId.getValue.equals("s1"))
-
- // Unwanted resources offered on an existing node. Make sure they are declined
- val mesosOffers2 = new java.util.ArrayList[Offer]
- mesosOffers2.add(createOffer(1, minMem, minCpu))
- reset(taskScheduler)
- reset(driver)
- when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq()))
- when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
- when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1))
-
- backend.resourceOffers(driver, mesosOffers2)
- verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId)
- }
-}