Skip to content

Commit 0b99bec

Browse files
committed
initial commit for pySparkStreaming
1 parent c214199 commit 0b99bec

File tree

15 files changed

+644
-3
lines changed

15 files changed

+644
-3
lines changed

bin/spark-submit

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PR
4848
# paths, library paths, java options and memory early on. Otherwise, it will
4949
# be too late by the time the driver JVM has started.
5050

51+
<<<<<<< HEAD
5152
if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]]; then
5253
# Parse the properties file only if the special configs exist
5354
contains_special_configs=$(
@@ -57,6 +58,16 @@ if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FI
5758
if [ -n "$contains_special_configs" ]; then
5859
export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
5960
fi
61+
=======
62+
# Figure out which Python executable to use
63+
if [[ -z "$PYSPARK_PYTHON" ]]; then
64+
PYSPARK_PYTHON="python"
65+
fi
66+
export PYSPARK_PYTHON
67+
68+
if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then
69+
export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY
70+
>>>>>>> initial commit for pySparkStreaming
6071
fi
6172

6273
exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"

core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24+
<<<<<<< HEAD
2425
<version>1.2.0-SNAPSHOT</version>
26+
=======
27+
<version>1.0.0</version>
28+
>>>>>>> initial commit for pySparkStreaming
2529
<relativePath>../pom.xml</relativePath>
2630
</parent>
2731

examples/src/main/python/streaming/wordcount.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
import sys
22
from operator import add
33

4+
<<<<<<< HEAD
45
from pyspark.conf import SparkConf
6+
=======
7+
>>>>>>> initial commit for pySparkStreaming
58
from pyspark.streaming.context import StreamingContext
69
from pyspark.streaming.duration import *
710

811
if __name__ == "__main__":
912
if len(sys.argv) != 2:
1013
print >> sys.stderr, "Usage: wordcount <directory>"
1114
exit(-1)
15+
<<<<<<< HEAD
1216
conf = SparkConf()
1317
conf.setAppName("PythonStreamingWordCount")
1418

@@ -20,5 +24,17 @@
2024
count = mapped_words.reduceByKey(add)
2125

2226
count.pyprint()
27+
=======
28+
ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))
29+
30+
lines = ssc.textFileStream(sys.argv[1])
31+
fm_lines = lines.flatMap(lambda x: x.split(" "))
32+
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
33+
mapped_lines = fm_lines.map(lambda x: (x, 1))
34+
35+
fm_lines.pyprint()
36+
filtered_lines.pyprint()
37+
mapped_lines.pyprint()
38+
>>>>>>> initial commit for pySparkStreaming
2339
ssc.start()
2440
ssc.awaitTermination()

python/pyspark/java_gateway.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,16 @@ def run(self):
108108
java_import(gateway.jvm, "org.apache.spark.SparkConf")
109109
java_import(gateway.jvm, "org.apache.spark.api.java.*")
110110
java_import(gateway.jvm, "org.apache.spark.api.python.*")
111+
<<<<<<< HEAD
111112
java_import(gateway.jvm, "org.apache.spark.streaming.*") # do we need this?
112113
java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
113114
java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
114115
java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*") # do we need this?
116+
=======
117+
java_import(gateway.jvm, "org.apache.spark.streaming.*")
118+
java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
119+
java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
120+
>>>>>>> initial commit for pySparkStreaming
115121
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
116122
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
117123
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")

python/pyspark/streaming/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
__author__ = 'ktakagiw'

python/pyspark/streaming/context.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
<<<<<<< HEAD
2+
=======
3+
__author__ = 'ktakagiw'
4+
5+
6+
>>>>>>> initial commit for pySparkStreaming
17
#
28
# Licensed to the Apache Software Foundation (ASF) under one or more
39
# contributor license agreements. See the NOTICE file distributed with
@@ -15,6 +21,7 @@
1521
# limitations under the License.
1622
#
1723

24+
<<<<<<< HEAD
1825
import sys
1926
from signal import signal, SIGTERM, SIGINT
2027

