Skip to content

Commit be652ed

Browse files
committed
avoid unnecessary delay when we only have nopref tasks
1 parent dee9e22 commit be652ed

File tree

5 files changed

+168
-116
lines changed

5 files changed

+168
-116
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.annotation.DeveloperApi
2222
@DeveloperApi
2323
object TaskLocality extends Enumeration {
2424
// Process local is expected to be used ONLY within TaskSetManager for now.
25-
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
25+
val PROCESS_LOCAL, NODE_LOCAL, NOPREF, RACK_LOCAL, ANY = Value
2626

2727
type TaskLocality = Value
2828

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,11 @@ private[spark] class TaskSchedulerImpl(
8989

9090
// The set of executors we have on each host; this is used to compute hostsAlive, which
9191
// in turn is used to decide when we can attain data locality on a given host
92-
private val executorsByHost = new HashMap[String, HashSet[String]]
92+
protected val executorsByHost = new HashMap[String, HashSet[String]]
9393

9494
protected val hostsByRack = new HashMap[String, HashSet[String]]
9595

96-
private val executorIdToHost = new HashMap[String, String]
96+
protected val executorIdToHost = new HashMap[String, String]
9797

9898
// Listener object to pass upcalls into
9999
var dagScheduler: DAGScheduler = null
@@ -257,15 +257,16 @@ private[spark] class TaskSchedulerImpl(
257257
val execId = shuffledOffers(i).executorId
258258
val host = shuffledOffers(i).host
259259
if (availableCpus(i) >= CPUS_PER_TASK) {
260-
for (task <- taskSet.resourceOffer(execId, host, preferredLocality)) {
260+
for (task <- taskSet.resourceOffer(execId, host, preferredLocality,
261+
TaskLocality.PROCESS_LOCAL, true)) {
261262
tasks(i) += task
262263
val tid = task.taskId
263264
taskIdToTaskSetId(tid) = taskSet.taskSet.id
264265
taskIdToExecutorId(tid) = execId
265266
activeExecutorIds += execId
266267
executorsByHost(host) += execId
267268
availableCpus(i) -= CPUS_PER_TASK
268-
assert (availableCpus(i) >= 0)
269+
assert(availableCpus(i) >= 0)
269270
launchedTask = true
270271
}
271272
}

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 62 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -336,30 +336,49 @@ private[spark] class TaskSetManager(
336336
* Dequeue a pending task for a given node and return its index and locality level.
337337
* Only search for tasks matching the given locality constraint.
338338
*
339+
* NOTE: minLocality is for avoiding duplicate traverse of the list (especially when we
340+
* pass NOPREF as maxLocality after the others
341+
*
339342
* @return An option containing (task index within the task set, locality, is speculative?)
340343
*/
341-
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
342-
: Option[(Int, TaskLocality.Value, Boolean)] =
344+
private def findTask(execId: String, host: String, maxLocality: TaskLocality.Value,
345+
minLocality: TaskLocality.Value)
346+
: Option[(Int, TaskLocality.Value, Boolean)] =
343347
{
344-
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
345-
return Some((index, TaskLocality.PROCESS_LOCAL, false))
348+
def withinAllowedLocality(locality: TaskLocality.TaskLocality): Boolean = {
349+
TaskLocality.isAllowed(maxLocality, locality) && {
350+
if (maxLocality != minLocality) {
351+
minLocality < locality
352+
} else {
353+
true
354+
}
355+
}
356+
}
357+
358+
if (withinAllowedLocality(TaskLocality.PROCESS_LOCAL)) {
359+
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
360+
return Some((index, TaskLocality.PROCESS_LOCAL, false))
361+
}
346362
}
347363

348-
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
364+
if (withinAllowedLocality(TaskLocality.NODE_LOCAL)) {
349365
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
350366
return Some((index, TaskLocality.NODE_LOCAL, false))
351367
}
368+
}
369+
370+
if (withinAllowedLocality(TaskLocality.NOPREF)) {
352371
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
353372
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
354373
return Some((index, TaskLocality.PROCESS_LOCAL, false))
355374
}
356375
// find a speculative task if all noPref tasks have been scheduled
357-
val specTask = findSpeculativeTask(execId, host, locality).map {
376+
val specTask = findSpeculativeTask(execId, host, maxLocality).map {
358377
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
359378
if (specTask != None) return specTask
360379
}
361380

362-
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
381+
if (withinAllowedLocality(TaskLocality.RACK_LOCAL)) {
363382
for {
364383
rack <- sched.getRackForHost(host)
365384
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
@@ -368,31 +387,53 @@ private[spark] class TaskSetManager(
368387
}
369388
}
370389

371-
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
390+
if (withinAllowedLocality(TaskLocality.ANY)) {
372391
for (index <- findTaskFromList(execId, allPendingTasks)) {
373392
return Some((index, TaskLocality.ANY, false))
374393
}
375394
}
395+
376396
None
377397
}
378398

379399
/**
380400
* Respond to an offer of a single executor from the scheduler by finding a task
401+
* @param execId the executor Id of the offered resource
402+
* @param host the host Id of the offered resource
403+
* @param preferredLocality the maximum locality we want to schedule the tasks at
404+
* @param bottomLocality the minimum locality we want to schedule the tasks at, this
405+
* parameter is mainly used to avoid some duplicate traversing of
406+
* the task lists, after we have determined that we have no candidate
407+
* tasks on certain levels
408+
* @param allowAdjustPrefLocality this parameter is mainly for scheduling noPref tasks, where
409+
* we do not want to apply delay scheduling on this kind of tasks
381410
*/
382411
def resourceOffer(
383412
execId: String,
384413
host: String,
385-
preferredLocality: TaskLocality.TaskLocality)
414+
preferredLocality: TaskLocality.TaskLocality,
415+
bottomLocality: TaskLocality.TaskLocality,
416+
allowAdjustPrefLocality: Boolean = true)
386417
: Option[TaskDescription] =
387418
{
388419
if (!isZombie) {
389420
val curTime = clock.getTime()
390421

391422
var allowedLocality = getAllowedLocalityLevel(curTime)
423+
392424
if (allowedLocality > preferredLocality) {
393-
allowedLocality = preferredLocality // We're not allowed to search for farther-away tasks
425+
// We're not allowed to search for farther-away tasks
426+
allowedLocality = preferredLocality
394427
}
395-
findTask(execId, host, allowedLocality) match {
428+
429+
val foundTask = {
430+
if (allowAdjustPrefLocality) {
431+
findTask(execId, host, allowedLocality, bottomLocality)
432+
} else {
433+
findTask(execId, host, preferredLocality, bottomLocality)
434+
}
435+
}
436+
foundTask match {
396437
case Some((index, taskLocality, speculative)) => {
397438
// Found a task; do some bookkeeping and return a task description
398439
val task = tasks(index)
@@ -433,6 +474,10 @@ private[spark] class TaskSetManager(
433474
return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
434475
}
435476
case _ =>
477+
if (preferredLocality != TaskLocality.NOPREF) {
478+
return resourceOffer(execId, host, TaskLocality.NOPREF, preferredLocality,
479+
allowAdjustPrefLocality = false)
480+
}
436481
}
437482
}
438483
None
@@ -634,8 +679,7 @@ private[spark] class TaskSetManager(
634679
override def executorLost(execId: String, host: String) {
635680
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
636681

637-
// Re-enqueue pending tasks for this host based on the status of the cluster -- for example, a
638-
// task that used to have locations on only this host might now go to the no-prefs list. Note
682+
// Re-enqueue pending tasks for this host based on the status of the cluster. Note
639683
// that it's okay if we add a task to the same queue twice (if it had multiple preferred
640684
// locations), because findTaskFromList will skip already-running tasks.
641685
for (index <- getPendingTasksForExecutor(execId)) {
@@ -666,6 +710,9 @@ private[spark] class TaskSetManager(
666710
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
667711
handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure)
668712
}
713+
// recalculate valid locality levels and waits when executor is lost
714+
myLocalityLevels = computeValidLocalityLevels()
715+
localityWaits = myLocalityLevels.map(getLocalityWait)
669716
}
670717

671718
/**
@@ -725,6 +772,8 @@ private[spark] class TaskSetManager(
725772
/**
726773
* Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
727774
* added to queues using addPendingTask.
775+
*
776+
* NOTE: don't need to handle NOPREF here, because NOPREF is scheduled as PROCESS_LOCAL
728777
*/
729778
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
730779
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
@@ -747,16 +796,6 @@ private[spark] class TaskSetManager(
747796
}
748797

749798
def executorAdded() {
750-
def newLocAvail(index: Int): Boolean = {
751-
for (loc <- tasks(index).preferredLocations) {
752-
if (sched.hasExecutorsAliveOnHost(loc.host) ||
753-
(sched.getRackForHost(loc.host).isDefined &&
754-
sched.hasHostAliveOnRack(sched.getRackForHost(loc.host).get))) {
755-
return true
756-
}
757-
}
758-
false
759-
}
760799
myLocalityLevels = computeValidLocalityLevels()
761800
localityWaits = myLocalityLevels.map(getLocalityWait)
762801
}

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ class FakeTaskSetManager(
8080
override def resourceOffer(
8181
execId: String,
8282
host: String,
83-
preferredLocality: TaskLocality.TaskLocality)
83+
preferredLocality: TaskLocality.TaskLocality,
84+
bottomLocality: TaskLocality.TaskLocality,
85+
allowedAdjustPrefLocality: Boolean)
8486
: Option[TaskDescription] =
8587
{
8688
if (tasksSuccessful + numRunningTasks < numTasks) {
@@ -124,7 +126,8 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
124126
manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
125127
}
126128
for (taskSet <- taskSetQueue) {
127-
taskSet.resourceOffer("execId_1", "hostname_1", TaskLocality.ANY) match {
129+
taskSet.resourceOffer("execId_1", "hostname_1", TaskLocality.ANY,
130+
TaskLocality.PROCESS_LOCAL) match {
128131
case Some(task) =>
129132
return taskSet.stageId
130133
case None => {}

0 commit comments

Comments
 (0)