Skip to content

Commit e54f986

Browse files
committed
add comments
1 parent 16aa64f commit e54f986

File tree

4 files changed

+59
-17
lines changed

4 files changed

+59
-17
lines changed

python/pyspark/java_gateway.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,14 @@ def run(self):
108108
java_import(gateway.jvm, "org.apache.spark.SparkConf")
109109
java_import(gateway.jvm, "org.apache.spark.api.java.*")
110110
java_import(gateway.jvm, "org.apache.spark.api.python.*")
111-
java_import(gateway.jvm, "org.apache.spark.streaming.*")
111+
java_import(gateway.jvm, "org.apache.spark.streaming.*") # do we need this?
112112
java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
113113
java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
114-
java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*")
114+
java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*") # do we need this?
115115
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
116116
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
117117
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
118118
java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
119119
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
120120
java_import(gateway.jvm, "scala.Tuple2")
121-
122121
return gateway

python/pyspark/streaming/context.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
6464
pyFiles=pyFiles, environment=environment, batchSize=batchSize,
6565
serializer=serializer, conf=conf, gateway=gateway)
6666

67-
# Start py4j callback server
67+
# Start py4j callback server.
68+
# Callback sever is need only by SparkStreming; therefore the callback sever
69+
# is started in StreamingContext.
6870
SparkContext._gateway.restart_callback_server()
6971
self._clean_up_trigger()
7072
self._jvm = self._sc._jvm
@@ -78,6 +80,8 @@ def _clean_up_trigger(self):
7880
"""Kill py4j callback server properly using signal lib"""
7981

8082
def clean_up_handler(*args):
83+
# Make sure stop callback server.
84+
# This need improvement how to terminate callback sever properly.
8185
SparkContext._gateway._shutdown_callback_server()
8286
SparkContext._gateway.shutdown()
8387
sys.exit(0)
@@ -100,7 +104,7 @@ def awaitTermination(self, timeout=None):
100104
else:
101105
self._jssc.awaitTermination(timeout)
102106

