Skip to content

Commit e9a577d

Browse files
authored
GH-9428: Emit MQTT delivery events even if share client instance
Fixes: #9428 Issue link: #9428 When `ClientManager` is used for MQTT channel adapters, a `MqttMessageDeliveredEvent` is not emitted since callback for the `ClientManager` is not aware about `deliveryComplete` * Use a `MqttActionListener` abstraction for the `publish` operation instead of a `deliveryComplete` from a common callback * Make some other refactoring into the `MqttPahoMessageHandler` and `Mqttv5PahoMessageHandler` extracting a common logic into their `AbstractMqttMessageHandler` superclass * Introduce an `MqttMessageNotDeliveredEvent` to be emitted from the `MqttActionListener.onFailure()` callback * Adapt mocks in the `MqttAdapterTests` for a new code flow * Add delivery events verification into the `ClientManagerBackToBackTests` * Fix race condition in the `ClientManagerBackToBackTests` Looks like the message can be consumed even before we just emit that `MqttMessageSentEvent`
1 parent 49a0aaa commit e9a577d

File tree

8 files changed

+206
-96
lines changed

8 files changed

+206
-96
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt.event;
18+
19+
import java.io.Serial;
20+
21+
/**
22+
* An event emitted (when using aysnc) when the client indicates the message
23+
* was not delivered on publish operation.
24+
*
25+
* @author Artem Bilan
26+
*
27+
* @since 6.4
28+
*
29+
*/
30+
public class MqttMessageNotDeliveredEvent extends MqttMessageDeliveryEvent {
31+
32+
@Serial
33+
private static final long serialVersionUID = 8983514811627569920L;
34+
35+
private final Throwable exception;
36+
37+
public MqttMessageNotDeliveredEvent(Object source, int messageId, String clientId,
38+
int clientInstance, Throwable exception) {
39+
40+
super(source, messageId, clientId, clientInstance);
41+
this.exception = exception;
42+
}
43+
44+
public Throwable getException() {
45+
return this.exception;
46+
}
47+
48+
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,9 @@
2929
import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor;
3030
import org.springframework.integration.handler.MessageProcessor;
3131
import org.springframework.integration.mqtt.core.ClientManager;
32+
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
33+
import org.springframework.integration.mqtt.event.MqttMessageNotDeliveredEvent;
34+
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
3235
import org.springframework.integration.mqtt.support.MqttHeaders;
3336
import org.springframework.integration.mqtt.support.MqttMessageConverter;
3437
import org.springframework.integration.support.management.ManageableLifecycle;
@@ -76,6 +79,10 @@ public abstract class AbstractMqttMessageHandler<T, C> extends AbstractMessageHa
7679

7780
private final ClientManager<T, C> clientManager;
7881

82+
private boolean async;
83+
84+
private boolean asyncEvents;
85+
7986
private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
8087

8188
private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT;
@@ -319,6 +326,32 @@ protected ClientManager<T, C> getClientManager() {
319326
return this.clientManager;
320327
}
321328

329+
/**
330+
* Set to true if you don't want to block when sending messages. Default false.
331+
* When true, message sent/delivered events will be published for reception
332+
* by a suitably configured 'ApplicationListener' or an event
333+
* inbound-channel-adapter.
334+
* @param async true for async.
335+
* @see #setAsyncEvents(boolean)
336+
*/
337+
public void setAsync(boolean async) {
338+
this.async = async;
339+
}
340+
341+
protected boolean isAsync() {
342+
return this.async;
343+
}
344+
345+
/**
346+
* When {@link #setAsync(boolean)} is true, setting this to true enables
347+
* publication of {@link MqttMessageSentEvent} and {@link MqttMessageDeliveredEvent}
348+
* to be emitted. Default false.
349+
* @param asyncEvents the asyncEvents.
350+
*/
351+
public void setAsyncEvents(boolean asyncEvents) {
352+
this.asyncEvents = asyncEvents;
353+
}
354+
322355
@Override
323356
protected void onInit() {
324357
super.onInit();
@@ -372,6 +405,31 @@ protected void handleMessageInternal(Message<?> message) {
372405
publish(topic, mqttMessage, message);
373406
}
374407

408+
protected void messageSentEvent(Message<?> message, String topic, int messageId) {
409+
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
410+
if (this.async && this.asyncEvents && applicationEventPublisher != null) {
411+
applicationEventPublisher.publishEvent(
412+
new MqttMessageSentEvent(this, message, topic, messageId, getClientId(),
413+
getClientInstance()));
414+
}
415+
}
416+
417+
protected void sendDeliveryCompleteEvent(int messageId) {
418+
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
419+
if (this.async && this.asyncEvents && applicationEventPublisher != null) {
420+
applicationEventPublisher.publishEvent(
421+
new MqttMessageDeliveredEvent(this, messageId, getClientId(), getClientInstance()));
422+
}
423+
}
424+
425+
protected void sendFailedDeliveryEvent(int messageId, Throwable exception) {
426+
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
427+
if (this.async && this.asyncEvents && applicationEventPublisher != null) {
428+
applicationEventPublisher.publishEvent(
429+
new MqttMessageNotDeliveredEvent(this, messageId, getClientId(), getClientInstance(), exception));
430+
}
431+
}
432+
375433
protected abstract void publish(String topic, Object mqttMessage, Message<?> message);
376434

377435
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java

Lines changed: 25 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616

1717
package org.springframework.integration.mqtt.outbound;
1818

19+
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
1920
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
2021
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
22+
import org.eclipse.paho.client.mqttv3.IMqttToken;
2123
import org.eclipse.paho.client.mqttv3.MqttCallback;
2224
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
2325
import org.eclipse.paho.client.mqttv3.MqttException;
@@ -29,8 +31,6 @@
2931
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
3032
import org.springframework.integration.mqtt.core.MqttPahoComponent;
3133
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
32-
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
33-
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
3434
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
3535
import org.springframework.integration.mqtt.support.MqttMessageConverter;
3636
import org.springframework.integration.mqtt.support.MqttUtils;
@@ -60,9 +60,7 @@ public class MqttPahoMessageHandler extends AbstractMqttMessageHandler<IMqttAsyn
6060

6161
private final MqttPahoClientFactory clientFactory;
6262

63-
private boolean async;
64-
65-
private boolean asyncEvents;
63+
private final IMqttActionListener mqttPublishActionListener = new MqttPublishActionListener();
6664

6765
private volatile IMqttAsyncClient client;
6866

@@ -113,29 +111,6 @@ public MqttPahoMessageHandler(ClientManager<IMqttAsyncClient, MqttConnectOptions
113111
this.clientFactory = factory;
114112
}
115113

116-
/**
117-
* Set to true if you don't want to block when sending messages. Default false.
118-
* When true, message sent/delivered events will be published for reception
119-
* by a suitably configured 'ApplicationListener' or an event
120-
* inbound-channel-adapter.
121-
* @param async true for async.
122-
* @since 4.1
123-
*/
124-
public void setAsync(boolean async) {
125-
this.async = async;
126-
}
127-
128-
/**
129-
* When {@link #setAsync(boolean)} is true, setting this to true enables
130-
* publication of {@link MqttMessageSentEvent} and {@link MqttMessageDeliveredEvent}
131-
* to be emitted. Default false.
132-
* @param asyncEvents the asyncEvents.
133-
* @since 4.1
134-
*/
135-
public void setAsyncEvents(boolean asyncEvents) {
136-
this.asyncEvents = asyncEvents;
137-
}
138-
139114
@Override
140115
public MqttConnectOptions getConnectionInfo() {
141116
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
@@ -236,31 +211,19 @@ protected void publish(String topic, Object mqttMessage, Message<?> message) {
236211
Assert.isInstanceOf(MqttMessage.class, mqttMessage, "The 'mqttMessage' must be an instance of 'MqttMessage'");
237212
try {
238213
IMqttDeliveryToken token = checkConnection()
239-
.publish(topic, (MqttMessage) mqttMessage);
240-
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
241-
if (!this.async) {
214+
.publish(topic, (MqttMessage) mqttMessage, null, this.mqttPublishActionListener);
215+
if (!isAsync()) {
242216
token.waitForCompletion(getCompletionTimeout()); // NOSONAR (sync)
243217
}
244-
else if (this.asyncEvents && applicationEventPublisher != null) {
245-
applicationEventPublisher.publishEvent(
246-
new MqttMessageSentEvent(this, message, topic, token.getMessageId(), getClientId(),
247-
getClientInstance()));
218+
else {
219+
messageSentEvent(message, topic, token.getMessageId());
248220
}
249221
}
250222
catch (MqttException e) {
251223
throw new MessageHandlingException(message, "Failed to publish to MQTT in the [" + this + ']', e);
252224
}
253225
}
254226

255-
private void sendDeliveryComplete(IMqttDeliveryToken token) {
256-
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
257-
if (this.async && this.asyncEvents && applicationEventPublisher != null) {
258-
applicationEventPublisher.publishEvent(
259-
new MqttMessageDeliveredEvent(this, token.getMessageId(), getClientId(),
260-
getClientInstance()));
261-
}
262-
}
263-
264227
@Override
265228
public void connectionLost(Throwable cause) {
266229
this.lock.lock();
@@ -293,7 +256,24 @@ public void messageArrived(String topic, MqttMessage message) {
293256

294257
@Override
295258
public void deliveryComplete(IMqttDeliveryToken token) {
296-
sendDeliveryComplete(token);
259+
260+
}
261+
262+
private final class MqttPublishActionListener implements IMqttActionListener {
263+
264+
MqttPublishActionListener() {
265+
}
266+
267+
@Override
268+
public void onSuccess(IMqttToken asyncActionToken) {
269+
sendDeliveryCompleteEvent(asyncActionToken.getMessageId());
270+
}
271+
272+
@Override
273+
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
274+
sendFailedDeliveryEvent(asyncActionToken.getMessageId(), exception);
275+
}
276+
297277
}
298278

299279
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java

Lines changed: 26 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.eclipse.paho.mqttv5.client.IMqttAsyncClient;
2222
import org.eclipse.paho.mqttv5.client.IMqttToken;
23+
import org.eclipse.paho.mqttv5.client.MqttActionListener;
2324
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
2425
import org.eclipse.paho.mqttv5.client.MqttCallback;
2526
import org.eclipse.paho.mqttv5.client.MqttClientPersistence;
@@ -36,8 +37,6 @@
3637
import org.springframework.integration.mqtt.core.ClientManager;
3738
import org.springframework.integration.mqtt.core.MqttComponent;
3839
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
39-
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
40-
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
4140
import org.springframework.integration.mqtt.event.MqttProtocolErrorEvent;
4241
import org.springframework.integration.mqtt.support.MqttHeaderMapper;
4342
import org.springframework.integration.mqtt.support.MqttMessageConverter;
@@ -62,15 +61,13 @@ public class Mqttv5PahoMessageHandler extends AbstractMqttMessageHandler<IMqttAs
6261

6362
private final MqttConnectionOptions connectionOptions;
6463

64+
private final MqttActionListener mqttPublishActionListener = new MqttPublishActionListener();
65+
6566
private IMqttAsyncClient mqttClient;
6667

6768
@Nullable
6869
private MqttClientPersistence persistence;
6970

70-
private boolean async;
71-
72-
private boolean asyncEvents;
73-
7471
private HeaderMapper<MqttProperties> headerMapper = new MqttHeaderMapper();
7572

7673
public Mqttv5PahoMessageHandler(String url, String clientId) {
@@ -118,28 +115,6 @@ public void setHeaderMapper(HeaderMapper<MqttProperties> headerMapper) {
118115
this.headerMapper = headerMapper;
119116
}
120117

121-
/**
122-
* Set to true if you don't want to block when sending messages. Default false.
123-
* When true, message sent/delivered events will be published for reception
124-
* by a suitably configured 'ApplicationListener' or an event
125-
* inbound-channel-adapter.
126-
* @param async true for async.
127-
* @see #setAsyncEvents(boolean)
128-
*/
129-
public void setAsync(boolean async) {
130-
this.async = async;
131-
}
132-
133-
/**
134-
* When {@link #setAsync(boolean)} is true, setting this to true enables
135-
* publication of {@link MqttMessageSentEvent} and {@link MqttMessageDeliveredEvent}
136-
* to be emitted. Default false.
137-
* @param asyncEvents the asyncEvents.
138-
*/
139-
public void setAsyncEvents(boolean asyncEvents) {
140-
this.asyncEvents = asyncEvents;
141-
}
142-
143118
@Override
144119
protected void onInit() {
145120
super.onInit();
@@ -268,34 +243,23 @@ protected void publish(String topic, Object mqttMessage, Message<?> message) {
268243
if (!this.mqttClient.isConnected()) {
269244
this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
270245
}
271-
IMqttToken token = this.mqttClient.publish(topic, (MqttMessage) mqttMessage);
272-
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
273-
if (!this.async) {
246+
IMqttToken token =
247+
this.mqttClient.publish(topic, (MqttMessage) mqttMessage, null, this.mqttPublishActionListener);
248+
if (!isAsync()) {
274249
token.waitForCompletion(completionTimeout); // NOSONAR (sync)
275250
}
276-
else if (this.asyncEvents && applicationEventPublisher != null) {
277-
applicationEventPublisher.publishEvent(
278-
new MqttMessageSentEvent(this, message, topic, token.getMessageId(), getClientId(),
279-
getClientInstance()));
251+
else {
252+
messageSentEvent(message, topic, token.getMessageId());
280253
}
281254
}
282255
catch (MqttException ex) {
283256
throw new MessageHandlingException(message, "Failed to publish to MQTT in the [" + this + ']', ex);
284257
}
285258
}
286259

287-
private void sendDeliveryComplete(IMqttToken token) {
288-
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
289-
if (this.async && this.asyncEvents && applicationEventPublisher != null) {
290-
applicationEventPublisher.publishEvent(
291-
new MqttMessageDeliveredEvent(this, token.getMessageId(), getClientId(),
292-
getClientInstance()));
293-
}
294-
}
295-
296260
@Override
297261
public void deliveryComplete(IMqttToken token) {
298-
sendDeliveryComplete(token);
262+
299263
}
300264

301265
@Override
@@ -330,4 +294,21 @@ public void authPacketArrived(int reasonCode, MqttProperties properties) {
330294

331295
}
332296

297+
private final class MqttPublishActionListener implements MqttActionListener {
298+
299+
MqttPublishActionListener() {
300+
}
301+
302+
@Override
303+
public void onSuccess(IMqttToken asyncActionToken) {
304+
sendDeliveryCompleteEvent(asyncActionToken.getMessageId());
305+
}
306+
307+
@Override
308+
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
309+
sendFailedDeliveryEvent(asyncActionToken.getMessageId(), exception);
310+
}
311+
312+
}
313+
333314
}

0 commit comments

Comments
 (0)