Skip to content

Commit e36f4d4

Browse files
daviesconviva-zz
authored andcommitted
[SPARK-791] [PySpark] fix pickle itemgetter with cloudpickle
fix the problem with pickle operator.itemgetter with multiple index. Author: Davies Liu <[email protected]> Closes apache#1627 from davies/itemgetter and squashes the following commits: aabd7fa [Davies Liu] fix pickle itemgetter with cloudpickle
1 parent c35fa7c commit e36f4d4

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

python/pyspark/cloudpickle.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -560,8 +560,9 @@ class ItemGetterType(ctypes.Structure):
560560
]
561561

562562

563-
itemgetter_obj = ctypes.cast(ctypes.c_void_p(id(obj)), ctypes.POINTER(ItemGetterType)).contents
564-
return self.save_reduce(operator.itemgetter, (itemgetter_obj.item,))
563+
obj = ctypes.cast(ctypes.c_void_p(id(obj)), ctypes.POINTER(ItemGetterType)).contents
564+
return self.save_reduce(operator.itemgetter,
565+
obj.item if obj.nitems > 1 else (obj.item,))
565566

566567
if PyObject_HEAD:
567568
dispatch[operator.itemgetter] = save_itemgetter

python/pyspark/tests.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,12 @@ def combOp(x, y):
284284
self.assertEqual(set([2]), sets[3])
285285
self.assertEqual(set([1, 3]), sets[5])
286286

287+
def test_itemgetter(self):
288+
rdd = self.sc.parallelize([range(10)])
289+
from operator import itemgetter
290+
self.assertEqual([1], rdd.map(itemgetter(1)).collect())
291+
self.assertEqual([(2, 3)], rdd.map(itemgetter(2, 3)).collect())
292+
287293

288294
class TestIO(PySparkTestCase):
289295

0 commit comments

Comments
 (0)