Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 935615c

Browse files
committed
Fix the flaky MQTT tests
1 parent 47278c5 commit 935615c

File tree

3 files changed

+22
-57
lines changed

3 files changed

+22
-57
lines changed

external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,11 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
6767
}
6868
}
6969

70-
MQTTTestUtils.registerStreamingListener(ssc)
71-
7270
ssc.start()
7371

74-
// wait for the receiver to start before publishing data, or we risk failing
75-
// the test nondeterministically. See SPARK-4631
76-
MQTTTestUtils.waitForReceiverToStart(ssc)
77-
78-
MQTTTestUtils.publishData(topic, sendMessage)
79-
72+
// Retry it because we don't know when the receiver will start.
8073
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
74+
MQTTTestUtils.publishData(topic, sendMessage)
8175
assert(sendMessage.equals(receiveMessage(0)))
8276
}
8377
ssc.stop()

external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.streaming.mqtt
1919

2020
import java.net.{ServerSocket, URI}
21-
import java.util.concurrent.{CountDownLatch, TimeUnit}
2221

2322
import scala.language.postfixOps
2423

@@ -28,10 +27,6 @@ import org.apache.commons.lang3.RandomUtils
2827
import org.eclipse.paho.client.mqttv3._
2928
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
3029

31-
import org.apache.spark.streaming.StreamingContext
32-
import org.apache.spark.streaming.api.java.JavaStreamingContext
33-
import org.apache.spark.streaming.scheduler.StreamingListener
34-
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
3530
import org.apache.spark.util.Utils
3631
import org.apache.spark.{Logging, SparkConf}
3732

@@ -47,8 +42,6 @@ private class MQTTTestUtils extends Logging {
4742
private var broker: BrokerService = _
4843
private var connector: TransportConnector = _
4944

50-
private var receiverStartedLatch = new CountDownLatch(1)
51-
5245
def brokerUri: String = {
5346
s"$brokerHost:$brokerPort"
5447
}
@@ -73,7 +66,6 @@ private class MQTTTestUtils extends Logging {
7366
connector = null
7467
}
7568
Utils.deleteRecursively(persistenceDir)
76-
receiverStartedLatch = null
7769
}
7870

7971
private def findFreePort(): Int = {
@@ -114,38 +106,4 @@ private class MQTTTestUtils extends Logging {
114106
}
115107
}
116108

117-
/**
118-
* Call this one before starting StreamingContext so that we won't miss the
119-
* StreamingListenerReceiverStarted event.
120-
*/
121-
def registerStreamingListener(jssc: JavaStreamingContext): Unit = {
122-
registerStreamingListener(jssc.ssc)
123-
}
124-
125-
/**
126-
* Call this one before starting StreamingContext so that we won't miss the
127-
* StreamingListenerReceiverStarted event.
128-
*/
129-
def registerStreamingListener(ssc: StreamingContext): Unit = {
130-
ssc.addStreamingListener(new StreamingListener {
131-
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
132-
receiverStartedLatch.countDown()
133-
}
134-
})
135-
}
136-
137-
/**
138-
* Block until at least one receiver has started or timeout occurs.
139-
*/
140-
def waitForReceiverToStart(jssc: JavaStreamingContext): Unit = {
141-
waitForReceiverToStart(jssc.ssc)
142-
}
143-
144-
/**
145-
* Block until at least one receiver has started or timeout occurs.
146-
*/
147-
def waitForReceiverToStart(ssc: StreamingContext): Unit = {
148-
assert(
149-
receiverStartedLatch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.")
150-
}
151109
}

python/pyspark/streaming/tests.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -931,14 +931,27 @@ def test_mqtt_stream(self):
931931
"""Test the Python MQTT stream API."""
932932
sendData = "MQTT demo for spark streaming"
933933
topic = self._randomTopic()
934-
self._MQTTTestUtils.registerStreamingListener(self.ssc._jssc)
935934
result = self._startContext(topic)
936-
self._MQTTTestUtils.waitForReceiverToStart(self.ssc._jssc)
937-
self._MQTTTestUtils.publishData(topic, sendData)
938-
self.wait_for(result, 1)
939-
# Because "publishData" sends duplicate messages, here we should use > 0
940-
self.assertTrue(len(result) > 0)
941-
self.assertEqual(sendData, result[0])
935+
936+
def retry():
937+
self._MQTTTestUtils.publishData(topic, sendData)
938+
# Because "publishData" sends duplicate messages, here we should use > 0
939+
self.assertTrue(len(result) > 0)
940+
self.assertEqual(sendData, result[0])
941+
942+
# Retry it because we don't know when the receiver will start.
943+
self._retry_or_timeout(retry)
944+
945+
def _retry_or_timeout(self, test_func):
946+
start_time = time.time()
947+
while True:
948+
try:
949+
test_func()
950+
break
951+
except:
952+
if time.time() - start_time > self.timeout:
953+
raise
954+
time.sleep(0.01)
942955

943956

944957
def search_kafka_assembly_jar():

0 commit comments

Comments
 (0)