Skip to content

Commit d39f2e9

Browse files
Davies Liumengxr
authored andcommitted
[SPARK-4477] [PySpark] remove numpy from RDDSampler
In RDDSampler, it try use numpy to gain better performance for possion(), but the number of call of random() is only (1+faction) * N in the pure python implementation of possion(), so there is no much performance gain from numpy. numpy is not a dependent of pyspark, so it maybe introduce some problem, such as there is no numpy installed in slaves, but only installed master, as reported in SPARK-927. It also complicate the code a lot, so we may should remove numpy from RDDSampler. I also did some benchmark to verify that: ``` >>> from pyspark.mllib.random import RandomRDDs >>> rdd = RandomRDDs.uniformRDD(sc, 1 << 20, 1).cache() >>> rdd.count() # cache it >>> rdd.sample(True, 0.9).count() # measure this line ``` the results: |withReplacement | random | numpy.random | ------- | ------------ | ------- |True | 1.5 s| 1.4 s| |False| 0.6 s | 0.8 s| closes apache#2313 Note: this patch including some commits that not mirrored to github, it will be OK after it catches up. Author: Davies Liu <[email protected]> Author: Xiangrui Meng <[email protected]> Closes apache#3351 from davies/numpy and squashes the following commits: 5c438d7 [Davies Liu] fix comment c5b9252 [Davies Liu] Merge pull request #1 from mengxr/SPARK-4477 98eb31b [Xiangrui Meng] make poisson sampling slightly faster ee17d78 [Davies Liu] remove = for float 13f7b05 [Davies Liu] Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/spark into numpy f583023 [Davies Liu] fix tests 51649f5 [Davies Liu] remove numpy in RDDSampler 78bf997 [Davies Liu] fix tests, do not use numpy in randomSplit, no performance gain f5fdf63 [Davies Liu] fix bug with int in weights 4dfa2cd [Davies Liu] refactor f866bcf [Davies Liu] remove unneeded change c7a2007 [Davies Liu] switch to python implementation 95a48ac [Davies Liu] Merge branch 'master' of github.com:apache/spark into randomSplit 0d9b256 [Davies Liu] refactor 1715ee3 [Davies Liu] address comments 41fce54 [Davies Liu] randomSplit()
1 parent ad5f1f3 commit d39f2e9

File tree

2 files changed

+40
-69
lines changed

2 files changed

+40
-69
lines changed

python/pyspark/rdd.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -310,8 +310,11 @@ def distinct(self, numPartitions=None):
310310

