Skip to content

[SPARK-4327] [PySpark] Python API for RDD.randomSplit() #3193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Nov 10, 2014

pyspark.RDD.randomSplit(self, weights, seed=None)
    Randomly splits this RDD with the provided weights.

    :param weights: weights for splits, will be normalized if they don't sum to 1
    :param seed: random seed
    :return: split RDDs in an list

    >>> rdd = sc.parallelize(range(10), 1)
    >>> rdd1, rdd2, rdd3 = rdd.randomSplit([0.4, 0.6, 1.0], 11)
    >>> rdd1.collect()
    [3, 6]
    >>> rdd2.collect()
    [0, 5, 7]
    >>> rdd3.collect()
    [1, 2, 4, 8, 9]

@davies
Copy link
Contributor Author

davies commented Nov 10, 2014

cc @mengxr @JoshRosen @mateiz, since there is a public API in this PR.

@SparkQA
Copy link

SparkQA commented Nov 11, 2014

Test build #23172 has started for PR 3193 at commit 41fce54.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 11, 2014

Test build #23172 has finished for PR 3193 at commit 41fce54.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23172/
Test PASSed.


/**
* A helper to convert java.util.List[Double] into Array[Double]
* @param list
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind removing these empty Scaladoc tags?

@JoshRosen
Copy link
Contributor

@mengxr I think that you should review this, since I don't understand how randomSplit is is implemented in Scala or what sorts of properties / correctness guarantees it's supposed to exhibit.

@davies
Copy link
Contributor Author

davies commented Nov 12, 2014

@mengxr Good catch! I had updated it.

@SparkQA
Copy link

SparkQA commented Nov 12, 2014

Test build #23283 has started for PR 3193 at commit 0d9b256.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 13, 2014

Test build #23283 has finished for PR 3193 at commit 0d9b256.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23283/
Test PASSed.

@davies
Copy link
Contributor Author

davies commented Nov 13, 2014

@JoshRosen @mengxr I had moved to implement it in Python, it avoid the problem of change the batchSize, please review it again.

@SparkQA
Copy link

SparkQA commented Nov 13, 2014

Test build #23320 has started for PR 3193 at commit f866bcf.

  • This patch merges cleanly.

@@ -111,7 +112,7 @@ def func(self, split, iterator):
yield obj
else:
for obj in iterator:
if self.getUniformSample(split) <= self._fraction:
if self._lowbound <= self.getUniformSample(split) < self._fraction:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an issue with the name here. Maybe we should keep the name fraction and rename lowbound to acceptanceRangeStart. Then check acceptanceRangeStart + fraction <= 1.0 + eps in the constructor, and call RDDSampler(False, ub - lb, seed, lb).func in randomSplit.

@mengxr
Copy link
Contributor

mengxr commented Nov 13, 2014

@davies Did you compare the performance?

@davies
Copy link
Contributor Author

davies commented Nov 13, 2014

@mengxr I had run randomSplit([0.2, 0.3]) on an RDD of a million of int, the Scala version finished in 16.4 seconds, the python version finished in 20.5 seconds (25% slower).

@SparkQA
Copy link

SparkQA commented Nov 13, 2014

Test build #23325 has started for PR 3193 at commit 4dfa2cd.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 13, 2014

Test build #23320 has finished for PR 3193 at commit f866bcf.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23320/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Nov 13, 2014

Test build #23326 has started for PR 3193 at commit f5fdf63.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 14, 2014

Test build #23351 has started for PR 3193 at commit 51649f5.

  • This patch merges cleanly.

@davies
Copy link
Contributor Author

davies commented Nov 14, 2014

@mengxr I had simplified RDDSample by removing numpy, the reason has been updated in the description of this PR, please re-review it.

@mengxr
Copy link
Contributor

mengxr commented Nov 14, 2014

@davies Did you only measure the rdd.sample(...).count()? Sampling 1 million took about 0.6s without replacement and 2.5s with replacement on my computer. I think we use the same macbook model or yours is better:)

Maybe part of the time in your case was spent on broadcasting the rdd. Could you try the following:

from pyspark.mllib.random import RandomRDDs
rdd = RandomRDDs.uniformRDD(sc, 1 << 20, 1).cache()
rdd.count()
rdd.sample(True, 0.9).count()

@SparkQA
Copy link

SparkQA commented Nov 14, 2014

Test build #23351 has finished for PR 3193 at commit 51649f5.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class RDDRangeSampler(RDDSamplerBase):

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23351/
Test FAILed.

@davies
Copy link
Contributor Author

davies commented Nov 14, 2014

@mengxr I got same result with you (using your test code), I will update the results in description.

@davies
Copy link
Contributor Author

davies commented Nov 14, 2014

@mengxr I had updated it, the numpy is even slower when withReplacement is False.

@davies
Copy link
Contributor Author

davies commented Nov 14, 2014

remove the test for withReplacement=True, because random.expovariate() is consistant between 2.6 and 2.7.

@SparkQA
Copy link

SparkQA commented Nov 14, 2014

Test build #23359 has started for PR 3193 at commit f583023.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 14, 2014

Test build #23359 has finished for PR 3193 at commit f583023.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class RDDRangeSampler(RDDSamplerBase):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23359/
Test PASSed.

@mengxr
Copy link
Contributor

mengxr commented Nov 14, 2014

@davies I'm a little concerned about removing numpy.random in this PR: 1) it is beyond the topic of this PR, 2) it brings performance regression. Since we are comparing python random vs. numpy random, we can easily tell the performance outside Spark. Numpy's random is about 2x faster than python's random on my machine. Besides speed, another issue is the quality of RNG, on which we need to spend more time on the specification.

@davies
Copy link
Contributor Author

davies commented Nov 14, 2014

For 1), I could put the refactor in another JIRA/PR.

For the performance regression, I think it's a acceptable balance in performance and code manageability. There are lots of way to improve the performance of PySpark, such as numpy/Cython/numba/pypy/pandas, we should balance the dependence and complicity.

Actually, the current approach introduce problems, if numpy is available in driver, but not installed in slaves, it will failed. And someone try to fix this by #2313, but that PR may introduce another problems, the result will be sample() will be no-reproducible if some of the slaves have numpy but others do not, these complicate the problem a lot, but did not contribute huge performance gain.

@mengxr
Copy link
Contributor

mengxr commented Nov 14, 2014

For the quality of RNG, both python and numpy use Mersenne Twister (http://docs.scipy.org/doc/numpy/reference/generated/numpy.random.RandomState.html):

The Python stdlib module “random” also contains a Mersenne Twister pseudo-random number generator with a number of methods that are similar to the ones available in RandomState. RandomState, besides being NumPy-aware, has the advantage that it provides a much larger number of probability distributions to choose from.

I can tell numpy.random uses MT19937 from its source code, and perhaps Python implements the same RNG. So quality-wise, there should be no issues with always using Python's random.

But for the performance/code complexity trade-offs, maybe @JoshRosen should decide.

@davies
Copy link
Contributor Author

davies commented Nov 15, 2014

@JoshRosen How to you think of this? The MLlib tests may be blocked by this.

@JoshRosen
Copy link
Contributor

I don't really feel qualified to give an opinion here.

@davies
Copy link
Contributor Author

davies commented Nov 15, 2014

@mengxr @JoshRosen I had reverted the changes about numpy, because it blocks this PR, let's think about it later.

@SparkQA
Copy link

SparkQA commented Nov 15, 2014

Test build #23427 has started for PR 3193 at commit 78bf997.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 15, 2014

Test build #23427 has finished for PR 3193 at commit 78bf997.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class RDDRangeSampler(RDDSamplerBase):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23427/
Test PASSed.

@mengxr
Copy link
Contributor

mengxr commented Nov 19, 2014

LGTM. Merged into master and branch-1.2. Thanks! Let's create a new JIRA for the python random vs. numpy random discussion.

@davies
Copy link
Contributor Author

davies commented Nov 19, 2014

I had created https://issues.apache.org/jira/browse/SPARK-4477 for it.

davies pushed a commit to davies/spark that referenced this pull request Nov 19, 2014
```
pyspark.RDD.randomSplit(self, weights, seed=None)
    Randomly splits this RDD with the provided weights.

    :param weights: weights for splits, will be normalized if they don't sum to 1
    :param seed: random seed
    :return: split RDDs in an list

    >>> rdd = sc.parallelize(range(10), 1)
    >>> rdd1, rdd2, rdd3 = rdd.randomSplit([0.4, 0.6, 1.0], 11)
    >>> rdd1.collect()
    [3, 6]
    >>> rdd2.collect()
    [0, 5, 7]
    >>> rdd3.collect()
    [1, 2, 4, 8, 9]
```

Author: Davies Liu <[email protected]>

Closes apache#3193 from davies/randomSplit and squashes the following commits:

78bf997 [Davies Liu] fix tests, do not use numpy in randomSplit, no performance gain
f5fdf63 [Davies Liu] fix bug with int in weights
4dfa2cd [Davies Liu] refactor
f866bcf [Davies Liu] remove unneeded change
c7a2007 [Davies Liu] switch to python implementation
95a48ac [Davies Liu] Merge branch 'master' of github.com:apache/spark into randomSplit
0d9b256 [Davies Liu] refactor
1715ee3 [Davies Liu] address comments
41fce54 [Davies Liu] randomSplit()
@davies
Copy link
Contributor Author

davies commented Nov 19, 2014

merged.

@davies davies closed this Nov 19, 2014
davies pushed a commit that referenced this pull request Nov 19, 2014
```
pyspark.RDD.randomSplit(self, weights, seed=None)
    Randomly splits this RDD with the provided weights.

    :param weights: weights for splits, will be normalized if they don't sum to 1
    :param seed: random seed
    :return: split RDDs in an list

    >>> rdd = sc.parallelize(range(10), 1)
    >>> rdd1, rdd2, rdd3 = rdd.randomSplit([0.4, 0.6, 1.0], 11)
    >>> rdd1.collect()
    [3, 6]
    >>> rdd2.collect()
    [0, 5, 7]
    >>> rdd3.collect()
    [1, 2, 4, 8, 9]
```

Author: Davies Liu <[email protected]>

Closes #3193 from davies/randomSplit and squashes the following commits:

78bf997 [Davies Liu] fix tests, do not use numpy in randomSplit, no performance gain
f5fdf63 [Davies Liu] fix bug with int in weights
4dfa2cd [Davies Liu] refactor
f866bcf [Davies Liu] remove unneeded change
c7a2007 [Davies Liu] switch to python implementation
95a48ac [Davies Liu] Merge branch 'master' of github.com:apache/spark into randomSplit
0d9b256 [Davies Liu] refactor
1715ee3 [Davies Liu] address comments
41fce54 [Davies Liu] randomSplit()

(cherry picked from commit 7f22fa8)
Signed-off-by: Xiangrui Meng <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants