Skip to content

GH-9428: Emit MQTT delivery events even if share client instance #9435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.event;

import java.io.Serial;

/**
* An event emitted (when using aysnc) when the client indicates the message
* was not delivered on publish operation.
*
* @author Artem Bilan
*
* @since 6.4
*
*/
public class MqttMessageNotDeliveredEvent extends MqttMessageDeliveryEvent {

@Serial
private static final long serialVersionUID = 8983514811627569920L;

private final Throwable exception;

public MqttMessageNotDeliveredEvent(Object source, int messageId, String clientId,
int clientInstance, Throwable exception) {

super(source, messageId, clientId, clientInstance);
this.exception = exception;
}

public Throwable getException() {
return this.exception;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,9 @@
import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor;
import org.springframework.integration.handler.MessageProcessor;
import org.springframework.integration.mqtt.core.ClientManager;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageNotDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.mqtt.support.MqttMessageConverter;
import org.springframework.integration.support.management.ManageableLifecycle;
Expand Down Expand Up @@ -76,6 +79,10 @@ public abstract class AbstractMqttMessageHandler<T, C> extends AbstractMessageHa

private final ClientManager<T, C> clientManager;

private boolean async;

private boolean asyncEvents;

private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;

private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT;
Expand Down Expand Up @@ -319,6 +326,32 @@ protected ClientManager<T, C> getClientManager() {
return this.clientManager;
}

/**
* Set to true if you don't want to block when sending messages. Default false.
* When true, message sent/delivered events will be published for reception
* by a suitably configured 'ApplicationListener' or an event
* inbound-channel-adapter.
* @param async true for async.
* @see #setAsyncEvents(boolean)
*/
public void setAsync(boolean async) {
this.async = async;
}

protected boolean isAsync() {
return this.async;
}

/**
* When {@link #setAsync(boolean)} is true, setting this to true enables
* publication of {@link MqttMessageSentEvent} and {@link MqttMessageDeliveredEvent}
* to be emitted. Default false.
* @param asyncEvents the asyncEvents.
*/
public void setAsyncEvents(boolean asyncEvents) {
this.asyncEvents = asyncEvents;
}

@Override
protected void onInit() {
super.onInit();
Expand Down Expand Up @@ -372,6 +405,31 @@ protected void handleMessageInternal(Message<?> message) {
publish(topic, mqttMessage, message);
}

protected void messageSentEvent(Message<?> message, String topic, int messageId) {
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (this.async && this.asyncEvents && applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new MqttMessageSentEvent(this, message, topic, messageId, getClientId(),
getClientInstance()));
}
}

protected void sendDeliveryCompleteEvent(int messageId) {
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (this.async && this.asyncEvents && applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new MqttMessageDeliveredEvent(this, messageId, getClientId(), getClientInstance()));
}
}

protected void sendFailedDeliveryEvent(int messageId, Throwable exception) {
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (this.async && this.asyncEvents && applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new MqttMessageNotDeliveredEvent(this, messageId, getClientId(), getClientInstance(), exception));
}
}

protected abstract void publish(String topic, Object mqttMessage, Message<?> message);

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package org.springframework.integration.mqtt.outbound;

import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
Expand All @@ -29,8 +31,6 @@
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoComponent;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttMessageConverter;
import org.springframework.integration.mqtt.support.MqttUtils;
Expand Down Expand Up @@ -60,9 +60,7 @@ public class MqttPahoMessageHandler extends AbstractMqttMessageHandler<IMqttAsyn

private final MqttPahoClientFactory clientFactory;

private boolean async;

private boolean asyncEvents;
private final IMqttActionListener mqttPublishActionListener = new MqttPublishActionListener();

private volatile IMqttAsyncClient client;

Expand Down Expand Up @@ -113,29 +111,6 @@ public MqttPahoMessageHandler(ClientManager<IMqttAsyncClient, MqttConnectOptions
this.clientFactory = factory;
}

/**
* Set to true if you don't want to block when sending messages. Default false.
* When true, message sent/delivered events will be published for reception
* by a suitably configured 'ApplicationListener' or an event
* inbound-channel-adapter.
* @param async true for async.
* @since 4.1
*/
public void setAsync(boolean async) {
this.async = async;
}

/**
* When {@link #setAsync(boolean)} is true, setting this to true enables
* publication of {@link MqttMessageSentEvent} and {@link MqttMessageDeliveredEvent}
* to be emitted. Default false.
* @param asyncEvents the asyncEvents.
* @since 4.1
*/
public void setAsyncEvents(boolean asyncEvents) {
this.asyncEvents = asyncEvents;
}

