Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit b7d42ff

Browse files
committed
Mqtt streaming support in Python
1 parent f35b0c3 commit b7d42ff

File tree

4 files changed

+166
-3
lines changed

4 files changed

+166
-3
lines changed

external/mqtt-assembly/pom.xml

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
<parent>
22+
<groupId>org.apache.spark</groupId>
23+
<artifactId>spark-parent</artifactId>
24+
<version>1.3.0-SNAPSHOT</version>
25+
<relativePath>../../pom.xml</relativePath>
26+
</parent>
27+
28+
<groupId>org.apache.spark</groupId>
29+
<artifactId>spark-streaming-mqtt-assembly_2.10</artifactId>
30+
<packaging>jar</packaging>
31+
<name>Spark Project External Kafka Assembly</name>
32+
<url>http://spark.apache.org/</url>
33+
34+
<properties>
35+
<sbt.project.name>streaming-mqtt-assembly</sbt.project.name>
36+
<spark.jar.dir>scala-${scala.binary.version}</spark.jar.dir>
37+
<spark.jar.basename>spark-streaming-mqtt-assembly-${project.version}.jar</spark.jar.basename>
38+
<spark.jar>${project.build.directory}/${spark.jar.dir}/${spark.jar.basename}</spark.jar>
39+
</properties>
40+
41+
<dependencies>
42+
<dependency>
43+
<groupId>org.apache.spark</groupId>
44+
<artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
45+
<version>${project.version}</version>
46+
</dependency>
47+
<dependency>
48+
<groupId>org.apache.spark</groupId>
49+
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
50+
<version>${project.version}</version>
51+
<scope>provided</scope>
52+
</dependency>
53+
</dependencies>
54+
55+
<build>
56+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
57+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
58+
<plugins>
59+
<plugin>
60+
<groupId>org.apache.maven.plugins</groupId>
61+
<artifactId>maven-shade-plugin</artifactId>
62+
<configuration>
63+
<shadedArtifactAttached>false</shadedArtifactAttached>
64+
<outputFile>${spark.jar}</outputFile>
65+
<artifactSet>
66+
<includes>
67+
<include>*:*</include>
68+
</includes>
69+
</artifactSet>
70+
<filters>
71+
<filter>
72+
<artifact>*:*</artifact>
73+
<excludes>
74+
<exclude>META-INF/*.SF</exclude>
75+
<exclude>META-INF/*.DSA</exclude>
76+
<exclude>META-INF/*.RSA</exclude>
77+
</excludes>
78+
</filter>
79+
</filters>
80+
</configuration>
81+
<executions>
82+
<execution>
83+
<phase>package</phase>
84+
<goals>
85+
<goal>shade</goal>
86+
</goals>
87+
<configuration>
88+
<transformers>
89+
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
90+
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
91+
<resource>reference.conf</resource>
92+
</transformer>
93+
<transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
94+
<resource>log4j.properties</resource>
95+
</transformer>
96+
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
97+
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
98+
</transformers>
99+
</configuration>
100+
</execution>
101+
</executions>
102+
</plugin>
103+
</plugins>
104+
</build>
105+
</project>

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
<module>external/flume-sink</module>
105105
<module>external/flume-assembly</module>
106106
<module>external/mqtt</module>
107+
<module>external/mqtt-assembly</module>
107108
<module>external/zeromq</module>
108109
<module>examples</module>
109110
<module>repl</module>

project/SparkBuild.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ object BuildCommons {
4545
sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
4646
"kinesis-asl").map(ProjectRef(buildLocation, _))
4747

48-
val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly) =
49-
Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly")
48+
val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingMqttAssembly) =
49+
Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly", "streaming-mqtt-assembly")
5050
.map(ProjectRef(buildLocation, _))
5151

5252
val tools = ProjectRef(buildLocation, "tools")
@@ -347,7 +347,7 @@ object Assembly {
347347
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
348348
},
349349
jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
350-
if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly")) {
350+
if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly") || mName.contains("streaming-mqtt-assembly")) {
351351
// This must match the same name used in maven (see external/kafka-assembly/pom.xml)
352352
s"${mName}-${v}.jar"
353353
} else {

python/pyspark/streaming/mqtt.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
from py4j.java_collections import MapConverter
19+
from py4j.java_gateway import java_import, Py4JError
20+
21+
from pyspark.storagelevel import StorageLevel
22+
from pyspark.serializers import PairDeserializer, NoOpSerializer
23+
from pyspark.streaming import DStream
24+
25+
__all__ = ['MQTTUtils']
26+
27+
28+
class MQTTUtils(object):
29+
30+
@staticmethod
31+
def createStream(ssc, brokerUrl, topic
32+
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
33+
"""
34+
Create an input stream that pulls messages from a Mqtt Broker.
35+
:param ssc: StreamingContext object
36+
:param brokerUrl: Url of remote mqtt publisher
37+
:param topic: topic name to subscribe to
38+
:param storageLevel: RDD storage level.
39+
:return: A DStream object
40+
"""
41+
java_import(ssc._jvm, "org.apache.spark.streaming.mqtt.MQTTUtils")
42+
43+
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
44+
45+
try:
46+
jstream = ssc._jvm.MQTTUtils.createStream(ssc._jssc, brokerUrl, topic, jlevel)
47+
48+
except Py4JError, e:
49+
# TODO: use --jar once it also work on driver
50+
if not e.message or 'call a package' in e.message:
51+
print "No Mqtt package, please put the assembly jar into classpath:"
52+
print " $ bin/spark-submit --driver-class-path external/mqtt-assembly/target/" + \
53+
"scala-*/spark-streaming-mqtt-assembly-*.jar"
54+
raise e
55+
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
56+
stream = DStream(jstream, ssc, ser)
57+
return stream

0 commit comments

Comments
 (0)