@@ -35,28 +35,28 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
35
35
private val topic = " def"
36
36
37
37
private var ssc : StreamingContext = _
38
- private var MQTTTestUtils : MQTTTestUtils = _
38
+ private var mqttTestUtils : MQTTTestUtils = _
39
39
40
40
before {
41
41
ssc = new StreamingContext (master, framework, batchDuration)
42
- MQTTTestUtils = new MQTTTestUtils
43
- MQTTTestUtils .setup()
42
+ mqttTestUtils = new MQTTTestUtils
43
+ mqttTestUtils .setup()
44
44
}
45
45
46
46
after {
47
47
if (ssc != null ) {
48
48
ssc.stop()
49
49
ssc = null
50
50
}
51
- if (MQTTTestUtils != null ) {
52
- MQTTTestUtils .teardown()
53
- MQTTTestUtils = null
51
+ if (mqttTestUtils != null ) {
52
+ mqttTestUtils .teardown()
53
+ mqttTestUtils = null
54
54
}
55
55
}
56
56
57
57
test(" mqtt input stream" ) {
58
58
val sendMessage = " MQTT demo for spark streaming"
59
- val receiveStream = MQTTUtils .createStream(ssc, " tcp://" + MQTTTestUtils .brokerUri, topic,
59
+ val receiveStream = MQTTUtils .createStream(ssc, " tcp://" + mqttTestUtils .brokerUri, topic,
60
60
StorageLevel .MEMORY_ONLY )
61
61
62
62
@ volatile var receiveMessage : List [String ] = List ()
@@ -71,7 +71,7 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
71
71
72
72
// Retry it because we don't know when the receiver will start.
73
73
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
74
- MQTTTestUtils .publishData(topic, sendMessage)
74
+ mqttTestUtils .publishData(topic, sendMessage)
75
75
assert(sendMessage.equals(receiveMessage(0 )))
76
76
}
77
77
ssc.stop()
0 commit comments