@@ -1045,31 +1045,37 @@ class DAGScheduler(
1045
1045
stage.pendingTasks += task
1046
1046
1047
1047
case FetchFailed (bmAddress, shuffleId, mapId, reduceId) =>
1048
- // Mark the stage that the reducer was in as unrunnable
1049
1048
val failedStage = stageIdToStage(task.stageId)
1050
- markStageAsFinished(failedStage, Some (" Fetch failure" ))
1051
- runningStages -= failedStage
1052
- // TODO: Cancel running tasks in the stage
1053
- logInfo(" Marking " + failedStage + " (" + failedStage.name +
1054
- " ) for resubmision due to a fetch failure" )
1055
- // Mark the map whose fetch failed as broken in the map stage
1056
- val mapStage = shuffleToMapStage(shuffleId)
1057
- if (mapId != - 1 ) {
1058
- mapStage.removeOutputLoc(mapId, bmAddress)
1059
- mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
1060
- }
1061
- logInfo(" The failed fetch was from " + mapStage + " (" + mapStage.name +
1062
- " ); marking it for resubmission" )
1063
- if (failedStages.isEmpty && eventProcessActor != null ) {
1064
- // Don't schedule an event to resubmit failed stages if failed isn't empty, because
1065
- // in that case the event will already have been scheduled. eventProcessActor may be
1066
- // null during unit tests.
1067
- import env .actorSystem .dispatcher
1068
- env.actorSystem.scheduler.scheduleOnce(
1069
- RESUBMIT_TIMEOUT , eventProcessActor, ResubmitFailedStages )
1049
+ // It is likely that we receive multiple FetchFailed for a single stage (because we have
1050
+ // multiple tasks running concurrently on different executors). In that case, it is possible
1051
+ // the fetch failure has already been handled by the executor.
1052
+ if (runningStages.contains(failedStage)) {
1053
+ markStageAsFinished(failedStage, Some (" Fetch failure" ))
1054
+ runningStages -= failedStage
1055
+ // TODO: Cancel running tasks in the stage
1056
+ logInfo(" Marking " + failedStage + " (" + failedStage.name +
1057
+ " ) for resubmision due to a fetch failure" )
1058
+
1059
+ // Mark the map whose fetch failed as broken in the map stage
1060
+ val mapStage = shuffleToMapStage(shuffleId)
1061
+ if (mapId != - 1 ) {
1062
+ mapStage.removeOutputLoc(mapId, bmAddress)
1063
+ mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
1064
+ }
1065
+
1066
+ logInfo(" The failed fetch was from " + mapStage + " (" + mapStage.name +
1067
+ " ); marking it for resubmission" )
1068
+ if (failedStages.isEmpty && eventProcessActor != null ) {
1069
+ // Don't schedule an event to resubmit failed stages if failed isn't empty, because
1070
+ // in that case the event will already have been scheduled. eventProcessActor may be
1071
+ // null during unit tests.
1072
+ import env .actorSystem .dispatcher
1073
+ env.actorSystem.scheduler.scheduleOnce(
1074
+ RESUBMIT_TIMEOUT , eventProcessActor, ResubmitFailedStages )
1075
+ }
1076
+ failedStages += failedStage
1077
+ failedStages += mapStage
1070
1078
}
1071
- failedStages += failedStage
1072
- failedStages += mapStage
1073
1079
// TODO: mark the executor as failed only if there were lots of fetch failures on it
1074
1080
if (bmAddress != null ) {
1075
1081
handleExecutorLost(bmAddress.executorId, Some (task.epoch))
0 commit comments