Skip to content

Commit 6d8190a

Browse files
committed
add comments
1 parent 4aa99e4 commit 6d8190a

File tree

4 files changed

+59
-17
lines changed

4 files changed

+59
-17
lines changed

python/pyspark/java_gateway.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,14 @@ def run(self):
8484
java_import(gateway.jvm, "org.apache.spark.SparkConf")
8585
java_import(gateway.jvm, "org.apache.spark.api.java.*")
8686
java_import(gateway.jvm, "org.apache.spark.api.python.*")
87-
java_import(gateway.jvm, "org.apache.spark.streaming.*")
87+
java_import(gateway.jvm, "org.apache.spark.streaming.*") # do we need this?
8888
java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
8989
java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
90-
java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*")
90+
java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*") # do we need this?
9191
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
9292
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
9393
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
9494
java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
9595
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
9696
java_import(gateway.jvm, "scala.Tuple2")
97-
9897
return gateway

python/pyspark/streaming/context.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
6464
pyFiles=pyFiles, environment=environment, batchSize=batchSize,
6565
serializer=serializer, conf=conf, gateway=gateway)
6666

67-
# Start py4j callback server
67+
# Start py4j callback server.
68+
# Callback sever is need only by SparkStreming; therefore the callback sever
69+
# is started in StreamingContext.
6870
SparkContext._gateway.restart_callback_server()
6971
self._clean_up_trigger()
7072
self._jvm = self._sc._jvm
@@ -78,6 +80,8 @@ def _clean_up_trigger(self):
7880
"""Kill py4j callback server properly using signal lib"""
7981

8082
def clean_up_handler(*args):
83+
# Make sure stop callback server.
84+
# This need improvement how to terminate callback sever properly.
8185
SparkContext._gateway._shutdown_callback_server()
8286
SparkContext._gateway.shutdown()
8387
sys.exit(0)
@@ -100,7 +104,7 @@ def awaitTermination(self, timeout=None):
100104
else:
101105
self._jssc.awaitTermination(timeout)
102106

