Skip to content

Commit 4bd85d0

Browse files
Ilya GanelinAndrew Or
authored andcommitted
[SPARK-5945] Spark should not retry a stage infinitely on a FetchFailedException
The ```Stage``` class now tracks whether there were a sufficient number of consecutive failures of that stage to trigger an abort. To avoid an infinite loop of stage retries, we abort the job completely after 4 consecutive stage failures for one stage. We still allow more than 4 consecutive stage failures if there is an intervening successful attempt for the stage, so that in very long-lived applications, where a stage may get reused many times, we don't abort the job after failures that have been recovered from successfully. I've added test cases to exercise the most obvious scenarios. Author: Ilya Ganelin <[email protected]> Closes #5636 from ilganeli/SPARK-5945.
1 parent 44948a2 commit 4bd85d0

File tree

3 files changed

+320
-5
lines changed

3 files changed

+320
-5
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1101,7 +1101,6 @@ class DAGScheduler(
11011101
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
11021102
s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
11031103
} else {
1104-
11051104
// It is likely that we receive multiple FetchFailed for a single stage (because we have
11061105
// multiple tasks running concurrently on different executors). In that case, it is
11071106
// possible the fetch failure has already been handled by the scheduler.
@@ -1117,6 +1116,11 @@ class DAGScheduler(
11171116
if (disallowStageRetryForTest) {
11181117
abortStage(failedStage, "Fetch failure will not retry stage due to testing config",
11191118
None)
1119+
} else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
1120+
abortStage(failedStage, s"$failedStage (${failedStage.name}) " +
1121+
s"has failed the maximum allowable number of " +
1122+
s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
1123+
s"Most recent failure reason: ${failureMessage}", None)
11201124
} else if (failedStages.isEmpty) {
11211125
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
11221126
// in that case the event will already have been scheduled.
@@ -1240,10 +1244,17 @@ class DAGScheduler(
12401244
if (errorMessage.isEmpty) {
12411245
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
12421246
stage.latestInfo.completionTime = Some(clock.getTimeMillis())
1247+
1248+
// Clear failure count for this stage, now that it's succeeded.
1249+
// We only limit consecutive failures of stage attempts,so that if a stage is
1250+
// re-used many times in a long-running job, unrelated failures don't eventually cause the
1251+
// stage to be aborted.
1252+
stage.clearFailures()
12431253
} else {
12441254
stage.latestInfo.stageFailed(errorMessage.get)
12451255
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
12461256
}
1257+
12471258
outputCommitCoordinator.stageEnd(stage.id)
12481259
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
12491260
runningStages -= stage

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import org.apache.spark.util.CallSite
4646
* be updated for each attempt.
4747
*
4848
*/
49-
private[spark] abstract class Stage(
49+
private[scheduler] abstract class Stage(
5050
val id: Int,
5151
val rdd: RDD[_],
5252
val numTasks: Int,
@@ -92,6 +92,29 @@ private[spark] abstract class Stage(
9292
*/
9393
private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
9494

95+
/**
96+
* Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these
97+
* failures in order to avoid endless retries if a stage keeps failing with a FetchFailure.
98+
* We keep track of each attempt ID that has failed to avoid recording duplicate failures if
99+
* multiple tasks from the same stage attempt fail (SPARK-5945).
100+
*/
101+
private val fetchFailedAttemptIds = new HashSet[Int]
102+
103+
private[scheduler] def clearFailures() : Unit = {
104+
fetchFailedAttemptIds.clear()
105+
}
106+
107+
/**
108+
* Check whether we should abort the failedStage due to multiple consecutive fetch failures.
109+
*
110+
* This method updates the running set of failed stage attempts and returns
111+
* true if the number of failures exceeds the allowable number of failures.
112+
*/
113+
private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = {
114+
fetchFailedAttemptIds.add(stageAttemptId)
115+
fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
116+
}
117+
95118
/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
96119
def makeNewStageAttempt(
97120
numPartitionsToCompute: Int,
@@ -110,3 +133,8 @@ private[spark] abstract class Stage(
110133
case _ => false
111134
}
112135
}
136+
137+
private[scheduler] object Stage {
138+
// The number of consecutive failures allowed before a stage is aborted
139+
val MAX_CONSECUTIVE_FETCH_FAILURES = 4
140+
}

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 279 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import org.scalatest.concurrent.Timeouts
2626
import org.scalatest.time.SpanSugar._
2727

2828
import org.apache.spark._
29+
import org.apache.spark.executor.TaskMetrics
2930
import org.apache.spark.rdd.RDD
3031
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
3132
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
3233
import org.apache.spark.util.CallSite
33-
import org.apache.spark.executor.TaskMetrics
3434

3535
class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
3636
extends DAGSchedulerEventProcessLoop(dagScheduler) {
@@ -473,6 +473,282 @@ class DAGSchedulerSuite
473473
assertDataStructuresEmpty()
474474
}
475475

476+
477+
// Helper function to validate state when creating tests for task failures
478+
private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) {
479+
assert(stageAttempt.stageId === stageId)
480+
assert(stageAttempt.stageAttemptId == attempt)
481+
}
482+
483+
484+
// Helper functions to extract commonly used code in Fetch Failure test cases
485+
private def setupStageAbortTest(sc: SparkContext) {
486+
sc.listenerBus.addListener(new EndListener())
487+
ended = false
488+
jobResult = null
489+
}
490+
491+
// Create a new Listener to confirm that the listenerBus sees the JobEnd message
492+
// when we abort the stage. This message will also be consumed by the EventLoggingListener
493+
// so this will propagate up to the user.
494+
var ended = false
495+
var jobResult : JobResult = null
496+
497+
class EndListener extends SparkListener {
498+
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
499+
jobResult = jobEnd.jobResult
500+
ended = true
501+
}
502+
}
503+
504+
/**
505+
* Common code to get the next stage attempt, confirm it's the one we expect, and complete it
506+
* successfully.
507+
*
508+
* @param stageId - The current stageId
509+
* @param attemptIdx - The current attempt count
510+
* @param numShufflePartitions - The number of partitions in the next stage
511+
*/
512+
private def completeShuffleMapStageSuccessfully(
513+
stageId: Int,
514+
attemptIdx: Int,
515+
numShufflePartitions: Int): Unit = {
516+
val stageAttempt = taskSets.last
517+
checkStageId(stageId, attemptIdx, stageAttempt)
518+
complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map {
519+
case (task, idx) =>
520+
(Success, makeMapStatus("host" + ('A' + idx).toChar, numShufflePartitions))
521+
}.toSeq)
522+
}
523+
524+
/**
525+
* Common code to get the next stage attempt, confirm it's the one we expect, and complete it
526+
* with all FetchFailure.
527+
*
528+
* @param stageId - The current stageId
529+
* @param attemptIdx - The current attempt count
530+
* @param shuffleDep - The shuffle dependency of the stage with a fetch failure
531+
*/
532+
private def completeNextStageWithFetchFailure(
533+
stageId: Int,
534+
attemptIdx: Int,
535+
shuffleDep: ShuffleDependency[_, _, _]): Unit = {
536+
val stageAttempt = taskSets.last
537+
checkStageId(stageId, attemptIdx, stageAttempt)
538+
complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
539+
(FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0, idx, "ignored"), null)
540+
}.toSeq)
541+
}
542+
543+
/**
544+
* Common code to get the next result stage attempt, confirm it's the one we expect, and
545+
* complete it with a success where we return 42.
546+
*
547+
* @param stageId - The current stageId
548+
* @param attemptIdx - The current attempt count
549+
*/
550+
private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: Int): Unit = {
551+
val stageAttempt = taskSets.last
552+
checkStageId(stageId, attemptIdx, stageAttempt)
553+
assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage])
554+
complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq)
555+
}
556+
557+
/**
558+
* In this test, we simulate a job where many tasks in the same stage fail. We want to show
559+
* that many fetch failures inside a single stage attempt do not trigger an abort
560+
* on their own, but only when there are enough failing stage attempts.
561+
*/
562+
test("Single stage fetch failure should not abort the stage.") {
563+
setupStageAbortTest(sc)
564+
565+
val parts = 8
566+
val shuffleMapRdd = new MyRDD(sc, parts, Nil)
567+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
568+
val shuffleId = shuffleDep.shuffleId
569+
val reduceRdd = new MyRDD(sc, parts, List(shuffleDep))
570+
submit(reduceRdd, (0 until parts).toArray)
571+
572+
completeShuffleMapStageSuccessfully(0, 0, numShufflePartitions = parts)
573+
574+
completeNextStageWithFetchFailure(1, 0, shuffleDep)
575+
576+
// Resubmit and confirm that now all is well
577+
scheduler.resubmitFailedStages()
578+
579+
assert(scheduler.runningStages.nonEmpty)
580+
assert(!ended)
581+
582+
// Complete stage 0 and then stage 1 with a "42"
583+
completeShuffleMapStageSuccessfully(0, 1, numShufflePartitions = parts)
584+
completeNextResultStageWithSuccess(1, 1)
585+
586+
// Confirm job finished succesfully
587+
sc.listenerBus.waitUntilEmpty(1000)
588+
assert(ended === true)
589+
assert(results === (0 until parts).map { idx => idx -> 42 }.toMap)
590+
assertDataStructuresEmpty()
591+
}
592+
593+
/**
594+
* In this test we simulate a job failure where the first stage completes successfully and
595+
* the second stage fails due to a fetch failure. Multiple successive fetch failures of a stage
596+
* trigger an overall job abort to avoid endless retries.
597+
*/
598+
test("Multiple consecutive stage fetch failures should lead to job being aborted.") {
599+
setupStageAbortTest(sc)
600+
601+
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
602+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
603+
val shuffleId = shuffleDep.shuffleId
604+
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
605+
submit(reduceRdd, Array(0, 1))
606+
607+
for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) {
608+
// Complete all the tasks for the current attempt of stage 0 successfully
609+
completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
610+
611+
// Now we should have a new taskSet, for a new attempt of stage 1.
612+
// Fail all these tasks with FetchFailure
613+
completeNextStageWithFetchFailure(1, attempt, shuffleDep)
614+
615+
// this will trigger a resubmission of stage 0, since we've lost some of its
616+
// map output, for the next iteration through the loop
617+
scheduler.resubmitFailedStages()
618+
619+
if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) {
620+
assert(scheduler.runningStages.nonEmpty)
621+
assert(!ended)
622+
} else {
623+
// Stage should have been aborted and removed from running stages
624+
assertDataStructuresEmpty()
625+
sc.listenerBus.waitUntilEmpty(1000)
626+
assert(ended)
627+
jobResult match {
628+
case JobFailed(reason) =>
629+
assert(reason.getMessage.contains("ResultStage 1 () has failed the maximum"))
630+
case other => fail(s"expected JobFailed, not $other")
631+
}
632+
}
633+
}
634+
}
635+
636+
/**
637+
* In this test, we create a job with two consecutive shuffles, and simulate 2 failures for each
638+
* shuffle fetch. In total In total, the job has had four failures overall but not four failures
639+
* for a particular stage, and as such should not be aborted.
640+
*/
641+
test("Failures in different stages should not trigger an overall abort") {
642+
setupStageAbortTest(sc)
643+
644+
val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
645+
val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
646+
val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache()
647+
val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
648+
val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
649+
submit(finalRdd, Array(0))
650+
651+
// In the first two iterations, Stage 0 succeeds and stage 1 fails. In the next two iterations,
652+
// stage 2 fails.
653+
for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) {
654+
// Complete all the tasks for the current attempt of stage 0 successfully
655+
completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
656+
657+
if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2) {
658+
// Now we should have a new taskSet, for a new attempt of stage 1.
659+
// Fail all these tasks with FetchFailure
660+
completeNextStageWithFetchFailure(1, attempt, shuffleDepOne)
661+
} else {
662+
completeShuffleMapStageSuccessfully(1, attempt, numShufflePartitions = 1)
663+
664+
// Fail stage 2
665+
completeNextStageWithFetchFailure(2, attempt - Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2,
666+
shuffleDepTwo)
667+
}
668+
669+
// this will trigger a resubmission of stage 0, since we've lost some of its
670+
// map output, for the next iteration through the loop
671+
scheduler.resubmitFailedStages()
672+
}
673+
674+
completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2)
675+
completeShuffleMapStageSuccessfully(1, 4, numShufflePartitions = 1)
676+
677+
// Succeed stage2 with a "42"
678+
completeNextResultStageWithSuccess(2, Stage.MAX_CONSECUTIVE_FETCH_FAILURES/2)
679+
680+
assert(results === Map(0 -> 42))
681+
assertDataStructuresEmpty()
682+
}
683+
684+
/**
685+
* In this test we demonstrate that only consecutive failures trigger a stage abort. A stage may
686+
* fail multiple times, succeed, then fail a few more times (because its run again by downstream
687+
* dependencies). The total number of failed attempts for one stage will go over the limit,
688+
* but that doesn't matter, since they have successes in the middle.
689+
*/
690+
test("Non-consecutive stage failures don't trigger abort") {
691+
setupStageAbortTest(sc)
692+
693+
val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
694+
val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
695+
val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache()
696+
val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
697+
val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
698+
submit(finalRdd, Array(0))
699+
700+
// First, execute stages 0 and 1, failing stage 1 up to MAX-1 times.
701+
for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) {
702+
// Make each task in stage 0 success
703+
completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
704+
705+
// Now we should have a new taskSet, for a new attempt of stage 1.
706+
// Fail these tasks with FetchFailure
707+
completeNextStageWithFetchFailure(1, attempt, shuffleDepOne)
708+
709+
scheduler.resubmitFailedStages()
710+
711+
// Confirm we have not yet aborted
712+
assert(scheduler.runningStages.nonEmpty)
713+
assert(!ended)
714+
}
715+
716+
// Rerun stage 0 and 1 to step through the task set
717+
completeShuffleMapStageSuccessfully(0, 3, numShufflePartitions = 2)
718+
completeShuffleMapStageSuccessfully(1, 3, numShufflePartitions = 1)
719+
720+
// Fail stage 2 so that stage 1 is resubmitted when we call scheduler.resubmitFailedStages()
721+
completeNextStageWithFetchFailure(2, 0, shuffleDepTwo)
722+
723+
scheduler.resubmitFailedStages()
724+
725+
// Rerun stage 0 to step through the task set
726+
completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2)
727+
728+
// Now again, fail stage 1 (up to MAX_FAILURES) but confirm that this doesn't trigger an abort
729+
// since we succeeded in between.
730+
completeNextStageWithFetchFailure(1, 4, shuffleDepOne)
731+
732+
scheduler.resubmitFailedStages()
733+
734+
// Confirm we have not yet aborted
735+
assert(scheduler.runningStages.nonEmpty)
736+
assert(!ended)
737+
738+
// Next, succeed all and confirm output
739+
// Rerun stage 0 + 1
740+
completeShuffleMapStageSuccessfully(0, 5, numShufflePartitions = 2)
741+
completeShuffleMapStageSuccessfully(1, 5, numShufflePartitions = 1)
742+
743+
// Succeed stage 2 and verify results
744+
completeNextResultStageWithSuccess(2, 1)
745+
746+
assertDataStructuresEmpty()
747+
sc.listenerBus.waitUntilEmpty(1000)
748+
assert(ended === true)
749+
assert(results === Map(0 -> 42))
750+
}
751+
476752
test("trivial shuffle with multiple fetch failures") {
477753
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
478754
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
@@ -810,15 +1086,15 @@ class DAGSchedulerSuite
8101086
submit(finalRdd, Array(0))
8111087
cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
8121088
cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))
813-
// complete stage 2
1089+
// complete stage 0
8141090
complete(taskSets(0), Seq(
8151091
(Success, makeMapStatus("hostA", 2)),
8161092
(Success, makeMapStatus("hostB", 2))))
8171093
// complete stage 1
8181094
complete(taskSets(1), Seq(
8191095
(Success, makeMapStatus("hostA", 1)),
8201096
(Success, makeMapStatus("hostB", 1))))
821-
// pretend stage 0 failed because hostA went down
1097+
// pretend stage 2 failed because hostA went down
8221098
complete(taskSets(2), Seq(
8231099
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
8241100
// TODO assert this:

0 commit comments

Comments
 (0)