Skip to content

Commit 924b708

Browse files
Rui Limateiz
authored andcommitted
SPARK-1937: fix issue with task locality
Don't check executor/host availability when creating a TaskSetManager. Because the executors may haven't been registered when the TaskSetManager is created, in which case all tasks will be considered "has no preferred locations", and thus losing data locality in later scheduling. Author: Rui Li <[email protected]> Author: lirui-intel <[email protected]> Closes apache#892 from lirui-intel/delaySchedule and squashes the following commits: 8444d7c [Rui Li] fix code style fafd57f [Rui Li] keep locality constraints within the valid levels 18f9e05 [Rui Li] restrict allowed locality 5b3fb2f [Rui Li] refine UT 99f843e [Rui Li] add unit test and fix bug fff4123 [Rui Li] fix computing valid locality levels 685ed3d [Rui Li] remove delay shedule for pendingTasksWithNoPrefs 7b0177a [Rui Li] remove redundant code c7b93b5 [Rui Li] revise patch 3d7da02 [lirui-intel] Update TaskSchedulerImpl.scala cab4c71 [Rui Li] revised patch 539a578 [Rui Li] fix code style cf0d6ac [Rui Li] fix code style 3dfae86 [Rui Li] re-compute pending tasks when new host is added a225ac2 [Rui Li] SPARK-1937: fix issue with task locality
1 parent 420c1c3 commit 924b708

File tree

3 files changed

+71
-14
lines changed

3 files changed

+71
-14
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,14 @@ private[spark] class TaskSchedulerImpl(
210210
SparkEnv.set(sc.env)
211211

212212
// Mark each slave as alive and remember its hostname
213+
// Also track if new executor is added
214+
var newExecAvail = false
213215
for (o <- offers) {
214216
executorIdToHost(o.executorId) = o.host
215217
if (!executorsByHost.contains(o.host)) {
216218
executorsByHost(o.host) = new HashSet[String]()
217219
executorAdded(o.executorId, o.host)
220+
newExecAvail = true
218221
}
219222
}
220223

@@ -227,12 +230,15 @@ private[spark] class TaskSchedulerImpl(
227230
for (taskSet <- sortedTaskSets) {
228231
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
229232
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
233+
if (newExecAvail) {
234+
taskSet.executorAdded()
235+
}
230236
}
231237

232238
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
233239
// of locality levels so that it gets a chance to launch local tasks on all of them.
234240
var launchedTask = false
235-
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
241+
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
236242
do {
237243
launchedTask = false
238244
for (i <- 0 until shuffledOffers.size) {

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ private[spark] class TaskSetManager(
118118
private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
119119

120120
// Set containing pending tasks with no locality preferences.
121-
val pendingTasksWithNoPrefs = new ArrayBuffer[Int]
121+
var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
122122

123123
// Set containing all pending tasks (also used as a stack, as above).
124124
val allPendingTasks = new ArrayBuffer[Int]
@@ -153,8 +153,8 @@ private[spark] class TaskSetManager(
153153
}
154154

155155
// Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
156-
val myLocalityLevels = computeValidLocalityLevels()
157-
val localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
156+
var myLocalityLevels = computeValidLocalityLevels()
157+
var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
158158

159159
// Delay scheduling variables: we keep track of our current locality level and the time we
160160
// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
@@ -181,16 +181,14 @@ private[spark] class TaskSetManager(
181181
var hadAliveLocations = false
182182
for (loc <- tasks(index).preferredLocations) {
183183
for (execId <- loc.executorId) {
184-
if (sched.isExecutorAlive(execId)) {
185-
addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
186-
hadAliveLocations = true
187-
}
184+
addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
188185
}
189186
if (sched.hasExecutorsAliveOnHost(loc.host)) {
190-
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
191-
for (rack <- sched.getRackForHost(loc.host)) {
192-
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
193-
}
187+
hadAliveLocations = true
188+
}
189+
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
190+
for (rack <- sched.getRackForHost(loc.host)) {
191+
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
194192
hadAliveLocations = true
195193
}
196194
}
@@ -725,10 +723,12 @@ private[spark] class TaskSetManager(
725723
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
726724
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
727725
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
728-
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0) {
726+
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
727+
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
729728
levels += PROCESS_LOCAL
730729
}
731-
if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0) {
730+
if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
731+
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
732732
levels += NODE_LOCAL
733733
}
734734
if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) {
@@ -738,4 +738,21 @@ private[spark] class TaskSetManager(
738738
logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
739739
levels.toArray
740740
}
741+
742+
// Re-compute pendingTasksWithNoPrefs since new preferred locations may become available
743+
def executorAdded() {
744+
def newLocAvail(index: Int): Boolean = {
745+
for (loc <- tasks(index).preferredLocations) {
746+
if (sched.hasExecutorsAliveOnHost(loc.host) ||
747+
sched.getRackForHost(loc.host).isDefined) {
748+
return true
749+
}
750+
}
751+
false
752+
}
753+
logInfo("Re-computing pending task lists.")
754+
pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.filter(!newLocAvail(_))
755+
myLocalityLevels = computeValidLocalityLevels()
756+
localityWaits = myLocalityLevels.map(getLocalityWait)
757+
}
741758
}

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
7777
override def isExecutorAlive(execId: String): Boolean = executors.contains(execId)
7878

7979
override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
80+
81+
def addExecutor(execId: String, host: String) {
82+
executors.put(execId, host)
83+
}
8084
}
8185

8286
class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
@@ -400,6 +404,36 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
400404
assert(sched.taskSetsFailed.contains(taskSet.id))
401405
}
402406

407+
test("new executors get added") {
408+
sc = new SparkContext("local", "test")
409+
val sched = new FakeTaskScheduler(sc)
410+
val taskSet = FakeTask.createTaskSet(4,
411+
Seq(TaskLocation("host1", "execA")),
412+
Seq(TaskLocation("host1", "execB")),
413+
Seq(TaskLocation("host2", "execC")),
414+
Seq())
415+
val clock = new FakeClock
416+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
417+
// All tasks added to no-pref list since no preferred location is available
418+
assert(manager.pendingTasksWithNoPrefs.size === 4)
419+
// Only ANY is valid
420+
assert(manager.myLocalityLevels.sameElements(Array(ANY)))
421+
// Add a new executor
422+
sched.addExecutor("execD", "host1")
423+
manager.executorAdded()
424+
// Task 0 and 1 should be removed from no-pref list
425+
assert(manager.pendingTasksWithNoPrefs.size === 2)
426+
// Valid locality should contain NODE_LOCAL and ANY
427+
assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, ANY)))
428+
// Add another executor
429+
sched.addExecutor("execC", "host2")
430+
manager.executorAdded()
431+
// No-pref list now only contains task 3
432+
assert(manager.pendingTasksWithNoPrefs.size === 1)
433+
// Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY
434+
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
435+
}
436+
403437
def createTaskResult(id: Int): DirectTaskResult[Int] = {
404438
val valueSer = SparkEnv.get.serializer.newInstance()
405439
new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)

0 commit comments

Comments
 (0)