Skip to content

Commit 51a77e9

Browse files
ScrapCodesmateiz
authored andcommitted
SPARK-1162 Added top in python.
Author: Prashant Sharma <[email protected]> Closes #93 from ScrapCodes/SPARK-1162/pyspark-top-takeOrdered and squashes the following commits: ece1fa4 [Prashant Sharma] Added top in python. (cherry picked from commit b8afe30) Signed-off-by: Matei Zaharia <[email protected]>
1 parent 7049164 commit 51a77e9

File tree

1 file changed

+25
-0
lines changed

1 file changed

+25
-0
lines changed

python/pyspark/rdd.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from tempfile import NamedTemporaryFile
2929
from threading import Thread
3030
import warnings
31+
from heapq import heappush, heappop, heappushpop
3132

3233
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
3334
BatchedSerializer, CloudPickleSerializer, pack_long
@@ -616,6 +617,30 @@ def mergeMaps(m1, m2):
616617
m1[k] += v
617618
return m1
618619
return self.mapPartitions(countPartition).reduce(mergeMaps)
620+
621+
def top(self, num):
622+
"""
623+
Get the top N elements from a RDD.
624+
625+
Note: It returns the list sorted in ascending order.
626+
>>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
627+
[12]
628+
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().top(2)
629+
[5, 6]
630+
"""
631+
def topIterator(iterator):
632+
q = []
633+
for k in iterator:
634+
if len(q) < num:
635+
heappush(q, k)
636+
else:
637+
heappushpop(q, k)
638+
yield q
639+
640+
def merge(a, b):
641+
return next(topIterator(a + b))
642+
643+
return sorted(self.mapPartitions(topIterator).reduce(merge))
619644

620645
def take(self, num):
621646
"""

0 commit comments

Comments
 (0)