@@ -29,12 +36,43 @@ class StreamingContext(object):
2936
"""
3037
Main entry point for Spark Streaming functionality. A StreamingContext represents the
3138
connection to a Spark cluster, and can be used to create L{DStream}s and
39+
=======
40+
import os
41+
import shutil
42+
import sys
43+
from threading import Lock
44+
from tempfile import NamedTemporaryFile
45+
46+
from pyspark import accumulators
47+
from pyspark.accumulators import Accumulator
48+
from pyspark.broadcast import Broadcast
49+
from pyspark.conf import SparkConf
50+
from pyspark.files import SparkFiles
51+
from pyspark.java_gateway import launch_gateway
52+
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
53+
from pyspark.storagelevel import StorageLevel
54+
from pyspark.rdd import RDD
55+
from pyspark.context import SparkContext
56+
57+
from py4j.java_collections import ListConverter
58+
59+
from pyspark.streaming.dstream import DStream
60+
61+
class StreamingContext(object):
62+
"""
63+
Main entry point for Spark functionality. A StreamingContext represents the
64+
connection to a Spark cluster, and can be used to create L{RDD}s and
65+
>>>>>>> initial commit for pySparkStreaming
3266
broadcast variables on that cluster.
3367
"""
3468
3569
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
3670
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
71+
<<<<<<< HEAD
3772
gateway=None, sparkContext=None, duration=None):
73+
=======
74+
gateway=None, duration=None):
75+
>>>>>>> initial commit for pySparkStreaming
3876
"""
3977
Create a new StreamingContext. At least the master and app name and duration
4078
should be set, either through the named parameters here or through C{conf}.
@@ -55,6 +93,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
5593
@param conf: A L{SparkConf} object setting Spark properties.
5694
@param gateway: Use an existing gateway and JVM, otherwise a new JVM
5795
will be instatiated.
96+
<<<<<<< HEAD
5897
@param sparkContext: L{SparkContext} object.
5998
@param duration: A L{Duration} object for SparkStreaming.
6099

@@ -73,13 +112,23 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
73112
# is started in StreamingContext.
74113
SparkContext._gateway.restart_callback_server()
75114
self._clean_up_trigger()
115+
=======
116+
@param duration: A L{Duration} Duration for SparkStreaming
117+
118+
"""
119+
# Create the Python Sparkcontext
120+
self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome,
121+
pyFiles=pyFiles, environment=environment, batchSize=batchSize,
122+
serializer=serializer, conf=conf, gateway=gateway)
123+
>>>>>>> initial commit for pySparkStreaming
76124
self._jvm = self._sc._jvm
77125
self._jssc = self._initialize_context(self._sc._jsc, duration._jduration)
78126

79127
# Initialize StremaingContext in function to allow subclass specific initialization
80128
def _initialize_context(self, jspark_context, jduration):
81129
return self._jvm.JavaStreamingContext(jspark_context, jduration)
82130

131+
<<<<<<< HEAD
83132
def _clean_up_trigger(self):
84133
"""Kill py4j callback server properly using signal lib"""
85134

@@ -156,3 +205,53 @@ def _testInputStream(self, test_inputs, numSlices=None):
156205
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()
157206

158207
return DStream(jinput_stream, self, test_rdd_deserializers[0])
208+
=======
209+
def actorStream(self, props, name, storageLevel, supervisorStrategy):
210+
raise NotImplementedError
211+
212+
def addStreamingListener(self, streamingListener):
213+
raise NotImplementedError
214+
215+
def awaitTermination(self, timeout=None):
216+
if timeout:
217+
self._jssc.awaitTermination(timeout)
218+
else:
219+
self._jssc.awaitTermination()
220+
221+
def checkpoint(self, directory):
222+
raise NotImplementedError
223+
224+
def fileStream(self, directory, filter=None, newFilesOnly=None):
225+
raise NotImplementedError
226+
227+
def networkStream(self, receiver):
228+
raise NotImplementedError
229+
230+
def queueStream(self, queue, oneAtATime=True, defaultRDD=None):
231+
raise NotImplementedError
232+
233+
def rawSocketStream(self, hostname, port, storagelevel):
234+
raise NotImplementedError
235+
236+
def remember(self, duration):
237+
raise NotImplementedError
238+
239+
def socketStream(hostname, port, converter,storageLevel):
240+
raise NotImplementedError
241+
242+
def start(self):
243+
self._jssc.start()
244+
245+
def stop(self, stopSparkContext=True):
246+
raise NotImplementedError
247+
248+
def textFileStream(self, directory):
249+
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())
250+
251+
def transform(self, seq):
252+
raise NotImplementedError
253+
254+
def union(self, seq):
255+
raise NotImplementedError
256+
257+
>>>>>>> initial commit for pySparkStreaming

0 commit comments

Comments
 (0)