311311
def sample(self, withReplacement, fraction, seed=None):
312312
"""
313-
Return a sampled subset of this RDD (relies on numpy and falls back
314-
on default random generator if numpy is unavailable).
313+
Return a sampled subset of this RDD.
314+
315+
>>> rdd = sc.parallelize(range(100), 4)
316+
>>> rdd.sample(False, 0.1, 81).count()
317+
10
315318
"""
316319
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
317320
return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
@@ -343,8 +346,7 @@ def randomSplit(self, weights, seed=None):
343346
# this is ported from scala/spark/RDD.scala
344347
def takeSample(self, withReplacement, num, seed=None):
345348
"""
346-
Return a fixed-size sampled subset of this RDD (currently requires
347-
numpy).
349+
Return a fixed-size sampled subset of this RDD.
348350
349351
>>> rdd = sc.parallelize(range(0, 10))
350352
>>> len(rdd.takeSample(True, 20, 1))

python/pyspark/rddsampler.py

Lines changed: 34 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -17,81 +17,48 @@
1717

1818
import sys
1919
import random
20+
import math
2021

2122

2223
class RDDSamplerBase(object):
2324

2425
def __init__(self, withReplacement, seed=None):
25-
try:
26-
import numpy
27-
self._use_numpy = True
28-
except ImportError:
29-
print >> sys.stderr, (
30-
"NumPy does not appear to be installed. "
31-
"Falling back to default random generator for sampling.")
32-
self._use_numpy = False
33-
34-
self._seed = seed if seed is not None else random.randint(0, 2 ** 32 - 1)
26+
self._seed = seed if seed is not None else random.randint(0, sys.maxint)
3527
self._withReplacement = withReplacement
3628
self._random = None
37-
self._split = None
38-
self._rand_initialized = False
3929

4030
def initRandomGenerator(self, split):
41-
if self._use_numpy:
42-
import numpy
43-
self._random = numpy.random.RandomState(self._seed ^ split)
44-
else:
45-
self._random = random.Random(self._seed ^ split)
31+
self._random = random.Random(self._seed ^ split)
4632

4733
# mixing because the initial seeds are close to each other
4834
for _ in xrange(10):
4935
self._random.randint(0, 1)
5036

51-
self._split = split
52-
self._rand_initialized = True
53-
54-
def getUniformSample(self, split):
55-
if not self._rand_initialized or split != self._split:
56-
self.initRandomGenerator(split)
57-
58-
if self._use_numpy:
59-
return self._random.random_sample()
37+
def getUniformSample(self):
38+
return self._random.random()
39+
40+
def getPoissonSample(self, mean):
41+
# Using Knuth's algorithm described in
42+
# http://en.wikipedia.org/wiki/Poisson_distribution
43+
if mean < 20.0:
44+
# one exp and k+1 random calls
45+
l = math.exp(-mean)
46+
p = self._random.random()
47+
k = 0
48+
while p > l:
49+
k += 1
50+
p *= self._random.random()
6051
else:
61-
return self._random.uniform(0.0, 1.0)
62-
63-
def getPoissonSample(self, split, mean):
64-
if not self._rand_initialized or split != self._split:
65-
self.initRandomGenerator(split)
66-
67-
if self._use_numpy:
68-
return self._random.poisson(mean)
69-
else:
70-
# here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by
71-
# drawing a sequence of numbers delta_j ~ Exp(mean)
72-
num_arrivals = 1
73-
cur_time = 0.0
74-
75-
cur_time += self._random.expovariate(mean)
52+
# switch to the log domain, k+1 expovariate (random + log) calls
53+
p = self._random.expovariate(mean)
54+
k = 0
55+
while p < 1.0:
56+
k += 1
57+
p += self._random.expovariate(mean)
58+
return k
7659

77-
if cur_time > 1.0:
78-
return 0
79-
80-
while(cur_time <= 1.0):
81-
cur_time += self._random.expovariate(mean)
82-
num_arrivals += 1
83-
84-
return (num_arrivals - 1)
85-
86-
def shuffle(self, vals):
87-
if self._random is None:
88-
self.initRandomGenerator(0) # this should only ever called on the master so
89-
# the split does not matter
90-
91-
if self._use_numpy:
92-
self._random.shuffle(vals)
93-
else:
94-
self._random.shuffle(vals, self._random.random)
60+
def func(self, split, iterator):
61+
raise NotImplementedError
9562

9663

9764
class RDDSampler(RDDSamplerBase):
@@ -101,31 +68,32 @@ def __init__(self, withReplacement, fraction, seed=None):
10168
self._fraction = fraction
10269

10370
def func(self, split, iterator):
71+
self.initRandomGenerator(split)
10472
if self._withReplacement:
10573
for obj in iterator:
10674
# For large datasets, the expected number of occurrences of each element in
10775
# a sample with replacement is Poisson(frac). We use that to get a count for
10876
# each element.
109-
count = self.getPoissonSample(split, mean=self._fraction)
77+
count = self.getPoissonSample(self._fraction)
11078
for _ in range(0, count):
11179
yield obj
11280
else:
11381
for obj in iterator:
114-
if self.getUniformSample(split) <= self._fraction:
82+
if self.getUniformSample() < self._fraction:
11583
yield obj
11684

11785

11886
class RDDRangeSampler(RDDSamplerBase):
11987

12088
def __init__(self, lowerBound, upperBound, seed=None):
12189
RDDSamplerBase.__init__(self, False, seed)
122-
self._use_numpy = False # no performance gain from numpy
12390
self._lowerBound = lowerBound
12491
self._upperBound = upperBound
12592

12693
def func(self, split, iterator):
94+
self.initRandomGenerator(split)
12795
for obj in iterator:
128-
if self._lowerBound <= self.getUniformSample(split) < self._upperBound:
96+
if self._lowerBound <= self.getUniformSample() < self._upperBound:
12997
yield obj
13098

13199

@@ -136,15 +104,16 @@ def __init__(self, withReplacement, fractions, seed=None):
136104
self._fractions = fractions
137105

138106
def func(self, split, iterator):
107+
self.initRandomGenerator(split)
139108
if self._withReplacement:
140109
for key, val in iterator:
141110
# For large datasets, the expected number of occurrences of each element in
142111
# a sample with replacement is Poisson(frac). We use that to get a count for
143112
# each element.
144-
count = self.getPoissonSample(split, mean=self._fractions[key])
113+
count = self.getPoissonSample(self._fractions[key])
145114
for _ in range(0, count):
146115
yield key, val
147116
else:
148117
for key, val in iterator:
149-
if self.getUniformSample(split) <= self._fractions[key]:
118+
if self.getUniformSample() < self._fractions[key]:
150119
yield key, val

0 commit comments

Comments
 (0)