Skip to content

Commit b8afe30

Browse files
ScrapCodesmateiz
authored andcommitted
SPARK-1162 Added top in python.
Author: Prashant Sharma <[email protected]> Closes #93 from ScrapCodes/SPARK-1162/pyspark-top-takeOrdered and squashes the following commits: ece1fa4 [Prashant Sharma] Added top in python.
1 parent 5d1ec64 commit b8afe30

File tree

1 file changed

+25
-0
lines changed

1 file changed

+25
-0
lines changed

python/pyspark/rdd.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from tempfile import NamedTemporaryFile
3030
from threading import Thread
3131
import warnings
32+
from heapq import heappush, heappop, heappushpop
3233

3334
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
3435
BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
@@ -660,6 +661,30 @@ def mergeMaps(m1, m2):
660661
m1[k] += v
661662
return m1
662663
return self.mapPartitions(countPartition).reduce(mergeMaps)
664+
665+
def top(self, num):
666+
"""
667+
Get the top N elements from a RDD.
668+
669+
Note: It returns the list sorted in ascending order.
670+
>>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
671+
[12]
672+
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().top(2)
673+
[5, 6]
674+
"""
675+
def topIterator(iterator):
676+
q = []
677+
for k in iterator:
678+
if len(q) < num:
679+
heappush(q, k)
680+
else:
681+
heappushpop(q, k)
682+
yield q
683+
684+
def merge(a, b):
685+
return next(topIterator(a + b))
686+
687+
return sorted(self.mapPartitions(topIterator).reduce(merge))
663688

664689
def take(self, num):
665690
"""

0 commit comments

Comments
 (0)