Skip to content

Commit b801f93

Browse files
committed
Fix removing queued driver in mesos cluster mode.
1 parent b0dbaec commit b801f93

File tree

1 file changed

+11
-8
lines changed

1 file changed

+11
-8
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -507,14 +507,16 @@ private[spark] class MesosClusterScheduler(
507507
val driversToRetry = pendingRetryDrivers.filter { d =>
508508
d.retryState.get.nextRetry.before(currentTime)
509509
}
510+
510511
scheduleTasks(
511-
driversToRetry,
512+
copyBuffer(driversToRetry),
512513
removeFromPendingRetryDrivers,
513514
currentOffers,
514515
tasks)
516+
515517
// Then we walk through the queued drivers and try to schedule them.
516518
scheduleTasks(
517-
queuedDrivers,
519+
copyBuffer(queuedDrivers),
518520
removeFromQueuedDrivers,
519521
currentOffers,
520522
tasks)
@@ -527,13 +529,14 @@ private[spark] class MesosClusterScheduler(
527529
.foreach(o => driver.declineOffer(o.getId))
528530
}
529531

532+
private def copyBuffer(
533+
buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
534+
val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
535+
buffer.copyToBuffer(newBuffer)
536+
newBuffer
537+
}
538+
530539
def getSchedulerState(): MesosClusterSchedulerState = {
531-
def copyBuffer(
532-
buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
533-
val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
534-
buffer.copyToBuffer(newBuffer)
535-
newBuffer
536-
}
537540
stateLock.synchronized {
538541
new MesosClusterSchedulerState(
539542
frameworkId,

0 commit comments

Comments
 (0)