Skip to content

[SPARK-8881][SPARK-9260] Fix algorithm for scheduling executors on workers #7274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from

Conversation

nishkamravi2
Copy link
Contributor

Current scheduling algorithm allocates one core at a time and in doing so ends up ignoring spark.executor.cores. As a result, when spark.cores.max/spark.executor.cores (i.e, num_executors) < num_workers, executors are not launched and the app hangs. This PR fixes and refactors the scheduling algorithm.

@andrewor14

while (toAssign > 0) {
if (usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor) {
toAssign -= coresPerExecutor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand what you're trying to change, this won't help. If there aren't enough cores on any worker, then this becomes an infinite loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please read code carefully

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, sorry that is not addressing my question but I think I see the situation now i.e. I need 16 cores for 2 8 core executors and I have 4 workers so each fails to be enough to cause an executor to launch anywhere? Example would be really helpful or a test.

If so think this is also fixable by just considering no more workers than executors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also yes I see you still have the filtering on cores available so this shouldn't keep looping over workers, right. Unless the available count can drop while this is in progress but that is either not a problem or already a problem so not directly relevant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the following: 4 workers each with 16 cores, spark.cores.max=48, spark.executor.cores = 16. When we spread out, we allocate one core at a time and in doing so end up allocating 12 cores from each worker. First, we ended up ignoring spark.executor.cores during allocation, which isn't right. Second, when the following condition is checked: while (coresLeft >= coresPerExecutor), coresLeft is 12 and coresPerExecutor is 16. As a result, executors don't launch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, makes sense. Is it maybe more direct to never spread the allocation over more than 3 workers in this case since only 3 executors are needed? Same effect but I also see the value in allocating whole executors of cores at a time for clarity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Allocating spark.executor.cores at a time is cleaner and directly enforces semantics.

@nishkamravi2
Copy link
Contributor Author

Overview of changes:

  1. scheduleExecutorsOnWorkers rewritten and separated out (so it can be unit tested)
  2. allocateWorkerResourceToExecutors modified accordingly and simplified

Comments:

  1. The two while loops in scheduleExecutorsOnWorkers can potentially be fused into one
  2. Would be good to add a couple of unit tests (we don't have any for executor scheduling at the moment)

@andrewor14
Copy link
Contributor

add to whitelist

@SparkQA
Copy link

SparkQA commented Jul 8, 2015

Test build #36810 has finished for PR 7274 at commit 66362d5.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 8, 2015

Test build #36831 has finished for PR 7274 at commit 2d6371c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nishkamravi2
Copy link
Contributor Author

Can this be retested please?

@SparkQA
Copy link

SparkQA commented Jul 9, 2015

Test build #1016 has finished for PR 7274 at commit 2d6371c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nishkamravi2
Copy link
Contributor Author

Hey @andrewor14, not sure what to make of these test results. Are you able to see which tests failed?

@andrewor14
Copy link
Contributor

Not sure... retest this please

@SparkQA
Copy link

SparkQA commented Jul 9, 2015

Test build #36897 has finished for PR 7274 at commit 2d6371c.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nishkamravi2
Copy link
Contributor Author

Can this be retested please?

@SparkQA
Copy link

SparkQA commented Jul 9, 2015

Test build #36923 has finished for PR 7274 at commit 5d6a19c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Jul 9, 2015

EventLoggingListenerSuite seems to be failing regularly. Does it pass when you run locally?

@nishkamravi2
Copy link
Contributor Author

Thanks Imran. I thought my local run had gone through, will check again. Btw, were you able to make this out from the test result tab or by scanning the console output ?

@SparkQA
Copy link

SparkQA commented Jul 10, 2015

Test build #37005 has finished for PR 7274 at commit c11c689.

  • This patch fails some tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 10, 2015

Test build #37007 has finished for PR 7274 at commit 40c8f9f.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 10, 2015

Test build #37032 has finished for PR 7274 at commit a06da76.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 14, 2015

Test build #1064 has finished for PR 7274 at commit a06da76.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Jul 14, 2015

This looks pretty good to my eyes but would be good to get another pair. @squito @andrewor14 and maybe @sryza what do you think of the logic refactoring? I think it preserves the original behavior and fixes the issue at hand.

@andrewor14
Copy link
Contributor

retest this please

var coresLeft = coresToAllocate
while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {

var numExecutors = assignedCores/coresPerExecutor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add spaces around /

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, this can be a val

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment here stating your implicit assumptions:

// If cores per executor is specified, then this division should have a remainder of zero

@SparkQA
Copy link

SparkQA commented Jul 18, 2015

Test build #37684 has finished for PR 7274 at commit f279cdf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor) {
coresToAssign -= coresPerExecutor
assignedCores(pos) += coresPerExecutor
assignedMemory(pos) += memoryPerExecutor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I stared at this loop for a little bit and I think it could bring us into an infinite loop.

E.g. We have 3 workers, with 3, 3, and 4 free cores respectively, so that coresToAssign == 10. Now let's say coresPerExecutor == 3, so after allocating 3 executors we end up with coresToAssign == 1. What happens next? Well, none of the usable workers can accommodate a new executor, and coresToAssign > 0 is still true, so this loop never exits.

Let me know if I'm missing something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(same for the non spread out case)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't app.coresLeft be a multiple of 3 in this case? so 9 or 12 rather than 10? but yeah it still raises the question of what happens if there simply aren't enough cores on one worker: I want 4x 3-core executors, and I have 3x 4-core workers. It will never schedule. Previously I think we'd just manage to schedule 3x 3-core executors but I think this would keep looping. I think there needs to be some logic for detecting when there is no worker left that could possibly fit another.

I haven't thought this through either but are there race condition problems here too? as long as the worst case is just that resources that looked available aren't anymore and fail to schedule, that's fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"resources that looked available aren't anymore and fail to schedule, that's fine." This is the assumption being made here. If the user didn't care about the size of the executor, they would skip executor.cores and the algorithm would proceed as before (best-effort: one-core at a time). If they do, we should either schedule as requested or not at all. If we care to be extra-friendly, we could add a check to log a message from within the loop: "Not enough resources, please check spark.cores.max and spark.executor.cores" ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems OK but I think there is an infinite loop problem here still?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially return assignedCores that we have thus far and proceed with scheduling. But as discussed earlier, we are better off failing than scheduling incorrectly. Do you feel otherwise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't see your note. I would think that by failing and allowing the user to reconfigure, we would be doing them a favor. But I can see the value in scheduling whatever we can as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we have both versions. We can choose to keep this or revert to the previous one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the previous behavior was to schedule as much as possible? since before it would only try to assign as many cores as are available, not necessarily as many as are requested. If so I think it's best to retain that behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think it's really a user error. The contract of setting spark.executor.cores is that every executor has that exactly many cores. If the total number of cores across the cluster is not a multiple of that then there will be some unused cores, but scheduling should still work.

@SparkQA
Copy link

SparkQA commented Jul 21, 2015

Test build #37934 has finished for PR 7274 at commit 79084e8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -533,6 +533,7 @@ private[master] class Master(

/**
* Schedule executors to be launched on the workers.
* Returns an array containing number of cores assigned to each worker (None if scheduling fails)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, for if another change is needed: this could be a @return tag

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, this shouldn't say None anymore

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #38019 has finished for PR 7274 at commit da0f491.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

freeWorkers = freeWorkers.filter(canLaunchExecutor)
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos) && coresToAssign > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tested this out locally myself and found a bug. The comparison here should be coresToAssign >= coresPerExecutor. Otherwise, we could end up allocating more than spark.cores.max. Same in L573.

E.g. spark.executor.cores == 3, and spark.cores.max == 10, then we'll allocate 12 cores because in the last iteration coresToAssign == 1 > 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch @andrewor14. Hopefully we've covered everything now.

@andrewor14
Copy link
Contributor

Hey @nishkamravi2 the latest changes look great other than the one bug I pointed out. By the way, I think this also happens to solve SPARK-9260, which is a completely separate issue. Would you mind adding that JIRA to the title of this patch as well?

@andrewor14
Copy link
Contributor

Proof that this fixes SPARK-9260:

Before

bad

After

good

@nishkamravi2
Copy link
Contributor Author

Sure, thanks.

@nishkamravi2 nishkamravi2 changed the title [SPARK-8881] Fix algorithm for scheduling executors on workers [SPARK-8881][SPARK-9260] Fix algorithm for scheduling executors on workers Jul 22, 2015
@nishkamravi2
Copy link
Contributor Author

Can this be retested please..

@squito
Copy link
Contributor

squito commented Jul 23, 2015

Jenkins add to whitelist

@SparkQA
Copy link

SparkQA commented Jul 23, 2015

Test build #1186 has finished for PR 7274 at commit b998097.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 23, 2015

Test build #1187 has finished for PR 7274 at commit b998097.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

LGTM, this is mergeable as is, but I will wait for some unit tests before doing so. Thanks for following up on the comments promptly @nishkamravi2.

@andrewor14
Copy link
Contributor

@nishkamravi2 I'm going to go ahead and merge this patch since it's blocking development in other patches. I have written the unit tests locally and will push a PR for it immediately after this is merged. Thanks everyone for your input.

asfgit pushed a commit that referenced this pull request Jul 26, 2015
…orkers

Current scheduling algorithm allocates one core at a time and in doing so ends up ignoring spark.executor.cores. As a result, when spark.cores.max/spark.executor.cores (i.e, num_executors) < num_workers, executors are not launched and the app hangs. This PR fixes and refactors the scheduling algorithm.

andrewor14

Author: Nishkam Ravi <[email protected]>
Author: nishkamravi2 <[email protected]>

Closes #7274 from nishkamravi2/master_scheduler and squashes the following commits:

b998097 [nishkamravi2] Update Master.scala
da0f491 [Nishkam Ravi] Update Master.scala
79084e8 [Nishkam Ravi] Update Master.scala
1daf25f [Nishkam Ravi] Update Master.scala
f279cdf [Nishkam Ravi] Update Master.scala
adec84b [Nishkam Ravi] Update Master.scala
a06da76 [nishkamravi2] Update Master.scala
40c8f9f [nishkamravi2] Update Master.scala (to trigger retest)
c11c689 [nishkamravi2] Update EventLoggingListenerSuite.scala
5d6a19c [nishkamravi2] Update Master.scala (for the purpose of issuing a retest)
2d6371c [Nishkam Ravi] Update Master.scala
66362d5 [nishkamravi2] Update Master.scala
ee7cf0e [Nishkam Ravi] Improved scheduling algorithm for executors

(cherry picked from commit 41a7cdf)
Signed-off-by: Andrew Or <[email protected]>
@asfgit asfgit closed this in 41a7cdf Jul 26, 2015
@andrewor14
Copy link
Contributor

#7668

@nishkamravi2
Copy link
Contributor Author

Hey @andrewor14, thanks for taking care of this! Sorry, couldn't respond sooner, was out for a couple of days.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants