Skip to content

Commit 490ae1b

Browse files
artembilangaryrussell
authored andcommitted
Some various polishing
* Add `RotatingServerAdvice.StandardRotationPolicy.getCurrent()` method for better end-user experience when this class is extended * Add NPE protection into the MQTT Channel Adapters. Some code style polishing for them **Cherry-pick to 5.0.x**
1 parent 74f6c75 commit 490ae1b

File tree

3 files changed

+34
-23
lines changed

3 files changed

+34
-23
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotatingServerAdvice.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
*
3737
* @author Gary Russell
3838
* @author Michael Forstner
39+
* @author Artem Bilan
3940
*
4041
* @since 5.0.7
4142
*
@@ -118,7 +119,7 @@ public static class StandardRotationPolicy implements RotationPolicy {
118119

119120
protected final Log logger = LogFactory.getLog(getClass());
120121

121-
private final DelegatingSessionFactory<?> factory;
122+
protected final DelegatingSessionFactory<?> factory;
122123

123124
private final List<KeyDirectory> keyDirectories = new ArrayList<>();
124125

@@ -170,6 +171,10 @@ protected boolean isFair() {
170171
return this.fair;
171172
}
172173

174+
protected KeyDirectory getCurrent() {
175+
return this.current;
176+
}
177+
173178
@Override
174179
public void beforeReceive(MessageSource<?> source) {
175180
if (this.fair || !this.initialized) {

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ protected synchronized void doStop() {
162162
if (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_ALWAYS)
163163
|| (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_CLEAN)
164164
&& this.cleanSession)) {
165+
165166
this.client.unsubscribe(getTopic());
166167
}
167168
}
@@ -249,8 +250,8 @@ private synchronized void connectAndSubscribe() throws MqttException {
249250
if (grantedQos[i] != requestedQos[i]) {
250251
if (logger.isWarnEnabled()) {
251252
logger.warn("Granted QOS different to Requested QOS; topics: " + Arrays.toString(topics)
252-
+ " requested: " + Arrays.toString(requestedQos)
253-
+ " granted: " + Arrays.toString(grantedQos));
253+
+ " requested: " + Arrays.toString(requestedQos)
254+
+ " granted: " + Arrays.toString(grantedQos));
254255
}
255256
break;
256257
}
@@ -294,7 +295,8 @@ private synchronized void cancelReconnect() {
294295
}
295296
}
296297

297-
private void scheduleReconnect() {
298+
private synchronized void scheduleReconnect() {
299+
cancelReconnect();
298300
try {
299301
this.reconnectFuture = getTaskScheduler().schedule(() -> {
300302
try {
@@ -324,12 +326,14 @@ public synchronized void connectionLost(Throwable cause) {
324326
if (isRunning()) {
325327
this.logger.error("Lost connection: " + cause.getMessage() + "; retrying...");
326328
this.connected = false;
327-
try {
328-
this.client.setCallback(null);
329-
this.client.close();
330-
}
331-
catch (MqttException e) {
332-
// NOSONAR
329+
if (this.client != null) {
330+
try {
331+
this.client.setCallback(null);
332+
this.client.close();
333+
}
334+
catch (MqttException e) {
335+
// NOSONAR
336+
}
333337
}
334338
this.client = null;
335339
scheduleReconnect();
@@ -340,7 +344,7 @@ public synchronized void connectionLost(Throwable cause) {
340344
}
341345

342346
@Override
343-
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
347+
public void messageArrived(String topic, MqttMessage mqttMessage) {
344348
Message<?> message = this.getConverter().toMessage(topic, mqttMessage);
345349
try {
346350
sendMessage(message);

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
*
4040
* @author Gary Russell
4141
* @author Artem Bilan
42+
*
4243
* @since 4.0
4344
*
4445
*/
@@ -51,13 +52,13 @@ public class MqttPahoMessageHandler extends AbstractMqttMessageHandler
5152

5253
private final MqttPahoClientFactory clientFactory;
5354

54-
private volatile IMqttAsyncClient client;
55-
5655
private volatile boolean async;
5756

5857
private volatile boolean asyncEvents;
5958

60-
private volatile ApplicationEventPublisher applicationEventPublisher;
59+
private ApplicationEventPublisher applicationEventPublisher;
60+
61+
private volatile IMqttAsyncClient client;
6162

6263
/**
6364
* Use this constructor for a single url (although it may be overridden
@@ -181,7 +182,6 @@ private synchronized IMqttAsyncClient checkConnection() throws MqttException {
181182
catch (MqttException e) {
182183
if (client != null) {
183184
client.close();
184-
client = null;
185185
}
186186
throw new MessagingException("Failed to connect", e);
187187
}
@@ -215,18 +215,20 @@ private void sendDeliveryComplete(IMqttDeliveryToken token) {
215215
@Override
216216
public synchronized void connectionLost(Throwable cause) {
217217
logger.error("Lost connection; will attempt reconnect on next request");
218-
try {
219-
this.client.setCallback(null);
220-
this.client.close();
221-
}
222-
catch (MqttException e) {
223-
// NOSONAR
218+
if (this.client != null) {
219+
try {
220+
this.client.setCallback(null);
221+
this.client.close();
222+
}
223+
catch (MqttException e) {
224+
// NOSONAR
225+
}
226+
this.client = null;
224227
}
225-
this.client = null;
226228
}
227229

228230
@Override
229-
public void messageArrived(String topic, MqttMessage message) throws Exception {
231+
public void messageArrived(String topic, MqttMessage message) {
230232

231233
}
232234

0 commit comments

Comments
 (0)