Skip to content

Commit 58150f5

Browse files
committed
Changed the test case to focus the test operation
1 parent 199e37f commit 58150f5

File tree

3 files changed

+51
-40
lines changed

3 files changed

+51
-40
lines changed

python/pyspark/java_gateway.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,9 @@ def run(self):
8484
java_import(gateway.jvm, "org.apache.spark.SparkConf")
8585
java_import(gateway.jvm, "org.apache.spark.api.java.*")
8686
java_import(gateway.jvm, "org.apache.spark.api.python.*")
87-
java_import(gateway.jvm, "org.apache.spark.streaming.*") # do we need this?
8887
java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
8988
java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
90-
java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*") # do we need this?
89+
java_import(gateway.jvm, "org.apache.spark.streaming.*") # for Duration and Time
9190
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
9291
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
9392
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")

python/pyspark/streaming/dstream.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from pyspark.rdd import _JavaStackTrace
2626
from pyspark.storagelevel import StorageLevel
2727
from pyspark.resultiterable import ResultIterable
28-
from pyspark.streaming.utils import rddToFileName
28+
from pyspark.streaming.utils import rddToFileName, RDDFunction
2929

3030

3131
from py4j.java_collections import ListConverter, MapConverter
@@ -227,7 +227,6 @@ def foreachRDD(self, func):
227227
This is an output operator, so this DStream will be registered as an output
228228
stream and there materialized.
229229
"""
230-
from utils import RDDFunction
231230
wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
232231
self.ctx._jvm.PythonForeachDStream(self._jdstream.dstream(), wrapped_func)
233232

@@ -386,18 +385,18 @@ def saveAsTextFile(rdd, time):
386385

387386
return self.foreachRDD(saveAsTextFile)
388387

389-
def saveAsPickledFiles(self, prefix, suffix=None):
388+
def saveAsPickleFiles(self, prefix, suffix=None):
390389
"""
391390
Save this DStream as a SequenceFile of serialized objects. The serializer
392391
used is L{pyspark.serializers.PickleSerializer}, default batch size
393392
is 10.
394393
"""
395394

396-
def saveAsTextFile(rdd, time):
395+
def saveAsPickleFile(rdd, time):
397396
path = rddToFileName(prefix, suffix, time)
398397
rdd.saveAsPickleFile(path)
399398

400-
return self.foreachRDD(saveAsTextFile)
399+
return self.foreachRDD(saveAsPickleFile)
401400

402401

403402
# TODO: implement updateStateByKey

python/pyspark/streaming_tests.py

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class PySparkStreamingTestCase(unittest.TestCase):
3939
def setUp(self):
4040
class_name = self.__class__.__name__
4141
self.ssc = StreamingContext(appName=class_name, duration=Seconds(1))
42+
time.sleep(1)
4243

4344
def tearDown(self):
4445
# Do not call pyspark.streaming.context.StreamingContext.stop directly because
@@ -186,68 +187,73 @@ def test_func(dstream):
186187

187188
def test_reduceByKey_batch(self):
188189
"""Basic operation test for DStream.reduceByKey with batch deserializer"""
189-
test_input = [["a", "a", "b", "b"], ["", "", "", ""]]
190+
test_input = [[("a", 1), ("a", 1), ("b", 1), ("b", 1)],
191+
[("", 1),("", 1), ("", 1), ("", 1)],
192+
[(1, 1), (1, 1), (2, 1), (2, 1), (3, 1)]]
190193

191194
def test_func(dstream):
192-
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add)
193-
expected_output = [[("a", 2), ("b", 2)], [("", 4)]]
195+
return dstream.reduceByKey(operator.add)
196+
expected_output = [[("a", 2), ("b", 2)], [("", 4)], [(1, 2), (2, 2), (3 ,1)]]
194197
output = self._run_stream(test_input, test_func, expected_output)
198+
for result in (output, expected_output):
199+
self._sort_result_based_on_key(result)
195200
self.assertEqual(expected_output, output)
196201

197202
def test_reduceByKey_unbatch(self):
198203
"""Basic operation test for DStream.reduceByKey with unbatch deserilizer"""
199-
test_input = [["a", "a", "b"], ["", ""], []]
204+
test_input = [[("a", 1), ("a", 1), ("b", 1)], [("", 1), ("", 1)], []]
200205

201206
def test_func(dstream):
202-
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add)
207+
return dstream.reduceByKey(operator.add)
203208
expected_output = [[("a", 2), ("b", 1)], [("", 2)], []]
204209
output = self._run_stream(test_input, test_func, expected_output)
210+
for result in (output, expected_output):
211+
self._sort_result_based_on_key(result)
205212
self.assertEqual(expected_output, output)
206213

207214
def test_mapValues_batch(self):
208215
"""Basic operation test for DStream.mapValues with batch deserializer"""
209-
test_input = [["a", "a", "b", "b"], ["", "", "", ""]]
216+
test_input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)],
217+
[("", 4), (1, 1), (2, 2), (3, 3)]]
210218

211219
def test_func(dstream):
212-
return dstream.map(lambda x: (x, 1))\
213-
.reduceByKey(operator.add)\
214-
.mapValues(lambda x: x + 10)
215-
expected_output = [[("a", 12), ("b", 12)], [("", 14)]]
220+
return dstream.mapValues(lambda x: x + 10)
221+
expected_output = [[("a", 12), ("b", 12), ("c", 11), ("d", 11)],
222+
[("", 14), (1, 11), (2, 12), (3, 13)]]
216223
output = self._run_stream(test_input, test_func, expected_output)
224+
for result in (output, expected_output):
225+
self._sort_result_based_on_key(result)
217226
self.assertEqual(expected_output, output)
218227

219228
def test_mapValues_unbatch(self):
220229
"""Basic operation test for DStream.mapValues with unbatch deserializer"""
221-
test_input = [["a", "a", "b"], ["", ""], []]
230+
test_input = [[("a", 2), ("b", 1)], [("", 2)], []]
222231

223232
def test_func(dstream):
224-
return dstream.map(lambda x: (x, 1))\
225-
.reduceByKey(operator.add)\
226-
.mapValues(lambda x: x + 10)
233+
return dstream.mapValues(lambda x: x + 10)
227234
expected_output = [[("a", 12), ("b", 11)], [("", 12)], []]
228235
output = self._run_stream(test_input, test_func, expected_output)
229236
self.assertEqual(expected_output, output)
230237

231238
def test_flatMapValues_batch(self):
232239
"""Basic operation test for DStream.flatMapValues with batch deserializer"""
233-
test_input = [["a", "a", "b", "b"], ["", "", "", ""]]
240+
test_input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)], [("", 4), (1, 1), (2, 1), (3, 1)]]
234241

235242
def test_func(dstream):
236-
return dstream.map(lambda x: (x, 1))\
237-
.reduceByKey(operator.add)\
238-
.flatMapValues(lambda x: (x, x + 10))
239-
expected_output = [[("a", 2), ("a", 12), ("b", 2), ("b", 12)], [("", 4), ("", 14)]]
243+
return dstream.flatMapValues(lambda x: (x, x + 10))
244+
expected_output = [[("a", 2), ("a", 12), ("b", 2), ("b", 12),
245+
("c", 1), ("c", 11), ("d", 1), ("d", 11)],
246+
[("", 4), ("", 14), (1, 1), (1, 11),
247+
(2, 1), (2, 11), (3, 1), (3, 11)]]
240248
output = self._run_stream(test_input, test_func, expected_output)
241249
self.assertEqual(expected_output, output)
242250

243251
def test_flatMapValues_unbatch(self):
244252
"""Basic operation test for DStream.flatMapValues with unbatch deserializer"""
245-
test_input = [["a", "a", "b"], ["", ""], []]
253+
test_input = [[("a", 2), ("b", 1)], [("", 2)], []]
246254

247255
def test_func(dstream):
248-
return dstream.map(lambda x: (x, 1))\
249-
.reduceByKey(operator.add)\
250-
.flatMapValues(lambda x: (x, x + 10))
256+
return dstream.flatMapValues(lambda x: (x, x + 10))
251257
expected_output = [[("a", 2), ("a", 12), ("b", 1), ("b", 11)], [("", 2), ("", 12)], []]
252258
output = self._run_stream(test_input, test_func, expected_output)
253259
self.assertEqual(expected_output, output)
@@ -302,7 +308,7 @@ def f(iterator):
302308

303309
def test_countByValue_batch(self):
304310
"""Basic operation test for DStream.countByValue with batch deserializer."""
305-
test_input = [range(1, 5) + range(1,5), range(5, 7) + range(5, 9), ["a", "a", "b", ""]]
311+
test_input = [range(1, 5) * 2, range(5, 7) + range(5, 9), ["a", "a", "b", ""]]
306312

307313
def test_func(dstream):
308314
return dstream.countByValue()
@@ -330,9 +336,12 @@ def test_func(dstream):
330336

331337
def test_groupByKey_batch(self):
332338
"""Basic operation test for DStream.groupByKey with batch deserializer."""
333-
test_input = [range(1, 5), [1, 1, 1, 2, 2, 3], ["a", "a", "b", "", "", ""]]
339+
test_input = [[(1, 1), (2, 1), (3, 1), (4, 1)],
340+
[(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)],
341+
[("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]]
342+
334343
def test_func(dstream):
335-
return dstream.map(lambda x: (x, 1)).groupByKey()
344+
return dstream.groupByKey()
336345
expected_output = [[(1, [1]), (2, [1]), (3, [1]), (4, [1])],
337346
[(1, [1, 1, 1]), (2, [1, 1]), (3, [1])],
338347
[("a", [1, 1]), ("b", [1]), ("", [1, 1, 1])]]
@@ -344,10 +353,12 @@ def test_func(dstream):
344353

345354
def test_groupByKey_unbatch(self):
346355
"""Basic operation test for DStream.groupByKey with unbatch deserializer."""
347-
test_input = [range(1, 4), [1, 1, ""], ["a", "a", "b"]]
356+
test_input = [[(1, 1), (2, 1), (3, 1)],
357+
[(1, 1), (1, 1), ("", 1)],
358+
[("a", 1), ("a", 1), ("b", 1)]]
348359

349360
def test_func(dstream):
350-
return dstream.map(lambda x: (x, 1)).groupByKey()
361+
return dstream.groupByKey()
351362
expected_output = [[(1, [1]), (2, [1]), (3, [1])],
352363
[(1, [1, 1]), ("", [1])],
353364
[("a", [1, 1]), ("b", [1])]]
@@ -359,11 +370,13 @@ def test_func(dstream):
359370

360371
def test_combineByKey_batch(self):
361372
"""Basic operation test for DStream.combineByKey with batch deserializer."""
362-
test_input = [range(1, 5), [1, 1, 1, 2, 2, 3], ["a", "a", "b", "", "", ""]]
373+
test_input = [[(1, 1), (2, 1), (3, 1), (4, 1)],
374+
[(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)],
375+
[("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]]
363376

364377
def test_func(dstream):
365378
def add(a, b): return a + str(b)
366-
return dstream.map(lambda x: (x, 1)).combineByKey(str, add, add)
379+
return dstream.combineByKey(str, add, add)
367380
expected_output = [[(1, "1"), (2, "1"), (3, "1"), (4, "1")],
368381
[(1, "111"), (2, "11"), (3, "1")],
369382
[("a", "11"), ("b", "1"), ("", "111")]]
@@ -374,11 +387,11 @@ def add(a, b): return a + str(b)
374387

375388
def test_combineByKey_unbatch(self):
376389
"""Basic operation test for DStream.combineByKey with unbatch deserializer."""
377-
test_input = [range(1, 4), [1, 1, ""], ["a", "a", "b"]]
390+
test_input = [[(1, 1), (2, 1), (3 ,1)], [(1, 1), (1, 1), ("", 1)], [("a", 1), ("a", 1), ("b", 1)]]
378391

379392
def test_func(dstream):
380393
def add(a, b): return a + str(b)
381-
return dstream.map(lambda x: (x, 1)).combineByKey(str, add, add)
394+
return dstream.combineByKey(str, add, add)
382395
expected_output = [[(1, "1"), (2, "1"), (3, "1")],
383396
[(1, "11"), ("", "1")],
384397
[("a", "11"), ("b", "1")]]
@@ -446,4 +459,4 @@ def tearDownClass(cls):
446459

447460

448461
if __name__ == "__main__":
449-
unittest.main()
462+
unittest.main(verbosity=2)

0 commit comments

Comments
 (0)