Skip to content

SPARK-2294: fix locality inversion bug in TaskManager #1313

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from

Conversation

CodingCat
Copy link
Contributor

copied from original JIRA (https://issues.apache.org/jira/browse/SPARK-2294):

If an executor E is free, a task may be speculatively assigned to E when there are other tasks in the job that have not been launched (at all) yet. Similarly, a task without any locality preferences may be assigned to E when there was another NODE_LOCAL task that could have been scheduled.
This happens because TaskSchedulerImpl calls TaskSetManager.resourceOffer (which in turn calls TaskSetManager.findTask) with increasing locality levels, beginning with PROCESS_LOCAL, followed by NODE_LOCAL, and so on until the highest currently allowed level. Now, supposed NODE_LOCAL is the highest currently allowed locality level. The first time findTask is called, it will be called with max level PROCESS_LOCAL; if it cannot find any PROCESS_LOCAL tasks, it will try to schedule tasks with no locality preferences or speculative tasks. As a result, speculative tasks or tasks with no preferences may be scheduled instead of NODE_LOCAL tasks.


I added an additional parameter in resourceOffer and findTask, maxLocality, indicating when we should consider the tasks without locality preference

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16360/

@@ -238,7 +238,7 @@ private[spark] class TaskSetManager(
*/
private def findTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
var indexOffset = list.size

println("findingTaskFromList:" + list)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this ? :-)

@mridulm
Copy link
Contributor

mridulm commented Jul 7, 2014

Might be simpler to move the noprefs block inside the NODE_LOCAL condition [1] - after NODE_LOCAL tasks have been checked.

[1] if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16370/

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16371/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16373/

@CodingCat
Copy link
Contributor Author

close temporarily for local test.....

@CodingCat CodingCat closed this Jul 7, 2014
@CodingCat CodingCat reopened this Jul 7, 2014
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@CodingCat
Copy link
Contributor Author

will this modification cause streaming test case failed (timeout)?

I cannot reproduce in my local side constantly.....

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16388/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@CodingCat
Copy link
Contributor Author

finally,

@@ -293,7 +291,7 @@ private[spark] class TaskSetManager(
for (index <- speculatableTasks if canRunOnHost(index)) {
val prefs = tasks(index).preferredLocations
val executors = prefs.flatMap(_.executorId)
if (prefs.size == 0 || executors.contains(execId)) {
if (executors.contains(execId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the comment above this, remove "preference-less tasks" since we've moved them below

@mateiz
Copy link
Contributor

mateiz commented Aug 5, 2014

Thanks, this looks good. I made some more small comments on it and there were a couple left over from before. Please take a look at https://github.com/apache/spark/pull/1313/files and fix them. Then I think it will be good to go!

@SparkQA
Copy link

SparkQA commented Aug 5, 2014

QA tests have started for PR 1313. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17957/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA results for PR 1313:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17957/consoleFull

@@ -192,56 +211,59 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)

// An executor that is not NODE_LOCAL should be rejected.
assert(manager.resourceOffer("execC", "host2", ANY) === None)
assert(manager.resourceOffer("execC", "host2", NODE_LOCAL) === None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey sorry, one other question.. why did you change this? It seems that this breaks what we were testing for before.

@mateiz
Copy link
Contributor

mateiz commented Aug 6, 2014

Cool, thanks! One final thing, I made some comments on the tests since it seemed that some of the changes broke previous stuff we were testing for. If they didn't, please explain why you changed them.

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA tests have started for PR 1313. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17970/consoleFull

@CodingCat
Copy link
Contributor Author

that's for fitting some early version of the patches, sorry about that, I just forgot to undo the changes...

@mateiz
Copy link
Contributor

mateiz commented Aug 6, 2014

Alright, thanks for all your work on this. Just want to make sure we don't miss stuff here.

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA results for PR 1313:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17970/consoleFull

@CodingCat
Copy link
Contributor Author

thanks for the patient review @mateiz @mridulm @kayousterhout @lirui-intel

@mateiz
Copy link
Contributor

mateiz commented Aug 6, 2014

Alright, I've merged this in. Thanks Nan!

@asfgit asfgit closed this in 63bdb1f Aug 6, 2014
asfgit pushed a commit that referenced this pull request Aug 6, 2014
copied from original JIRA (https://issues.apache.org/jira/browse/SPARK-2294):

If an executor E is free, a task may be speculatively assigned to E when there are other tasks in the job that have not been launched (at all) yet. Similarly, a task without any locality preferences may be assigned to E when there was another NODE_LOCAL task that could have been scheduled.
This happens because TaskSchedulerImpl calls TaskSetManager.resourceOffer (which in turn calls TaskSetManager.findTask) with increasing locality levels, beginning with PROCESS_LOCAL, followed by NODE_LOCAL, and so on until the highest currently allowed level. Now, supposed NODE_LOCAL is the highest currently allowed locality level. The first time findTask is called, it will be called with max level PROCESS_LOCAL; if it cannot find any PROCESS_LOCAL tasks, it will try to schedule tasks with no locality preferences or speculative tasks. As a result, speculative tasks or tasks with no preferences may be scheduled instead of NODE_LOCAL tasks.

----

I added an additional parameter in resourceOffer and findTask, maxLocality, indicating when we should consider the tasks without locality preference

Author: CodingCat <[email protected]>

Closes #1313 from CodingCat/SPARK-2294 and squashes the following commits:

bf3f13b [CodingCat] rollback some forgotten changes
89f9bc0 [CodingCat] address matei's comments
18cae02 [CodingCat] add test case for node-local tasks
2ba6195 [CodingCat] fix failed test cases
87dd09e [CodingCat] fix style
9b9432f [CodingCat] remove hasNodeLocalOnlyTasks
fdd1573 [CodingCat] fix failed test cases
941a4fd [CodingCat] see my shocked face..........
f600085 [CodingCat] remove hasNodeLocalOnlyTasks checking
0b8a46b [CodingCat] test whether hasNodeLocalOnlyTasks affect the results
73ceda8 [CodingCat] style fix
b3a430b [CodingCat] remove fine granularity tracking for node-local only tasks
f9a2ad8 [CodingCat] simplify the logic in TaskSchedulerImpl
c8c1de4 [CodingCat] simplify the patch
be652ed [CodingCat] avoid unnecessary delay when we only have nopref tasks
dee9e22 [CodingCat] fix locality inversion bug in TaskManager by moving nopref branch

(cherry picked from commit 63bdb1f)
Signed-off-by: Matei Zaharia <[email protected]>
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
copied from original JIRA (https://issues.apache.org/jira/browse/SPARK-2294):

If an executor E is free, a task may be speculatively assigned to E when there are other tasks in the job that have not been launched (at all) yet. Similarly, a task without any locality preferences may be assigned to E when there was another NODE_LOCAL task that could have been scheduled.
This happens because TaskSchedulerImpl calls TaskSetManager.resourceOffer (which in turn calls TaskSetManager.findTask) with increasing locality levels, beginning with PROCESS_LOCAL, followed by NODE_LOCAL, and so on until the highest currently allowed level. Now, supposed NODE_LOCAL is the highest currently allowed locality level. The first time findTask is called, it will be called with max level PROCESS_LOCAL; if it cannot find any PROCESS_LOCAL tasks, it will try to schedule tasks with no locality preferences or speculative tasks. As a result, speculative tasks or tasks with no preferences may be scheduled instead of NODE_LOCAL tasks.

----

I added an additional parameter in resourceOffer and findTask, maxLocality, indicating when we should consider the tasks without locality preference

Author: CodingCat <[email protected]>

Closes apache#1313 from CodingCat/SPARK-2294 and squashes the following commits:

bf3f13b [CodingCat] rollback some forgotten changes
89f9bc0 [CodingCat] address matei's comments
18cae02 [CodingCat] add test case for node-local tasks
2ba6195 [CodingCat] fix failed test cases
87dd09e [CodingCat] fix style
9b9432f [CodingCat] remove hasNodeLocalOnlyTasks
fdd1573 [CodingCat] fix failed test cases
941a4fd [CodingCat] see my shocked face..........
f600085 [CodingCat] remove hasNodeLocalOnlyTasks checking
0b8a46b [CodingCat] test whether hasNodeLocalOnlyTasks affect the results
73ceda8 [CodingCat] style fix
b3a430b [CodingCat] remove fine granularity tracking for node-local only tasks
f9a2ad8 [CodingCat] simplify the logic in TaskSchedulerImpl
c8c1de4 [CodingCat] simplify the patch
be652ed [CodingCat] avoid unnecessary delay when we only have nopref tasks
dee9e22 [CodingCat] fix locality inversion bug in TaskManager by moving nopref branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants