@@ -237,11 +237,12 @@ class DAGScheduler(
237
237
case Some (stage) => stage
238
238
case None =>
239
239
// We are going to register ancestor shuffle dependencies
240
- registerShuffleDependencies(shuffleDep, firstJobId)
240
+ getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
241
+ shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
242
+ }
241
243
// Then register current shuffleDep
242
244
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
243
245
shuffleToMapStage(shuffleDep.shuffleId) = stage
244
-
245
246
stage
246
247
}
247
248
}
@@ -352,16 +353,6 @@ class DAGScheduler(
352
353
parents.toList
353
354
}
354
355
355
- /** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */
356
- private def registerShuffleDependencies (shuffleDep : ShuffleDependency [_, _, _], firstJobId : Int ) {
357
- val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
358
- while (parentsWithNoMapStage.nonEmpty) {
359
- val currentShufDep = parentsWithNoMapStage.pop()
360
- val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
361
- shuffleToMapStage(currentShufDep.shuffleId) = stage
362
- }
363
- }
364
-
365
356
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
366
357
private def getAncestorShuffleDependencies (rdd : RDD [_]): Stack [ShuffleDependency [_, _, _]] = {
367
358
val parents = new Stack [ShuffleDependency [_, _, _]]
@@ -378,11 +369,9 @@ class DAGScheduler(
378
369
if (! shuffleToMapStage.contains(shufDep.shuffleId)) {
379
370
parents.push(shufDep)
380
371
}
381
-
382
- waitingForVisit.push(shufDep.rdd)
383
372
case _ =>
384
- waitingForVisit.push(dep.rdd)
385
373
}
374
+ waitingForVisit.push(dep.rdd)
386
375
}
387
376
}
388
377
}
@@ -1039,39 +1028,22 @@ class DAGScheduler(
1039
1028
// we registered these map outputs.
1040
1029
mapOutputTracker.registerMapOutputs(
1041
1030
shuffleStage.shuffleDep.shuffleId,
1042
- shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head ).toArray,
1031
+ shuffleStage.outputLocs.map(_.headOption.orNull ).toArray,
1043
1032
changeEpoch = true )
1044
1033
1045
1034
clearCacheLocs()
1035
+
1036
+ // Some tasks had failed; let's resubmit this shuffleStage
1037
+ // TODO: Lower-level scheduler should also deal with this
1046
1038
if (shuffleStage.outputLocs.contains(Nil )) {
1047
- // Some tasks had failed; let's resubmit this shuffleStage
1048
- // TODO: Lower-level scheduler should also deal with this
1049
1039
logInfo(" Resubmitting " + shuffleStage + " (" + shuffleStage.name +
1050
1040
" ) because some of its tasks had failed: " +
1051
1041
shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)
1052
1042
.map(_._2).mkString(" , " ))
1053
1043
submitStage(shuffleStage)
1054
- } else {
1055
- val newlyRunnable = new ArrayBuffer [Stage ]
1056
- for (shuffleStage <- waitingStages) {
1057
- logInfo(" Missing parents for " + shuffleStage + " : " +
1058
- getMissingParentStages(shuffleStage))
1059
- }
1060
- for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty)
1061
- {
1062
- newlyRunnable += shuffleStage
1063
- }
1064
- waitingStages --= newlyRunnable
1065
- runningStages ++= newlyRunnable
1066
- for {
1067
- shuffleStage <- newlyRunnable.sortBy(_.id)
1068
- jobId <- activeJobForStage(shuffleStage)
1069
- } {
1070
- logInfo(" Submitting " + shuffleStage + " (" +
1071
- shuffleStage.rdd + " ), which is now runnable" )
1072
- submitMissingTasks(shuffleStage, jobId)
1073
- }
1074
1044
}
1045
+
1046
+ // Note: newly runnable stages will be submitted below when we submit waiting stages
1075
1047
}
1076
1048
}
1077
1049
@@ -1169,7 +1141,7 @@ class DAGScheduler(
1169
1141
// TODO: This will be really slow if we keep accumulating shuffle map stages
1170
1142
for ((shuffleId, stage) <- shuffleToMapStage) {
1171
1143
stage.removeOutputsOnExecutor(execId)
1172
- val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head ).toArray
1144
+ val locs = stage.outputLocs.map(_.headOption.orNull ).toArray
1173
1145
mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true )
1174
1146
}
1175
1147
if (shuffleToMapStage.isEmpty) {
0 commit comments