Skip to content
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,7 +48,6 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
builder.addPropertyReference("outputChannel", channelName);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-channel");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "qos");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "recovery-interval");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "manual-acks");

return builder.getBeanDefinition();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.core;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter;
import org.springframework.util.Assert;

/**
* @param <T> MQTT client type
* @param <C> MQTT connection options type (v5 or v3)
*
* @author Artem Vozhdayenko
*
* @since 6.0
*/
public abstract class AbstractMqttClientManager<T, C> implements ClientManager<T, C>, ApplicationEventPublisherAware {

protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR

private static final int DEFAULT_MANAGER_PHASE = 0;

private final Set<ConnectCallback> connectCallbacks;

private final String clientId;

private int phase = DEFAULT_MANAGER_PHASE;

private boolean manualAcks;

private ApplicationEventPublisher applicationEventPublisher;

private String url;

private volatile T client;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just another nit-pick: volatile props go after all others 😄


private String beanName;

AbstractMqttClientManager(String clientId) {
Assert.notNull(clientId, "'clientId' is required");
this.clientId = clientId;
this.connectCallbacks = Collections.synchronizedSet(new HashSet<>());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that I have commented before that this initialization could be done directly on the property declaration.

}

protected void setManualAcks(boolean manualAcks) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that this one has to be public

this.manualAcks = manualAcks;
}

protected String getUrl() {
return this.url;
}

protected void setUrl(String url) {
this.url = url;
}

protected String getClientId() {
return this.clientId;
}

protected ApplicationEventPublisher getApplicationEventPublisher() {
return this.applicationEventPublisher;
}

protected synchronized void setClient(T client) {
this.client = client;
}

protected Set<ConnectCallback> getCallbacks() {
return this.connectCallbacks;
}

@Override
public boolean isManualAcks() {
return this.manualAcks;
}

@Override
public T getClient() {
return this.client;
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
Assert.notNull(applicationEventPublisher, "'applicationEventPublisher' cannot be null");
this.applicationEventPublisher = applicationEventPublisher;
}

@Override
public void setBeanName(String name) {
this.beanName = name;
}

@Override
public String getBeanName() {
return this.beanName;
}

/**
* The phase of component autostart in {@link SmartLifecycle}.
* If the custom one is required, note that for the correct behavior it should be less than phase of
* {@link AbstractMqttMessageDrivenChannelAdapter} implementations.
* The default phase is {@link #DEFAULT_MANAGER_PHASE}.
* @return {@link SmartLifecycle} autostart phase
* @see #setPhase
*/
@Override
public int getPhase() {
return this.phase;
}

@Override
public void addCallback(ConnectCallback connectCallback) {
this.connectCallbacks.add(connectCallback);
}

@Override
public boolean removeCallback(ConnectCallback connectCallback) {
return this.connectCallbacks.remove(connectCallback);
}

public synchronized boolean isRunning() {
return this.client != null;
}

