Skip to content

Commit 8edc9d0

Browse files
Erik Selinmateiz
authored andcommitted
[SPARK-1468] Modify the partition function used by partitionBy.
Make partitionBy use a tweaked version of hash as its default partition function since the python hash function does not consistently assign the same value to None across python processes. Associated JIRA at https://issues.apache.org/jira/browse/SPARK-1468 Author: Erik Selin <[email protected]> Closes #371 from tyro89/consistent_hashing and squashes the following commits: 201c301 [Erik Selin] Make partitionBy use a tweaked version of hash as its default partition function since the python hash function does not consistently assign the same value to None across python processes.
1 parent b1f2853 commit 8edc9d0

File tree

1 file changed

+4
-1
lines changed

1 file changed

+4
-1
lines changed

python/pyspark/rdd.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1062,7 +1062,7 @@ def rightOuterJoin(self, other, numPartitions=None):
10621062
return python_right_outer_join(self, other, numPartitions)
10631063

10641064
# TODO: add option to control map-side combining
1065-
def partitionBy(self, numPartitions, partitionFunc=hash):
1065+
def partitionBy(self, numPartitions, partitionFunc=None):
10661066
"""
10671067
Return a copy of the RDD partitioned using the specified partitioner.
10681068
@@ -1073,6 +1073,9 @@ def partitionBy(self, numPartitions, partitionFunc=hash):
10731073
"""
10741074
if numPartitions is None:
10751075
numPartitions = self.ctx.defaultParallelism
1076+
1077+
if partitionFunc is None:
1078+
partitionFunc = lambda x: 0 if x is None else hash(x)
10761079
# Transferring O(n) objects to Java is too expensive. Instead, we'll
10771080
# form the hash buckets in Python, transferring O(numPartitions) objects
10781081
# to Java. Each object is a (splitNumber, [objects]) pair.

0 commit comments

Comments
 (0)