@@ -65,6 +65,9 @@ private[spark] class ExecutorAllocationManager(
65
65
listenerBus : LiveListenerBus ,
66
66
conf : SparkConf )
67
67
extends Logging {
68
+
69
+ allocationManager =>
70
+
68
71
import ExecutorAllocationManager ._
69
72
70
73
// Lower and upper bounds on the number of executors. These are required.
@@ -121,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
121
124
private var clock : Clock = new RealClock
122
125
123
126
// Listener for Spark events that impact the allocation policy
124
- private val listener = new ExecutorAllocationListener ( this )
127
+ private val listener = new ExecutorAllocationListener
125
128
126
129
/**
127
130
* Verify that the settings specified through the config are valid.
@@ -209,11 +212,12 @@ private[spark] class ExecutorAllocationManager(
209
212
addTime += sustainedSchedulerBacklogTimeout * 1000
210
213
}
211
214
212
- removeTimes.foreach { case (executorId, expireTime) =>
213
- if (now >= expireTime) {
215
+ removeTimes.retain { case (executorId, expireTime) =>
216
+ val expired = now >= expireTime
217
+ if (expired) {
214
218
removeExecutor(executorId)
215
- removeTimes.remove(executorId)
216
219
}
220
+ ! expired
217
221
}
218
222
}
219
223
@@ -291,7 +295,7 @@ private[spark] class ExecutorAllocationManager(
291
295
// Do not kill the executor if we have already reached the lower bound
292
296
val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
293
297
if (numExistingExecutors - 1 < minNumExecutors) {
294
- logInfo (s " Not removing idle executor $executorId because there are only " +
298
+ logDebug (s " Not removing idle executor $executorId because there are only " +
295
299
s " $numExistingExecutors executor(s) left (limit $minNumExecutors) " )
296
300
return false
297
301
}
@@ -315,7 +319,11 @@ private[spark] class ExecutorAllocationManager(
315
319
private def onExecutorAdded (executorId : String ): Unit = synchronized {
316
320
if (! executorIds.contains(executorId)) {
317
321
executorIds.add(executorId)
318
- executorIds.foreach(onExecutorIdle)
322
+ // If an executor (call this executor X) is not removed because the lower bound
323
+ // has been reached, it will no longer be marked as idle. When new executors join,
324
+ // however, we are no longer at the lower bound, and so we must mark executor X
325
+ // as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
326
+ executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
319
327
logInfo(s " New executor $executorId has registered (new total is ${executorIds.size}) " )
320
328
if (numExecutorsPending > 0 ) {
321
329
numExecutorsPending -= 1
@@ -373,10 +381,14 @@ private[spark] class ExecutorAllocationManager(
373
381
* the executor is not already marked as idle.
374
382
*/
375
383
private def onExecutorIdle (executorId : String ): Unit = synchronized {
376
- if (! removeTimes.contains(executorId) && ! executorsPendingToRemove.contains(executorId)) {
377
- logDebug(s " Starting idle timer for $executorId because there are no more tasks " +
378
- s " scheduled to run on the executor (to expire in $executorIdleTimeout seconds) " )
379
- removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
384
+ if (executorIds.contains(executorId)) {
385
+ if (! removeTimes.contains(executorId) && ! executorsPendingToRemove.contains(executorId)) {
386
+ logDebug(s " Starting idle timer for $executorId because there are no more tasks " +
387
+ s " scheduled to run on the executor (to expire in $executorIdleTimeout seconds) " )
388
+ removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
389
+ }
390
+ } else {
391
+ logWarning(s " Attempted to mark unknown executor $executorId idle " )
380
392
}
381
393
}
382
394
@@ -396,25 +408,24 @@ private[spark] class ExecutorAllocationManager(
396
408
* and consistency of events returned by the listener. For simplicity, it does not account
397
409
* for speculated tasks.
398
410
*/
399
- private class ExecutorAllocationListener (allocationManager : ExecutorAllocationManager )
400
- extends SparkListener {
411
+ private class ExecutorAllocationListener extends SparkListener {
401
412
402
413
private val stageIdToNumTasks = new mutable.HashMap [Int , Int ]
403
414
private val stageIdToTaskIndices = new mutable.HashMap [Int , mutable.HashSet [Int ]]
404
415
private val executorIdToTaskIds = new mutable.HashMap [String , mutable.HashSet [Long ]]
405
416
406
417
override def onStageSubmitted (stageSubmitted : SparkListenerStageSubmitted ): Unit = {
407
- synchronized {
408
- val stageId = stageSubmitted.stageInfo.stageId
409
- val numTasks = stageSubmitted.stageInfo.numTasks
418
+ val stageId = stageSubmitted.stageInfo.stageId
419
+ val numTasks = stageSubmitted.stageInfo.numTasks
420
+ allocationManager. synchronized {
410
421
stageIdToNumTasks(stageId) = numTasks
411
422
allocationManager.onSchedulerBacklogged()
412
423
}
413
424
}
414
425
415
426
override def onStageCompleted (stageCompleted : SparkListenerStageCompleted ): Unit = {
416
- synchronized {
417
- val stageId = stageCompleted.stageInfo.stageId
427
+ val stageId = stageCompleted.stageInfo.stageId
428
+ allocationManager. synchronized {
418
429
stageIdToNumTasks -= stageId
419
430
stageIdToTaskIndices -= stageId
420
431
@@ -426,47 +437,62 @@ private[spark] class ExecutorAllocationManager(
426
437
}
427
438
}
428
439
429
- override def onTaskStart (taskStart : SparkListenerTaskStart ): Unit = synchronized {
440
+ override def onTaskStart (taskStart : SparkListenerTaskStart ): Unit = {
430
441
val stageId = taskStart.stageId
431
442
val taskId = taskStart.taskInfo.taskId
432
443
val taskIndex = taskStart.taskInfo.index
433
444
val executorId = taskStart.taskInfo.executorId
434
445
435
- // If this is the last pending task, mark the scheduler queue as empty
436
- stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet [Int ]) += taskIndex
437
- val numTasksScheduled = stageIdToTaskIndices(stageId).size
438
- val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, - 1 )
439
- if (numTasksScheduled == numTasksTotal) {
440
- // No more pending tasks for this stage
441
- stageIdToNumTasks -= stageId
442
- if (stageIdToNumTasks.isEmpty) {
443
- allocationManager.onSchedulerQueueEmpty()
446
+ allocationManager.synchronized {
447
+ // This guards against the race condition in which the `SparkListenerTaskStart`
448
+ // event is posted before the `SparkListenerBlockManagerAdded` event, which is
449
+ // possible because these events are posted in different threads. (see SPARK-4951)
450
+ if (! allocationManager.executorIds.contains(executorId)) {
451
+ allocationManager.onExecutorAdded(executorId)
452
+ }
453
+
454
+ // If this is the last pending task, mark the scheduler queue as empty
455
+ stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet [Int ]) += taskIndex
456
+ val numTasksScheduled = stageIdToTaskIndices(stageId).size
457
+ val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, - 1 )
458
+ if (numTasksScheduled == numTasksTotal) {
459
+ // No more pending tasks for this stage
460
+ stageIdToNumTasks -= stageId
461
+ if (stageIdToNumTasks.isEmpty) {
462
+ allocationManager.onSchedulerQueueEmpty()
463
+ }
444
464
}
445
- }
446
465
447
- // Mark the executor on which this task is scheduled as busy
448
- executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet [Long ]) += taskId
449
- allocationManager.onExecutorBusy(executorId)
466
+ // Mark the executor on which this task is scheduled as busy
467
+ executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet [Long ]) += taskId
468
+ allocationManager.onExecutorBusy(executorId)
469
+ }
450
470
}
451
471
452
- override def onTaskEnd (taskEnd : SparkListenerTaskEnd ): Unit = synchronized {
472
+ override def onTaskEnd (taskEnd : SparkListenerTaskEnd ): Unit = {
453
473
val executorId = taskEnd.taskInfo.executorId
454
474
val taskId = taskEnd.taskInfo.taskId
455
-
456
- // If the executor is no longer running scheduled any tasks, mark it as idle
457
- if (executorIdToTaskIds.contains(executorId)) {
458
- executorIdToTaskIds(executorId) -= taskId
459
- if (executorIdToTaskIds(executorId).isEmpty) {
460
- executorIdToTaskIds -= executorId
461
- allocationManager.onExecutorIdle(executorId)
475
+ allocationManager.synchronized {
476
+ // If the executor is no longer running scheduled any tasks, mark it as idle
477
+ if (executorIdToTaskIds.contains(executorId)) {
478
+ executorIdToTaskIds(executorId) -= taskId
479
+ if (executorIdToTaskIds(executorId).isEmpty) {
480
+ executorIdToTaskIds -= executorId
481
+ allocationManager.onExecutorIdle(executorId)
482
+ }
462
483
}
463
484
}
464
485
}
465
486
466
487
override def onBlockManagerAdded (blockManagerAdded : SparkListenerBlockManagerAdded ): Unit = {
467
488
val executorId = blockManagerAdded.blockManagerId.executorId
468
489
if (executorId != SparkContext .DRIVER_IDENTIFIER ) {
469
- allocationManager.onExecutorAdded(executorId)
490
+ // This guards against the race condition in which the `SparkListenerTaskStart`
491
+ // event is posted before the `SparkListenerBlockManagerAdded` event, which is
492
+ // possible because these events are posted in different threads. (see SPARK-4951)
493
+ if (! allocationManager.executorIds.contains(executorId)) {
494
+ allocationManager.onExecutorAdded(executorId)
495
+ }
470
496
}
471
497
}
472
498
@@ -478,12 +504,23 @@ private[spark] class ExecutorAllocationManager(
478
504
/**
479
505
* An estimate of the total number of pending tasks remaining for currently running stages. Does
480
506
* not account for tasks which may have failed and been resubmitted.
507
+ *
508
+ * Note: This is not thread-safe without the caller owning the `allocationManager` lock.
481
509
*/
482
510
def totalPendingTasks (): Int = {
483
511
stageIdToNumTasks.map { case (stageId, numTasks) =>
484
512
numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0 )
485
513
}.sum
486
514
}
515
+
516
+ /**
517
+ * Return true if an executor is not currently running a task, and false otherwise.
518
+ *
519
+ * Note: This is not thread-safe without the caller owning the `allocationManager` lock.
520
+ */
521
+ def isExecutorIdle (executorId : String ): Boolean = {
522
+ ! executorIdToTaskIds.contains(executorId)
523
+ }
487
524
}
488
525
489
526
}
0 commit comments