diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttMessageNotDeliveredEvent.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttMessageNotDeliveredEvent.java new file mode 100644 index 00000000000..4b1320b66e6 --- /dev/null +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttMessageNotDeliveredEvent.java @@ -0,0 +1,48 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mqtt.event; + +import java.io.Serial; + +/** + * An event emitted (when using aysnc) when the client indicates the message + * was not delivered on publish operation. + * + * @author Artem Bilan + * + * @since 6.4 + * + */ +public class MqttMessageNotDeliveredEvent extends MqttMessageDeliveryEvent { + + @Serial + private static final long serialVersionUID = 8983514811627569920L; + + private final Throwable exception; + + public MqttMessageNotDeliveredEvent(Object source, int messageId, String clientId, + int clientInstance, Throwable exception) { + + super(source, messageId, clientId, clientInstance); + this.exception = exception; + } + + public Throwable getException() { + return this.exception; + } + +} diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java index 4b43a6722c9..fc1a1e36440 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,9 @@ import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor; import org.springframework.integration.handler.MessageProcessor; import org.springframework.integration.mqtt.core.ClientManager; +import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent; +import org.springframework.integration.mqtt.event.MqttMessageNotDeliveredEvent; +import org.springframework.integration.mqtt.event.MqttMessageSentEvent; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.integration.mqtt.support.MqttMessageConverter; import org.springframework.integration.support.management.ManageableLifecycle; @@ -76,6 +79,10 @@ public abstract class AbstractMqttMessageHandler extends AbstractMessageHa private final ClientManager clientManager; + private boolean async; + + private boolean asyncEvents; + private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT; private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT; @@ -319,6 +326,32 @@ protected ClientManager getClientManager() { return this.clientManager; } + /** + * Set to true if you don't want to block when sending messages. Default false. + * When true, message sent/delivered events will be published for reception + * by a suitably configured 'ApplicationListener' or an event + * inbound-channel-adapter. + * @param async true for async. + * @see #setAsyncEvents(boolean) + */ + public void setAsync(boolean async) { + this.async = async; + } + + protected boolean isAsync() { + return this.async; + } + + /** + * When {@link #setAsync(boolean)} is true, setting this to true enables + * publication of {@link MqttMessageSentEvent} and {@link MqttMessageDeliveredEvent} + * to be emitted. Default false. + * @param asyncEvents the asyncEvents. + */ + public void setAsyncEvents(boolean asyncEvents) { + this.asyncEvents = asyncEvents; + } + @Override protected void onInit() { super.onInit(); @@ -372,6 +405,31 @@ protected void handleMessageInternal(Message message) { publish(topic, mqttMessage, message); } + protected void messageSentEvent(Message message, String topic, int messageId) { + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (this.async && this.asyncEvents && applicationEventPublisher != null) { + applicationEventPublisher.publishEvent( + new MqttMessageSentEvent(this, message, topic, messageId, getClientId(), + getClientInstance())); + } + } + + protected void sendDeliveryCompleteEvent(int messageId) { + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (this.async && this.asyncEvents && applicationEventPublisher != null) { + applicationEventPublisher.publishEvent( + new MqttMessageDeliveredEvent(this, messageId, getClientId(), getClientInstance())); + } + } + + protected void sendFailedDeliveryEvent(int messageId, Throwable exception) { + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (this.async && this.asyncEvents && applicationEventPublisher != null) { + applicationEventPublisher.publishEvent( + new MqttMessageNotDeliveredEvent(this, messageId, getClientId(), getClientInstance(), exception)); + } + } + protected abstract void publish(String topic, Object mqttMessage, Message message); } diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java index 3a565d76cc4..0e33d996cea 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java @@ -16,8 +16,10 @@ package org.springframework.integration.mqtt.outbound; +import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; @@ -29,8 +31,6 @@ import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoComponent; import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; -import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent; -import org.springframework.integration.mqtt.event.MqttMessageSentEvent; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttMessageConverter; import org.springframework.integration.mqtt.support.MqttUtils; @@ -60,9 +60,7 @@ public class MqttPahoMessageHandler extends AbstractMqttMessageHandler message) { Assert.isInstanceOf(MqttMessage.class, mqttMessage, "The 'mqttMessage' must be an instance of 'MqttMessage'"); try { IMqttDeliveryToken token = checkConnection() - .publish(topic, (MqttMessage) mqttMessage); - ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); - if (!this.async) { + .publish(topic, (MqttMessage) mqttMessage, null, this.mqttPublishActionListener); + if (!isAsync()) { token.waitForCompletion(getCompletionTimeout()); // NOSONAR (sync) } - else if (this.asyncEvents && applicationEventPublisher != null) { - applicationEventPublisher.publishEvent( - new MqttMessageSentEvent(this, message, topic, token.getMessageId(), getClientId(), - getClientInstance())); + else { + messageSentEvent(message, topic, token.getMessageId()); } } catch (MqttException e) { @@ -252,15 +224,6 @@ else if (this.asyncEvents && applicationEventPublisher != null) { } } - private void sendDeliveryComplete(IMqttDeliveryToken token) { - ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); - if (this.async && this.asyncEvents && applicationEventPublisher != null) { - applicationEventPublisher.publishEvent( - new MqttMessageDeliveredEvent(this, token.getMessageId(), getClientId(), - getClientInstance())); - } - } - @Override public void connectionLost(Throwable cause) { this.lock.lock(); @@ -293,7 +256,24 @@ public void messageArrived(String topic, MqttMessage message) { @Override public void deliveryComplete(IMqttDeliveryToken token) { - sendDeliveryComplete(token); + + } + + private final class MqttPublishActionListener implements IMqttActionListener { + + MqttPublishActionListener() { + } + + @Override + public void onSuccess(IMqttToken asyncActionToken) { + sendDeliveryCompleteEvent(asyncActionToken.getMessageId()); + } + + @Override + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + sendFailedDeliveryEvent(asyncActionToken.getMessageId(), exception); + } + } } diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java index 60ce021d45b..3b0af69ff2a 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java @@ -20,6 +20,7 @@ import org.eclipse.paho.mqttv5.client.IMqttAsyncClient; import org.eclipse.paho.mqttv5.client.IMqttToken; +import org.eclipse.paho.mqttv5.client.MqttActionListener; import org.eclipse.paho.mqttv5.client.MqttAsyncClient; import org.eclipse.paho.mqttv5.client.MqttCallback; import org.eclipse.paho.mqttv5.client.MqttClientPersistence; @@ -36,8 +37,6 @@ import org.springframework.integration.mqtt.core.ClientManager; import org.springframework.integration.mqtt.core.MqttComponent; import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; -import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent; -import org.springframework.integration.mqtt.event.MqttMessageSentEvent; import org.springframework.integration.mqtt.event.MqttProtocolErrorEvent; import org.springframework.integration.mqtt.support.MqttHeaderMapper; import org.springframework.integration.mqtt.support.MqttMessageConverter; @@ -62,15 +61,13 @@ public class Mqttv5PahoMessageHandler extends AbstractMqttMessageHandler headerMapper = new MqttHeaderMapper(); public Mqttv5PahoMessageHandler(String url, String clientId) { @@ -118,28 +115,6 @@ public void setHeaderMapper(HeaderMapper headerMapper) { this.headerMapper = headerMapper; } - /** - * Set to true if you don't want to block when sending messages. Default false. - * When true, message sent/delivered events will be published for reception - * by a suitably configured 'ApplicationListener' or an event - * inbound-channel-adapter. - * @param async true for async. - * @see #setAsyncEvents(boolean) - */ - public void setAsync(boolean async) { - this.async = async; - } - - /** - * When {@link #setAsync(boolean)} is true, setting this to true enables - * publication of {@link MqttMessageSentEvent} and {@link MqttMessageDeliveredEvent} - * to be emitted. Default false. - * @param asyncEvents the asyncEvents. - */ - public void setAsyncEvents(boolean asyncEvents) { - this.asyncEvents = asyncEvents; - } - @Override protected void onInit() { super.onInit(); @@ -268,15 +243,13 @@ protected void publish(String topic, Object mqttMessage, Message message) { if (!this.mqttClient.isConnected()) { this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout); } - IMqttToken token = this.mqttClient.publish(topic, (MqttMessage) mqttMessage); - ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); - if (!this.async) { + IMqttToken token = + this.mqttClient.publish(topic, (MqttMessage) mqttMessage, null, this.mqttPublishActionListener); + if (!isAsync()) { token.waitForCompletion(completionTimeout); // NOSONAR (sync) } - else if (this.asyncEvents && applicationEventPublisher != null) { - applicationEventPublisher.publishEvent( - new MqttMessageSentEvent(this, message, topic, token.getMessageId(), getClientId(), - getClientInstance())); + else { + messageSentEvent(message, topic, token.getMessageId()); } } catch (MqttException ex) { @@ -284,18 +257,9 @@ else if (this.asyncEvents && applicationEventPublisher != null) { } } - private void sendDeliveryComplete(IMqttToken token) { - ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); - if (this.async && this.asyncEvents && applicationEventPublisher != null) { - applicationEventPublisher.publishEvent( - new MqttMessageDeliveredEvent(this, token.getMessageId(), getClientId(), - getClientInstance())); - } - } - @Override public void deliveryComplete(IMqttToken token) { - sendDeliveryComplete(token); + } @Override @@ -330,4 +294,21 @@ public void authPacketArrived(int reasonCode, MqttProperties properties) { } + private final class MqttPublishActionListener implements MqttActionListener { + + MqttPublishActionListener() { + } + + @Override + public void onSuccess(IMqttToken asyncActionToken) { + sendDeliveryCompleteEvent(asyncActionToken.getMessageId()); + } + + @Override + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + sendFailedDeliveryEvent(asyncActionToken.getMessageId(), exception); + } + + } + } diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ClientManagerBackToBackTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ClientManagerBackToBackTests.java index 611fe10e37c..921e47089a6 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ClientManagerBackToBackTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ClientManagerBackToBackTests.java @@ -17,6 +17,8 @@ package org.springframework.integration.mqtt; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -37,6 +39,7 @@ import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.mqtt.core.Mqttv3ClientManager; import org.springframework.integration.mqtt.core.Mqttv5ClientManager; +import org.springframework.integration.mqtt.event.MqttMessageDeliveryEvent; import org.springframework.integration.mqtt.event.MqttSubscribedEvent; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter; @@ -49,6 +52,7 @@ import org.springframework.messaging.PollableChannel; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; /** * @author Artem Vozhdayenko @@ -92,6 +96,7 @@ void testV5ClientManagerRuntime() throws Exception { Mqttv5ConfigRuntime.subscribedLatch); } + @SuppressWarnings("unchecked") private void testSubscribeAndPublish(Class configClass, String topicName, CountDownLatch subscribedLatch) throws Exception { @@ -115,6 +120,12 @@ private void testSubscribeAndPublish(Class configClass, String topicName, Cou else { assertThat(payload).isEqualTo(testPayload.getBytes(StandardCharsets.UTF_8)); } + + if (ctx.containsBean("deliveryEvents")) { + List deliveryEvents = ctx.getBean("deliveryEvents", List.class); + // MqttMessageSentEvent and MqttMessageDeliveredEvent + await().untilAsserted(() -> assertThat(deliveryEvents).hasSize(2)); + } } } @@ -164,6 +175,16 @@ public void onSubscribed(MqttSubscribedEvent e) { subscribedLatch.countDown(); } + @EventListener + void mqttEvents(MqttMessageDeliveryEvent event) { + deliveryEvents().add(event); + } + + @Bean + List deliveryEvents() { + return new ArrayList<>(); + } + @Bean public Mqttv3ClientManager mqttv3ClientManager() { MqttConnectOptions connectionOptions = new MqttConnectOptions(); @@ -174,7 +195,10 @@ public Mqttv3ClientManager mqttv3ClientManager() { @Bean public IntegrationFlow mqttOutFlow(Mqttv3ClientManager mqttv3ClientManager) { - return f -> f.handle(new MqttPahoMessageHandler(mqttv3ClientManager)); + MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(mqttv3ClientManager); + mqttPahoMessageHandler.setAsync(true); + mqttPahoMessageHandler.setAsyncEvents(true); + return f -> f.handle(mqttPahoMessageHandler); } @Bean @@ -257,6 +281,7 @@ public MessageProducerSupport createMessageDrivenAdapter(ApplicationContext ctx) var clientManager = ctx.getBean(Mqttv3ClientManager.class); return new MqttPahoMessageDrivenChannelAdapter(clientManager, TOPIC_NAME); } + } @Configuration @@ -272,6 +297,16 @@ public void onSubscribed(MqttSubscribedEvent e) { subscribedLatch.countDown(); } + @EventListener + void mqttEvents(MqttMessageDeliveryEvent event) { + deliveryEvents().add(event); + } + + @Bean + List deliveryEvents() { + return new ArrayList<>(); + } + @Bean public Mqttv5ClientManager mqttv5ClientManager() { return new Mqttv5ClientManager(MosquittoContainerTest.mqttUrl(), "client-manager-client-id-v5"); @@ -280,7 +315,10 @@ public Mqttv5ClientManager mqttv5ClientManager() { @Bean @ServiceActivator(inputChannel = "mqttOutFlow.input") public Mqttv5PahoMessageHandler mqttv5PahoMessageHandler(Mqttv5ClientManager mqttv5ClientManager) { - return new Mqttv5PahoMessageHandler(mqttv5ClientManager); + Mqttv5PahoMessageHandler mqttPahoMessageHandler = new Mqttv5PahoMessageHandler(mqttv5ClientManager); + mqttPahoMessageHandler.setAsync(true); + mqttPahoMessageHandler.setAsyncEvents(true); + return mqttPahoMessageHandler; } @Bean @@ -358,10 +396,13 @@ public MessageProducerSupport createMessageDrivenAdapter(ApplicationContext ctx) var clientManager = ctx.getBean(Mqttv5ClientManager.class); return new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, TOPIC_NAME); } + } interface MessageDrivenChannelAdapterFactory { + MessageProducerSupport createMessageDrivenAdapter(ApplicationContext ctx); + } record ClientV3Disconnector(Mqttv3ClientManager clientManager) { diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java index 8d14687a0ca..2a3588b67ba 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java @@ -182,7 +182,7 @@ public void testOutboundOptionsApplied() throws Exception { assertThat(new String(message.getPayload())).isEqualTo("Hello, world!"); publishCalled.set(true); return deliveryToken; - }).given(client).publish(anyString(), any(MqttMessage.class)); + }).given(client).publish(anyString(), any(), any(), any()); handler.handleMessage(new GenericMessage<>("Hello, world!")); @@ -204,7 +204,7 @@ void testClientManagerIsNotConnectedAndClosedInHandler() throws Exception { given(clientManager.getClient()).willReturn(client); var deliveryToken = mock(MqttDeliveryToken.class); - given(client.publish(anyString(), any(MqttMessage.class))).willReturn(deliveryToken); + given(client.publish(anyString(), any(), any(), any())).willReturn(deliveryToken); var handler = new MqttPahoMessageHandler(clientManager); handler.setDefaultTopic("mqtt-foo"); @@ -218,7 +218,7 @@ void testClientManagerIsNotConnectedAndClosedInHandler() throws Exception { // then verify(client, never()).connect(any(MqttConnectOptions.class)); - verify(client).publish(anyString(), any(MqttMessage.class)); + verify(client).publish(anyString(), any(), any(), any()); verify(client, never()).disconnect(); verify(client, never()).disconnect(anyLong()); verify(client, never()).close(); diff --git a/src/reference/antora/modules/ROOT/pages/mqtt.adoc b/src/reference/antora/modules/ROOT/pages/mqtt.adoc index 3e0154a6262..ce0ba68ea13 100644 --- a/src/reference/antora/modules/ROOT/pages/mqtt.adoc +++ b/src/reference/antora/modules/ROOT/pages/mqtt.adoc @@ -399,6 +399,7 @@ Certain application events are published by the adapters. For the MQTT v5 Paho client, this event is also emitted when the server performs a normal disconnection, in which case the `cause` of the lost connection is `null`. * `MqttMessageSentEvent` - published by the outbound adapter when a message has been sent, if running in asynchronous mode. * `MqttMessageDeliveredEvent` - published by the outbound adapter when the client indicates that a message has been delivered, if running in asynchronous mode. +* `MqttMessageNotDeliveredEvent` - published by the outbound adapter when the client indicates that a message has not been delivered, if running in asynchronous mode. * `MqttSubscribedEvent` - published by the inbound adapter after subscribing to the topics. These events can be received by an `ApplicationListener` or with an `@EventListener` method. diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index c3f3c50a62c..d92bd74f9f1 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -72,5 +72,6 @@ See xref:sftp/session-factory.adoc[SFTP Session Factory] for more information. === MQTT Support Changes Multiple instances of `MqttPahoMessageDrivenChannelAdapter` and `Mqttv5PahoMessageDrivenChannelAdapter` can now be added at runtime using corresponding `ClientManager` through `IntegrationFlowContext` +Also a `MqttMessageNotDeliveredEvent` event has been introduced to emit when action callback reacts to the delivery failure. See xref:mqtt.adoc[MQTT Support] for more information.