18
18
package org .apache .spark .streaming .mqtt
19
19
20
20
import java .net .{ServerSocket , URI }
21
- import java .util .concurrent .{CountDownLatch , TimeUnit }
22
21
23
22
import scala .language .postfixOps
24
23
@@ -28,10 +27,6 @@ import org.apache.commons.lang3.RandomUtils
28
27
import org .eclipse .paho .client .mqttv3 ._
29
28
import org .eclipse .paho .client .mqttv3 .persist .MqttDefaultFilePersistence
30
29
31
- import org .apache .spark .streaming .StreamingContext
32
- import org .apache .spark .streaming .api .java .JavaStreamingContext
33
- import org .apache .spark .streaming .scheduler .StreamingListener
34
- import org .apache .spark .streaming .scheduler .StreamingListenerReceiverStarted
35
30
import org .apache .spark .util .Utils
36
31
import org .apache .spark .{Logging , SparkConf }
37
32
@@ -47,8 +42,6 @@ private class MQTTTestUtils extends Logging {
47
42
private var broker : BrokerService = _
48
43
private var connector : TransportConnector = _
49
44
50
- private var receiverStartedLatch = new CountDownLatch (1 )
51
-
52
45
def brokerUri : String = {
53
46
s " $brokerHost: $brokerPort"
54
47
}
@@ -73,7 +66,6 @@ private class MQTTTestUtils extends Logging {
73
66
connector = null
74
67
}
75
68
Utils .deleteRecursively(persistenceDir)
76
- receiverStartedLatch = null
77
69
}
78
70
79
71
private def findFreePort (): Int = {
@@ -114,38 +106,4 @@ private class MQTTTestUtils extends Logging {
114
106
}
115
107
}
116
108
117
- /**
118
- * Call this one before starting StreamingContext so that we won't miss the
119
- * StreamingListenerReceiverStarted event.
120
- */
121
- def registerStreamingListener (jssc : JavaStreamingContext ): Unit = {
122
- registerStreamingListener(jssc.ssc)
123
- }
124
-
125
- /**
126
- * Call this one before starting StreamingContext so that we won't miss the
127
- * StreamingListenerReceiverStarted event.
128
- */
129
- def registerStreamingListener (ssc : StreamingContext ): Unit = {
130
- ssc.addStreamingListener(new StreamingListener {
131
- override def onReceiverStarted (receiverStarted : StreamingListenerReceiverStarted ) {
132
- receiverStartedLatch.countDown()
133
- }
134
- })
135
- }
136
-
137
- /**
138
- * Block until at least one receiver has started or timeout occurs.
139
- */
140
- def waitForReceiverToStart (jssc : JavaStreamingContext ): Unit = {
141
- waitForReceiverToStart(jssc.ssc)
142
- }
143
-
144
- /**
145
- * Block until at least one receiver has started or timeout occurs.
146
- */
147
- def waitForReceiverToStart (ssc : StreamingContext ): Unit = {
148
- assert(
149
- receiverStartedLatch.await(10 , TimeUnit .SECONDS ), " Timeout waiting for receiver to start." )
150
- }
151
109
}
0 commit comments