Skip to content

Commit 5059255

Browse files
jiangxb1987mengxr
authored andcommitted
[SPARK-25161][CORE] Fix several bugs in failure handling of barrier execution mode
## What changes were proposed in this pull request? Fix several bugs in failure handling of barrier execution mode: * Mark TaskSet for a barrier stage as zombie when a task attempt fails; * Multiple barrier task failures from a single barrier stage should not trigger multiple stage retries; * Barrier task failure from a previous failed stage attempt should not trigger stage retry; * Fail the job when a task from a barrier ResultStage failed; * RDD.isBarrier() should not rely on `ShuffleDependency`s. ## How was this patch tested? Added corresponding test cases in `DAGSchedulerSuite` and `TaskSchedulerImplSuite`. Closes #22158 from jiangxb1987/failure. Authored-by: Xingbo Jiang <[email protected]> Signed-off-by: Xiangrui Meng <[email protected]>
1 parent b8788b3 commit 5059255

File tree

5 files changed

+200
-56
lines changed

5 files changed

+200
-56
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1863,7 +1863,8 @@ abstract class RDD[T: ClassTag](
18631863

18641864
// From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long
18651865
// RDD chain.
1866-
@transient protected lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier())
1866+
@transient protected lazy val isBarrier_ : Boolean =
1867+
dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier())
18671868
}
18681869

18691870

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 70 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1478,9 +1478,11 @@ private[spark] class DAGScheduler(
14781478
mapOutputTracker.unregisterAllMapOutput(failedMapStage.shuffleDep.shuffleId)
14791479

14801480
case failedResultStage: ResultStage =>
1481-
// Mark all the partitions of the result stage to be not finished, to ensure retry
1482-
// all the tasks on resubmitted stage attempt.
1483-
failedResultStage.activeJob.map(_.resetAllPartitions())
1481+
// Abort the failed result stage since we may have committed output for some
1482+
// partitions.
1483+
val reason = "Could not recover from a failed barrier ResultStage. Most recent " +
1484+
s"failure reason: $failureMessage"
1485+
abortStage(failedResultStage, reason, None)
14841486
}
14851487
}
14861488

@@ -1553,62 +1555,75 @@ private[spark] class DAGScheduler(
15531555

15541556
// Always fail the current stage and retry all the tasks when a barrier task fail.
15551557
val failedStage = stageIdToStage(task.stageId)
1556-
logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " +
1557-
"failed.")
1558-
val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" +
1559-
failure.toErrorString
1560-
try {
1561-
// killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask.
1562-
val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) failed."
1563-
taskScheduler.killAllTaskAttempts(stageId, interruptThread = false, reason)
1564-
} catch {
1565-
case e: UnsupportedOperationException =>
1566-
// Cannot continue with barrier stage if failed to cancel zombie barrier tasks.
1567-
// TODO SPARK-24877 leave the zombie tasks and ignore their completion events.
1568-
logWarning(s"Could not kill all tasks for stage $stageId", e)
1569-
abortStage(failedStage, "Could not kill zombie barrier tasks for stage " +
1570-
s"$failedStage (${failedStage.name})", Some(e))
1571-
}
1572-
markStageAsFinished(failedStage, Some(message))
1558+
if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
1559+
logInfo(s"Ignoring task failure from $task as it's from $failedStage attempt" +
1560+
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
1561+
s"(attempt ${failedStage.latestInfo.attemptNumber}) running")
1562+
} else {
1563+
logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " +
1564+
"failed.")
1565+
val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" +
1566+
failure.toErrorString
1567+
try {
1568+
// killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask.
1569+
val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) " +
1570+
"failed."
1571+
taskScheduler.killAllTaskAttempts(stageId, interruptThread = false, reason)
1572+
} catch {
1573+
case e: UnsupportedOperationException =>
1574+
// Cannot continue with barrier stage if failed to cancel zombie barrier tasks.
1575+
// TODO SPARK-24877 leave the zombie tasks and ignore their completion events.
1576+
logWarning(s"Could not kill all tasks for stage $stageId", e)
1577+
abortStage(failedStage, "Could not kill zombie barrier tasks for stage " +
1578+
s"$failedStage (${failedStage.name})", Some(e))
1579+
}
1580+
markStageAsFinished(failedStage, Some(message))
15731581

1574-
failedStage.failedAttemptIds.add(task.stageAttemptId)
1575-
// TODO Refactor the failure handling logic to combine similar code with that of
1576-
// FetchFailed.
1577-
val shouldAbortStage =
1578-
failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts ||
1579-
disallowStageRetryForTest
1582+
failedStage.failedAttemptIds.add(task.stageAttemptId)
1583+
// TODO Refactor the failure handling logic to combine similar code with that of
1584+
// FetchFailed.
1585+
val shouldAbortStage =
1586+
failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts ||
1587+
disallowStageRetryForTest
15801588

1581-
if (shouldAbortStage) {
1582-
val abortMessage = if (disallowStageRetryForTest) {
1583-
"Barrier stage will not retry stage due to testing config. Most recent failure " +
1584-
s"reason: $message"
1589+
if (shouldAbortStage) {
1590+
val abortMessage = if (disallowStageRetryForTest) {
1591+
"Barrier stage will not retry stage due to testing config. Most recent failure " +
1592+
s"reason: $message"
1593+
} else {
1594+
s"""$failedStage (${failedStage.name})
1595+
|has failed the maximum allowable number of
1596+
|times: $maxConsecutiveStageAttempts.
1597+
|Most recent failure reason: $message
1598+
""".stripMargin.replaceAll("\n", " ")
1599+
}
1600+
abortStage(failedStage, abortMessage, None)
15851601
} else {
1586-
s"""$failedStage (${failedStage.name})
1587-
|has failed the maximum allowable number of
1588-
|times: $maxConsecutiveStageAttempts.
1589-
|Most recent failure reason: $message""".stripMargin.replaceAll("\n", " ")
1590-
}
1591-
abortStage(failedStage, abortMessage, None)
1592-
} else {
1593-
failedStage match {
1594-
case failedMapStage: ShuffleMapStage =>
1595-
// Mark all the map as broken in the map stage, to ensure retry all the tasks on
1596-
// resubmitted stage attempt.
1597-
mapOutputTracker.unregisterAllMapOutput(failedMapStage.shuffleDep.shuffleId)
1598-
1599-
case failedResultStage: ResultStage =>
1600-
// Mark all the partitions of the result stage to be not finished, to ensure retry
1601-
// all the tasks on resubmitted stage attempt.
1602-
failedResultStage.activeJob.map(_.resetAllPartitions())
1603-
}
1602+
failedStage match {
1603+
case failedMapStage: ShuffleMapStage =>
1604+
// Mark all the map as broken in the map stage, to ensure retry all the tasks on
1605+
// resubmitted stage attempt.
1606+
mapOutputTracker.unregisterAllMapOutput(failedMapStage.shuffleDep.shuffleId)
16041607

1605-
// update failedStages and make sure a ResubmitFailedStages event is enqueued
1606-
failedStages += failedStage
1607-
logInfo(s"Resubmitting $failedStage (${failedStage.name}) due to barrier stage " +
1608-
"failure.")
1609-
messageScheduler.schedule(new Runnable {
1610-
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
1611-
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
1608+
case failedResultStage: ResultStage =>
1609+
// Abort the failed result stage since we may have committed output for some
1610+
// partitions.
1611+
val reason = "Could not recover from a failed barrier ResultStage. Most recent " +
1612+
s"failure reason: $message"
1613+
abortStage(failedResultStage, reason, None)
1614+
}
1615+
// In case multiple task failures triggered for a single stage attempt, ensure we only
1616+
// resubmit the failed stage once.
1617+
val noResubmitEnqueued = !failedStages.contains(failedStage)
1618+
failedStages += failedStage
1619+
if (noResubmitEnqueued) {
1620+
logInfo(s"Resubmitting $failedStage (${failedStage.name}) due to barrier stage " +
1621+
"failure.")
1622+
messageScheduler.schedule(new Runnable {
1623+
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
1624+
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
1625+
}
1626+
}
16121627
}
16131628

16141629
case Resubmitted =>

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,10 @@ private[spark] class TaskSetManager(
893893
None
894894
}
895895

896+
if (tasks(index).isBarrier) {
897+
isZombie = true
898+
}
899+
896900
sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
897901

898902
if (!isZombie && reason.countTowardsTaskFailures) {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,6 +1119,33 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
11191119
assertDataStructuresEmpty()
11201120
}
11211121

1122+
test("Fail the job if a barrier ResultTask failed") {
1123+
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
1124+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
1125+
val shuffleId = shuffleDep.shuffleId
1126+
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
1127+
.barrier()
1128+
.mapPartitions(iter => iter)
1129+
submit(reduceRdd, Array(0, 1))
1130+
1131+
// Complete the map stage.
1132+
complete(taskSets(0), Seq(
1133+
(Success, makeMapStatus("hostA", 2)),
1134+
(Success, makeMapStatus("hostA", 2))))
1135+
assert(mapOutputTracker.findMissingPartitions(shuffleId) === Some(Seq.empty))
1136+
1137+
// The first ResultTask fails
1138+
runEvent(makeCompletionEvent(
1139+
taskSets(1).tasks(0),
1140+
TaskKilled("test"),
1141+
null))
1142+
1143+
// Assert the stage has been cancelled.
1144+
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1145+
assert(failure.getMessage.startsWith("Job aborted due to stage failure: Could not recover " +
1146+
"from a failed barrier ResultStage."))
1147+
}
1148+
11221149
/**
11231150
* This tests the case where another FetchFailed comes in while the map stage is getting
11241151
* re-run.
@@ -2521,6 +2548,85 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
25212548
}
25222549
}
25232550

2551+
test("Barrier task failures from the same stage attempt don't trigger multiple stage retries") {
2552+
val shuffleMapRdd = new MyRDD(sc, 2, Nil).barrier().mapPartitions(iter => iter)
2553+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
2554+
val shuffleId = shuffleDep.shuffleId
2555+
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
2556+
submit(reduceRdd, Array(0, 1))
2557+
2558+
val mapStageId = 0
2559+
def countSubmittedMapStageAttempts(): Int = {
2560+
sparkListener.submittedStageInfos.count(_.stageId == mapStageId)
2561+
}
2562+
2563+
// The map stage should have been submitted.
2564+
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
2565+
assert(countSubmittedMapStageAttempts() === 1)
2566+
2567+
// The first map task fails with TaskKilled.
2568+
runEvent(makeCompletionEvent(
2569+
taskSets(0).tasks(0),
2570+
TaskKilled("test"),
2571+
null))
2572+
assert(sparkListener.failedStages === Seq(0))
2573+
2574+
// The second map task fails with TaskKilled.
2575+
runEvent(makeCompletionEvent(
2576+
taskSets(0).tasks(1),
2577+
TaskKilled("test"),
2578+
null))
2579+
2580+
// Trigger resubmission of the failed map stage.
2581+
runEvent(ResubmitFailedStages)
2582+
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
2583+
2584+
// Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
2585+
assert(countSubmittedMapStageAttempts() === 2)
2586+
}
2587+
2588+
test("Barrier task failures from a previous stage attempt don't trigger stage retry") {
2589+
val shuffleMapRdd = new MyRDD(sc, 2, Nil).barrier().mapPartitions(iter => iter)
2590+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
2591+
val shuffleId = shuffleDep.shuffleId
2592+
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
2593+
submit(reduceRdd, Array(0, 1))
2594+
2595+
val mapStageId = 0
2596+
def countSubmittedMapStageAttempts(): Int = {
2597+
sparkListener.submittedStageInfos.count(_.stageId == mapStageId)
2598+
}
2599+
2600+
// The map stage should have been submitted.
2601+
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
2602+
assert(countSubmittedMapStageAttempts() === 1)
2603+
2604+
// The first map task fails with TaskKilled.
2605+
runEvent(makeCompletionEvent(
2606+
taskSets(0).tasks(0),
2607+
TaskKilled("test"),
2608+
null))
2609+
assert(sparkListener.failedStages === Seq(0))
2610+
2611+
// Trigger resubmission of the failed map stage.
2612+
runEvent(ResubmitFailedStages)
2613+
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
2614+
2615+
// Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
2616+
assert(countSubmittedMapStageAttempts() === 2)
2617+
2618+
// The second map task fails with TaskKilled.
2619+
runEvent(makeCompletionEvent(
2620+
taskSets(0).tasks(1),
2621+
TaskKilled("test"),
2622+
null))
2623+
2624+
// The second map task failure doesn't trigger stage retry.
2625+
runEvent(ResubmitFailedStages)
2626+
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
2627+
assert(countSubmittedMapStageAttempts() === 2)
2628+
}
2629+
25242630
/**
25252631
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
25262632
* Note that this checks only the host and not the executor ID.

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,4 +1118,22 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11181118
assert(!tsm.isZombie)
11191119
assert(taskScheduler.taskSetManagerForAttempt(0, 0).isDefined)
11201120
}
1121+
1122+
test("mark taskset for a barrier stage as zombie in case a task fails") {
1123+
val taskScheduler = setupScheduler()
1124+
1125+
val attempt = FakeTask.createBarrierTaskSet(3)
1126+
taskScheduler.submitTasks(attempt)
1127+
1128+
val tsm = taskScheduler.taskSetManagerForAttempt(0, 0).get
1129+
val offers = (0 until 3).map{ idx =>
1130+
WorkerOffer(s"exec-$idx", s"host-$idx", 1, Some(s"192.168.0.101:4962$idx"))
1131+
}
1132+
taskScheduler.resourceOffers(offers)
1133+
assert(tsm.runningTasks === 3)
1134+
1135+
// Fail a task from the stage attempt.
1136+
tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, TaskKilled("test"))
1137+
assert(tsm.isZombie)
1138+
}
11211139
}

0 commit comments

Comments
 (0)