Skip to content

SPARK-1170 Added histogram(buckets) to pyspark and not histogram(noOfBuckets). #121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
if (buckets.length < 2) {
throw new IllegalArgumentException("buckets array must have at least two elements")
}
// The histogramPartition function computes the partail histogram for a given
// The histogramPartition function computes the partial histogram for a given
// partition. The provided bucketFunction determines which bucket in the array
// to increment or returns None if there is no bucket. This is done so we can
// specialize for uniformly distributed buckets and save the O(log n) binary
Expand Down
50 changes: 49 additions & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from pyspark.rddsampler import RDDSampler

from py4j.java_collections import ListConverter, MapConverter

from bisect import bisect_left

__all__ = ["RDD"]

Expand Down Expand Up @@ -609,7 +609,55 @@ def sampleVariance(self):
1.0
"""
return self.stats().sampleVariance()

def histogram(self, buckets, evenBuckets=False):
"""
Compute a histogram using the provided buckets. The buckets are all open
to the left except for the last which is closed e.g. for the array [1, 10, 20, 50]
the buckets are [1, 10) [10, 20) [20, 50] e.g 1<=x<10 , 10<=x<20, 20<=x<50
And on the input of 1 and 50 we would have a histogram of 1, 0, 0.

>>> a = sc.parallelize(range(100))
>>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], evenBuckets=True)
[10, 10, 10, 10, 10, 10, 10, 10, 10]
>>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90])
[10, 10, 10, 10, 10, 10, 10, 10, 10]
>>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99])
[10, 10, 10, 10, 10, 10, 10, 10, 10, 9]
"""

def histogramPartition(iterator):
counters = [0 for i in range(len(buckets) - 1)]
for i in iterator:
if (not evenBuckets):
t = basicBucketFunction(i)
else:
t = fastBucketFunciton(buckets[0], buckets[1] - buckets[0], len(counters), i)
if(not(t == None)):
counters[t] += 1
return [counters]

def mergeCounters(a1, a2):
for i in range(len(a1)):
a1[i] = a1[i] + a2[i]
return a1

def basicBucketFunction(e):
loc = bisect_left(buckets, e, 0, len(buckets))
if loc > 0 and loc < len(buckets):
return loc - 1
else:
return None

def fastBucketFunciton(minimum, inc, count, e):
bucketNumber = (e - minimum) / inc
if (bucketNumber >= count or bucketNumber < 0):
return None
return min(int(bucketNumber), count -1)

return self.mapPartitions(lambda x: histogramPartition(x)).reduce(mergeCounters)


def countByValue(self):
"""
Return the count of each unique value in this RDD as a dictionary of
Expand Down