/**
* Sets the phase of component autostart in {@link SmartLifecycle}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* If the custom one is required, note that for the correct behavior it should be less than phase of
* {@link AbstractMqttMessageDrivenChannelAdapter} implementations.
* @see #getPhase
*/
public void setPhase(int phase) {
this.phase = phase;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.core;

import org.springframework.context.SmartLifecycle;

/**
* A utility abstraction over MQTT client which can be used in any MQTT-related component
* without need to handle generic client callbacks, reconnects etc.
* Using this manager in multiple MQTT integrations will preserve a single connection.
*
* @param <T> MQTT client type
* @param <C> MQTT connection options type (v5 or v3)
*
* @author Artem Vozhdayenko
*
* @since 6.0
*/
public interface ClientManager<T, C> extends SmartLifecycle, MqttComponent<C> {

T getClient();

boolean isManualAcks();

void addCallback(ConnectCallback connectCallback);

boolean removeCallback(ConnectCallback connectCallback);

/**
* A contract for a custom callback if needed by a usage.
*
* @see org.eclipse.paho.mqttv5.client.MqttCallback#connectComplete
* @see org.eclipse.paho.client.mqttv3.MqttCallbackExtended#connectComplete
*/
interface ConnectCallback {

void connectComplete(boolean isReconnect);

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.core;

import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.util.Assert;

/**
* @author Artem Vozhdayenko
* @since 6.0
*/
public class Mqttv3ClientManager extends AbstractMqttClientManager<IMqttAsyncClient, MqttConnectOptions>
implements MqttCallbackExtended {

private final MqttPahoClientFactory clientFactory;

public Mqttv3ClientManager(String url, String clientId) {
this(buildDefaultClientFactory(url), clientId);
}

public Mqttv3ClientManager(MqttPahoClientFactory clientFactory, String clientId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we still need to rely on this factory for this our new ClientManager abstraction.
How about just to make this v3 version similar to v5 - based on the MqttConnectionOptions.
And perhaps we need to add one more MqttClientPersistence option to these managers impls.
A setter should be OK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, initially if I remember correctly you asked to implement client manager using existing client factory abstraction 😄

super(clientId);
Assert.notNull(clientFactory, "'clientFactory' is required");
this.clientFactory = clientFactory;
MqttConnectOptions connectionOptions = clientFactory.getConnectionOptions();
String[] serverURIs = connectionOptions.getServerURIs();
Assert.notEmpty(serverURIs, "'serverURIs' must be provided in the 'MqttConnectionOptions'");
setUrl(serverURIs[0]);
if (!connectionOptions.isAutomaticReconnect()) {
logger.info("If this `ClientManager` is used from message-driven channel adapters, " +
"it is recommended to set 'automaticReconnect' MQTT connection option. " +
"Otherwise connection check and reconnect should be done manually.");
}
}

private static MqttPahoClientFactory buildDefaultClientFactory(String url) {
Assert.notNull(url, "'url' is required");
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setServerURIs(new String[]{ url });
connectOptions.setAutomaticReconnect(true);
DefaultMqttPahoClientFactory defaultFactory = new DefaultMqttPahoClientFactory();
defaultFactory.setConnectionOptions(connectOptions);
return defaultFactory;
}

@Override
public synchronized void start() {
if (getClient() == null) {
try {
var client = this.clientFactory.getAsyncClientInstance(getUrl(), getClientId());
client.setManualAcks(isManualAcks());
client.setCallback(this);
setClient(client);
}
catch (MqttException e) {
throw new IllegalStateException("could not start client manager", e);
}
}
try {
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
getClient().connect(options).waitForCompletion(options.getConnectionTimeout());
}
catch (MqttException e) {
logger.error("could not start client manager, client_id=" + getClientId(), e);

// See GH-3822
if (getConnectionInfo().isAutomaticReconnect()) {
try {
getClient().reconnect();
}
catch (MqttException re) {
logger.error("MQTT client failed to connect. Never happens.", re);
}
}
else {
var applicationEventPublisher = getApplicationEventPublisher();
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e));
}
}
}
}

@Override
public synchronized void stop() {
var client = getClient();
if (client == null) {
return;
}
try {
client.disconnectForcibly(this.clientFactory.getConnectionOptions().getConnectionTimeout());
}
catch (MqttException e) {
logger.error("could not disconnect from the client", e);
}
finally {
try {
client.close();
}
catch (MqttException e) {
logger.error("could not close the client", e);
}
setClient(null);
}
}

@Override
public synchronized void connectionLost(Throwable cause) {
logger.error("connection lost, client_id=" + getClientId(), cause);
}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
getCallbacks().forEach(callback -> callback.connectComplete(reconnect));
}

@Override
public void messageArrived(String topic, MqttMessage message) {
// not this manager concern
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// nor this manager concern
}

@Override
public MqttConnectOptions getConnectionInfo() {
return this.clientFactory.getConnectionOptions();
}
}
Loading