@@ -594,11 +594,17 @@ class DAGSchedulerSuite
594
594
* @param stageId - The current stageId
595
595
* @param attemptIdx - The current attempt count
596
596
*/
597
- private def completeNextResultStageWithSuccess (stageId : Int , attemptIdx : Int ): Unit = {
597
+ private def completeNextResultStageWithSuccess (
598
+ stageId : Int ,
599
+ attemptIdx : Int ,
600
+ partitionToResult : Int => Int = _ => 42 ): Unit = {
598
601
val stageAttempt = taskSets.last
599
602
checkStageId(stageId, attemptIdx, stageAttempt)
600
603
assert(scheduler.stageIdToStage(stageId).isInstanceOf [ResultStage ])
601
- complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success , 42 )).toSeq)
604
+ val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
605
+ (Success , partitionToResult(idx))
606
+ }
607
+ complete(stageAttempt, taskResults.toSeq)
602
608
}
603
609
604
610
/**
@@ -1054,6 +1060,47 @@ class DAGSchedulerSuite
1054
1060
assertDataStructuresEmpty()
1055
1061
}
1056
1062
1063
+ /**
1064
+ * Run two jobs, with a shared dependency. We simulate a fetch failure in the second job, which
1065
+ * requires regenerating some outputs of the shared dependency. One key aspect of this test is
1066
+ * that the second job actually uses a different stage for the shared dependency (a "skipped"
1067
+ * stage).
1068
+ */
1069
+ test(" shuffle fetch failure in a reused shuffle dependency" ) {
1070
+ // Run the first job successfully, which creates one shuffle dependency
1071
+
1072
+ val shuffleMapRdd = new MyRDD (sc, 2 , Nil )
1073
+ val shuffleDep = new ShuffleDependency (shuffleMapRdd, null )
1074
+ val reduceRdd = new MyRDD (sc, 2 , List (shuffleDep))
1075
+ submit(reduceRdd, Array (0 , 1 ))
1076
+
1077
+ completeShuffleMapStageSuccessfully(0 , 0 , 2 )
1078
+ completeNextResultStageWithSuccess(1 , 0 )
1079
+ assert(results === Map (0 -> 42 , 1 -> 42 ))
1080
+ assertDataStructuresEmpty()
1081
+
1082
+ // submit another job w/ the shared dependency, and have a fetch failure
1083
+ val reduce2 = new MyRDD (sc, 2 , List (shuffleDep))
1084
+ submit(reduce2, Array (0 , 1 ))
1085
+ // Note that the stage numbering here is only b/c the shared dependency produces a new, skipped
1086
+ // stage. If instead it reused the existing stage, then this would be stage 2
1087
+ completeNextStageWithFetchFailure(3 , 0 , shuffleDep)
1088
+ scheduler.resubmitFailedStages()
1089
+
1090
+ // the scheduler now creates a new task set to regenerate the missing map output, but this time
1091
+ // using a different stage, the "skipped" one
1092
+
1093
+ // SPARK-9809 -- this stage is submitted without a task for each partition (because some of
1094
+ // the shuffle map output is still available from stage 0); make sure we've still got internal
1095
+ // accumulators setup
1096
+ assert(scheduler.stageIdToStage(2 ).internalAccumulators.nonEmpty)
1097
+ completeShuffleMapStageSuccessfully(2 , 0 , 2 )
1098
+ completeNextResultStageWithSuccess(3 , 1 , idx => idx + 1234 )
1099
+ assert(results === Map (0 -> 1234 , 1 -> 1235 ))
1100
+
1101
+ assertDataStructuresEmpty()
1102
+ }
1103
+
1057
1104
/**
1058
1105
* This test runs a three stage job, with a fetch failure in stage 1. but during the retry, we
1059
1106
* have completions from both the first & second attempt of stage 1. So all the map output is
0 commit comments