Skip to content

Commit d33df1c

Browse files
mengxrmateiz
authored andcommitted
[SPARK-1674] fix interrupted system call error in pyspark's RDD.pipe
`RDD.pipe`'s doctest throws interrupted system call exception on Mac. It can be fixed by wrapping `pipe.stdout.readline` in an iterator. Author: Xiangrui Meng <[email protected]> Closes #594 from mengxr/pyspark-pipe and squashes the following commits: cc32ac9 [Xiangrui Meng] fix interrupted system call error in pyspark's RDD.pipe
1 parent bf8d0aa commit d33df1c

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

python/pyspark/rdd.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -537,8 +537,8 @@ def pipe(self, command, env={}):
537537
"""
538538
Return an RDD created by piping elements to a forked external process.
539539
540-
>>> sc.parallelize([1, 2, 3]).pipe('cat').collect()
541-
['1', '2', '3']
540+
>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
541+
['1', '2', '', '3']
542542
"""
543543
def func(iterator):
544544
pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
@@ -547,7 +547,7 @@ def pipe_objs(out):
547547
out.write(str(obj).rstrip('\n') + '\n')
548548
out.close()
549549
Thread(target=pipe_objs, args=[pipe.stdin]).start()
550-
return (x.rstrip('\n') for x in pipe.stdout)
550+
return (x.rstrip('\n') for x in iter(pipe.stdout.readline, ''))
551551
return self.mapPartitions(func)
552552

553553
def foreach(self, f):

0 commit comments

Comments
 (0)