Skip to content

Commit 7f1cadf

Browse files
foxikmarkhamstra
authored andcommitted
[SPARK-5969][PySpark] Fix descending pyspark.rdd.sortByKey.
The samples should always be sorted in ascending order, because bisect.bisect_left is used on it. The reverse order of the result is already achieved in rangePartitioner by reversing the found index. The current implementation also work, but always uses only two partitions -- the first one and the last one (because the bisect_left return returns either "beginning" or "end" for a descending sequence). Author: Milan Straka <[email protected]> This patch had conflicts when merged, resolved by Committer: Josh Rosen <[email protected]> Closes apache#4761 from foxik/fix-descending-sort and squashes the following commits: 95896b5 [Milan Straka] Add regression test for SPARK-5969. 5757490 [Milan Straka] Fix descending pyspark.rdd.sortByKey.
1 parent 1f8cf66 commit 7f1cadf

File tree

2 files changed

+12
-1
lines changed

2 files changed

+12
-1
lines changed

python/pyspark/rdd.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,7 @@ def sortPartition(iterator):
587587
maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner
588588
fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
589589
samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
590-
samples = sorted(samples, reverse=(not ascending), key=keyfunc)
590+
samples = sorted(samples, key=keyfunc)
591591

592592
# we have numPartitions many parts but one of the them has
593593
# an implicit boundary

python/pyspark/tests.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,17 @@ def test_take_on_jrdd(self):
744744
rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
745745
rdd._jrdd.first()
746746

747+
def test_sortByKey_uses_all_partitions_not_only_first_and_last(self):
748+
# Regression test for SPARK-5969
749+
seq = [(i * 59 % 101, i) for i in range(101)] # unsorted sequence
750+
rdd = self.sc.parallelize(seq)
751+
for ascending in [True, False]:
752+
sort = rdd.sortByKey(ascending=ascending, numPartitions=5)
753+
self.assertEqual(sort.collect(), sorted(seq, reverse=not ascending))
754+
sizes = sort.glom().map(len).collect()
755+
for size in sizes:
756+
self.assertGreater(size, 0)
757+
747758

748759
class ProfilerTests(PySparkTestCase):
749760

0 commit comments

Comments
 (0)