diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index a7b6b3b5146ce..93eb63331078d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -129,7 +129,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { if (buckets.length < 2) { throw new IllegalArgumentException("buckets array must have at least two elements") } - // The histogramPartition function computes the partail histogram for a given + // The histogramPartition function computes the partial histogram for a given // partition. The provided bucketFunction determines which bucket in the array // to increment or returns None if there is no bucket. This is done so we can // specialize for uniformly distributed buckets and save the O(log n) binary diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index be23f87f5ed2d..c5513e5e57cc4 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -37,7 +37,7 @@ from pyspark.rddsampler import RDDSampler from py4j.java_collections import ListConverter, MapConverter - +from bisect import bisect_left __all__ = ["RDD"] @@ -609,7 +609,55 @@ def sampleVariance(self): 1.0 """ return self.stats().sampleVariance() + + def histogram(self, buckets, evenBuckets=False): + """ + Compute a histogram using the provided buckets. The buckets are all open + to the left except for the last which is closed e.g. for the array [1, 10, 20, 50] + the buckets are [1, 10) [10, 20) [20, 50] e.g 1<=x<10 , 10<=x<20, 20<=x<50 + And on the input of 1 and 50 we would have a histogram of 1, 0, 0. + + >>> a = sc.parallelize(range(100)) + >>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], evenBuckets=True) + [10, 10, 10, 10, 10, 10, 10, 10, 10] + >>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90]) + [10, 10, 10, 10, 10, 10, 10, 10, 10] + >>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99]) + [10, 10, 10, 10, 10, 10, 10, 10, 10, 9] + """ + + def histogramPartition(iterator): + counters = [0 for i in range(len(buckets) - 1)] + for i in iterator: + if (not evenBuckets): + t = basicBucketFunction(i) + else: + t = fastBucketFunciton(buckets[0], buckets[1] - buckets[0], len(counters), i) + if(not(t == None)): + counters[t] += 1 + return [counters] + + def mergeCounters(a1, a2): + for i in range(len(a1)): + a1[i] = a1[i] + a2[i] + return a1 + + def basicBucketFunction(e): + loc = bisect_left(buckets, e, 0, len(buckets)) + if loc > 0 and loc < len(buckets): + return loc - 1 + else: + return None + + def fastBucketFunciton(minimum, inc, count, e): + bucketNumber = (e - minimum) / inc + if (bucketNumber >= count or bucketNumber < 0): + return None + return min(int(bucketNumber), count -1) + return self.mapPartitions(lambda x: histogramPartition(x)).reduce(mergeCounters) + + def countByValue(self): """ Return the count of each unique value in this RDD as a dictionary of