-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-27963][core] Allow dynamic allocation without a shuffle service. #24817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This change adds a new option that enables dynamic allocation without the need for a shuffle service. This mode works by tracking which stages generate shuffle files, and keeping executors that generate data for those shuffles alive while the jobs that use them are active. A separate timeout is also added for shuffle data; so that executors that hold shuffle data can use a separate timeout before being removed because of being idle. This allows the shuffle data to be kept around in case it is needed by some new job, or allow users to be more aggressive in timing out executors that don't have shuffle data in active use. The code also hooks up to the context cleaner so that shuffles that are garbage collected are detected, and the respective executors not held unnecessarily. Testing done with added unit tests, and also with TPC-DS workloads on YARN without a shuffle service.
Notes: I'm uploading this as a WIP since I made some changes lately that I have not fully tested, but this seems to be working ok. I'll also not be able to respond to feedback for ~ 2 weeks, but wanted to give people time to look at it / play with it while I'm not around. |
cc @dbtsai |
Test build #106259 has finished for PR 24817 at commit
|
Retest this please. |
Test build #106263 has finished for PR 24817 at commit
|
Test build #106276 has finished for PR 24817 at commit
|
Retest this please. |
Test build #106285 has finished for PR 24817 at commit
|
// If executors hold shuffle data that is related to an active job, then the executor is | ||
// considered to be in "shuffle busy" state; meaning that the executor is not allowed to be | ||
// removed. If the executor has shuffle data but it doesn't relate to any active job, then it | ||
// may be removed when idle, following the same timeout configuration used for cache blocks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a side note. There is a trade-off here, you dont want to keep things around wasting resources when idle but also not to re-start executors when you have shuffle data that are being used by the job. Getting this timeout right becomes use case dependent but also adds another headache to the configuration options people need to think of. Configuration is the number one problem I have seen people facing with Spark (including DA) and it is never obvious how they should configure things. Of course for this addition it is reasonable to make it configurable. The good thing is you may have a chance to keep latency stable after an idle period of time, as the executors you care about, are around at least.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting this timeout right becomes use case dependent but also adds another headache to the configuration options people need to think of
Actually the heuristic is written in a way that for usual applications, this timeout shouldn't matter much. The context cleaner runs the GC periodically, which would clean up shuffles related to orphaned RDDs. Normal application run would also cause these orphaned RDDs to be collected. So for all practical purposes, in a normal application the executor would become idle soon after jobs are run (doubly so for SQL jobs, which don't seem to reuse the underlying RDDs).
For shells this is a little more tricky because of the way the shell itself holds on to RDD references. So if you're directly playing with RDDs in a shell this may not perform as well, and then you'd be relying on the timeout. But, like the above, for SQL jobs you'd still end up with idle executors more often than not.
But, on a side note, this comment is inaccurate since I ended up adding a separate timeout for the shuffle.
What if the executor is idle currently (no active job, can be removed), but later on the jobs depend one the previous shuffle stage? From my understanding that job will be failed to fetch the shuffle data and rerun the parent stages. My feeling is that if idle time is short, this exception will be quite common and confuse the users. |
@lwwmanning - we built something similar to this before, think your feedback might be helpful here! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need more time to digest it fully, some comments from the first pass
} | ||
|
||
var updateExecutors = false | ||
shuffleStages.foreach { case (sid, shuffle) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you rename sid
-> stageId
? sid
sounds like it could also be shuffleId
} | ||
|
||
if (updateExecutors) { | ||
val active = shuffleStages.map(_._2).toSeq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
activeShuffleIds
? (there are a lot of different things with some notion of "active")
if (shuffleIds.add(id)) { | ||
hasActiveShuffle = true | ||
if (!isIdle) { | ||
updateTimeout() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isIdle
is always false here, because you've just set hasActiveShuffle=true
, right? I guess I am not sure why you'd need to call updateTimeout() here so I'm not sure the right condition to check ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can go away; I changed onTaskEnd
at some point so that the task count is updated after this call, so that code should handle the necessary updates.
override def shuffleCleaned(shuffleId: Int): Unit = { | ||
// Because this is called in a completely separate thread, we post a custom event to the | ||
// listener bus so that the internal state is safely updated. | ||
listenerBus.post(ShuffleCleanedEvent(shuffleId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is weird that this will be visible to all user listeners as well ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, all listeners already have to deal with events they don't understand; that's the contract for onOtherEvent
, since we're allowed to add new events (public or not).
The only odd thing here is that this event is not public, so you can't really handle it outside of Spark (without reflection), but I don't really see an easy way to solve this issue differently (outside of adding locking which I don't want to).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's the weird part, you get this event with a class that is not public.
The only things I can think of are (1) to create a new event loop here, which just forwards the listener events and also this new event or (2) create a "InternalListenerEvent" which is automatically filtered from all non-internal listeners. But both of those seem like overkill.
I think that is correct. to be clear this isn't meant to be a perfect replacement for dynamic allocation + external shuffle service. But its a reasonable heuristic. It might make sense to put in a separate config for shuffle timeout (longer than cache timeout), since recomputing lost shuffle data is actually much more expensive. I'm actually more worried about the opposite problem, that it will not scale down aggressively if a long ETL job, with an early stage which is really big (say it uses 1000 executors) but then there are many stages after that which need only a small number of executors (say 10 executors), you'll hold on to all 1000 executors for the whole job. But, I don't think you can do any better without having something else to serve that shuffle data (a la SPARK-25299). |
I think that the scheduler will just detect that the needed map outputs don't exist (instead of failing to fetch data), but the end result is the same - parent stages will be re-run. Note that by default the shuffle timeout is basically "infinite", so the situation you describe wouldn't happen.
Yeah, that's a little trickier. You could start tracking things based on which stage is currently active (and the shuffle it needs), instead of per-job, and that would make this particular problem go away, at the expense of being worse when failures happen and you end up needing the shuffle data from an earlier stage. But I think the current version is good enough to start playing with. |
/cc |
Test build #106848 has finished for PR 24817 at commit
|
idleStart + (if (cachedBlocks.nonEmpty) storageTimeoutMs else idleTimeoutMs) | ||
val timeout = if (cachedBlocks.nonEmpty || (shuffleIds != null && shuffleIds.nonEmpty)) { | ||
val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutMs else Long.MaxValue | ||
val _shuffleTimeout = if (shuffleIds != null && shuffleIds.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add hasActiveShuffle
in if condition or starts with if (isIdle)
rather than if (idleStart >= 0)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was intentional. The hasActiveShuffle
tracking is not meant to change when the executor times out, just when it should not be removed.
Doing that would mean that every time a job starts with a shuffle that this executor has data, its timeout would be reset, even if it ends up not running any tasks. (Which is unlikely, but well.)
} | ||
|
||
if (updateExecutors) { | ||
val activeShuffleIds = shuffleStages.map(_._2).toSeq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you're only grabbing the active shuffles of this job. I don't think that is right, when you've got two concurrent jobs submitted with a distinct set of shuffles. The first job will need shuffle A on executor X, and then this needs shuffle B on executor Y, and it would look like executor X does not have an active shuffle anymore.
it might also help to rename shuffleStages
--> thisJobShuffleStage
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While what you say is true in isolation, the check for if (!exec.hasActiveShuffle) {
below makes sure the end result is correct. When the second jobs starts, executor X has an active shuffle, so this code would not change that state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my fault, you are right!
if (!exec.hasActiveShuffle) { | ||
exec.updateActiveShuffles(activeShuffleIds) | ||
if (exec.hasActiveShuffle) { | ||
logDebug(s"Executor $id has data needed by new active job.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought: I'm imagining dealing with debugging this down the road, and I might be hesitant to turn on debug logging b/c of how many times this will get logged. You could build up the list of execs, and then combine it into one log line after going through the log line with all the exec ids.
|
||
executors.asScala.foreach { case (id, exec) => | ||
if (exec.hasActiveShuffle) { | ||
exec.updateActiveShuffles(activeShuffles) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it worthwhile to logdebug here too? (with similar comment to above for combining them)
@@ -137,6 +254,21 @@ private[spark] class ExecutorMonitor( | |||
val executorId = event.taskInfo.executorId | |||
val exec = executors.get(executorId) | |||
if (exec != null) { | |||
// If the task succeeded and the stage generates shuffle data, record that this executor | |||
// holds data for the shuffle. Note that this ignores speculation, since this code is not | |||
// directly tied to the map output tracker that knows exactly which shuffle blocks are |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment about speculation is a bit confusing. It sounds a bit like you're saying that speculative tasks are ignored. Really, you are saying is that if there is a speculative and non-speculative task, and both succeed, you say both have shuffle data. (right?)
and I actually think both tasks will write shuffle data locally and update that state in the driver -- its just that scheduler only sends one location when it sends out the next downstream tasks. But if the stage fails, it might resubmit immediately using the other copy (but that is off the top of my head, I need to step through that part carefully ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there is a speculative and non-speculative task, and both succeed, you say both have shuffle data
Right.
scheduler only sends one location when it sends out the next downstream tasks
IIRC the map output tracker only keeps one location for each shuffle block.
def addMapOutput(mapId: Int, status: MapStatus): Unit = synchronized {
if (mapStatuses(mapId) == null) {
_numAvailableOutputs += 1
invalidateSerializedMapOutputStatusCache()
}
mapStatuses(mapId) = status
}
BTW that code is interesting in that it seems like it's possible to have the tracker pointing at one executor while the serialized version is pointing at another... so I guess, inadvertently, the behavior the comment is talking about is "the right thing".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah you are right. I guess I was remembering pre-SPARK-20715.
override def shuffleCleaned(shuffleId: Int): Unit = { | ||
// Because this is called in a completely separate thread, we post a custom event to the | ||
// listener bus so that the internal state is safely updated. | ||
listenerBus.post(ShuffleCleanedEvent(shuffleId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's the weird part, you get this event with a class that is not public.
The only things I can think of are (1) to create a new event loop here, which just forwards the listener events and also this new event or (2) create a "InternalListenerEvent" which is automatically filtered from all non-internal listeners. But both of those seem like overkill.
assert(monitor.timedOutExecutors(idleDeadline).isEmpty) | ||
assert(monitor.timedOutExecutors(shuffleDeadline) === Seq("1")) | ||
|
||
// Clean up the shuffles, executor now should now time out at the idle deadline. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should also test another job after this, which references the same shuffles from the earlier jobs, to make sure things get properly moved out of being idle.
Also one thing to keep an eye on is that if you re-use old shuffles, the DAGScheduler creates new stages, which then get skipped, but should keep the consistent shuffle ids. I'm thinking of simple pipelines where they get broken with a count or something, but logically its all one thing eg.
val cachedAfterFirstShuffle = someRdd.reduceByKey{ ...}.cache()
cachedAfterFirstShuffle.count() // job 1
cachedAfterFirstShuffle.map ... // job 2, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's job 3, isn't it? (L318)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, my fault, I looked at this test too quickly after thinking I had found the bug above
retest this please |
Test build #107025 has finished for PR 24817 at commit
|
I'll file a bug for the flaky test. |
retest this please |
Test build #107543 has finished for PR 24817 at commit
|
Any more feedback? Otherwise I'll merge next week. |
LGTM:) |
@@ -363,6 +363,17 @@ package object config { | |||
.checkValue(_ >= 0L, "Timeout must be >= 0.") | |||
.createWithDefault(60) | |||
|
|||
private[spark] val DYN_ALLOCATION_SHUFFLE_TRACKING = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
documenting these in configuration.md?
still lgtm |
thanks for adding the docs, those look good |
Test build #107695 has finished for PR 24817 at commit
|
retest this please |
Test build #107748 has finished for PR 24817 at commit
|
Retest this please. |
Test build #107757 has finished for PR 24817 at commit
|
Merging to master. |
This change adds a new option that enables dynamic allocation without the need for a shuffle service. This mode works by tracking which stages generate shuffle files, and keeping executors that generate data for those shuffles alive while the jobs that use them are active. A separate timeout is also added for shuffle data; so that executors that hold shuffle data can use a separate timeout before being removed because of being idle. This allows the shuffle data to be kept around in case it is needed by some new job, or allow users to be more aggressive in timing out executors that don't have shuffle data in active use. The code also hooks up to the context cleaner so that shuffles that are garbage collected are detected, and the respective executors not held unnecessarily. Testing done with added unit tests, and also with TPC-DS workloads on YARN without a shuffle service. Closes apache#24817 from vanzin/SPARK-27963. Authored-by: Marcelo Vanzin <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
This change adds a new option that enables dynamic allocation without the need for a shuffle service. This mode works by tracking which stages generate shuffle files, and keeping executors that generate data for those shuffles alive while the jobs that use them are active. A separate timeout is also added for shuffle data; so that executors that hold shuffle data can use a separate timeout before being removed because of being idle. This allows the shuffle data to be kept around in case it is needed by some new job, or allow users to be more aggressive in timing out executors that don't have shuffle data in active use. The code also hooks up to the context cleaner so that shuffles that are garbage collected are detected, and the respective executors not held unnecessarily. Testing done with added unit tests, and also with TPC-DS workloads on YARN without a shuffle service. Closes apache#24817 from vanzin/SPARK-27963. Authored-by: Marcelo Vanzin <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
…tion and shuffle service are on ### What changes were proposed in this pull request? This PR proposes to avoid to thrown NPE at context cleaner when shuffle service is on - it is kind of a small followup of #24817 Seems like it sets `null` for `shuffleIds` to track when the service is on. Later, `removeShuffle` tries to remove an element at `shuffleIds` which leads to NPE. It fixes it by explicitly not sending the event (`ShuffleCleanedEvent`) in this case. See the code path below: https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/SparkContext.scala#L584 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/ContextCleaner.scala#L125 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/ContextCleaner.scala#L190 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/ContextCleaner.scala#L220-L230 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L353-L357 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L347 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L400-L406 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L475 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L427 ### Why are the changes needed? This is a bug fix. ### Does this PR introduce any user-facing change? It prevents the exception: ``` 19/08/21 06:44:01 ERROR AsyncEventQueue: Listener ExecutorMonitor threw an exception java.lang.NullPointerException at org.apache.spark.scheduler.dynalloc.ExecutorMonitor$Tracker.removeShuffle(ExecutorMonitor.scala:479) at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.$anonfun$cleanupShuffle$2(ExecutorMonitor.scala:408) at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.$anonfun$cleanupShuffle$2$adapted(ExecutorMonitor.scala:407) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.cleanupShuffle(ExecutorMonitor.scala:407) at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.onOtherEvent(ExecutorMonitor.sc ``` ### How was this patch test? Unittest was added. Closes #25551 from HyukjinKwon/SPARK-28839. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
This change adds a new option that enables dynamic allocation without
the need for a shuffle service. This mode works by tracking which stages
generate shuffle files, and keeping executors that generate data for those
shuffles alive while the jobs that use them are active.
A separate timeout is also added for shuffle data; so that executors that
hold shuffle data can use a separate timeout before being removed because
of being idle. This allows the shuffle data to be kept around in case it
is needed by some new job, or allow users to be more aggressive in timing
out executors that don't have shuffle data in active use.
The code also hooks up to the context cleaner so that shuffles that are
garbage collected are detected, and the respective executors not held
unnecessarily.
Testing done with added unit tests, and also with TPC-DS workloads on
YARN without a shuffle service.