Skip to content

[SPARK-26755][SCHEDULER] : Optimize Spark Scheduler to dequeue speculative tasks… #23677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 17 commits into from

Conversation

pgandhi999
Copy link

@pgandhi999 pgandhi999 commented Jan 28, 2019

… more efficiently

This PR improves the performance of scheduling speculative tasks to be O(1) instead of O(numSpeculativeTasks), using the same approach used for scheduling regular tasks. The performance of this method is particularly important because a lock is held on the TaskSchedulerImpl which is a bottleneck for all scheduling operations. We ran a Join query on a large dataset with speculation enabled and out of 100000 tasks for the ShuffleMapStage, the maximum number of speculatable tasks that was noted was close to 7900-8000 at a point. That is when we start seeing the bottleneck on the scheduler lock.

In particular, this works by storing a separate stack of tasks by executor, node, and rack locality preferences. Then when trying to schedule a speculative task, rather than scanning all speculative tasks to find ones which match the given executor (or node, or rack) preference, we can jump to a quick check of tasks matching the resource offer. This technique was already used for regular tasks -- this change refactors the code to allow sharing with regular and speculative task execution.

What changes were proposed in this pull request?

Have split the main queue "speculatableTasks" into 5 separate queues based on locality preference similar to how normal tasks are enqueued. Thus, the "dequeueSpeculativeTask" method will avoid performing locality checks for each task at runtime and simply return the preferable task to be executed.

How was this patch tested?

We ran a spark job that performed a join on a 10 TB dataset to test the code change.
Original Code:
screen shot 2019-01-28 at 5 07 22 pm

Optimized Code:
screen shot 2019-01-28 at 5 08 19 pm

As you can see, the run time of the ShuffleMapStage came down from 40 min to 6 min approximately, thus, reducing the overall running time of the spark job by a significant amount.

Another example for the same job:

Original Code:
screen shot 2019-01-28 at 5 11 30 pm

Optimized Code:
screen shot 2019-01-28 at 5 12 16 pm

… more efficiently

Have split the main queue "speculatableTasks" into 5 separate queues based on locality preference similar to how normal tasks are enqueued.
@pgandhi999
Copy link
Author

ok to test

@pgandhi999
Copy link
Author

pgandhi999 commented Jan 28, 2019

@SparkQA
Copy link

SparkQA commented Jan 29, 2019