103-
# start from simple one. storageLevel is not passed for now.
107+
#TODO: add storageLevel
104108
def socketTextStream(self, hostname, port):
105109
"""
106110
Create an input from TCP source hostname:port. Data is received using
@@ -134,7 +138,7 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
134138
def _testInputStream(self, test_inputs, numSlices=None):
135139
"""
136140
This function is only for test.
137-
This implementation is inpired by QueStream implementation.
141+
This implementation is inspired by QueStream implementation.
138142
Give list of RDD to generate DStream which contains the RDD.
139143
"""
140144
test_rdds = list()
@@ -144,9 +148,6 @@ def _testInputStream(self, test_inputs, numSlices=None):
144148
test_rdds.append(test_rdd._jrdd)
145149
test_rdd_deserializers.append(test_rdd._jrdd_deserializer)
146150

147-
# if len(set(test_rdd_deserializers)) > 1:
148-
# raise IOError("Deserializer should be one type to run test case. "
149-
# "See the SparkContext.parallelize to understand how to decide deserializer")
150151
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
151152
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()
152153

python/pyspark/streaming/dstream.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,17 @@ def checkpoint(self, interval):
331331
return self
332332

333333
def groupByKey(self, numPartitions=None):
334+
"""
335+
Return a new DStream which contains group the values for each key in the
336+
DStream into a single sequence.
337+
Hash-partitions the resulting RDD with into numPartitions partitions in
338+
the DStream.
339+
340+
Note: If you are grouping in order to perform an aggregation (such as a
341+
sum or average) over each key, using reduceByKey will provide much
342+
better performance.
343+
344+
"""
334345
def createCombiner(x):
335346
return [x]
336347

@@ -346,6 +357,10 @@ def mergeCombiners(a, b):
346357
numPartitions).mapValues(lambda x: ResultIterable(x))
347358

348359
def countByValue(self):
360+
"""
361+
Return new DStream which contains the count of each unique value in this
362+
DStreeam as a (value, count) pairs.
363+
"""
349364
def countPartition(iterator):
350365
counts = defaultdict(int)
351366
for obj in iterator:
@@ -360,6 +375,9 @@ def mergeMaps(m1, m2):
360375
return self.mapPartitions(countPartition).reduce(mergeMaps).flatMap(lambda x: x.items())
361376

362377
def saveAsTextFiles(self, prefix, suffix=None):
378+
"""
379+
Save this DStream as a text file, using string representations of elements.
380+
"""
363381

364382
def saveAsTextFile(rdd, time):
365383
path = rddToFileName(prefix, suffix, time)
@@ -368,6 +386,11 @@ def saveAsTextFile(rdd, time):
368386
return self.foreachRDD(saveAsTextFile)
369387

370388
def saveAsPickledFiles(self, prefix, suffix=None):
389+
"""
390+
Save this DStream as a SequenceFile of serialized objects. The serializer
391+
used is L{pyspark.serializers.PickleSerializer}, default batch size
392+
is 10.
393+
"""
371394

372395
def saveAsTextFile(rdd, time):
373396
path = rddToFileName(prefix, suffix, time)
@@ -397,6 +420,7 @@ def saveAsTextFile(rdd, time):
397420
# TODO: implement leftOuterJoin
398421
# TODO: implemtnt rightOuterJoin
399422

423+
400424
class PipelinedDStream(DStream):
401425
def __init__(self, prev, func, preservesPartitioning=False):
402426
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():

python/pyspark/streaming_tests.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,11 @@
1818
"""
1919
Unit tests for PySpark; additional tests are implemented as doctests in
2020
individual modules.
21-
Other option is separate this test case with other tests.
22-
This makes sense becuase streaming tests takes long time due to waiting time
23-
for stoping callback server.
2421
25-
This file will merged to tests.py. But for now, this file is separated due
26-
to focusing to streaming test case
22+
This file would be merged to tests.py after all functions are ready.
23+
But for now, this file is separated due to focusing to streaming test case.
24+
25+
Callback server seems like unstable sometimes, which cause error in test case.
2726
2827
"""
2928
from itertools import chain
@@ -43,10 +42,10 @@ def setUp(self):
4342

4443
def tearDown(self):
4544
# Do not call pyspark.streaming.context.StreamingContext.stop directly because
46-
# we do not wait to shutdowncall back server and py4j client
45+
# we do not wait to shutdown call back server and py4j client
4746
self.ssc._jssc.stop()
4847
self.ssc._sc.stop()
49-
# Why does it long time to terminaete StremaingContext and SparkContext?
48+
# Why does it long time to terminate StremaingContext and SparkContext?
5049
# Should we change the sleep time if this depends on machine spec?
5150
time.sleep(10)
5251

@@ -68,7 +67,7 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase):
6867
I am wondering if these test are enough or not.
6968
All tests input should have list of lists. This represents stream.
7069
Every batch interval, the first object of list are chosen to make DStream.
71-
Please see the BasicTestSuits in Scala or QueStream which is close to this implementation.
70+
Please see the BasicTestSuits in Scala which is close to this implementation.
7271
"""
7372
def setUp(self):
7473
PySparkStreamingTestCase.setUp(self)
@@ -358,5 +357,24 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
358357

359358
return self.result
360359

360+
class TestSaveAsFilesSuite(PySparkStreamingTestCase):
361+
def setUp(self):
362+
PySparkStreamingTestCase.setUp(self)
363+
self.timeout = 10 # seconds
364+
self.numInputPartitions = 2
365+
self.result = list()
366+
367+
def tearDown(self):
368+
PySparkStreamingTestCase.tearDown(self)
369+
370+
@classmethod
371+
def tearDownClass(cls):
372+
PySparkStreamingTestCase.tearDownClass()
373+
374+
375+
376+
377+
378+
361379
if __name__ == "__main__":
362380
unittest.main()

0 commit comments

Comments
 (0)