@@ -659,21 +659,41 @@ class DAGSchedulerSuite
659
659
val shuffleStage = scheduler.stageIdToStage(taskSet.stageId).asInstanceOf [ShuffleMapStage ]
660
660
assert(shuffleStage.numAvailableOutputs === 0 )
661
661
// should be ignored for being too old
662
- runEvent(CompletionEvent (taskSet.tasks(0 ), Success , makeMapStatus(" hostA" ,
663
- reduceRdd.partitions.size), null , createFakeTaskInfo(), null ))
662
+ runEvent(CompletionEvent (
663
+ taskSet.tasks(0 ),
664
+ Success ,
665
+ makeMapStatus(" hostA" , reduceRdd.partitions.size),
666
+ null ,
667
+ createFakeTaskInfo(),
668
+ null ))
664
669
assert(shuffleStage.numAvailableOutputs === 0 )
665
670
// should work because it's a non-failed host (so the available map outputs will increase)
666
- runEvent(CompletionEvent (taskSet.tasks(0 ), Success , makeMapStatus(" hostB" ,
667
- reduceRdd.partitions.size), null , createFakeTaskInfo(), null ))
671
+ runEvent(CompletionEvent (
672
+ taskSet.tasks(0 ),
673
+ Success ,
674
+ makeMapStatus(" hostB" , reduceRdd.partitions.size),
675
+ null ,
676
+ createFakeTaskInfo(),
677
+ null ))
668
678
assert(shuffleStage.numAvailableOutputs === 1 )
669
679
// should be ignored for being too old
670
- runEvent(CompletionEvent (taskSet.tasks(0 ), Success , makeMapStatus(" hostA" ,
671
- reduceRdd.partitions.size), null , createFakeTaskInfo(), null ))
680
+ runEvent(CompletionEvent (
681
+ taskSet.tasks(0 ),
682
+ Success ,
683
+ makeMapStatus(" hostA" , reduceRdd.partitions.size),
684
+ null ,
685
+ createFakeTaskInfo(),
686
+ null ))
672
687
assert(shuffleStage.numAvailableOutputs === 1 )
673
688
// should work because it's a new epoch
674
689
taskSet.tasks(1 ).epoch = newEpoch
675
- runEvent(CompletionEvent (taskSet.tasks(1 ), Success , makeMapStatus(" hostA" ,
676
- reduceRdd.partitions.size), null , createFakeTaskInfo(), null ))
690
+ runEvent(CompletionEvent (
691
+ taskSet.tasks(1 ),
692
+ Success ,
693
+ makeMapStatus(" hostA" , reduceRdd.partitions.size),
694
+ null ,
695
+ createFakeTaskInfo(),
696
+ null ))
677
697
assert(shuffleStage.numAvailableOutputs === 2 )
678
698
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0 ).map(_._1).toSet ===
679
699
HashSet (makeBlockManagerId(" hostB" ), makeBlockManagerId(" hostA" )))
0 commit comments