Test build #101775 has finished for PR 23677 at commit 2d87a62.

  • This patch fails from timeout after a configured wait of 400m.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {
if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold) {
addPendingSpeculativeTask(index)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this check we ensure 1 task is running (so it's been dequeued). While the 1 task is running, and we mark it for potential speculation, we are not protecting anymore against duplicates. So each of the queues could have duplicate tasks to speculate.

Could this lead to multiple speculative tasks for a single real task? And also, could it lead to a snowball effect where speculative tasks are created off of speculative tasks that are running? Seems like the previous code was protecting against that in !speculatableTasks.contains(index).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you were right, have updated the code to use HashSet instead of ArrayBuffer. Thank you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also add a test case that if you have multiple calls to checkSpeculatbleTasks followed by multiple resources offers, you still only submit one speculative task?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added the test case.

}

// No point scanning this whole list to find the old task there
allPendingSpeculatableTasks += index

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq does this need to hold all tasks or only with TaskLocality.ANY pref... seems like a waste to scan all the tasks within it for getting non local tasks, could be wrong in my understanding however thanks

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also in your tests would be interesting to note what is locality affinity... do you see more process local tasks vs node local etc., that might be related to scheduling logic, just curious to know about it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So afaik, task locality here means a preference for the individual task to run at a desired locality level(kind of like for flight scheduling, my preference would be to fly from airport A as it is the nearest to my house, but if the flight i want to book does not take off from location A, location B which is farther than A, will do). If the desired locality level cannot be assigned to the task by the scheduler due to locality constraints and other checks, it will be assigned the next best locality. That is why we need all tasks in the ANY locality preference queue. The method hasAttemptOnHost will take care of duplicate scheduling of tasks anyways.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will definitely test for locality affinity, but ideally, that logic should not have changed with this PR.

@squito
Copy link
Contributor

squito commented Jan 29, 2019

I haven't looked very closely yet, but general idea makes sense, and results are very compelling.

High level question though -- any reason TaskSetManager.canFetchMoreResults needs to lock the TaskSchedulerImpl? Couldn't we protect tsm.totalResultSize and tsm.calculatedTasks with something local to the tsm, which would be entirely separate from the lock needed for speculation?

I'm really just brainstorming at this point -- that change might actually be a lot harder to reason about.

@pgandhi999 pgandhi999 changed the title [SPARK-26755] : Optimize Spark Scheduler to dequeue speculative tasks… [SPARK-26755][Scheduler] : Optimize Spark Scheduler to dequeue speculative tasks… Jan 29, 2019
@pgandhi999 pgandhi999 changed the title [SPARK-26755][Scheduler] : Optimize Spark Scheduler to dequeue speculative tasks… [SPARK-26755][SCHEDULER] : Optimize Spark Scheduler to dequeue speculative tasks… Jan 29, 2019
@SparkQA
Copy link

SparkQA commented Jan 29, 2019

Test build #101833 has finished for PR 23677 at commit 65d0926.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@pgandhi999
Copy link
Author

@squito That is a good point to look into. The lock at the task scheduler level was introduced to TaskSetManager.canFetchMoreResults in c306555 with the corresponding JIRA https://issues.apache.org/jira/browse/SPARK-5219. The JIRA is not descriptive enough so I do not know the exact reason as to why was the lock added.

@SparkQA
Copy link

SparkQA commented Mar 4, 2019

Test build #102999 has finished for PR 23677 at commit 65d0926.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@pgandhi999
Copy link
Author

Hi @squito , any chance we could take a look at this one. We have experimented quite a lot with this code change and we have seen quite positive results especially in terms of running time improvements. Thank you.

@pgandhi999
Copy link
Author

@tgravescs @vanzin @squito We have been running Spark on our clusters with this code patch and have observed runtime improvements for large jobs by a factor of 20-25 mins. Would appreciate your reviews. Thank you.

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106899 has finished for PR 23677 at commit e525b1c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@pgandhi999
Copy link
Author

test this please

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106902 has finished for PR 23677 at commit e525b1c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry its taken me so long to get back to this @pgandhi999 . Makes a lot of sense to me, just wondering if we can clean things up some, improve tests etc.

pendingSpeculatableTasksWithNoPrefs += index
}

