Skip to content

Commit a3ed2c3

Browse files
tnachenAndrew Or
authored andcommitted
[SPARK-10124] [MESOS] Fix removing queued driver in mesos cluster mode.
Currently the spark applications can be queued to the Mesos cluster dispatcher, but when multiple jobs are in queue we don't handle removing jobs from the buffer correctly while iterating and causes null pointer exception. This patch copies the buffer before iterating them, so exceptions aren't thrown when the jobs are removed. Author: Timothy Chen <[email protected]> Closes #8322 from tnachen/fix_cluster_mode. (cherry picked from commit 73431d8) Signed-off-by: Andrew Or <[email protected]>
1 parent 16414da commit a3ed2c3

File tree

1 file changed

+11
-8
lines changed

1 file changed

+11
-8
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -507,14 +507,16 @@ private[spark] class MesosClusterScheduler(
507507
val driversToRetry = pendingRetryDrivers.filter { d =>
508508
d.retryState.get.nextRetry.before(currentTime)
509509
}
510+
510511
scheduleTasks(
511-
driversToRetry,
512+
copyBuffer(driversToRetry),
512513
removeFromPendingRetryDrivers,
513514
currentOffers,
514515
tasks)
516+
515517
// Then we walk through the queued drivers and try to schedule them.
516518
scheduleTasks(
517-
queuedDrivers,
519+
copyBuffer(queuedDrivers),
518520
removeFromQueuedDrivers,
519521
currentOffers,
520522
tasks)
@@ -527,13 +529,14 @@ private[spark] class MesosClusterScheduler(
527529
.foreach(o => driver.declineOffer(o.getId))
528530
}
529531

532+
private def copyBuffer(
533+
buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
534+
val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
535+
buffer.copyToBuffer(newBuffer)
536+
newBuffer
537+
}
538+
530539
def getSchedulerState(): MesosClusterSchedulerState = {
531-
def copyBuffer(
532-
buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
533-
val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
534-
buffer.copyToBuffer(newBuffer)
535-
newBuffer
536-
}
537540
stateLock.synchronized {
538541
new MesosClusterSchedulerState(
539542
frameworkId,

0 commit comments

Comments
 (0)