@@ -1049,27 +1049,25 @@ class DAGScheduler(
1049
1049
val mapStage = shuffleToMapStage(shuffleId)
1050
1050
// It is likely that we receive multiple FetchFailed for a single stage (because we have
1051
1051
// multiple tasks running concurrently on different executors). In that case, it is possible
1052
- // the fetch failure has already been handled by the executor .
1052
+ // the fetch failure has already been handled by the scheduler .
1053
1053
if (runningStages.contains(failedStage)) {
1054
1054
markStageAsFinished(failedStage, Some (" Fetch failure" ))
1055
1055
runningStages -= failedStage
1056
1056
// TODO: Cancel running tasks in the stage
1057
- logInfo(" Marking " + failedStage + " (" + failedStage.name +
1058
- " ) for resubmision due to a fetch failure" )
1059
-
1060
- logInfo(" The failed fetch was from " + mapStage + " (" + mapStage.name +
1061
- " ); marking it for resubmission" )
1062
- if (failedStages.isEmpty && eventProcessActor != null ) {
1063
- // Don't schedule an event to resubmit failed stages if failed isn't empty, because
1064
- // in that case the event will already have been scheduled. eventProcessActor may be
1065
- // null during unit tests.
1066
- import env .actorSystem .dispatcher
1067
- env.actorSystem.scheduler.scheduleOnce(
1068
- RESUBMIT_TIMEOUT , eventProcessActor, ResubmitFailedStages )
1069
- }
1070
- failedStages += failedStage
1071
- failedStages += mapStage
1057
+ logInfo(s " Marking $failedStage ( ${failedStage.name}) for resubmision " +
1058
+ s " due to a fetch failure from $mapStage ( ${mapStage.name}" )
1059
+ }
1060
+
1061
+ if (failedStages.isEmpty && eventProcessActor != null ) {
1062
+ // Don't schedule an event to resubmit failed stages if failed isn't empty, because
1063
+ // in that case the event will already have been scheduled. eventProcessActor may be
1064
+ // null during unit tests.
1065
+ import env .actorSystem .dispatcher
1066
+ env.actorSystem.scheduler.scheduleOnce(
1067
+ RESUBMIT_TIMEOUT , eventProcessActor, ResubmitFailedStages )
1072
1068
}
1069
+ failedStages += failedStage
1070
+ failedStages += mapStage
1073
1071
1074
1072
// Mark the map whose fetch failed as broken in the map stage
1075
1073
if (mapId != - 1 ) {
0 commit comments