Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit a6747cb

Browse files
committed
wait for starting the receiver before publishing data
1 parent 87fc677 commit a6747cb

File tree

3 files changed

+13
-1
lines changed

3 files changed

+13
-1
lines changed

external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.streaming.mqtt
1919

2020
import scala.reflect.ClassTag
2121

22-
import org.apache.spark.api.java.function.Function
2322
import org.apache.spark.storage.StorageLevel
2423
import org.apache.spark.streaming.StreamingContext
2524
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}

external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.eclipse.paho.client.mqttv3._
2828
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
2929

3030
import org.apache.spark.streaming.StreamingContext
31+
import org.apache.spark.streaming.api.java.JavaStreamingContext
3132
import org.apache.spark.streaming.scheduler.StreamingListener
3233
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
3334
import org.apache.spark.util.Utils
@@ -121,4 +122,15 @@ private class MQTTTestUtils extends Logging {
121122

122123
assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.")
123124
}
125+
126+
def waitForReceiverToStart(jssc: JavaStreamingContext) : Unit = {
127+
val latch = new CountDownLatch(1)
128+
jssc.addStreamingListener(new StreamingListener {
129+
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
130+
latch.countDown()
131+
}
132+
})
133+
134+
assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.")
135+
}
124136
}

python/pyspark/streaming/tests.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -884,6 +884,7 @@ def test_mqtt_stream(self):
884884
sendData = "MQTT demo for spark streaming"
885885
topic = self._randomTopic()
886886
result = self._startContext(topic)
887+
self._MQTTTestUtils.waitForReceiverToStart(self.ssc._jssc)
887888
self._publishData(topic, sendData)
888889
self.wait_for(result, len(sendData))
889890
self._validateStreamResult(sendData, result)

0 commit comments

Comments
 (0)