103-
# start from simple one. storageLevel is not passed for now.
107+
#TODO: add storageLevel
104108
def socketTextStream(self, hostname, port):
105109
"""
106110
Create an input from TCP source hostname:port. Data is received using
@@ -134,7 +138,7 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
134138
def _testInputStream(self, test_inputs, numSlices=None):
135139
"""
136140
This function is only for test.
137-
This implementation is inpired by QueStream implementation.
141+
This implementation is inspired by QueStream implementation.
138142
Give list of RDD to generate DStream which contains the RDD.
139143
"""
140144
test_rdds = list()
@@ -144,9 +148,6 @@ def _testInputStream(self, test_inputs, numSlices=None):
144148
test_rdds.append(test_rdd._jrdd)
145149
test_rdd_deserializers.append(test_rdd._jrdd_deserializer)
146150

147-
# if len(set(test_rdd_deserializers)) > 1:
148-
# raise IOError("Deserializer should be one type to run test case. "
149-
# "See the SparkContext.parallelize to understand how to decide deserializer")
150151
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
151152
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()
152153

python/pyspark/streaming/dstream.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,17 @@ def checkpoint(self, interval):
344344
return self
345345

346346
def groupByKey(self, numPartitions=None):
347+
"""
348+
Return a new DStream which contains group the values for each key in the
349+
DStream into a single sequence.
350+
Hash-partitions the resulting RDD with into numPartitions partitions in
351+
the DStream.
352+
353+
Note: If you are grouping in order to perform an aggregation (such as a
354+
sum or average) over each key, using reduceByKey will provide much
355+
better performance.
356+
357+
"""
347358
def createCombiner(x):
348359
return [x]
349360

@@ -359,6 +370,10 @@ def mergeCombiners(a, b):
359370
numPartitions).mapValues(lambda x: ResultIterable(x))
360371

361372
def countByValue(self):
373+
"""
374+
Return new DStream which contains the count of each unique value in this
375+
DStreeam as a (value, count) pairs.
376+
"""
362377
def countPartition(iterator):
363378
counts = defaultdict(int)
364379
for obj in iterator:
@@ -373,6 +388,9 @@ def mergeMaps(m1, m2):
373388
return self.mapPartitions(countPartition).reduce(mergeMaps).flatMap(lambda x: x.items())
374389

375390
def saveAsTextFiles(self, prefix, suffix=None):
391+
"""
392+
Save this DStream as a text file, using string representations of elements.
393+
"""
376394

377395
def saveAsTextFile(rdd, time):
378396
path = rddToFileName(prefix, suffix, time)
@@ -381,6 +399,11 @@ def saveAsTextFile(rdd, time):
381399
return self.foreachRDD(saveAsTextFile)
382400

383401
def saveAsPickledFiles(self, prefix, suffix=None):
402+
"""
403+
Save this DStream as a SequenceFile of serialized objects. The serializer
404+
used is L{pyspark.serializers.PickleSerializer}, default batch size
405+
is 10.
406+
"""
384407

385408
def saveAsTextFile(rdd, time):
386409
path = rddToFileName(prefix, suffix, time)
@@ -410,6 +433,7 @@ def saveAsTextFile(rdd, time):
410433
# TODO: implement leftOuterJoin
411434
# TODO: implemtnt rightOuterJoin
412435

436+
413437
class PipelinedDStream(DStream):
414438
def __init__(self, prev, func, preservesPartitioning=False):
415439
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():

python/pyspark/streaming_tests.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,11 @@
1818
"""
1919
Unit tests for PySpark; additional tests are implemented as doctests in
2020
individual modules.
21-
Other option is separate this test case with other tests.
22-
This makes sense becuase streaming tests takes long time due to waiting time
23-
for stoping callback server.
2421
25-
This file will merged to tests.py. But for now, this file is separated due
26-
to focusing to streaming test case
22+
This file would be merged to tests.py after all functions are ready.
23+
But for now, this file is separated due to focusing to streaming test case.
24+
25+
Callback server seems like unstable sometimes, which cause error in test case.
2726
2827
"""
2928
from itertools import chain
@@ -43,10 +42,10 @@ def setUp(self):
4342

4443
def tearDown(self):
4544
# Do not call pyspark.streaming.context.StreamingContext.stop directly because
46-
# we do not wait to shutdowncall back server and py4j client
45+
# we do not wait to shutdown call back server and py4j client
4746
self.ssc._jssc.stop()
4847
self.ssc._sc.stop()
49-
# Why does it long time to terminaete StremaingContext and SparkContext?
48+
# Why does it long time to terminate StremaingContext and SparkContext?
5049
# Should we change the sleep time if this depends on machine spec?
5150
time.sleep(10)
5251

@@ -68,7 +67,7 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase):
6867
I am wondering if these test are enough or not.
6968
All tests input should have list of lists. This represents stream.
7069
Every batch interval, the first object of list are chosen to make DStream.
71-
Please see the BasicTestSuits in Scala or QueStream which is close to this implementation.
70+
Please see the BasicTestSuits in Scala which is close to this implementation.
7271
"""
7372
def setUp(self):
7473
PySparkStreamingTestCase.setUp(self)
@@ -358,5 +357,24 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
358357

359358
return self.result
360359

360+
class TestSaveAsFilesSuite(PySparkStreamingTestCase):
361+
def setUp(self):
362+
PySparkStreamingTestCase.setUp(self)
363+
self.timeout = 10 # seconds
364+
self.numInputPartitions = 2
365+
self.result = list()
366+
367+
def tearDown(self):
368+
PySparkStreamingTestCase.tearDown(self)
369+
370+
@classmethod
371+
def tearDownClass(cls):
372+
PySparkStreamingTestCase.tearDownClass()
373+
374+
375+
376+
377+
378+
361379
if __name__ == "__main__":
362380
unittest.main()

0 commit comments

Comments
 (0)