Skip to content

Commit 7877a2a

Browse files
committed
Fixed code
1 parent ba02414 commit 7877a2a

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

python/pyspark/join.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,13 @@ def dispatch(seq):
8080

8181

8282
def python_cogroup(rdds, numPartitions):
83-
vrdds = [rdd.map(lambda (k, v): (k, (i, v))) for i, rdd in enumerate(rdds)]
83+
def make_mapper(i):
84+
return lambda (k, v): (k, (i, v))
85+
vrdds = [rdd.map(make_mapper(i)) for i, rdd in enumerate(rdds)]
8486
union_vrdds = reduce(lambda acc, other: acc.union(other), vrdds)
87+
rdd_len = len(vrdds)
8588
def dispatch(seq):
86-
bufs = [[] for rdd in vrdds]
89+
bufs = [[] for i in range(rdd_len)]
8790
for (n, v) in seq:
8891
bufs[n].append(v)
8992
return tuple(map(ResultIterable, bufs))

0 commit comments

Comments
 (0)