Skip to content

Commit 574b564

Browse files
Merge pull request #2 from megatron-me-uk/patch-4
Use boolean checkCode optional parameter
2 parents eb4801c + 0c1e762 commit 574b564

File tree

2 files changed

+6
-16
lines changed

2 files changed

+6
-16
lines changed

python/pyspark/rdd.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -687,24 +687,15 @@ def groupBy(self, f, numPartitions=None):
687687
return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
688688

689689
@ignore_unicode_prefix
690-
def pipe(self, command, env={}, mode='permissive'):
690+
def pipe(self, command, env={}, checkCode=False):
691691
"""
692692
Return an RDD created by piping elements to a forked external process.
693693
694694
>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
695695
[u'1', u'2', u'', u'3']
696+
697+
:param checkCode: whether or not to check the return value of the shell command.
696698
"""
697-
if mode == 'permissive':
698-
def fail_condition(x):
699-
return False
700-
elif mode == 'strict':
701-
def fail_condition(x):
702-
return x != 0
703-
elif mode == 'grep':
704-
def fail_condition(x):
705-
return x != 0 and x != 1
706-
else:
707-
raise ValueError("mode must be one of 'permissive', 'strict' or 'grep'.")
708699

709700
def func(iterator):
710701
pipe = Popen(
@@ -719,7 +710,7 @@ def pipe_objs(out):
719710

720711
def check_return_code():
721712
pipe.wait()
722-
if fail_condition(pipe.returncode):
713+
if checkCode and pipe.returncode:
723714
raise Exception("Pipe function `%s' exited "
724715
"with error code %d" % (command, pipe.returncode))
725716
else:

python/pyspark/tests.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -879,13 +879,12 @@ def test_pipe_functions(self):
879879
rdd = self.sc.parallelize(data)
880880
with QuietTest(self.sc):
881881
self.assertEqual([], rdd.pipe('cc').collect())
882-
self.assertRaises(Py4JJavaError, rdd.pipe('cc', mode='strict').collect)
882+
self.assertRaises(Py4JJavaError, rdd.pipe('cc', checkCode=True).collect)
883883
result = rdd.pipe('cat').collect()
884884
result.sort()
885885
[self.assertEqual(x, y) for x, y in zip(data, result)]
886-
self.assertRaises(Py4JJavaError, rdd.pipe('grep 4', mode='strict').collect)
886+
self.assertRaises(Py4JJavaError, rdd.pipe('grep 4', checkCode=True).collect)
887887
self.assertEqual([], rdd.pipe('grep 4').collect())
888-
self.assertEqual([], rdd.pipe('grep 4', mode='grep').collect())
889888

890889

891890
class ProfilerTests(PySparkTestCase):

0 commit comments

Comments
 (0)