Skip to content

Commit 5c04a5f

Browse files
committed
WIP: added PythonTestInputStream
1 parent 019ef38 commit 5c04a5f

File tree

3 files changed

+6
-10
lines changed

3 files changed

+6
-10
lines changed

examples/src/main/python/streaming/test_oprations.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,14 @@
66
from pyspark.streaming.duration import *
77

88
if __name__ == "__main__":
9-
if len(sys.argv) != 3:
10-
print >> sys.stderr, "Usage: wordcount <hostname> <port>"
11-
exit(-1)
129
conf = SparkConf()
1310
conf.setAppName("PythonStreamingNetworkWordCount")
1411
ssc = StreamingContext(conf=conf, duration=Seconds(1))
1512

16-
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
17-
words = lines.flatMap(lambda line: line.split(" "))
18-
# ssc.checkpoint("checkpoint")
19-
mapped_words = words.map(lambda word: (word, 1))
20-
count = mapped_words.reduceByKey(add)
13+
test_input = ssc._testInputStream([1,1,1,1])
14+
mapped = test_input.map(lambda x: (x, 1))
15+
mapped.pyprint()
2116

22-
count.pyprint()
2317
ssc.start()
24-
ssc.awaitTermination()
18+
# ssc.awaitTermination()
2519
# ssc.stop()

python/pyspark/streaming/context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import sys
1919
from signal import signal, SIGTERM, SIGINT
20+
from tempfile import NamedTemporaryFile
2021

2122

2223
from pyspark.conf import SparkConf

python/pyspark/streaming/dstream.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ def _mergeCombiners(iterator):
156156
combiners[k] = v
157157
else:
158158
combiners[k] = mergeCombiners(combiners[k], v)
159+
return combiners.iteritems()
159160

160161
return shuffled.mapPartitions(_mergeCombiners)
161162

0 commit comments

Comments
 (0)