-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[WIP][SPARK-4673][SQL] Optimizing limit using coalesce #3531
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #23978 has started for PR 3531 at commit
|
Test build #23978 has finished for PR 3531 at commit
|
Test PASSed. |
iter.take(limit).map(row => (false, row.copy())) | ||
if (sortBasedShuffleOn) { | ||
child.execute().map(_.copy).coalesce(1).mapPartitions { iter => | ||
iter.take(limit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move the map(_.copy)
after take(limit)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You do need to copy()
before any take
or collect
operation because SparkSQL will reuse row objects and these operations create arrays that will end up all containing the same object.
Will this actually always be faster? It seems like in some cases you are actually just eliminating a bunch of parallelism? Do you have some benchmarks? |
hi @marmbrus, the old version also eliminate the parallelism to 1 by a shuffledRDD, the diff is this PR using |
Is there an assumption, the LIMIT number is quite small? |
I tested with limit number 5000. I am testing more for this. I do not think limit number has big affect. |
@scwf I am not sure if this is a good idea in general. Think about a highly selective filter, e.g. select * from every_body_in_the_world where company="Databricks" limit 5; In this case, with your patch this query is going to run slowly on a single thread to scan all the data ... |
Yes, i also realize this, it will not be always faster, since |
I think it is too risky to do this this way right now. It seems to me the advantage of coalesce only shows up when you have a huge number of partitions without a highly selective filter. Maybe we can have two variants of Limit, and in the optimizer, we pick the coalesce one if there is no filter at all? |
@rxin Yes, we can not change to coalesce here, I agree with you of coalesce's advantages situation, and i will try to do the optimization with coalesce for no filter. Thanks;) |
BTW it doesn't have to be a new operator. Can also just add a flag to Limit. |
Actually one more question before you make big changes: executeCollect should be called most of the time (if you run a sql query). In what cases did you run into this problem? |
Ah I see. We should absolutely fix that one. Once that is fixed, do you think we still need this? It seems very unlikely execute() will be called on this. |
If limit is in a sub-queries, execute() will be called, right? But that is really rare:) |
It seems to me that case would be rare enough that we probably don't need to care at this point. There are a lot of other low hanging fruits that we can optimize. |
agree, to close this 发自我的 iPhone
|
Optimizing limit using coalesce to avoid shuffle.