Skip to content

Commit b1b3174

Browse files
garyrussellartembilan
authored andcommitted
GH-2471: Close client on outbound adapter
Resolves #2471 # Conflicts: # spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java
1 parent ef9c760 commit b1b3174

File tree

2 files changed

+61
-9
lines changed

2 files changed

+61
-9
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,15 +159,17 @@ protected void doStop() {
159159

160160
private synchronized IMqttAsyncClient checkConnection() throws MqttException {
161161
if (this.client != null && !this.client.isConnected()) {
162+
this.client.setCallback(null);
162163
this.client.close();
163164
this.client = null;
164165
}
165166
if (this.client == null) {
167+
IMqttAsyncClient client = null;
166168
try {
167169
MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
168170
Assert.state(this.getUrl() != null || connectionOptions.getServerURIs() != null,
169171
"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
170-
IMqttAsyncClient client = this.clientFactory.getAsyncClientInstance(this.getUrl(), this.getClientId());
172+
client = this.clientFactory.getAsyncClientInstance(this.getUrl(), this.getClientId());
171173
incrementClientInstance();
172174
client.setCallback(this);
173175
client.connect(connectionOptions).waitForCompletion(this.completionTimeout);
@@ -177,6 +179,10 @@ private synchronized IMqttAsyncClient checkConnection() throws MqttException {
177179
}
178180
}
179181
catch (MqttException e) {
182+
if (client != null) {
183+
client.close();
184+
client = null;
185+
}
180186
throw new MessagingException("Failed to connect", e);
181187
}
182188
}
@@ -209,6 +215,13 @@ private void sendDeliveryComplete(IMqttDeliveryToken token) {
209215
@Override
210216
public synchronized void connectionLost(Throwable cause) {
211217
logger.error("Lost connection; will attempt reconnect on next request");
218+
try {
219+
this.client.setCallback(null);
220+
this.client.close();
221+
}
222+
catch (MqttException e) {
223+
// NOSONAR
224+
}
212225
this.client = null;
213226
}
214227

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.junit.Assert.assertSame;
2424
import static org.junit.Assert.assertThat;
2525
import static org.junit.Assert.assertTrue;
26+
import static org.junit.Assert.fail;
2627
import static org.mockito.BDDMockito.given;
2728
import static org.mockito.BDDMockito.willAnswer;
2829
import static org.mockito.BDDMockito.willReturn;
@@ -50,6 +51,7 @@
5051

5152
import org.aopalliance.intercept.MethodInterceptor;
5253
import org.apache.commons.logging.Log;
54+
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
5355
import org.eclipse.paho.client.mqttv3.IMqttClient;
5456
import org.eclipse.paho.client.mqttv3.IMqttToken;
5557
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
@@ -77,6 +79,7 @@
7779
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
7880
import org.springframework.integration.test.util.TestUtils;
7981
import org.springframework.messaging.Message;
82+
import org.springframework.messaging.MessagingException;
8083
import org.springframework.messaging.support.GenericMessage;
8184
import org.springframework.scheduling.TaskScheduler;
8285
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@@ -100,15 +103,32 @@ public class MqttAdapterTests {
100103
}
101104

102105
@Test
103-
public void testCloseOnBadConnect() throws Exception {
106+
public void testCloseOnBadConnectIn() throws Exception {
104107
final IMqttClient client = mock(IMqttClient.class);
105108
willThrow(new MqttException(0)).given(client).connect(any());
106-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapter(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
109+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
107110
adapter.start();
108111
verify(client).close();
109112
adapter.stop();
110113
}
111114

115+
@Test
116+
public void testCloseOnBadConnectOut() throws Exception {
117+
final IMqttAsyncClient client = mock(IMqttAsyncClient.class);
118+
willThrow(new MqttException(0)).given(client).connect(any());
119+
MqttPahoMessageHandler adapter = buildAdapterOut(client);
120+
adapter.start();
121+
try {
122+
adapter.handleMessage(new GenericMessage<>("foo"));
123+
fail("exception expected");
124+
}
125+
catch (MessagingException e) {
126+
// NOSONAR
127+
}
128+
verify(client).close();
129+
adapter.stop();
130+
}
131+
112132
@Test
113133
public void testOutboundOptionsApplied() throws Exception {
114134
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
@@ -290,7 +310,7 @@ public void testInboundOptionsApplied() throws Exception {
290310
@Test
291311
public void testStopActionDefault() throws Exception {
292312
final IMqttClient client = mock(IMqttClient.class);
293-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapter(client, null, null);
313+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, null);
294314

295315
adapter.start();
296316
adapter.stop();
@@ -300,7 +320,7 @@ public void testStopActionDefault() throws Exception {
300320
@Test
301321
public void testStopActionDefaultNotClean() throws Exception {
302322
final IMqttClient client = mock(IMqttClient.class);
303-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapter(client, false, null);
323+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false, null);
304324

305325
adapter.start();
306326
adapter.stop();
@@ -310,7 +330,7 @@ public void testStopActionDefaultNotClean() throws Exception {
310330
@Test
311331
public void testStopActionAlways() throws Exception {
312332
final IMqttClient client = mock(IMqttClient.class);
313-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapter(client, false,
333+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false,
314334
ConsumerStopAction.UNSUBSCRIBE_ALWAYS);
315335

316336
adapter.start();
@@ -328,7 +348,7 @@ public void testStopActionAlways() throws Exception {
328348
@Test
329349
public void testStopActionNever() throws Exception {
330350
final IMqttClient client = mock(IMqttClient.class);
331-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapter(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
351+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
332352

333353
adapter.start();
334354
adapter.stop();
@@ -338,7 +358,7 @@ public void testStopActionNever() throws Exception {
338358
@Test
339359
public void testReconnect() throws Exception {
340360
final IMqttClient client = mock(IMqttClient.class);
341-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapter(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
361+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
342362
adapter.setRecoveryInterval(10);
343363
Log logger = spy(TestUtils.getPropertyValue(adapter, "logger", Log.class));
344364
new DirectFieldAccessor(adapter).setPropertyValue("logger", logger);
@@ -364,7 +384,7 @@ public void testReconnect() throws Exception {
364384
taskScheduler.destroy();
365385
}
366386

367-
private MqttPahoMessageDrivenChannelAdapter buildAdapter(final IMqttClient client, Boolean cleanSession,
387+
private MqttPahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttClient client, Boolean cleanSession,
368388
ConsumerStopAction action) throws MqttException {
369389
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory() {
370390

@@ -392,6 +412,25 @@ public IMqttClient getClientInstance(String uri, String clientId) throws MqttExc
392412
return adapter;
393413
}
394414

415+
private MqttPahoMessageHandler buildAdapterOut(final IMqttAsyncClient client) throws MqttException {
416+
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory() {
417+
418+
@Override
419+
public IMqttAsyncClient getAsyncClientInstance(String uri, String clientId) throws MqttException {
420+
return client;
421+
}
422+
423+
};
424+
MqttConnectOptions connectOptions = new MqttConnectOptions();
425+
connectOptions.setServerURIs(new String[] { "tcp://localhost:1883" });
426+
factory.setConnectionOptions(connectOptions);
427+
MqttPahoMessageHandler adapter = new MqttPahoMessageHandler("client", factory);
428+
adapter.setDefaultTopic("foo");
429+
adapter.setApplicationEventPublisher(mock(ApplicationEventPublisher.class));
430+
adapter.afterPropertiesSet();
431+
return adapter;
432+
}
433+
395434
private void verifyUnsubscribe(IMqttClient client) throws Exception {
396435
verify(client).connect(any(MqttConnectOptions.class));
397436
verify(client).subscribe(any(String[].class), any(int[].class));

0 commit comments

Comments
 (0)