@Override
public MqttConnectOptions getConnectionInfo() {
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
Expand Down Expand Up @@ -236,31 +211,19 @@ protected void publish(String topic, Object mqttMessage, Message<?> message) {
Assert.isInstanceOf(MqttMessage.class, mqttMessage, "The 'mqttMessage' must be an instance of 'MqttMessage'");
try {
IMqttDeliveryToken token = checkConnection()
.publish(topic, (MqttMessage) mqttMessage);
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (!this.async) {
.publish(topic, (MqttMessage) mqttMessage, null, this.mqttPublishActionListener);
if (!isAsync()) {
token.waitForCompletion(getCompletionTimeout()); // NOSONAR (sync)
}
else if (this.asyncEvents && applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new MqttMessageSentEvent(this, message, topic, token.getMessageId(), getClientId(),
getClientInstance()));
else {
messageSentEvent(message, topic, token.getMessageId());
}
}
catch (MqttException e) {
throw new MessageHandlingException(message, "Failed to publish to MQTT in the [" + this + ']', e);
}
}

private void sendDeliveryComplete(IMqttDeliveryToken token) {
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (this.async && this.asyncEvents && applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new MqttMessageDeliveredEvent(this, token.getMessageId(), getClientId(),
getClientInstance()));
}
}

@Override
public void connectionLost(Throwable cause) {
this.lock.lock();
Expand Down Expand Up @@ -293,7 +256,24 @@ public void messageArrived(String topic, MqttMessage message) {

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
sendDeliveryComplete(token);

}

private final class MqttPublishActionListener implements IMqttActionListener {

MqttPublishActionListener() {
}

@Override
public void onSuccess(IMqttToken asyncActionToken) {
sendDeliveryCompleteEvent(asyncActionToken.getMessageId());
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
sendFailedDeliveryEvent(asyncActionToken.getMessageId(), exception);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.eclipse.paho.mqttv5.client.IMqttAsyncClient;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttActionListener;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClientPersistence;
Expand All @@ -36,8 +37,6 @@
import org.springframework.integration.mqtt.core.ClientManager;
import org.springframework.integration.mqtt.core.MqttComponent;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttProtocolErrorEvent;
import org.springframework.integration.mqtt.support.MqttHeaderMapper;
import org.springframework.integration.mqtt.support.MqttMessageConverter;
Expand All @@ -62,15 +61,13 @@ public class Mqttv5PahoMessageHandler extends AbstractMqttMessageHandler<IMqttAs

private final MqttConnectionOptions connectionOptions;

private final MqttActionListener mqttPublishActionListener = new MqttPublishActionListener();

private IMqttAsyncClient mqttClient;

@Nullable
private MqttClientPersistence persistence;

private boolean async;

private boolean asyncEvents;

private HeaderMapper<MqttProperties> headerMapper = new MqttHeaderMapper();

public Mqttv5PahoMessageHandler(String url, String clientId) {
Expand Down Expand Up @@ -118,28 +115,6 @@ public void setHeaderMapper(HeaderMapper<MqttProperties> headerMapper) {
this.headerMapper = headerMapper;
}

/**
* Set to true if you don't want to block when sending messages. Default false.
* When true, message sent/delivered events will be published for reception
* by a suitably configured 'ApplicationListener' or an event
* inbound-channel-adapter.
* @param async true for async.
* @see #setAsyncEvents(boolean)
*/
public void setAsync(boolean async) {
this.async = async;
}

/**
* When {@link #setAsync(boolean)} is true, setting this to true enables
* publication of {@link MqttMessageSentEvent} and {@link MqttMessageDeliveredEvent}
* to be emitted. Default false.
* @param asyncEvents the asyncEvents.
*/
public void setAsyncEvents(boolean asyncEvents) {
this.asyncEvents = asyncEvents;
}

@Override
protected void onInit() {
super.onInit();
Expand Down Expand Up @@ -268,34 +243,23 @@ protected void publish(String topic, Object mqttMessage, Message<?> message) {
if (!this.mqttClient.isConnected()) {
this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
}
IMqttToken token = this.mqttClient.publish(topic, (MqttMessage) mqttMessage);
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (!this.async) {
IMqttToken token =
this.mqttClient.publish(topic, (MqttMessage) mqttMessage, null, this.mqttPublishActionListener);
if (!isAsync()) {
token.waitForCompletion(completionTimeout); // NOSONAR (sync)
}
else if (this.asyncEvents && applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new MqttMessageSentEvent(this, message, topic, token.getMessageId(), getClientId(),
getClientInstance()));
else {
messageSentEvent(message, topic, token.getMessageId());
}
}
catch (MqttException ex) {
throw new MessageHandlingException(message, "Failed to publish to MQTT in the [" + this + ']', ex);
}
}

private void sendDeliveryComplete(IMqttToken token) {
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (this.async && this.asyncEvents && applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new MqttMessageDeliveredEvent(this, token.getMessageId(), getClientId(),
getClientInstance()));
}
}

@Override
public void deliveryComplete(IMqttToken token) {
sendDeliveryComplete(token);

}

@Override
Expand Down Expand Up @@ -330,4 +294,21 @@ public void authPacketArrived(int reasonCode, MqttProperties properties) {

}

private final class MqttPublishActionListener implements MqttActionListener {

MqttPublishActionListener() {
}

@Override
public void onSuccess(IMqttToken asyncActionToken) {
sendDeliveryCompleteEvent(asyncActionToken.getMessageId());
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
sendFailedDeliveryEvent(asyncActionToken.getMessageId(), exception);
}

}

}
Loading