Skip to content

Commit 6550cda

Browse files
garyrussellartembilan
authored andcommitted
AMQP-4238: Detect Subscription Failures and QOS
JIRA: https://jira.spring.io/browse/INT-4238 Revert to using the sync client in the message-driven adapter so we can detect subscription failures (the sync client throws an exception). The only reason to use the async client was to timeout disconnects; this can be achieved with the sync client and `disconnectForcibly`. Also, the subscribe method updates the qos argument with the granted QOS values. Detect and log if any QOS does not match the request. Polishing Polishing - PR Comments Conflicts: spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java * Remove all new tests since Paho lib has class signature check, so we can't mock its classes Conflicts: spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java * Fix only the `subscribe()` bug; leave the `async` client
1 parent 1bbc8a6 commit 6550cda

File tree

1 file changed

+14
-7
lines changed

1 file changed

+14
-7
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
1617
package org.springframework.integration.mqtt.inbound;
1718

1819
import java.util.Arrays;
1920
import java.util.concurrent.ScheduledFuture;
2021

2122
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
2223
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
24+
import org.eclipse.paho.client.mqttv3.IMqttToken;
2325
import org.eclipse.paho.client.mqttv3.MqttCallback;
2426
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
2527
import org.eclipse.paho.client.mqttv3.MqttException;
@@ -155,7 +157,7 @@ protected void doStop() {
155157
try {
156158
if (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_ALWAYS)
157159
|| (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_CLEAN)
158-
&& this.cleanSession)) {
160+
&& this.cleanSession)) {
159161
this.client.unsubscribe(getTopic())
160162
.waitForCompletion(this.completionTimeout);
161163
}
@@ -187,8 +189,12 @@ public void addTopic(String topic, int qos) {
187189
try {
188190
super.addTopic(topic, qos);
189191
if (this.client != null && this.client.isConnected()) {
190-
this.client.subscribe(topic, qos)
191-
.waitForCompletion(this.completionTimeout);
192+
IMqttToken mqttToken = this.client.subscribe(topic, qos);
193+
mqttToken.waitForCompletion(this.completionTimeout);
194+
int[] grantedQos = mqttToken.getGrantedQos();
195+
if (grantedQos != null && grantedQos.length == 1 && grantedQos[0] == 0x80) {
196+
throw new MqttException(MqttException.REASON_CODE_SUBSCRIBE_FAILED);
197+
}
192198
}
193199
}
194200
catch (MqttException e) {
@@ -211,7 +217,7 @@ public void removeTopic(String... topic) {
211217
super.removeTopic(topic);
212218
}
213219
catch (MqttException e) {
214-
throw new MessagingException("Failed to unsubscribe from topic " + Arrays.asList(topic), e);
220+
throw new MessagingException("Failed to unsubscribe from topic " + Arrays.toString(topic), e);
215221
}
216222
finally {
217223
this.topicLock.unlock();
@@ -230,17 +236,18 @@ private void connectAndSubscribe() throws MqttException {
230236
this.client.setCallback(this);
231237

232238
this.topicLock.lock();
239+
String[] topics = getTopic();
233240
try {
234241
this.client.connect(connectionOptions)
235242
.waitForCompletion(this.completionTimeout);
236-
this.client.subscribe(getTopic(), getQos())
243+
this.client.subscribe(topics, getQos())
237244
.waitForCompletion(this.completionTimeout);
238245
}
239246
catch (MqttException e) {
240247
if (this.applicationEventPublisher != null) {
241248
this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e));
242249
}
243-
logger.error("Error connecting or subscribing to " + Arrays.asList(getTopic()), e);
250+
logger.error("Error connecting or subscribing to " + Arrays.toString(topics), e);
244251
this.client.disconnect()
245252
.waitForCompletion(this.completionTimeout);
246253
throw e;
@@ -250,7 +257,7 @@ private void connectAndSubscribe() throws MqttException {
250257
}
251258
if (this.client.isConnected()) {
252259
this.connected = true;
253-
String message = "Connected and subscribed to " + Arrays.asList(getTopic());
260+
String message = "Connected and subscribed to " + Arrays.toString(topics);
254261
if (logger.isDebugEnabled()) {
255262
logger.debug(message);
256263
}

0 commit comments

Comments
 (0)