Skip to content

Commit 8f4014f

Browse files
prabeeshtdas
authored andcommitted
[SPARK-5155] [PYSPARK] [STREAMING] Mqtt streaming support in Python
This PR is based on apache#4229, thanks prabeesh. Closes apache#4229 Author: Prabeesh K <[email protected]> Author: zsxwing <[email protected]> Author: prabs <[email protected]> Author: Prabeesh K <[email protected]> Closes apache#7833 from zsxwing/pr4229 and squashes the following commits: 9570bec [zsxwing] Fix the variable name and check null in finally 4a9c79e [zsxwing] Fix pom.xml indentation abf5f18 [zsxwing] Merge branch 'master' into pr4229 935615c [zsxwing] Fix the flaky MQTT tests 47278c5 [zsxwing] Include the project class files 478f844 [zsxwing] Add unpack 5f8a1d4 [zsxwing] Make the maven build generate the test jar for Python MQTT tests 734db99 [zsxwing] Merge branch 'master' into pr4229 126608a [Prabeesh K] address the comments b90b709 [Prabeesh K] Merge pull request #1 from zsxwing/pr4229 d07f454 [zsxwing] Register StreamingListerner before starting StreamingContext; Revert unncessary changes; fix the python unit test a6747cb [Prabeesh K] wait for starting the receiver before publishing data 87fc677 [Prabeesh K] address the comments: 97244ec [zsxwing] Make sbt build the assembly test jar for streaming mqtt 80474d1 [Prabeesh K] fix 1f0cfe9 [Prabeesh K] python style fix e1ee016 [Prabeesh K] scala style fix a5a8f9f [Prabeesh K] added Python test 9767d82 [Prabeesh K] implemented Python-friendly class a11968b [Prabeesh K] fixed python style 795ec27 [Prabeesh K] address comments ee387ae [Prabeesh K] Fix assembly jar location of mqtt-assembly 3f4df12 [Prabeesh K] updated version b34c3c1 [prabs] adress comments 3aa7fff [prabs] Added Python streaming mqtt word count example b7d42ff [prabs] Mqtt streaming support in Python (cherry picked from commit 853809e) Signed-off-by: Tathagata Das <[email protected]>
1 parent 51406be commit 8f4014f

File tree

14 files changed

+565
-109
lines changed

14 files changed

+565
-109
lines changed

dev/run-tests.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,8 @@ def build_spark_sbt(hadoop_version):
303303
"assembly/assembly",
304304
"streaming-kafka-assembly/assembly",
305305
"streaming-flume-assembly/assembly",
306+
"streaming-mqtt-assembly/assembly",
307+
"streaming-mqtt/test:assembly",
306308
"streaming-kinesis-asl-assembly/assembly"]
307309
profiles_and_goals = build_profiles + sbt_goals
308310

