Skip to content

Commit 466849e

Browse files
author
pgandhi
committed
[SPARK-26755] : Addressing Reviews July 16, 2019
Adding a unit test to check for locality prefs for speculatable tasks and other changes
1 parent 62a15d7 commit 466849e

File tree

2 files changed

+97
-52
lines changed

2 files changed

+97
-52
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -131,29 +131,15 @@ private[spark] class TaskSetManager(
131131
// same time for a barrier stage.
132132
private[scheduler] def isBarrier = taskSet.tasks.nonEmpty && taskSet.tasks(0).isBarrier
133133

134-
// Set of pending tasks for various levels of locality: executor, host, rack,
135-
// noPrefs and anyPrefs. These collections are actually
136-
// treated as stacks, in which new tasks are added to the end of the
137-
// ArrayBuffer and removed from the end. This makes it faster to detect
138-
// tasks that repeatedly fail because whenever a task failed, it is put
139-
// back at the head of the stack. These collections may contain duplicates
140-
// for two reasons:
141-
// (1): Tasks are only removed lazily; when a task is launched, it remains
142-
// in all the pending lists except the one that it was launched from.
143-
// (2): Tasks may be re-added to these lists multiple times as a result
144-
// of failures.
145-
// Duplicates are handled in dequeueTaskFromList, which ensures that a
146-
// task hasn't already started running before launching it.
147-
134+
// Store tasks waiting to be scheduled by locality preferences
148135
private[scheduler] val pendingTasks = new PendingTasksByLocality()
149136

150137
// Tasks that can be speculated. Since these will be a small fraction of total
151138
// tasks, we'll just hold them in a HashSet. The HashSet here ensures that we do not add
152-
// duplicate speculative tasks.
139+
// duplicate speculatable tasks.
153140
private[scheduler] val speculatableTasks = new HashSet[Int]
154141

155-
// Set of pending tasks marked as speculative for various levels of locality: executor, host,
156-
// rack, noPrefs and anyPrefs
142+
// Store speculatable tasks by locality preferences
157143
private[scheduler] val pendingSpeculatableTasks = new PendingTasksByLocality()
158144

159145
// Task index, start and finish time for each task attempt (indexed by task ID)
@@ -229,8 +215,8 @@ private[spark] class TaskSetManager(
229215
private[spark] def addPendingTask(
230216
index: Int,
231217
resolveRacks: Boolean = true,
232-
speculative: Boolean = false): Unit = {
233-
val pendingTaskSetToAddTo = if (speculative) pendingSpeculatableTasks else pendingTasks
218+
speculatable: Boolean = false): Unit = {
219+
val pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasks
234220
for (loc <- tasks(index).preferredLocations) {
235221
loc match {
236222
case e: ExecutorCacheTaskLocation =>
@@ -281,7 +267,7 @@ private[spark] class TaskSetManager(
281267
indexOffset -= 1
282268
val index = list(indexOffset)
283269
if (!isTaskBlacklistedOnExecOrNode(index, execId, host) &&
284-
!(speculative && hasAttemptOnHost(index, host))) {
270+
!(speculative && hasAttemptOnHost(index, host))) {
285271
// This should almost always be list.trimEnd(1) to remove tail
286272
list.remove(indexOffset)
287273
if (!successful(index)) {
@@ -314,8 +300,10 @@ private[spark] class TaskSetManager(
314300
*
315301
* @return An option containing (task index within the task set, locality, is speculative?)
316302
*/
317-
private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
318-
: Option[(Int, TaskLocality.Value, Boolean)] = {
303+
private def dequeueTask(
304+
execId: String,
305+
host: String,
306+
maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = {
319307
// Tries to schedule a regular task first; if it returns None, then schedules
320308
// a speculative task
321309
dequeueTaskHelper(execId, host, maxLocality, false).orElse(
@@ -980,10 +968,11 @@ private[spark] class TaskSetManager(
980968
val index = info.index
981969
if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
982970
!speculatableTasks.contains(index)) {
983-
addPendingTask(index, speculative = true)
971+
addPendingTask(index, speculatable = true)
984972
logInfo(
985-
"Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
986-
.format(index, taskSet.id, info.host, threshold))
973+
("Marking task %d in stage %s (on %s) as speculatable because it ran more" +
974+
" than %.0f ms(%d speculatable tasks in this taskset now)")
975+
.format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1))
987976
speculatableTasks += index
988977
sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
989978
foundTasks = true
@@ -1054,6 +1043,19 @@ private[spark] object TaskSetManager {
10541043
val TASK_SIZE_TO_WARN_KIB = 1000
10551044
}
10561045

1046+
// Set of pending tasks for various levels of locality: executor, host, rack,
1047+
// noPrefs and anyPrefs. These collections are actually
1048+
// treated as stacks, in which new tasks are added to the end of the
1049+
// ArrayBuffer and removed from the end. This makes it faster to detect
1050+
// tasks that repeatedly fail because whenever a task failed, it is put
1051+
// back at the head of the stack. These collections may contain duplicates
1052+
// for two reasons:
1053+
// (1): Tasks are only removed lazily; when a task is launched, it remains
1054+
// in all the pending lists except the one that it was launched from.
1055+
// (2): Tasks may be re-added to these lists multiple times as a result
1056+
// of failures.
1057+
// Duplicates are handled in dequeueTaskFromList, which ensures that a
1058+
// task hasn't already started running before launching it.
10571059
private[scheduler] class PendingTasksByLocality {
10581060

10591061
// Set of pending tasks for each executor.

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 70 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
740740
// Mark the task as available for speculation, and then offer another resource,
741741
// which should be used to launch a speculative copy of the task.
742742
manager.speculatableTasks += singleTask.partitionId
743-
manager.addPendingTask(singleTask.partitionId, speculative = true)
743+
manager.addPendingTask(singleTask.partitionId, speculatable = true)
744744
val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get
745745

746746
assert(manager.runningTasks === 2)
@@ -886,7 +886,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
886886
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1)
887887

888888
manager.speculatableTasks += 1
889-
manager.addPendingTask(1, speculative = true)
889+
manager.addPendingTask(1, speculatable = true)
890890
clock.advance(LOCALITY_WAIT_MS)
891891
// schedule the nonPref task
892892
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2)
@@ -1671,16 +1671,13 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
16711671
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
16721672
task.metrics.internalAccums
16731673
}
1674-
// Offer resources for 4 tasks to start
1675-
for ((k, v) <- List(
1676-
"exec1" -> "host1",
1677-
"exec1" -> "host1",
1678-
"exec2" -> "host2",
1679-
"exec2" -> "host2")) {
1680-
val taskOption = manager.resourceOffer(k, v, NO_PREF)
1681-
assert(taskOption.isDefined)
1682-
val task = taskOption.get
1683-
assert(task.executorId === k)
1674+
// Offer resources for 4 tasks to start, 2 on each exec
1675+
Seq("exec1" -> "host1", "exec2" -> "host2").foreach { case (exec, host) =>
1676+
(0 until 2).foreach { _ =>
1677+
val taskOption = manager.resourceOffer(exec, host, NO_PREF)
1678+
assert(taskOption.isDefined)
1679+
assert(taskOption.get.executorId === exec)
1680+
}
16841681
}
16851682
assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
16861683
clock.advance(1)
@@ -1698,21 +1695,23 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
16981695
assert(manager.copiesRunning(2) === 1)
16991696
assert(manager.copiesRunning(3) === 1)
17001697

1701-
// Offer resource to start the speculative attempt for the running task
1702-
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
1703-
val taskOption6 = manager.resourceOffer("exec1", "host1", NO_PREF)
1704-
assert(taskOption5.isDefined)
1705-
val task5 = taskOption5.get
1706-
assert(task5.index === 3)
1707-
assert(task5.taskId === 4)
1708-
assert(task5.executorId === "exec1")
1709-
assert(task5.attemptNumber === 1)
1710-
assert(taskOption6.isDefined)
1711-
val task6 = taskOption6.get
1712-
assert(task6.index === 2)
1713-
assert(task6.taskId === 5)
1714-
assert(task6.executorId === "exec1")
1715-
assert(task6.attemptNumber === 1)
1698+
// Offer resource to start the speculative attempt for the running task. We offer more
1699+
// resources, and ensure that speculative tasks get scheduled appropriately -- only one extra
1700+
// copy per speculatable task
1701+
val taskOption2 = manager.resourceOffer("exec1", "host1", NO_PREF)
1702+
val taskOption3 = manager.resourceOffer("exec1", "host1", NO_PREF)
1703+
assert(taskOption2.isDefined)
1704+
val task2 = taskOption2.get
1705+
assert(task2.index === 3)
1706+
assert(task2.taskId === 4)
1707+
assert(task2.executorId === "exec1")
1708+
assert(task2.attemptNumber === 1)
1709+
assert(taskOption3.isDefined)
1710+
val task3 = taskOption3.get
1711+
assert(task3.index === 2)
1712+
assert(task3.taskId === 5)
1713+
assert(task3.executorId === "exec1")
1714+
assert(task3.attemptNumber === 1)
17161715
clock.advance(1)
17171716
// Running checkSpeculatableTasks again should return false
17181717
assert(!manager.checkSpeculatableTasks(0))
@@ -1723,4 +1722,48 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
17231722
assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty)
17241723
assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty)
17251724
}
1725+
1726+
test("SPARK-26755 Ensure that a speculative task obeys the original locality preferences") {
1727+
sc = new SparkContext("local", "test")
1728+
sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
1729+
("exec2", "host2"), ("exec3", "host3"), ("exec4", "host4"))
1730+
// Create 3 tasks with locality preferences
1731+
val taskSet = FakeTask.createTaskSet(3,
1732+
Seq(TaskLocation("host1"), TaskLocation("host3")),
1733+
Seq(TaskLocation("host2")),
1734+
Seq(TaskLocation("host3")))
1735+
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
1736+
sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
1737+
sc.conf.set(config.SPECULATION_ENABLED, true)
1738+
sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
1739+
val clock = new ManualClock()
1740+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
1741+
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
1742+
task.metrics.internalAccums
1743+
}
1744+
// Offer resources for 3 tasks to start
1745+
Seq("exec1" -> "host1", "exec2" -> "host2", "exec3" -> "host3").foreach { case (exec, host) =>
1746+
val taskOption = manager.resourceOffer(exec, host, NO_PREF)
1747+
assert(taskOption.isDefined)
1748+
assert(taskOption.get.executorId === exec)
1749+
}
1750+
assert(sched.startedTasks.toSet === Set(0, 1, 2))
1751+
clock.advance(1)
1752+
// Finish one task and mark the others as speculatable
1753+
manager.handleSuccessfulTask(2, createTaskResult(2, accumUpdatesByTask(2)))
1754+
assert(sched.endedTasks(2) === Success)
1755+
clock.advance(1)
1756+
assert(manager.checkSpeculatableTasks(0))
1757+
assert(sched.speculativeTasks.toSet === Set(0, 1))
1758+
// Ensure that the speculatable tasks obey the original locality preferences
1759+
assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty)
1760+
assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty)
1761+
assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined)
1762+
assert(manager.resourceOffer("exec4", "host4", ANY).isDefined)
1763+
// Since, all speculatable tasks have been launched, making another offer
1764+
// should not schedule any more tasks
1765+
assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty)
1766+
assert(!manager.checkSpeculatableTasks(0))
1767+
assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty)
1768+
}
17261769
}

0 commit comments

Comments
 (0)