@@ -835,9 +835,9 @@ class MQTTStreamTests(PySparkStreamingTestCase):
835
835
def setUp (self ):
836
836
super (MQTTStreamTests , self ).setUp ()
837
837
838
- utilsClz = self .ssc ._jvm .java .lang .Thread .currentThread ().getContextClassLoader () \
838
+ MQTTTestUtilsClz = self .ssc ._jvm .java .lang .Thread .currentThread ().getContextClassLoader () \
839
839
.loadClass ("org.apache.spark.streaming.mqtt.MQTTTestUtils" )
840
- self ._utils = utilsClz .newInstance ()
840
+ self ._MQTTTestUtils = MQTTTestUtilsClz .newInstance ()
841
841
self ._MQTTTestUtils .setup ()
842
842
843
843
def tearDown (self ):
@@ -850,7 +850,7 @@ def tearDown(self):
850
850
def _randomTopic (self ):
851
851
return "topic-%d" % random .randint (0 , 10000 )
852
852
853
- def _validateStreamResult (self , sendData , stream ):
853
+ def _validateStreamResult (self , sendData , dstream ):
854
854
result = []
855
855
856
856
def get_output (_ , rdd ):
@@ -862,16 +862,15 @@ def get_output(_, rdd):
862
862
self .assertEqual (sendData , receiveData )
863
863
864
864
def test_mqtt_stream (self ):
865
- """Test the Python Kafka stream API."""
865
+ """Test the Python MQTT stream API."""
866
866
topic = self ._randomTopic ()
867
867
sendData = "MQTT demo for spark streaming"
868
868
ssc = self .ssc
869
869
870
- self ._MQTTTestUtils .createTopic (topic )
871
870
self ._MQTTTestUtils .waitForReceiverToStart (ssc )
872
871
self ._MQTTTestUtils .publishData (topic , sendData )
873
872
874
- stream = MQTTUtils .createStream (ssc , "tcp://" + MQTTTestUtils . brokerUri , topic )
873
+ stream = MQTTUtils .createStream (ssc , "tcp://" + self . _MQTTTestUtils . brokerUri () , topic )
875
874
self ._validateStreamResult (sendData , stream )
876
875
877
876
0 commit comments