Skip to content

Commit 3dfae86

Browse files
author
Rui Li
committed
re-compute pending tasks when new host is added
1 parent a225ac2 commit 3dfae86

File tree

2 files changed

+22
-8
lines changed

2 files changed

+22
-8
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ private[spark] class TaskSchedulerImpl(
111111
// This is a var so that we can reset it for testing purposes.
112112
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
113113

114+
private val delaySchedule = conf.getBoolean("spark.schedule.delaySchedule", true)
115+
114116
override def setDAGScheduler(dagScheduler: DAGScheduler) {
115117
this.dagScheduler = dagScheduler
116118
}
@@ -210,11 +212,14 @@ private[spark] class TaskSchedulerImpl(
210212
SparkEnv.set(sc.env)
211213

212214
// Mark each slave as alive and remember its hostname
215+
//also track if new executor is added
216+
var newExecAvail = false
213217
for (o <- offers) {
214218
executorIdToHost(o.executorId) = o.host
215219
if (!executorsByHost.contains(o.host)) {
216220
executorsByHost(o.host) = new HashSet[String]()
217221
executorAdded(o.executorId, o.host)
222+
newExecAvail = true
218223
}
219224
}
220225

@@ -233,6 +238,9 @@ private[spark] class TaskSchedulerImpl(
233238
// of locality levels so that it gets a chance to launch local tasks on all of them.
234239
var launchedTask = false
235240
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
241+
if (delaySchedule && newExecAvail) {
242+
taskSet.reAddPendingTasks()
243+
}
236244
do {
237245
launchedTask = false
238246
for (i <- 0 until shuffledOffers.size) {

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,7 @@ private[spark] class TaskSetManager(
150150
// of task index so that tasks with low indices get launched first.
151151
val delaySchedule = conf.getBoolean("spark.schedule.delaySchedule", true)
152152
for (i <- (0 until numTasks).reverse) {
153-
//if delay schedule is set, we shouldn't enforce check since executors may haven't registered yet
154-
addPendingTask(i, enforceCheck = !delaySchedule)
153+
addPendingTask(i)
155154
}
156155

157156
// Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
@@ -171,10 +170,8 @@ private[spark] class TaskSetManager(
171170
/**
172171
* Add a task to all the pending-task lists that it should be on. If readding is set, we are
173172
* re-adding the task so only include it in each list if it's not already there.
174-
* If enforceCheck is set, we'll check the availability of executors/hosts before adding a task
175-
* to the pending list, otherwise, we simply add the task according to its preference.
176173
*/
177-
private def addPendingTask(index: Int, readding: Boolean = false, enforceCheck: Boolean = true) {
174+
private def addPendingTask(index: Int, readding: Boolean = false) {
178175
// Utility method that adds `index` to a list only if readding=false or it's not already there
179176
def addTo(list: ArrayBuffer[Int]) {
180177
if (!readding || !list.contains(index)) {
@@ -185,12 +182,12 @@ private[spark] class TaskSetManager(
185182
var hadAliveLocations = false
186183
for (loc <- tasks(index).preferredLocations) {
187184
for (execId <- loc.executorId) {
188-
if (!enforceCheck || sched.isExecutorAlive(execId)) {
185+
if (sched.isExecutorAlive(execId)) {
189186
addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
190187
hadAliveLocations = true
191188
}
192189
}
193-
if (!enforceCheck || sched.hasExecutorsAliveOnHost(loc.host)) {
190+
if (sched.hasExecutorsAliveOnHost(loc.host)) {
194191
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
195192
for (rack <- sched.getRackForHost(loc.host)) {
196193
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
@@ -199,7 +196,8 @@ private[spark] class TaskSetManager(
199196
}
200197
}
201198

202-
if (!hadAliveLocations) {
199+
if (tasks(index).preferredLocations.isEmpty ||
200+
(!delaySchedule && !hadAliveLocations)) {
203201
// Even though the task might've had preferred locations, all of those hosts or executors
204202
// are dead; put it in the no-prefs list so we can schedule it elsewhere right away.
205203
addTo(pendingTasksWithNoPrefs)
@@ -742,4 +740,12 @@ private[spark] class TaskSetManager(
742740
logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
743741
levels.toArray
744742
}
743+
744+
//Re-compute the pending lists. This should be called when new executor is added
745+
def reAddPendingTasks() {
746+
logInfo("Re-computing pending task lists.")
747+
for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0 && !successful(index))) {
748+
addPendingTask(i, readding = true)
749+
}
750+
}
745751
}

0 commit comments

Comments
 (0)