-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[Spark] RDD take() method: overestimate too much #2648
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Can one of the admins verify this patch? |
This seems right to me yingjie. Let's see if the tests work |
Jenkins, test this please. |
Changes LGTM. |
QA tests have started for PR 2648 at commit
|
QA tests have started for PR 2648 at commit
|
Tests timed out for PR 2648 at commit |
Test FAILed. |
Tests timed out for PR 2648 at commit |
It seems like this leads to some infinite loop and tests are timing out because of that. |
hmm... |
@rxin |
Could you make this change in rdd.py as well? The code should be kept equivalent. |
updated |
Jenkins, retest this please. |
QA tests have started for PR 2648 at commit
|
QA tests have finished for PR 2648 at commit
|
Test FAILed. |
oops, looks like |
@rxin |
@@ -84,10 +84,10 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi | |||
if (results.size == 0) { | |||
numPartsToTry = totalParts - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also change this to partsScanned * 4 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can. The comment says: "If we didn't find any rows after the first iteration, just try all partitions next" . I had little context about these decisions. But I agree that we should keep logic equivalent in these methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The take() was fixed in yingjieMiao@ba5bcad, but AsyncRDDActions was missed in that patch, thanks for bringing this on top of the table.
@davies addressed your comments. |
if (results.size == 0) { | ||
numPartsToTry = totalParts - 1 | ||
numPartsToTry = totalParts * 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
totalParts should be partsScanned
LGTM, thanks! Jenkins, retest this please. |
retest? @davies |
QA tests have started for PR 2648 at commit
|
QA tests have finished for PR 2648 at commit
|
numPartsToTry = (1.5 * num * partsScanned / results.size).toInt | ||
// the left side of max is >=1 whenever partsScanned >= 2 | ||
numPartsToTry = ((1.5 * num * partsScanned / results.size).toInt - partsScanned) max 1 | ||
numPartsToTry = numPartsToTry min (partsScanned * 4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Infix Methods
Don't use infix notation for methods that aren't operators. For example, instead of list map func, use list.map(func), or instead of string contains "foo", use string.contains("foo"). This is to improve familiarity to developers coming from other languages.
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for the pointer. Will fix!
@davies style fixed. thanks! |
it failed in python style check:
|
@davies fixed. thank you. |
Can one of the admins verify this patch? |
QA tests have started for PR 2648 at commit
|
QA tests have finished for PR 2648 at commit
|
Jenkins, test this please. |
QA tests have started for PR 2648 at commit
|
QA tests have finished for PR 2648 at commit
|
Test PASSed. |
@davies OK to merge? |
@yingjieMiao it looks good to me, waiting for other people. |
Merging in master. Thanks! |
I just realized we didn't have a jira for this. Let's make sure we create a jira ticket tracking updates. Thanks. |
In the comment (Line 1083), it says: "Otherwise, interpolate the number of partitions we need to try, but overestimate it by 50%."
(1.5 * num * partsScanned / buf.size).toInt
is the guess of "num of total partitions needed". In every iteration, we should consider the increment(1.5 * num * partsScanned / buf.size).toInt - partsScanned
Existing implementation 'exponentially' grows
partsScanned
( roughly:x_{n+1} >= (1.5 + 1) x_n
)This could be a performance problem. (unless this is the intended behavior)