dev/sparktestsupport/modules.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ def contains_file(self, filename):
181181
dependencies=[streaming],
182182
source_file_regexes=[
183183
"external/mqtt",
184+
"external/mqtt-assembly",
184185
],
185186
sbt_test_goals=[
186187
"streaming-mqtt/test",
@@ -306,6 +307,7 @@ def contains_file(self, filename):
306307
streaming,
307308
streaming_kafka,
308309
streaming_flume_assembly,
310+
streaming_mqtt,
309311
streaming_kinesis_asl
310312
],
311313
source_file_regexes=[

docs/streaming-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
683683
{:.no_toc}
684684

685685
<span class="badge" style="background-color: grey">Python API</span> As of Spark {{site.SPARK_VERSION_SHORT}},
686-
out of these sources, *only* Kafka and Flume are available in the Python API. We will add more advanced sources in the Python API in future.
686+
out of these sources, *only* Kafka, Flume and MQTT are available in the Python API. We will add more advanced sources in the Python API in future.
687687

688688
This category of sources require interfacing with external non-Spark libraries, some of them with
689689
complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
A sample wordcount with MqttStream stream
20+
Usage: mqtt_wordcount.py <broker url> <topic>
21+
22+
To run this in your local machine, you need to setup a MQTT broker and publisher first,
23+
Mosquitto is one of the open source MQTT Brokers, see
24+
http://mosquitto.org/
25+
Eclipse paho project provides number of clients and utilities for working with MQTT, see
26+
http://www.eclipse.org/paho/#getting-started
27+
28+
and then run the example
29+
`$ bin/spark-submit --jars external/mqtt-assembly/target/scala-*/\
30+
spark-streaming-mqtt-assembly-*.jar examples/src/main/python/streaming/mqtt_wordcount.py \
31+
tcp://localhost:1883 foo`
32+
"""
33+
34+
import sys
35+
36+
from pyspark import SparkContext
37+
from pyspark.streaming import StreamingContext
38+
from pyspark.streaming.mqtt import MQTTUtils
39+
40+
if __name__ == "__main__":
41+
if len(sys.argv) != 3:
42+
print >> sys.stderr, "Usage: mqtt_wordcount.py <broker url> <topic>"
43+
exit(-1)
44+
45+
sc = SparkContext(appName="PythonStreamingMQTTWordCount")
46+
ssc = StreamingContext(sc, 1)
47+
48+
brokerUrl = sys.argv[1]
49+
topic = sys.argv[2]
50+
51+
lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
52+
counts = lines.flatMap(lambda line: line.split(" ")) \
53+
.map(lambda word: (word, 1)) \
54+
.reduceByKey(lambda a, b: a+b)
55+
counts.pprint()
56+
57+
ssc.start()
58+
ssc.awaitTermination()

external/mqtt-assembly/pom.xml

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
<parent>
22+
<groupId>org.apache.spark</groupId>
23+
<artifactId>spark-parent_2.10</artifactId>
24+
<version>1.5.0-SNAPSHOT</version>
25+
<relativePath>../../pom.xml</relativePath>
26+
</parent>
27+
28+
<groupId>org.apache.spark</groupId>
29+
<artifactId>spark-streaming-mqtt-assembly_2.10</artifactId>
30+
<packaging>jar</packaging>
31+
<name>Spark Project External MQTT Assembly</name>
32+
<url>http://spark.apache.org/</url>
33+
34+
<properties>
35+
<sbt.project.name>streaming-mqtt-assembly</sbt.project.name>
36+
</properties>
37+
38+
<dependencies>
39+
<dependency>
40+
<groupId>org.apache.spark</groupId>
41+
<artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
42+
<version>${project.version}</version>
43+
</dependency>
44+
<dependency>
45+
<groupId>org.apache.spark</groupId>
46+
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
47+
<version>${project.version}</version>
48+
<scope>provided</scope>
49+
</dependency>
50+
</dependencies>
51+
52+
<build>
53+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
54+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
55+
<plugins>
56+
<plugin>
57+
<groupId>org.apache.maven.plugins</groupId>
58+
<artifactId>maven-shade-plugin</artifactId>
59+
<configuration>
60+
<shadedArtifactAttached>false</shadedArtifactAttached>
61+
<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar</outputFile>
62+
<artifactSet>
63+
<includes>
64+
<include>*:*</include>
65+
</includes>
66+
</artifactSet>
67+
<filters>
68+
<filter>
69+
<artifact>*:*</artifact>
70+
<excludes>
71+
<exclude>META-INF/*.SF</exclude>
72+
<exclude>META-INF/*.DSA</exclude>
73+
<exclude>META-INF/*.RSA</exclude>
74+
</excludes>
75+
</filter>
76+
</filters>
77+
</configuration>
78+
<executions>
79+
<execution>
80+
<phase>package</phase>
81+
<goals>
82+
<goal>shade</goal>
83+
</goals>
84+
<configuration>
85+
<transformers>
86+
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
87+
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
88+
<resource>reference.conf</resource>
89+
</transformer>
90+
<transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
91+
<resource>log4j.properties</resource>
92+
</transformer>
93+
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
94+
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
95+
</transformers>
96+
</configuration>
97+
</execution>
98+
</executions>
99+
</plugin>
100+
</plugins>
101+
</build>
102+
</project>

external/mqtt/pom.xml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,5 +78,33 @@
7878
<build>
7979
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
8080
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
81+
82+
<plugins>
83+
<!-- Assemble a jar with test dependencies for Python tests -->
84+
<plugin>
85+
<groupId>org.apache.maven.plugins</groupId>
86+
<artifactId>maven-assembly-plugin</artifactId>
87+
<executions>
88+
<execution>
89+
<id>test-jar-with-dependencies</id>
90+
<phase>package</phase>
91+
<goals>
92+
<goal>single</goal>
93+
</goals>
94+
<configuration>
95+
<!-- Make sure the file path is same as the sbt build -->
96+
<finalName>spark-streaming-mqtt-test-${project.version}</finalName>
97+
<outputDirectory>${project.build.directory}/scala-${scala.binary.version}/</outputDirectory>
98+
<appendAssemblyId>false</appendAssemblyId>
99+
<!-- Don't publish it since it's only for Python tests -->
100+
<attach>false</attach>
101+
<descriptors>
102+
<descriptor>src/main/assembly/assembly.xml</descriptor>
103+
</descriptors>
104+
</configuration>
105+
</execution>
106+
</executions>
107+
</plugin>
108+
</plugins>
81109
</build>
82110
</project>
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<!--
2+
~ Licensed to the Apache Software Foundation (ASF) under one or more
3+
~ contributor license agreements. See the NOTICE file distributed with
4+
~ this work for additional information regarding copyright ownership.
5+
~ The ASF licenses this file to You under the Apache License, Version 2.0
6+
~ (the "License"); you may not use this file except in compliance with
7+
~ the License. You may obtain a copy of the License at
8+
~
9+
~ http://www.apache.org/licenses/LICENSE-2.0
10+
~
11+
~ Unless required by applicable law or agreed to in writing, software
12+
~ distributed under the License is distributed on an "AS IS" BASIS,
13+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
~ See the License for the specific language governing permissions and
15+
~ limitations under the License.
16+
-->
17+
<assembly>
18+
<id>test-jar-with-dependencies</id>
19+
<formats>
20+
<format>jar</format>
21+
</formats>
22+
<includeBaseDirectory>false</includeBaseDirectory>
23+
24+
<fileSets>
25+
<fileSet>
26+
<directory>${project.build.directory}/scala-${scala.binary.version}/test-classes</directory>
27+
<outputDirectory>/</outputDirectory>
28+
</fileSet>
29+
</fileSets>
30+
31+
<dependencySets>
32+
<dependencySet>
33+
<useTransitiveDependencies>true</useTransitiveDependencies>
34+
<scope>test</scope>
35+
<unpack>true</unpack>
36+
<excludes>
37+
<exclude>org.apache.hadoop:*:jar</exclude>
38+
<exclude>org.apache.zookeeper:*:jar</exclude>
39+
<exclude>org.apache.avro:*:jar</exclude>
40+
</excludes>
41+
</dependencySet>
42+
</dependencySets>
43+
44+
</assembly>

external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,19 @@ object MQTTUtils {
7474
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
7575
}
7676
}
77+
78+
/**
79+
* This is a helper class that wraps the methods in MQTTUtils into more Python-friendly class and
80+
* function so that it can be easily instantiated and called from Python's MQTTUtils.
81+
*/
82+
private class MQTTUtilsPythonHelper {
83+
84+
def createStream(
85+
jssc: JavaStreamingContext,
86+
brokerUrl: String,
87+
topic: String,
88+
storageLevel: StorageLevel
89+
): JavaDStream[String] = {
90+
MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel)
91+
}
92+
}

0 commit comments

Comments
 (0)