Skip to content

Commit 17f93a2

Browse files
committed
Fix head of line blocking in scheduling drivers.
1 parent 6ff8e5c commit 17f93a2

File tree

7 files changed

+87
-94
lines changed

7 files changed

+87
-94
lines changed

core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,19 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
5858

5959
private def queuedRow(submission: MesosDriverDescription): Seq[Node] = {
6060
<tr>
61-
<td>{submission.submissionId}</td>
62-
<td>{submission.submissionDate}</td>
61+
<td>{submission.submissionId.get}</td>
62+
<td>{submission.submissionDate.get}</td>
6363
<td>{submission.command.mainClass}</td>
6464
<td>cpus: {submission.cores}, mem: {submission.mem}</td>
6565
</tr>
6666
}
6767

6868
private def driverRow(state: MesosClusterTaskState): Seq[Node] = {
6969
<tr>
70-
<td>{state.submission.submissionId}</td>
71-
<td>{state.submission.submissionDate}</td>
70+
<td>{state.submission.submissionId.get}</td>
71+
<td>{state.submission.submissionDate.get}</td>
7272
<td>{state.submission.command.mainClass}</td>
73+
<td>cpus: {state.submission.cores}, mem: {state.submission.mem}</td>
7374
<td>{state.startDate}</td>
7475
<td>{state.slaveId.getValue}</td>
7576
<td>{stateString(state.taskState)}</td>
@@ -78,8 +79,8 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
7879

7980
private def retryRow(state: RetryState): Seq[Node] = {
8081
<tr>
81-
<td>{state.submission.submissionId}</td>
82-
<td>{state.submission.submissionDate}</td>
82+
<td>{state.submission.submissionId.get}</td>
83+
<td>{state.submission.submissionDate.get}</td>
8384
<td>{state.submission.command.mainClass}</td>
8485
<td>{state.lastFailureStatus}</td>
8586
<td>{state.nextRetry}</td>

core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private[rest] class CreateSubmissionRequest extends SubmitRestProtocolRequest {
6161
assertProperty[Boolean](key, "boolean", _.toBoolean)
6262

6363
private def assertPropertyIsNumeric(key: String): Unit =
64-
assertProperty[Int](key, "numeric", _.toInt)
64+
assertProperty[Double](key, "numeric", _.toDouble)
6565

6666
private def assertPropertyIsMemory(key: String): Unit =
6767
assertProperty[Int](key, "memory", Utils.memoryStringToMb)

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/DriverQueue.scala

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,30 +62,17 @@ private[mesos] class DriverQueue(state: MesosClusterPersistenceEngine, capacity:
6262
}
6363

6464
def remove(submissionId: String): Boolean = {
65-
val removed = queue.dequeueFirst(d => d.submissionId.equals(submissionId))
66-
if (removed.isDefined) {
67-
state.expunge(removed.get.submissionId.get)
65+
val removed = queue.dequeueFirst { d =>
66+
d.submissionId.map(_.equals(submissionId)).getOrElse(false)
6867
}
69-
68+
if (removed.isDefined) state.expunge(removed.get.submissionId.get)
7069
removed.isDefined
7170
}
7271

73-
def peek(): Option[MesosDriverDescription] = {
74-
queue.headOption
75-
}
76-
77-
def poll(): Option[MesosDriverDescription] = {
78-
if (queue.isEmpty) {
79-
None
80-
} else {
81-
val item = queue.dequeue()
82-
state.expunge(item.submissionId.get)
83-
Some(item)
84-
}
85-
}
72+
def drivers: Seq[MesosDriverDescription] = queue
8673

8774
// Returns a copy of the queued drivers.
88-
def drivers: Iterable[MesosDriverDescription] = {
75+
def copyDrivers: Iterable[MesosDriverDescription] = {
8976
val buffer = new Array[MesosDriverDescription](queue.size)
9077
queue.copyToArray(buffer)
9178
buffer

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 47 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ private[spark] class MesosClusterTaskState(
7676
*/
7777
private[spark] class MesosClusterSchedulerState(
7878
val frameworkId: String,
79-
val masterUrl: String,
79+
val masterUrl: Option[String],
8080
val queuedDrivers: Iterable[MesosDriverDescription],
8181
val launchedDrivers: Iterable[MesosClusterTaskState],
8282
val finishedDrivers: Iterable[MesosClusterTaskState],
@@ -135,7 +135,7 @@ private[spark] class MesosClusterSchedulerDriver(
135135
// All supervised drivers that are waiting to retry after termination.
136136
var superviseRetryList: SuperviseRetryList = _
137137

138-
private var masterInfo: MasterInfo = _
138+
private var masterInfo: Option[MasterInfo] = None
139139

140140
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
141141
private def newDriverId(submitDate: Date): String = {
@@ -252,7 +252,7 @@ private[spark] class MesosClusterSchedulerDriver(
252252
markRegistered()
253253

254254
stateLock.synchronized {
255-
this.masterInfo = masterInfo
255+
this.masterInfo = Some(masterInfo)
256256
if (!launchedDrivers.pendingRecover.isEmpty) {
257257
// Start task reconciliation if we need to recover.
258258
val statuses = launchedDrivers.pendingRecover.collect {
@@ -350,20 +350,14 @@ private[spark] class MesosClusterSchedulerDriver(
350350
getResource(o.getResourcesList, "cpus"),
351351
getResource(o.getResourcesList, "mem"))
352352
}
353-
logTrace(s"Received offers from Mesos: ${printOffers(currentOffers)}")
353+
logTrace(s"Received offers from Mesos: \n${printOffers(currentOffers)}")
354354
val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
355355
val currentTime = new Date()
356-
357356
def scheduleTasks(
358-
taskFunc: () => (Option[MesosDriverDescription], Option[RetryState]),
357+
tasksFunc: () => Seq[(MesosDriverDescription, Option[RetryState])],
359358
scheduledCallback: (String) => Unit): Unit = {
360-
var nextItem = taskFunc()
361-
// TODO: We should not stop scheduling at the very first task
362-
// that cannot be scheduled. Instead we should exhaust the
363-
// candidate list and remove drivers that cannot scheduled
364-
// over a configurable period of time.
365-
while (nextItem._1.isDefined) {
366-
val (submission, retryState) = (nextItem._1.get, nextItem._2)
359+
val candidates = tasksFunc()
360+
for ((submission, retryState) <- candidates) {
367361
val driverCpu = submission.cores
368362
val driverMem = submission.mem
369363
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
@@ -374,56 +368,54 @@ private[spark] class MesosClusterSchedulerDriver(
374368
if (offerOption.isEmpty) {
375369
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId.get}," +
376370
s"cpu: $driverCpu, mem: $driverMem")
377-
return
378-
}
379-
380-
val offer = offerOption.get
381-
offer.cpu -= driverCpu
382-
offer.mem -= driverMem
383-
val taskId = TaskID.newBuilder().setValue(submission.submissionId.get).build()
384-
val cpuResource = Resource.newBuilder()
385-
.setName("cpus").setType(Value.Type.SCALAR)
386-
.setScalar(Value.Scalar.newBuilder().setValue(driverCpu)).build()
387-
val memResource = Resource.newBuilder()
388-
.setName("mem").setType(Value.Type.SCALAR)
389-
.setScalar(Value.Scalar.newBuilder().setValue(driverMem)).build()
390-
val commandInfo = buildCommand(submission)
391-
val appName = submission.schedulerProperties("spark.app.name")
392-
val taskInfo = TaskInfo.newBuilder()
393-
.setTaskId(taskId)
394-
.setName(s"Driver for $appName")
395-
.setSlaveId(offer.offer.getSlaveId)
396-
.setCommand(commandInfo)
397-
.addResources(cpuResource)
398-
.addResources(memResource)
399-
.build
400-
val queuedTasks = if (!tasks.contains(offer.offer.getId)) {
401-
val buffer = new ArrayBuffer[TaskInfo]
402-
tasks(offer.offer.getId) = buffer
403-
buffer
404371
} else {
405-
tasks(offer.offer.getId)
372+
val offer = offerOption.get
373+
offer.cpu -= driverCpu
374+
offer.mem -= driverMem
375+
val taskId = TaskID.newBuilder().setValue(submission.submissionId.get).build()
376+
val cpuResource = Resource.newBuilder()
377+
.setName("cpus").setType(Value.Type.SCALAR)
378+
.setScalar(Value.Scalar.newBuilder().setValue(driverCpu)).build()
379+
val memResource = Resource.newBuilder()
380+
.setName("mem").setType(Value.Type.SCALAR)
381+
.setScalar(Value.Scalar.newBuilder().setValue(driverMem)).build()
382+
val commandInfo = buildCommand(submission)
383+
val appName = submission.schedulerProperties("spark.app.name")
384+
val taskInfo = TaskInfo.newBuilder()
385+
.setTaskId(taskId)
386+
.setName(s"Driver for $appName")
387+
.setSlaveId(offer.offer.getSlaveId)
388+
.setCommand(commandInfo)
389+
.addResources(cpuResource)
390+
.addResources(memResource)
391+
.build
392+
val queuedTasks = if (!tasks.contains(offer.offer.getId)) {
393+
val buffer = new ArrayBuffer[TaskInfo]
394+
tasks(offer.offer.getId) = buffer
395+
buffer
396+
} else {
397+
tasks(offer.offer.getId)
398+
}
399+
queuedTasks += taskInfo
400+
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
401+
submission.submissionId.get)
402+
403+
launchedDrivers.set(
404+
submission.submissionId.get,
405+
new MesosClusterTaskState(submission, taskId, offer.offer.getSlaveId,
406+
None, new Date(), retryState))
407+
scheduledCallback(submission.submissionId.get)
406408
}
407-
queuedTasks += taskInfo
408-
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
409-
submission.submissionId.get)
410-
411-
launchedDrivers.set(
412-
submission.submissionId.get,
413-
new MesosClusterTaskState(submission, taskId, offer.offer.getSlaveId,
414-
None, new Date(), retryState))
415-
scheduledCallback(submission.submissionId.get)
416-
nextItem = taskFunc()
417409
}
418410
}
419411

420412
stateLock.synchronized {
421413
scheduleTasks(() => {
422-
superviseRetryList.getNextRetry(currentTime)
414+
superviseRetryList.getNextRetries(currentTime)
423415
}, (id: String) => {
424416
superviseRetryList.remove(id)
425417
})
426-
scheduleTasks(() => (queue.peek(), None), (_) => queue.poll())
418+
scheduleTasks(() => queue.drivers.map(d => (d, None)), (id) => queue.remove(id))
427419
}
428420

429421
tasks.foreach { case (offerId, tasks) =>
@@ -439,8 +431,8 @@ private[spark] class MesosClusterSchedulerDriver(
439431
stateLock.synchronized {
440432
new MesosClusterSchedulerState(
441433
frameworkId,
442-
s"http://${masterInfo.getIp}:${masterInfo.getPort}",
443-
queue.drivers,
434+
masterInfo.map(m => s"http://${m.getIp}:${m.getPort}"),
435+
queue.copyDrivers,
444436
launchedDrivers.states,
445437
finishedDrivers.collect { case s => s.copy() },
446438
superviseRetryList.retries)

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
2020
import java.util.List
2121
import java.util.concurrent.CountDownLatch
2222

23-
import org.apache.mesos.Protos.{FrameworkInfo, Resource}
23+
import org.apache.mesos.Protos.{Status, FrameworkInfo, Resource}
2424
import org.apache.mesos.{MesosSchedulerDriver, Scheduler, SchedulerDriver}
2525
import org.apache.spark.Logging
2626

@@ -64,6 +64,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
6464
try {
6565
val ret = driver.run()
6666
logInfo("driver.run() returned with code " + ret)
67+
onDriverExit(ret)
6768
} catch {
6869
case e: Exception => logError("driver.run() failed", e)
6970
}
@@ -74,6 +75,14 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
7475
}
7576
}
7677

78+
def onDriverExit(status: Status): Unit = {
79+
// Exit the process when the Mesos framework driver was aborted.
80+
// This behavior can be overriden by the scheduler.
81+
if (status.equals(Status.DRIVER_ABORTED)) {
82+
System.exit(1)
83+
}
84+
}
85+
7786
/**
7887
* Waits for the scheduler to be registered, which the scheduler will signal by calling
7988
* markRegistered().

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/SuperviseRetryList.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,28 +59,32 @@ private[mesos] class SuperviseRetryList(state: MesosClusterPersistenceEngine) {
5959

6060
def size: Int = drivers.size
6161

62-
def contains(submissionId: String): Boolean =
63-
drivers.exists(d => d.submission.submissionId.equals(submissionId))
64-
65-
def getNextRetry(currentTime: Date): (Option[MesosDriverDescription], Option[RetryState]) = {
66-
val retry = drivers.find(d => d.nextRetry.before(currentTime))
67-
if (retry.isDefined) {
68-
(Some(retry.get.submission), retry)
69-
} else {
70-
(None, None)
62+
def contains(submissionId: String): Boolean = {
63+
drivers.exists { d =>
64+
d.submission.submissionId.map(_.equals(submissionId)).getOrElse(false)
65+
}
66+
}
67+
68+
def getNextRetries(currentTime: Date): Seq[(MesosDriverDescription, Option[RetryState])] = {
69+
drivers.filter(d => d.nextRetry.before(currentTime)).map { d =>
70+
(d.submission, Some(d))
7171
}
7272
}
7373

7474
def get(submissionId: String): Option[RetryState] = {
75-
drivers.find(d => d.submission.submissionId.equals(submissionId))
75+
drivers.find { d =>
76+
d.submission.submissionId.map(_.equals(submissionId)).getOrElse(false)
77+
}
7678
}
7779

7880
def retries: Iterable[RetryState] = {
7981
drivers.map(d => d.copy).toList
8082
}
8183

8284
def remove(submissionId: String): Boolean = {
83-
val index = drivers.indexWhere(s => s.submission.submissionId.equals(submissionId))
85+
val index = drivers.indexWhere { s =>
86+
s.submission.submissionId.map(_.equals(submissionId)).getOrElse(false)
87+
}
8488

8589
if (index != -1) {
8690
drivers.remove(index)

core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ class MesosClusterSchedulerSuite extends FunSuite with LocalSparkContext with Mo
4949
assert(response2.success)
5050

5151
val state = scheduler.getState()
52-
assert(state.queuedDrivers.exists(d => d.submissionId == response.submissionId))
53-
assert(state.queuedDrivers.exists(d => d.submissionId == response2.submissionId))
52+
assert(state.queuedDrivers.exists(d => d.submissionId.get == response.submissionId))
53+
assert(state.queuedDrivers.exists(d => d.submissionId.get == response2.submissionId))
5454
}
5555

5656
test("can kill queued drivers") {

0 commit comments

Comments
 (0)