// No point scanning this whole list to find the old task there
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't figure out what this comment meant, till I realized you were copying it from addPendingTask, though I coudln't figure out what it meant there either, till I looked in history and realized it was long obsolete -- added here, then removed here (but the comment was left). Can you delete both comments? (unless you can see a way it still has some relevance ...)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private def dequeueSpeculativeTaskFromList(
execId: String,
host: String,
list: HashSet[Int]): Option[Int] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double-indent method params

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {
if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold) {
addPendingSpeculativeTask(index)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also add a test case that if you have multiple calls to checkSpeculatbleTasks followed by multiple resources offers, you still only submit one speculative task?

private[scheduler] var pendingSpeculatableTasksForRack = new HashMap[String, HashSet[Int]]

// Set of all pending tasks that can be speculated.
private[scheduler] val allPendingSpeculatableTasks = new HashSet[Int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall, this makes a lot of sense. I see how you're replacing a bunch of O(numSpeculatableTasks) operations with things specific to tasks targeted at each executor, etc., just like the other scheduler methods.

I don't really like how there is so much code duplication between the speculative and non-speculative versions, for what seem like minor differences. I was thinking you could create a private case class like

case class PendingsTasksByLocality(forExecutor: HashMap[String, ArrayBuffer[Int]], forHost: HashMap[String, ArrayBuffer[Int]], ...)

with one instance for all pending tasks, one for pending speculative tasks. Then a lot of the functions could be combined (eg. dequeueSpeculativeTaskFromList & dequeueTaskFromList) just with a speculative: Boolean parameter added. Your new datastructures are HashMap[String, HashSet] instead of HashMap[String, ArrayBuffer], but I think you can go back to ArrayBuffer if you also keep val speculatableTasks = new HashSet[Int] for avoiding adding duplicate entries.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense @squito, I have refactored the code. Please let me know if it looks different from what you had thought earlier. Thank you.

Adding Unit test, refactoring code to remove duplicate methods, refactoring comments etc.
@SparkQA
Copy link

SparkQA commented Jul 1, 2019

Test build #107083 has finished for PR 23677 at commit 41ddf23.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class PendingTasksByLocality(

forHost = new HashMap[String, ArrayBuffer[Int]],
noPrefs = new ArrayBuffer[Int],
forRack = new HashMap[String, ArrayBuffer[Int]],
anyPrefs = new ArrayBuffer[Int])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can clean this up a bit by having all the initialization be inside the constructor (or apply method, doesn't matter)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

case _ =>
resolveRacks: Boolean = true,
speculative: Boolean = false): Unit = {
if (speculative) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could combine the code here even more

    val pendingTaskSetToAddTo = if (speculative) pendingTasks else pendingSpeculatableTasks
    // ... mostly the original code from `addPendingTask` here, just adding into pendingTaskSetToAddTo

the only differences seem to be speculative tasks didn't look at HDFSCacheTaskLocation, which was a bug

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

list -= index
if (!successful(index)) {
list.remove(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, can these be combined more? There is a small logical difference here, but seems that can be pushed down. Otherwise the differences are confusing (and even sometimes wrong -- eg. for speculative tasks, you're scanning from the front of the list, so the comment about removing tail is incorrect).

    var indexOffset = list.size
    while (indexOffset > 0) {
      indexOffset -= 1
      val index = list(indexOffset)
      if (!isTaskBlacklistedOnExecOrNode(index, execId, host) && 
          !(speculative && hasAttemptOnHost(index, host))) {
        // This should almost always be list.trimEnd(1) to remove tail
        list.remove(indexOffset)
        if ((copiesRunning(index) == 0 || speculative) && !successful(index)) {
          return Some(index)
        }
      }
    }
    None

(I'm also wondering if we need copiesRunning <= 1 even with speculation, since you've got multiple lists now, will need to step through that carefully and make sure there is a test)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold) {
addPendingSpeculativeTask(index)
if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent the continuation of the if condition more

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

/**
* Return the pending tasks list for a given executor ID, or an empty list if
* there is no map entry for that host
*/
private def getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int] = {
pendingTasksForExecutor.getOrElse(executorId, ArrayBuffer())
pendingTasks.forExecutor.getOrElse(executorId, ArrayBuffer())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we even need these private getPendingTasksForXXX helpers anymore (especially with some of my other suggested refactoring). If you want to keep them, they should go on class PendingTasksByLocality

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed the methods.

execId, host, pendingSpeculatableTasksForExecutor.getOrElse(execId, HashSet()))) {
for (index <- dequeueTaskFromList(
execId, host, pendingSpeculatableTasks.forExecutor.getOrElse(execId, ArrayBuffer()),
speculative = true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once more, I'd just get rid of dequeueSpeculativeTask completely

  private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
      : Option[(Int, TaskLocality.Value, Boolean)] = {
    dequeueTaskHelper(execId, host, maxLocality, false)
      // if we didn't schedule a regular task, try to schedule a speculative one
      .orElse(dequeueTaskHelper(execId, host, maxLocality, true))
  }

  private def dequeueTaskHelper(
      execId: String,
      host: String,
      maxLocality: TaskLocality.Value,
      speculative: Boolean): Option[(Int, TaskLocality.Value, Boolean)] = {
    if (speculative && speculatableTasks.isEmpty) {
      return None
    }
    val pendingTaskSetToUse = if (speculative) pendingSpeculatableTasks else pendingTasks
    def dequeue(list: ArrayBuffer[Int]): Option[Int] = {
      dequeueTaskFromList(execId, host, list, speculative)
    }

    dequeue(pendingTaskSetToUse.forExecutor.getOrElse(execId, ArrayBuffer())).foreach { index =>
      return Some((index, TaskLocality.PROCESS_LOCAL, speculative))
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
      dequeue(pendingTaskSetToUse.forHost.getOrElse(host, ArrayBuffer())).foreach { index =>
        return Some((index, TaskLocality.NODE_LOCAL, speculative))
      }
    }

    // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
      dequeue(pendingTaskSetToUse.noPrefs).foreach { index =>
        return Some((index, TaskLocality.NO_PREF, speculative))
      }
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- dequeue(pendingTaskSetToUse.forRack.getOrElse(rack, ArrayBuffer()))
      } {
        return Some((index, TaskLocality.RACK_LOCAL, speculative))
      }
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
      logInfo(s"about to try to dequeue from ${pendingTaskSetToUse.anyPrefs}")
      dequeue(pendingTaskSetToUse.anyPrefs).foreach { index =>
        return Some((index, TaskLocality.ANY, speculative))
      }
    }
    None
  }

as this suggestion is a little long I put it here: https://github.com/squito/spark/tree/speculation_cleanup

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thank you for your valuable help in coming up with the code.

// allPendingSpeculativeTasks will still have two pending tasks but
// pendingSpeculatableTasksWithNoPrefs should have none
assert(manager.pendingSpeculatableTasks.anyPrefs.size === 2)
assert(manager.pendingSpeculatableTasks.noPrefs.size === 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these structures are actually an implementation detail, in particular since they leave some "stale" state and are meant to lazily correct themselves. So I wouldn't test their size at all.

However, here you should test what happens if you have another resource offer, with ANY locality, and make sure you don't schedule two tasks on them. Furthermore, you should probably originally give your tasks some locality preferences, and then make offers with some meeting the locality preferences, and some not meeting the locality constraints. Since you'll have the speculatable task listed in multiple places inside pendingSpeculatableTasks, we want to make sure we don't run multiple copies.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito Your comment does make sense and I can implement it in the test. The only part I am stuck at is trying to figure out a way to test that we are not resubmitting speculative tasks without checking in the HashMap or the size. Would appreciate your guidance in this matter. Thank you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make more resource offers, and see if they result in anything getting scheduled. Be sure it includes an offer on a different host, to avoid the canRunOnHost filter. Eg. like this (lots of extra logs in there):

squito@6cdd8ee

my added test fails because that last offer, on host3, results in another speculative task. I think we need a filter w/ speculative execution that there is <= 2 tasks running.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito I have modified the test. Thank you for pointing me out in the right direction. It should more or less look good now, please let me know if it still needs modification. Thank you.

pgandhi added 2 commits July 5, 2019 11:21
…RK-26755

[SPARK-26755] : Upmerging with master branch
Restructuring code and eliminating duplicate code
@SparkQA
Copy link

SparkQA commented Jul 8, 2019

Test build #107361 has finished for PR 23677 at commit 1fa17ec.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 8, 2019

Test build #107362 has finished for PR 23677 at commit 025e548.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

execId: String,
host: String,
maxLocality: TaskLocality.Value,
speculative: Boolean): Option[(Int, TaskLocality.Value, Boolean)] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double indent method params

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

pendingTasks.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index
val pendingTaskSetToAddTo = if (speculative) pendingSpeculatableTasks else pendingTasks
// ... mostly the original code from `addPendingTask` here, just adding
// into pendingTaskSetToAddTo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't leave this is as comment in the code -- the comments should make sense for somebody reading the code without having to look at history to figure out what its referring to.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, removed the comment

for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
: Option[(Int, TaskLocality.Value, Boolean)] = {
// if we didn't schedule a regular task, try to schedule a speculative one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super-duper nit: if you're going to move this comment up here, before either of the calls to dequeueTaskHelper, then don't use past tense for one of them, eg. "If we don't schedule a regular task, try to schedule a speculative one".

(or just delete the comment completely)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, have modified the comment.

// allPendingSpeculativeTasks will still have two pending tasks but
// pendingSpeculatableTasksWithNoPrefs should have none
assert(manager.pendingSpeculatableTasks.anyPrefs.size === 2)
assert(manager.pendingSpeculatableTasks.noPrefs.size === 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make more resource offers, and see if they result in anything getting scheduled. Be sure it includes an offer on a different host, to avoid the canRunOnHost filter. Eg. like this (lots of extra logs in there):

squito@6cdd8ee

my added test fails because that last offer, on host3, results in another speculative task. I think we need a filter w/ speculative execution that there is <= 2 tasks running.

@pgandhi999
Copy link
Author

@squito @Ngone51 Have updated the PR with the changes. Thank you once again for your valuable reviews.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just remove the first part of the PR description, before the updated part I suggested? That is all good info to have in the jira, but I don't think its useful in the commit msg.

Also can you update the testing section above to mention the number of tasks you mentioned above? More important than the size of the dataset is that you ran with 100K tasks and ~8K speculatable tasks.

// (2): Tasks may be re-added to these lists multiple times as a result
// of failures.
// Duplicates are handled in dequeueTaskFromList, which ensures that a
// task hasn't already started running before launching it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

turn this into a scaladoc comment so its shows up in IDEs for PendingTasksByLocality

/**
 * Set of ...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty)
assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty)
assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined)
assert(manager.resourceOffer("exec4", "host4", ANY).isDefined)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any particular reason to pull this out into a separate test case? Seems like it could be combined. Its fine if there is a good reason, but I don't like a proliferation of test cases that are all doing more or less the same thing. It seems the only thing which you aren't doing here, but you are doing above, is checking the taskId etc. of the speculative tasks.

also another thing missing from both tests -- there is no check that we do not schedule a speculative task on the same host as the original task, even despite locality preferences.

(I realize some of these tests were missing before, but this logic is getting a little trickier now, and maybe those tests always should have been there)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito I have combined the two test cases into one. Also have added the missing test(checking whether speculatable task is launched on same host as original). Please let me know if anything else needs to be added/removed from the test. Thank you.

@SparkQA
Copy link

SparkQA commented Jul 16, 2019

Test build #107762 has finished for PR 23677 at commit 466849e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Modifying comment in Scaladoc style and combining two unit tests into one
@SparkQA
Copy link

SparkQA commented Jul 17, 2019

Test build #107790 has finished for PR 23677 at commit 7b23ef3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


test("SPARK-26755 Ensure that a speculative task obeys the original locality preferences") {
sc = new SparkContext("local", "test")
// Launch a new set of tasks with locality preferences
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to test everything we want to test with one scheduler and one taskset. My reason for consolidating things is just so its easier to keep find tests related to speculative execution, to get a sense of what's already tested (eg. in the code today, I had a heard time telling if there were any tests which make sure we don't launch speculative tasks on the same node as the original).

anyway if this is a pain, just go back to having it be two separate tests. Its really two very independent set of tests, so better to have it be in separate tests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I apologize for misunderstanding your original intent. The thing was that I wanted to test with one set of tasks created with no locality preferences and the other set of tasks created with locality preferences, so I ended up creating two of them. I guess the best thing would then be to create two independent tests as I am not sure how do I test both cases with one taskset. Would appreciate your suggestions. Thank you.

assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined)
assert(manager.resourceOffer("exec4", "host4", ANY).isDefined)
assert(manager2.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty)
assert(manager2.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a comment here would be helpful -- eg. "task 1 does have a node-local preference for host2 -- but we've already got a regular task running there, so we should not schedule a speculative there as well."

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@SparkQA
Copy link

SparkQA commented Jul 17, 2019

Test build #107797 has finished for PR 23677 at commit 7b23b37.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Jul 18, 2019

lgtm

I am going to be unreachable next week, so I am going to wait till I'm back to merge it (though another committer is free to merge in the meantime of course). Sorry I know this has been delayed a lot already, but this is too brittle a part of spark for me to merge without being around to follow up on. thanks again for all the work on it @pgandhi999 , feel free to bug me after that.

@pgandhi999
Copy link
Author

Makes complete sense @squito , thank you once again for all your time.

@Ngone51
Copy link
Member

Ngone51 commented Jul 18, 2019

@pgandhi999 I'll do another look tomorrow :)

// Tries to schedule a regular task first; if it returns None, then schedules
// a speculative task
dequeueTaskHelper(execId, host, maxLocality, false).orElse(
dequeueTaskHelper(execId, host, maxLocality, true))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do speculatableTasks -= index when this returns Some(index) ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed it.

@@ -1166,7 +1168,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be
// killed, so the FakeTaskScheduler is only told about the successful completion
// of the speculated task.
assert(sched.endedTasks(3) === Success)
assert(sched.endedTasks(4) === Success)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this changes to 4 ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have changed the previous HashSet to ArrayBuffer, so now the dequeue logic remains same but order of tasks stored in the ArrayBuffer will be different and as the dequeuing of tasks picks up the first possible task that satisfies all conditions, a different task will be dequeued now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense.

@SparkQA
Copy link

SparkQA commented Jul 19, 2019

Test build #107922 has finished for PR 23677 at commit 32fa4d2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for both better performance and code reusing.


/**
* Return the pending tasks list for a given executor ID, or an empty list if
* there is no map entry for that host
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move these three methods and comment into PendingTasksByLocality, then there will be fewer code changes from the original dequeueTask to dequeueTaskHelper.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we really need those three helper methods anymore, might lead to unnecessary code duplication.

private[scheduler] val speculatableTasks = new HashSet[Int]

// Store speculatable tasks by locality preferences
private[scheduler] val pendingSpeculatableTasks = new PendingTasksByLocality()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the speculationEnabled is false, we can even don't do the instance creation here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were not doing this earlier, so not sure whether we should do it now.

if (!successful(index)) {
if (copiesRunning(index) == 0) {
return Some(index)
} else if (speculative && copiesRunning(index) == 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here is the logic of speculating task only once? How about adding more comments to highlight.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment, thank you.

if (copiesRunning(index) == 0) {
return Some(index)
} else if (speculative && copiesRunning(index) == 1) {
speculatableTasks -= index
Copy link
Member

@Ngone51 Ngone51 Jul 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when copiesRunning(index) == 0, it is still possible to return a speculatable tasks, thinking about the case that a long running task failed after it got a pending speculatable task. I mean, for if (copiesRunning(index) == 0) above, it actually covers two cases: 1) not speculative 2) speculative(would be nice to comment for this). So, I think it's better to put this after dequeueTaskHelper(execId, host, maxLocality, true).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, have updated the code.

Removing task indes from speculatableTasks after task is dequeued, adding comment
@SparkQA
Copy link

SparkQA commented Jul 22, 2019

Test build #108019 has finished for PR 23677 at commit 685bbae.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member

Ngone51 commented Jul 26, 2019

LGTM

@@ -107,7 +107,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
val taskSet = invoke.getArguments()(0).asInstanceOf[TaskSet]
new TaskSetManager(mockTaskScheduler, taskSet, 4) {
private var hasDequeuedSpeculatedTask = false
override def dequeueSpeculativeTask(execId: String,
def dequeueSpeculativeTask(execId: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I hadn't noticed this before -- this change is wrong as now this whole setup isn't doing anything. To keep the same behavior as before, I think you need to change dequeueTaskHelper to be protected, and then override it here.

squito@366e916

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, have updated the code. Sorry, I missed out that one earlier. Thanks @squito.

pgandhi added 2 commits July 29, 2019 14:39
Fixing OutputCommitCoordinatorSuite Test to override dequeueTaskHelper method.
@SparkQA
Copy link

SparkQA commented Jul 29, 2019

Test build #108346 has finished for PR 23677 at commit 7a8c992.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@squito
Copy link
Contributor

squito commented Jul 30, 2019

merged to master, thanks @pgandhi999 !

@pgandhi999
Copy link
Author

pgandhi999 commented Jul 30, 2019

Thank you @squito, @Ngone51, @abellina @redsanket and @xuanyuanking for all your valuable reviews.

@asfgit asfgit closed this in 70910e6 Jul 30, 2019
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…ative tasks…

… more efficiently

Refer: LIHADOOP-52383

This PR improves the performance of scheduling speculative tasks to be O(1) instead of O(numSpeculativeTasks), using the same approach used for scheduling regular tasks. The performance of this method is particularly important because a lock is held on the TaskSchedulerImpl which is a bottleneck for all scheduling operations. We ran a Join query on a large dataset with speculation enabled and out of 100000 tasks for the ShuffleMapStage, the maximum number of speculatable tasks that was noted was close to 7900-8000 at a point. That is when we start seeing the bottleneck on the scheduler lock.

In particular, this works by storing a separate stack of tasks by executor, node, and rack locality preferences. Then when trying to schedule a speculative task, rather than scanning all speculative tasks to find ones which match the given executor (or node, or rack) preference, we can jump to a quick check of tasks matching the resource offer. This technique was already used for regular tasks -- this change refactors the code to allow sharing with regular and speculative task execution.

Have split the main queue "speculatableTasks" into 5 separate queues based on locality preference similar to how normal tasks are enqueued. Thus, the "dequeueSpeculativeTask" method will avoid performing locality checks for each task at runtime and simply return the preferable task to be executed.

We ran a spark job that performed a join on a 10 TB dataset to test the code change.
Original Code:
<img width="1433" alt="screen shot 2019-01-28 at 5 07 22 pm" src="https://user-images.githubusercontent.com/22228190/51873321-572df280-2322-11e9-9149-0aae08d5edc6.png">

Optimized Code:
<img width="1435" alt="screen shot 2019-01-28 at 5 08 19 pm" src="https://user-images.githubusercontent.com/22228190/51873343-6745d200-2322-11e9-947b-2cfd0f06bcab.png">

As you can see, the run time of the ShuffleMapStage came down from 40 min to 6 min approximately, thus, reducing the overall running time of the spark job by a significant amount.

Another example for the same job:

Original Code:
<img width="1440" alt="screen shot 2019-01-28 at 5 11 30 pm" src="https://user-images.githubusercontent.com/22228190/51873355-70cf3a00-2322-11e9-9c3a-af035449a306.png">

Optimized Code:
<img width="1440" alt="screen shot 2019-01-28 at 5 12 16 pm" src="https://user-images.githubusercontent.com/22228190/51873367-7dec2900-2322-11e9-8d07-1b1b49285f71.png">

Closes apache#23677 from pgandhi999/SPARK-26755.

Lead-authored-by: pgandhi <[email protected]>
Co-authored-by: pgandhi <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>

RB=2065158
BUG=LIHADOOP-52383
G=spark-reviewers
R=rhu,ekrogen
A=rhu
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants