@@ -1658,7 +1658,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
1658
1658
assert(availableResources(GPU ) sameElements Array (" 0" , " 1" , " 2" , " 3" ))
1659
1659
}
1660
1660
1661
- test(" SPARK-26755 Ensure that a speculative task is submitted only once for execution" ) {
1661
+ test(" SPARK-26755 Ensure that a speculative task is submitted only once for execution and" +
1662
+ " must also obey original locality preferences" ) {
1662
1663
sc = new SparkContext (" local" , " test" )
1663
1664
sched = new FakeTaskScheduler (sc, (" exec1" , " host1" ), (" exec2" , " host2" ))
1664
1665
val taskSet = FakeTask .createTaskSet(4 )
@@ -1682,7 +1683,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
1682
1683
assert(sched.startedTasks.toSet === Set (0 , 1 , 2 , 3 ))
1683
1684
clock.advance(1 )
1684
1685
// Complete the first 2 tasks and leave the other 2 tasks in running
1685
- for (id <- Set (0 , 1 )) {
1686
+ for (id <- Set (0 , 2 )) {
1686
1687
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
1687
1688
assert(sched.endedTasks(id) === Success )
1688
1689
}
@@ -1691,79 +1692,74 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
1691
1692
// > 0ms, so advance the clock by 1ms here.
1692
1693
clock.advance(1 )
1693
1694
assert(manager.checkSpeculatableTasks(0 ))
1694
- assert(sched.speculativeTasks.toSet === Set (2 , 3 ))
1695
- assert(manager.copiesRunning(2 ) === 1 )
1695
+ assert(sched.speculativeTasks.toSet === Set (1 , 3 ))
1696
+ assert(manager.copiesRunning(1 ) === 1 )
1696
1697
assert(manager.copiesRunning(3 ) === 1 )
1697
1698
1698
1699
// Offer resource to start the speculative attempt for the running task. We offer more
1699
1700
// resources, and ensure that speculative tasks get scheduled appropriately -- only one extra
1700
1701
// copy per speculatable task
1701
1702
val taskOption2 = manager.resourceOffer(" exec1" , " host1" , NO_PREF )
1702
- val taskOption3 = manager.resourceOffer(" exec1 " , " host1 " , NO_PREF )
1703
+ val taskOption3 = manager.resourceOffer(" exec2 " , " host2 " , NO_PREF )
1703
1704
assert(taskOption2.isDefined)
1704
1705
val task2 = taskOption2.get
1706
+ // Ensure that task index 3 is launched on host1 and task index 4 on host2
1705
1707
assert(task2.index === 3 )
1706
1708
assert(task2.taskId === 4 )
1707
1709
assert(task2.executorId === " exec1" )
1708
1710
assert(task2.attemptNumber === 1 )
1709
1711
assert(taskOption3.isDefined)
1710
1712
val task3 = taskOption3.get
1711
- assert(task3.index === 2 )
1713
+ assert(task3.index === 1 )
1712
1714
assert(task3.taskId === 5 )
1713
- assert(task3.executorId === " exec1 " )
1715
+ assert(task3.executorId === " exec2 " )
1714
1716
assert(task3.attemptNumber === 1 )
1715
1717
clock.advance(1 )
1716
1718
// Running checkSpeculatableTasks again should return false
1717
1719
assert(! manager.checkSpeculatableTasks(0 ))
1718
- assert(manager.copiesRunning(2 ) === 2 )
1720
+ assert(manager.copiesRunning(1 ) === 2 )
1719
1721
assert(manager.copiesRunning(3 ) === 2 )
1720
1722
// Offering additional resources should not lead to any speculative tasks being respawned
1721
1723
assert(manager.resourceOffer(" exec1" , " host1" , ANY ).isEmpty)
1722
1724
assert(manager.resourceOffer(" exec2" , " host2" , ANY ).isEmpty)
1723
1725
assert(manager.resourceOffer(" exec3" , " host3" , ANY ).isEmpty)
1724
- }
1725
1726
1726
- test(" SPARK-26755 Ensure that a speculative task obeys the original locality preferences" ) {
1727
- sc = new SparkContext (" local" , " test" )
1727
+ // Launch a new set of tasks with locality preferences
1728
1728
sched = new FakeTaskScheduler (sc, (" exec1" , " host1" ),
1729
1729
(" exec2" , " host2" ), (" exec3" , " host3" ), (" exec4" , " host4" ))
1730
- // Create 3 tasks with locality preferences
1731
- val taskSet = FakeTask .createTaskSet(3 ,
1730
+ val taskSet2 = FakeTask .createTaskSet(3 ,
1732
1731
Seq (TaskLocation (" host1" ), TaskLocation (" host3" )),
1733
1732
Seq (TaskLocation (" host2" )),
1734
1733
Seq (TaskLocation (" host3" )))
1735
- // Set the speculation multiplier to be 0 so speculative tasks are launched immediately
1736
- sc.conf.set(config.SPECULATION_MULTIPLIER , 0.0 )
1737
- sc.conf.set(config.SPECULATION_ENABLED , true )
1738
- sc.conf.set(config.SPECULATION_QUANTILE , 0.5 )
1739
- val clock = new ManualClock ()
1740
- val manager = new TaskSetManager (sched, taskSet, MAX_TASK_FAILURES , clock = clock)
1741
- val accumUpdatesByTask : Array [Seq [AccumulatorV2 [_, _]]] = taskSet.tasks.map { task =>
1734
+ val clock2 = new ManualClock ()
1735
+ val manager2 = new TaskSetManager (sched, taskSet2, MAX_TASK_FAILURES , clock = clock2)
1736
+ val accumUpdatesByTask2 : Array [Seq [AccumulatorV2 [_, _]]] = taskSet2.tasks.map { task =>
1742
1737
task.metrics.internalAccums
1743
1738
}
1739
+
1744
1740
// Offer resources for 3 tasks to start
1745
1741
Seq (" exec1" -> " host1" , " exec2" -> " host2" , " exec3" -> " host3" ).foreach { case (exec, host) =>
1746
- val taskOption = manager .resourceOffer(exec, host, NO_PREF )
1742
+ val taskOption = manager2 .resourceOffer(exec, host, NO_PREF )
1747
1743
assert(taskOption.isDefined)
1748
1744
assert(taskOption.get.executorId === exec)
1749
1745
}
1750
1746
assert(sched.startedTasks.toSet === Set (0 , 1 , 2 ))
1751
- clock .advance(1 )
1747
+ clock2 .advance(1 )
1752
1748
// Finish one task and mark the others as speculatable
1753
- manager .handleSuccessfulTask(2 , createTaskResult(2 , accumUpdatesByTask (2 )))
1749
+ manager2 .handleSuccessfulTask(2 , createTaskResult(2 , accumUpdatesByTask2 (2 )))
1754
1750
assert(sched.endedTasks(2 ) === Success )
1755
- clock .advance(1 )
1756
- assert(manager .checkSpeculatableTasks(0 ))
1751
+ clock2 .advance(1 )
1752
+ assert(manager2 .checkSpeculatableTasks(0 ))
1757
1753
assert(sched.speculativeTasks.toSet === Set (0 , 1 ))
1758
1754
// Ensure that the speculatable tasks obey the original locality preferences
1759
- assert(manager .resourceOffer(" exec4" , " host4" , NODE_LOCAL ).isEmpty)
1760
- assert(manager .resourceOffer(" exec2" , " host2" , NODE_LOCAL ).isEmpty)
1761
- assert(manager .resourceOffer(" exec3" , " host3" , NODE_LOCAL ).isDefined)
1762
- assert(manager .resourceOffer(" exec4" , " host4" , ANY ).isDefined)
1755
+ assert(manager2 .resourceOffer(" exec4" , " host4" , NODE_LOCAL ).isEmpty)
1756
+ assert(manager2 .resourceOffer(" exec2" , " host2" , NODE_LOCAL ).isEmpty)
1757
+ assert(manager2 .resourceOffer(" exec3" , " host3" , NODE_LOCAL ).isDefined)
1758
+ assert(manager2 .resourceOffer(" exec4" , " host4" , ANY ).isDefined)
1763
1759
// Since, all speculatable tasks have been launched, making another offer
1764
1760
// should not schedule any more tasks
1765
- assert(manager .resourceOffer(" exec1" , " host1" , ANY ).isEmpty)
1766
- assert(! manager .checkSpeculatableTasks(0 ))
1767
- assert(manager .resourceOffer(" exec1" , " host1" , ANY ).isEmpty)
1761
+ assert(manager2 .resourceOffer(" exec1" , " host1" , ANY ).isEmpty)
1762
+ assert(! manager2 .checkSpeculatableTasks(0 ))
1763
+ assert(manager2 .resourceOffer(" exec1" , " host1" , ANY ).isEmpty)
1768
1764
}
1769
1765
}
0 commit comments