Skip to content

Commit d5fa622

Browse files
author
Ilya Ganelin
committed
Moved failure tracking to Stage class. Added clear of failre count upon Stage success
1 parent 729b7ef commit d5fa622

File tree

2 files changed

+27
-57
lines changed

2 files changed

+27
-57
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 7 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -95,30 +95,6 @@ class DAGScheduler(
9595

9696
// Stages that must be resubmitted due to fetch failures
9797
private[scheduler] val failedStages = new HashSet[Stage]
98-
99-
// The maximum number of times to retry a stage before aborting
100-
val maxStageFailures = sc.conf.getInt("spark.stage.maxFailures", 5)
101-
102-
// To avoid cyclical stage failures (see SPARK-5945) we limit the number of times that a stage
103-
// may be retried. However, it only makes sense to limit the number of times that a stage fails
104-
// if it's failing for the same reason every time. Therefore, track why a stage fails as well as
105-
// how many times it has failed.
106-
private[scheduler] case class StageFailure(failureReason : String) {
107-
var count = 1
108-
def fail(): Unit = { count += 1 }
109-
def shouldAbort(): Boolean = { count >= maxStageFailures }
110-
111-
override def equals(other: Any): Boolean =
112-
other match {
113-
case that: StageFailure => that.failureReason.equals(this.failureReason)
114-
case _ => false
115-
}
116-
117-
override def hashCode: Int = failureReason.hashCode()
118-
}
119-
120-
// Map to track failure reasons for a given stage (indexed by stage ID)
121-
private[scheduler] val stageFailureReasons = new HashMap[Stage, HashSet[StageFailure]]
12298

12399
private[scheduler] val activeJobs = new HashSet[ActiveJob]
124100

@@ -484,10 +460,6 @@ class DAGScheduler(
484460
logDebug("Removing stage %d from failed set.".format(stageId))
485461
failedStages -= stage
486462
}
487-
if (stageFailureReasons.contains(stage)) {
488-
logDebug("Removing stage %d from failure reasons set.".format(stageId))
489-
stageFailureReasons -= stage
490-
}
491463
}
492464
// data structures based on StageId
493465
stageIdToStage -= stageId
@@ -968,31 +940,6 @@ class DAGScheduler(
968940
}
969941
}
970942

971-
/**
972-
* Check whether we should abort the failedStage due to multiple failures for the same reason.
973-
* This method updates the running count of failures for a particular stage and returns
974-
* true if the number of failures for any single reason exceeds the allowable number
975-
* of failures.
976-
* @return An Option that contains the failure reason that caused the abort
977-
*/
978-
private[scheduler]
979-
def shouldAbortStage(failedStage: Stage, failureReason: String): Option[String] = {
980-
if (!stageFailureReasons.contains(failedStage)) {
981-
stageFailureReasons.put(failedStage, new HashSet[StageFailure]())
982-
}
983-
984-
val failures = stageFailureReasons.get(failedStage).get
985-
val failure = StageFailure(failureReason)
986-
failures.find(s => s.equals(failure)) match {
987-
case Some(f) => f.fail()
988-
case None => failures.add(failure)
989-
}
990-
failures.find(_.shouldAbort()) match {
991-
case Some(f) => Some(f.failureReason)
992-
case None => None
993-
}
994-
}
995-
996943
/**
997944
* Responds to a task finishing. This is called inside the event loop so it assumes that it can
998945
* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
@@ -1021,6 +968,10 @@ class DAGScheduler(
1021968
val stage = stageIdToStage(task.stageId)
1022969
event.reason match {
1023970
case Success =>
971+
// Clear failure count for this stage, now that it's succeeded. This ensures that even if
972+
// subsequent stages fail, triggering a recompute of this stage, we abort because of
973+
// those failures.
974+
stage.clearFailures()
1024975
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
1025976
event.reason, event.taskInfo, event.taskMetrics))
1026977
stage.pendingTasks -= task
@@ -1136,13 +1087,12 @@ class DAGScheduler(
11361087
markStageAsFinished(failedStage, Some(failureMessage))
11371088
}
11381089

1139-
val shouldAbort = shouldAbortStage(failedStage, failureMessage)
11401090
if (disallowStageRetryForTest) {
11411091
abortStage(failedStage, "Fetch failure will not retry stage due to testing config")
1142-
} else if (shouldAbort.isDefined) {
1092+
} else if (failedStage.failAndShouldAbort()) {
11431093
abortStage(failedStage, s"Fetch failure - aborting stage. Stage ${failedStage.name} " +
1144-
s"has failed the maximum allowable number of times: ${maxStageFailures}. " +
1145-
s"Failure reason: ${shouldAbort.get}")
1094+
s"has failed the maximum allowable number of times: ${failedStage.maxStageFailures}. " +
1095+
s"Failure reason: ${failureMessage}")
11461096
} else if (failedStages.isEmpty) {
11471097
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
11481098
// in that case the event will already have been scheduled.

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,26 @@ private[spark] abstract class Stage(
7070
/** Pointer to the latest [StageInfo] object, set by DAGScheduler. */
7171
var latestInfo: StageInfo = StageInfo.fromStage(this)
7272

73+
// The maximum number of times to retry a stage before aborting
74+
final val maxStageFailures = 4
75+
76+
// To avoid cyclical stage failures (see SPARK-5945) we limit the number of times that a stage
77+
// may be retried.
78+
private var failCount = 0
79+
private[scheduler] def fail() : Unit = { failCount += 1 }
80+
private[scheduler] def shouldAbort(): Boolean = { failCount > maxStageFailures }
81+
private[scheduler] def clearFailures() : Unit = { failCount = 0 }
82+
83+
/**
84+
* Check whether we should abort the failedStage due to multiple failures.
85+
* This method updates the running count of failures for a particular stage and returns
86+
* true if the number of failures exceeds the allowable number of failures.
87+
*/
88+
private[scheduler] def failAndShouldAbort(): Boolean = {
89+
fail()
90+
shouldAbort()
91+
}
92+
7393
/** Return a new attempt id, starting with 0. */
7494
def newAttemptId(): Int = {
7595
val id = nextAttemptId

0 commit comments

Comments
 (0)