Skip to content

Commit f6ce899

Browse files
author
Davies Liu
committed
add example and fix bugs
1 parent 98c8d17 commit f6ce899

File tree

5 files changed

+100
-78
lines changed

5 files changed

+100
-78
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ private object SpecialLengths {
313313
val PYTHON_EXCEPTION_THROWN = -2
314314
val TIMING_DATA = -3
315315
val END_OF_STREAM = -4
316+
val NULL = -5
316317
}
317318

318319
private[spark] object PythonRDD extends Logging {
@@ -374,49 +375,61 @@ private[spark] object PythonRDD extends Logging {
374375
// The right way to implement this would be to use TypeTags to get the full
375376
// type of T. Since I don't want to introduce breaking changes throughout the
376377
// entire Spark API, I have to use this hacky approach:
378+
def write(bytes: Array[Byte]) {
379+
if (bytes == null) {
380+
dataOut.writeInt(SpecialLengths.NULL)
381+
} else {
382+
dataOut.writeInt(bytes.length)
383+
dataOut.write(bytes)
384+
}
385+
}
386+
def writeS(str: String) {
387+
if (str == null) {
388+
dataOut.writeInt(SpecialLengths.NULL)
389+
} else {
390+
writeUTF(str, dataOut)
391+
}
392+
}
377393
if (iter.hasNext) {
378394
val first = iter.next()
379395
val newIter = Seq(first).iterator ++ iter
380396
first match {
381397
case arr: Array[Byte] =>
382-
newIter.asInstanceOf[Iterator[Array[Byte]]].foreach { bytes =>
383-
dataOut.writeInt(bytes.length)
384-
dataOut.write(bytes)
385-
}
398+
newIter.asInstanceOf[Iterator[Array[Byte]]].foreach(write)
386399
case string: String =>
387-
newIter.asInstanceOf[Iterator[String]].foreach { str =>
388-
writeUTF(str, dataOut)
389-
}
400+
newIter.asInstanceOf[Iterator[String]].foreach(writeS)
390401
case stream: PortableDataStream =>
391402
newIter.asInstanceOf[Iterator[PortableDataStream]].foreach { stream =>
392-
val bytes = stream.toArray()
393-
dataOut.writeInt(bytes.length)
394-
dataOut.write(bytes)
403+
write(stream.toArray())
395404
}
396405
case (key: String, stream: PortableDataStream) =>
397406
newIter.asInstanceOf[Iterator[(String, PortableDataStream)]].foreach {
398407
case (key, stream) =>
399-
writeUTF(key, dataOut)
400-
val bytes = stream.toArray()
401-
dataOut.writeInt(bytes.length)
402-
dataOut.write(bytes)
408+
writeS(key)
409+
write(stream.toArray())
403410
}
404411
case (key: String, value: String) =>
405412
newIter.asInstanceOf[Iterator[(String, String)]].foreach {
406413
case (key, value) =>
407-
writeUTF(key, dataOut)
408-
writeUTF(value, dataOut)
414+
writeS(key)
415+
writeS(value)
409416
}
410417
case (key: Array[Byte], value: Array[Byte]) =>
411418
newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
412419
case (key, value) =>
413-
dataOut.writeInt(key.length)
414-
dataOut.write(key)
415-
dataOut.writeInt(value.length)
416-
dataOut.write(value)
420+
write(key)
421+
write(value)
422+
}
423+
// key is null
424+
case (null, v:Array[Byte]) =>
425+
newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
426+
case (key, value) =>
427+
write(key)
428+
write(value)
417429
}
430+
418431
case other =>
419-
throw new SparkException("Unexpected element type " + first.getClass)
432+
throw new SparkException("Unexpected element type " + other.getClass)
420433
}
421434
}
422435
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
20+
Usage: network_wordcount.py <zk> <topic>
21+
22+
To run this on your local machine, you need to setup Kafka and create a producer first
23+
$ bin/zookeeper-server-start.sh config/zookeeper.properties
24+
$ bin/kafka-server-start.sh config/server.properties
25+
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
26+
27+
and then run the example
28+
`$ bin/spark-submit --driver-class-path lib_managed/jars/kafka_*.jar:\
29+
external/kafka/target/scala-*/spark-streaming-kafka_*.jar examples/src/main/python/\
30+
streaming/kafka_wordcount.py localhost:2181 test`
31+
"""
32+
33+
import sys
34+
35+
from pyspark import SparkContext
36+
from pyspark.streaming import StreamingContext
37+
from pyspark.streaming.kafka import KafkaUtils
38+
39+
if __name__ == "__main__":
40+
if len(sys.argv) != 3:
41+
print >> sys.stderr, "Usage: network_wordcount.py <zk> <topic>"
42+
exit(-1)
43+
44+
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
45+
ssc = StreamingContext(sc, 1)
46+
47+
zkQuorum, topic = sys.argv[1:]
48+
lines = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
49+
counts = lines.map(lambda x: x[1]).flatMap(lambda line: line.split(" ")) \
50+
.map(lambda word: (word, 1)) \
51+
.reduceByKey(lambda a, b: a+b)
52+
counts.pprint()
53+
54+
ssc.start()
55+
ssc.awaitTermination()

python/pyspark/serializers.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class SpecialLengths(object):
7070
PYTHON_EXCEPTION_THROWN = -2
7171
TIMING_DATA = -3
7272
END_OF_STREAM = -4
73+
NULL = -5
7374

7475

7576
class Serializer(object):
@@ -145,8 +146,10 @@ def _read_with_length(self, stream):
145146
length = read_int(stream)
146147
if length == SpecialLengths.END_OF_DATA_SECTION:
147148
raise EOFError
149+
if length == SpecialLengths.NULL:
150+
return None
148151
obj = stream.read(length)
149-
if obj == "":
152+
if len(obj) < length:
150153
raise EOFError
151154
return self.loads(obj)
152155

@@ -480,6 +483,8 @@ def loads(self, stream):
480483
length = read_int(stream)
481484
if length == SpecialLengths.END_OF_DATA_SECTION:
482485
raise EOFError
486+
if length == SpecialLengths.NULL:
487+
return None
483488
s = stream.read(length)
484489
return s.decode("utf-8") if self.use_unicode else s
485490

python/pyspark/streaming/kafka.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@
2222
from pyspark.serializers import PairDeserializer, NoOpSerializer
2323
from pyspark.streaming import DStream
2424

25-
__all__ = ['KafkaUtils']
25+
__all__ = ['KafkaUtils', 'utf8_decoder']
2626

2727

2828
def utf8_decoder(s):
29-
return s.decode('utf-8')
29+
""" Decode the unicode as UTF-8 """
30+
return s and s.decode('utf-8')
3031

3132

3233
class KafkaUtils(object):
@@ -70,7 +71,8 @@ def getClassByName(name):
7071
jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder,
7172
jparam, jtopics, jlevel)
7273
except Py4JError, e:
73-
if 'call a package' in e.message:
74+
# TODO: use --jar once it also work on driver
75+
if not e.message or 'call a package' in e.message:
7476
print "No kafka package, please build it and add it into classpath:"
7577
print " $ sbt/sbt streaming-kafka/package"
7678
print " $ bin/submit --driver-class-path lib_managed/jars/kafka_2.10-0.8.0.jar:" \

python/pyspark/streaming/mqtt.py

Lines changed: 0 additions & 53 deletions
This file was deleted.

0 commit comments

Comments
 (0)