-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-18890][CORE] Move task serialization from the TaskSetManager to the CoarseGrainedSchedulerBackend #15505
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #67033 has finished for PR 15505 at commit
|
Test build #67035 has finished for PR 15505 at commit
|
Test build #67062 has finished for PR 15505 at commit
|
Test build #67056 has finished for PR 15505 at commit
|
There are many unnecessary changes, can you recover them to minimize diff? That'll be easier for others to review. :) |
@wzhfy |
Test build #67109 has finished for PR 15505 at commit
|
Test build #67110 has finished for PR 15505 at commit
|
Test build #67126 has finished for PR 15505 at commit
|
Test build #67159 has finished for PR 15505 at commit
|
cc @rxin |
Test build #67179 has finished for PR 15505 at commit
|
Test build #67184 has finished for PR 15505 at commit
|
Test build #67242 has finished for PR 15505 at commit
|
Test build #68141 has finished for PR 15505 at commit
|
This is pretty big and will miss 2.1. But cc @kayousterhout / @squito / @JoshRosen |
@@ -486,7 +481,7 @@ private[spark] class Executor( | |||
* Download any missing dependencies if we receive a new set of files and JARs from the | |||
* SparkContext. Also adds any new JARs we fetched to the class loader. | |||
*/ | |||
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { | |||
private def updateDependencies(newFiles: Map[String, Long], newJars: Map[String, Long]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not necessary, I will change it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wzhfy The source of this parameter is sc.addedFiles
and sc.addedJars
, Their types are mutable.Map[String, Long]
, This change is reasonable.
ping @kayousterhout / @squito / @JoshRosen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just did a very brief review -- the idea here makes a lot of sense. The only big problem I see with the patch now is the tests that have been eliminated, some of those we definitely need to bring back. But I need to do a longer pass to think about how the pieces fit together.
_serializedTask: ByteBuffer) | ||
extends Serializable { | ||
private[spark] class TaskDescription private( | ||
val taskId: Long, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: double indent (4 spaces) for constructor params
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
private var taskProps: Properties) { | ||
|
||
def this(taskId: Long, | ||
attemptNumber: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: each parameter on its own line (even the first one), and double indent all params
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -139,29 +139,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B | |||
assert(!failedTaskSet) | |||
} | |||
|
|||
test("Scheduler does not crash when tasks are not serializable") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't look like there is any replacement for this test, right? We certainly want to keep this check in some form.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
For rdd and closures, there are already related test cases, see DAGSchedulerSuite.scala#L506
For task, user can use a custom partition, and the partition instance may not be serializable. This has no test cases I will add one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test case has been added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand right, you are saying there are now test cases which cover all of the little individual pieces inside a Task
, so we don't need an explicit test for the serializing the entire task.
I'd still prefer a test for serializing the entire task (to prevent future regressions), but I guess its hard to do b/c now the task actually gets serialized by the SchedulerBackends.
@@ -592,47 +579,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg | |||
assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1) | |||
} | |||
|
|||
test("do not emit warning when serialized task is small") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same thing on replacements for these tests. we definitely want to keep the test on not-serializability. The others should probably stay as well, unless there is some reason why its extremely hard to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For ShuffleMapTask and ResultTask, since they do not contain rdd instance, they are very small.
Detection of the size of the serialized RDD should be more reasonable. I added the corresponding code in the DAGScheduler class, but did not add the test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be a stretch, but Task
could be big, right? Eg. if there was a long chain of partitions, and for some reason they had a lot of data? That seems unlikely, but I also have no idea if it ever happens or not. It seems easy enough to add the check back in -- is there any reason not to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree with @squito that we should keep this test / check. I've seen many huge task sizes due to a developer mistake, so think we should absolutely keep warning about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will add this test case back.
taskMetrics.incMemoryBytesSpilled(10) | ||
override def runTask(tc: TaskContext): Int = 0 | ||
} | ||
val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: with so many args, all with pretty generic types, its really helpful to name each one, eg
TaskDescription(
taskId=1L,
attemptNumber = 0,
...
(here and elsewhere)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I'll change it
Test build #73694 has finished for PR 15505 at commit
|
This commit also does all task serializion in the encode() method, so now the encode() method just takes the TaskDescription as an input parameter.
Test build #73721 has finished for PR 15505 at commit
|
@kayousterhout It takes some time to update the test report. |
@witgo OK I'll hold off on doing another pass on the code until you have the test results. |
@kayousterhout
|
I don't know which PR causes the run time of this test case to be reduced from 21.764 s to 9.566 s. |
@witgo I don't think the ~1.5% improvement in runtime merits the added complexity of this change. I could be convinced to merge this if it simplified the code or the ability to reason about the code, but unfortunately this makes things somewhat more complicated because of the new logic about aborting task sets. |
@kayousterhout I am surprised it is not more, but I agree that the added complexity for such low returns is not worth it. |
Yes, maybe a multithreaded serialization task code can have a better performance, let me close the PR |
SPARK-18890_20170303 `s code is older but the test case running time is 5.2 s |
@squito I like this idea very much. I just encounte the de-serialization time is too long (about more than 10s for some tasks). Is there any PR try to solve this? If there is no related PR, I would like open an issue and try to solve this.:-D |
@djvulee this is https://issues.apache.org/jira/browse/SPARK-19108. Note that there is some discussion there about this being a bit harder than what I originally thought, though I think its still worth exploring where task serialization is an issue. |
What changes were proposed in this pull request?
Performance Testing:
The code:
and
spark-defaults.conf
file:The test results are as follows:
How was this patch tested?
Existing tests.