Skip to content

Commit c306555

Browse files
zsxwingrxin
authored andcommitted
[SPARK-5219][Core] Add locks to avoid scheduling race conditions
Author: zsxwing <[email protected]> Closes apache#4019 from zsxwing/SPARK-5219 and squashes the following commits: 36a8b4e [zsxwing] Add locks to avoid race conditions
1 parent 60f67e7 commit c306555

File tree

2 files changed

+3
-3
lines changed

2 files changed

+3
-3
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ private[spark] class TaskSchedulerImpl(
361361
dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
362362
}
363363

364-
def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
364+
def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized {
365365
taskSetManager.handleTaskGettingResult(tid)
366366
}
367367

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,7 @@ private[spark] class TaskSetManager(
542542
/**
543543
* Check whether has enough quota to fetch the result with `size` bytes
544544
*/
545-
def canFetchMoreResults(size: Long): Boolean = synchronized {
545+
def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
546546
totalResultSize += size
547547
calculatedTasks += 1
548548
if (maxResultSize > 0 && totalResultSize > maxResultSize) {
@@ -671,7 +671,7 @@ private[spark] class TaskSetManager(
671671
maybeFinishTaskSet()
672672
}
673673

674-
def abort(message: String) {
674+
def abort(message: String): Unit = sched.synchronized {
675675
// TODO: Kill running tasks if we were not terminated due to a Mesos error
676676
sched.dagScheduler.taskSetFailed(taskSet, message)
677677
isZombie = true

0 commit comments

Comments
 (0)