Skip to content

Commit fd94955

Browse files
committed
INT-4379: JMS OG Shutdown reply container on stop
JIRA: https://jira.spring.io/browse/INT-4379 - shutdown the container when the gateway is stopped Also, improve test suite - at the end of the tests, hundreds of threads are running, some caused by the above but others because `TaskExecutor`s are not shut down - reduce the number of iterations in the JMS pipeline tests to speed things up - change more tests to extend `ActiveMQMultiContextTests`, to keep a single broker up __cherry-pick to 4.3.x__ (perhaps just the gateway fix)
1 parent 7e263aa commit fd94955

File tree

10 files changed

+87
-38
lines changed

10 files changed

+87
-38
lines changed

spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsOutboundGateway.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -157,6 +157,8 @@ public class JmsOutboundGateway extends AbstractReplyProducingMessageHandler imp
157157

158158
private volatile long idleReplyContainerTimeout;
159159

160+
private volatile boolean wasStopped;
161+
160162
private ScheduledFuture<?> idleTask;
161163

162164
/**
@@ -686,6 +688,10 @@ public void start() {
686688
if (this.replyContainer != null) {
687689
TaskScheduler taskScheduler = getTaskScheduler();
688690
if (this.idleReplyContainerTimeout <= 0) {
691+
if (this.wasStopped) {
692+
this.replyContainer.initialize();
693+
this.wasStopped = false;
694+
}
689695
this.replyContainer.start();
690696
}
691697
else {
@@ -705,7 +711,8 @@ public void start() {
705711
public void stop() {
706712
synchronized (this.lifeCycleMonitor) {
707713
if (this.replyContainer != null) {
708-
this.replyContainer.stop();
714+
this.replyContainer.shutdown();
715+
this.wasStopped = true;
709716
this.deleteDestinationIfTemporary(this.replyContainer.getDestination());
710717
if (this.reaper != null) {
711718
this.reaper.cancel(false);

spring-integration-jms/src/test/java/org/springframework/integration/jms/JmsOutboundGatewayTests.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.util.ArrayList;
3030
import java.util.List;
31+
import java.util.concurrent.ExecutorService;
3132
import java.util.concurrent.Executors;
3233
import java.util.concurrent.atomic.AtomicInteger;
3334

@@ -67,7 +68,7 @@
6768
*
6869
* @since 2.2.4
6970
*/
70-
public class JmsOutboundGatewayTests {
71+
public class JmsOutboundGatewayTests extends ActiveMQMultiContextTests {
7172

7273
private final Log logger = LogFactory.getLog(this.getClass());
7374

@@ -100,8 +101,9 @@ public void testReplyContainerRecovery() throws Exception {
100101
gateway.setUseReplyContainer(true);
101102
ReplyContainerProperties replyContainerProperties = new ReplyContainerProperties();
102103
final List<Throwable> errors = new ArrayList<Throwable>();
104+
ExecutorService exec = Executors.newFixedThreadPool(10);
103105
ErrorHandlingTaskExecutor errorHandlingTaskExecutor =
104-
new ErrorHandlingTaskExecutor(Executors.newFixedThreadPool(10), t -> {
106+
new ErrorHandlingTaskExecutor(exec, t -> {
105107
errors.add(t);
106108
throw new RuntimeException(t);
107109
});
@@ -157,6 +159,7 @@ public void testReplyContainerRecovery() throws Exception {
157159
}
158160
finally {
159161
gateway.stop();
162+
exec.shutdownNow();
160163
}
161164
}
162165

@@ -176,7 +179,8 @@ public void testConnectionBreakOnReplyMessageIdCorrelation() throws Exception {
176179
gateway.setReceiveTimeout(60000);
177180
gateway.afterPropertiesSet();
178181
gateway.start();
179-
Executors.newSingleThreadExecutor().execute(() -> gateway.handleMessage(new GenericMessage<String>("foo")));
182+
ExecutorService exec = Executors.newSingleThreadExecutor();
183+
exec.execute(() -> gateway.handleMessage(new GenericMessage<String>("foo")));
180184
CachingConnectionFactory connectionFactory2 = new CachingConnectionFactory(
181185
new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"));
182186
JmsTemplate template = new JmsTemplate(connectionFactory2);
@@ -197,6 +201,7 @@ public void testConnectionBreakOnReplyMessageIdCorrelation() throws Exception {
197201
gateway.stop();
198202
connectionFactory1.destroy();
199203
connectionFactory2.destroy();
204+
exec.shutdownNow();
200205
}
201206

202207
@Test
@@ -216,7 +221,8 @@ public void testConnectionBreakOnReplyCustomCorrelation() throws Exception {
216221
gateway.setCorrelationKey("JMSCorrelationID");
217222
gateway.afterPropertiesSet();
218223
gateway.start();
219-
Executors.newSingleThreadExecutor().execute(() -> gateway.handleMessage(new GenericMessage<String>("foo")));
224+
ExecutorService exec = Executors.newSingleThreadExecutor();
225+
exec.execute(() -> gateway.handleMessage(new GenericMessage<String>("foo")));
220226
CachingConnectionFactory connectionFactory2 = new CachingConnectionFactory(
221227
new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"));
222228
JmsTemplate template = new JmsTemplate(connectionFactory2);
@@ -239,6 +245,7 @@ public void testConnectionBreakOnReplyCustomCorrelation() throws Exception {
239245
gateway.stop();
240246
connectionFactory1.destroy();
241247
connectionFactory2.destroy();
248+
exec.shutdownNow();
242249
}
243250

244251
}

spring-integration-jms/src/test/java/org/springframework/integration/jms/JmsOutboundInsideChainTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,11 +36,12 @@
3636
* //INT-2275
3737
*
3838
* @author Artem Bilan
39+
* @author Gary Russell
3940
*/
4041
@RunWith(SpringJUnit4ClassRunner.class)
4142
@ContextConfiguration
4243
@DirtiesContext
43-
public class JmsOutboundInsideChainTests {
44+
public class JmsOutboundInsideChainTests extends ActiveMQMultiContextTests {
4445

4546
@Autowired
4647
private MessageChannel outboundChainChannel;

spring-integration-jms/src/test/java/org/springframework/integration/jms/OutboundGatewayConnectionTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import static org.mockito.Mockito.when;
2323

2424
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.ExecutorService;
2526
import java.util.concurrent.Executors;
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.atomic.AtomicReference;
@@ -80,7 +81,8 @@ public void testContainerWithDestBrokenConnection() throws Exception {
8081
final AtomicReference<Object> reply = new AtomicReference<Object>();
8182
final CountDownLatch latch1 = new CountDownLatch(1);
8283
final CountDownLatch latch2 = new CountDownLatch(1);
83-
Executors.newSingleThreadExecutor().execute(() -> {
84+
ExecutorService exec = Executors.newSingleThreadExecutor();
85+
exec.execute(() -> {
8486
latch1.countDown();
8587
try {
8688
reply.set(gateway.handleRequestMessage(new GenericMessage<String>("foo")));
@@ -107,7 +109,7 @@ public void testContainerWithDestBrokenConnection() throws Exception {
107109

108110
final CountDownLatch latch3 = new CountDownLatch(1);
109111
final CountDownLatch latch4 = new CountDownLatch(1);
110-
Executors.newSingleThreadExecutor().execute(() -> {
112+
exec.execute(() -> {
111113
latch3.countDown();
112114
try {
113115
reply.set(gateway.handleRequestMessage(new GenericMessage<String>("foo")));
@@ -131,6 +133,7 @@ public void testContainerWithDestBrokenConnection() throws Exception {
131133
broker.stop();
132134

133135
scheduler.destroy();
136+
exec.shutdownNow();
134137
}
135138

136139
}

spring-integration-jms/src/test/java/org/springframework/integration/jms/OutboundGatewayFunctionTests.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.mockito.Mockito.when;
2424

2525
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.ExecutorService;
2627
import java.util.concurrent.Executors;
2728
import java.util.concurrent.TimeUnit;
2829
import java.util.concurrent.atomic.AtomicReference;
@@ -56,7 +57,7 @@
5657
* @since 2.2
5758
*
5859
*/
59-
public class OutboundGatewayFunctionTests {
60+
public class OutboundGatewayFunctionTests extends ActiveMQMultiContextTests {
6061

6162
private static Destination requestQueue1 = new ActiveMQQueue("request1");
6263

@@ -102,7 +103,8 @@ public void testContainerWithDest() throws Exception {
102103
final AtomicReference<Object> reply = new AtomicReference<Object>();
103104
final CountDownLatch latch1 = new CountDownLatch(1);
104105
final CountDownLatch latch2 = new CountDownLatch(1);
105-
Executors.newSingleThreadExecutor().execute(() -> {
106+
ExecutorService exec = Executors.newSingleThreadExecutor();
107+
exec.execute(() -> {
106108
latch1.countDown();
107109
try {
108110
reply.set(gateway.handleRequestMessage(new GenericMessage<String>("foo")));
@@ -124,6 +126,7 @@ public void testContainerWithDest() throws Exception {
124126

125127
gateway.stop();
126128
scheduler.destroy();
129+
exec.shutdown();
127130
}
128131

129132
@Test
@@ -145,7 +148,8 @@ public void testContainerWithDestNoCorrelation() throws Exception {
145148
final AtomicReference<Object> reply = new AtomicReference<Object>();
146149
final CountDownLatch latch1 = new CountDownLatch(1);
147150
final CountDownLatch latch2 = new CountDownLatch(1);
148-
Executors.newSingleThreadExecutor().execute(() -> {
151+
ExecutorService exec = Executors.newSingleThreadExecutor();
152+
exec.execute(() -> {
149153
latch1.countDown();
150154
try {
151155
reply.set(gateway.handleRequestMessage(new GenericMessage<String>("foo")));
@@ -170,6 +174,7 @@ public void testContainerWithDestNoCorrelation() throws Exception {
170174

171175
gateway.stop();
172176
scheduler.destroy();
177+
exec.shutdownNow();
173178
}
174179

175180
@Test
@@ -192,7 +197,8 @@ public void testContainerWithDestName() throws Exception {
192197
final AtomicReference<Object> reply = new AtomicReference<Object>();
193198
final CountDownLatch latch1 = new CountDownLatch(1);
194199
final CountDownLatch latch2 = new CountDownLatch(1);
195-
Executors.newSingleThreadExecutor().execute(() -> {
200+
ExecutorService exec = Executors.newSingleThreadExecutor();
201+
exec.execute(() -> {
196202
latch1.countDown();
197203
try {
198204
reply.set(gateway.handleRequestMessage(new GenericMessage<String>("foo")));
@@ -214,6 +220,7 @@ public void testContainerWithDestName() throws Exception {
214220

215221
gateway.stop();
216222
scheduler.destroy();
223+
exec.shutdownNow();
217224
}
218225

219226
@Test
@@ -235,7 +242,8 @@ public void testContainerWithDestNameNoCorrelation() throws Exception {
235242
final AtomicReference<Object> reply = new AtomicReference<>();
236243
final CountDownLatch latch1 = new CountDownLatch(1);
237244
final CountDownLatch latch2 = new CountDownLatch(1);
238-
Executors.newSingleThreadExecutor().execute(() -> {
245+
ExecutorService exec = Executors.newSingleThreadExecutor();
246+
exec.execute(() -> {
239247
latch1.countDown();
240248
try {
241249
reply.set(gateway.handleRequestMessage(new GenericMessage<>("foo")));
@@ -260,6 +268,7 @@ public void testContainerWithDestNameNoCorrelation() throws Exception {
260268

261269
gateway.stop();
262270
scheduler.destroy();
271+
exec.shutdownNow();
263272
}
264273

265274
@Test
@@ -282,7 +291,8 @@ public void testContainerWithTemporary() throws Exception {
282291
final AtomicReference<Object> reply = new AtomicReference<>();
283292
final CountDownLatch latch1 = new CountDownLatch(1);
284293
final CountDownLatch latch2 = new CountDownLatch(1);
285-
Executors.newSingleThreadExecutor().execute(() -> {
294+
ExecutorService exec = Executors.newSingleThreadExecutor();
295+
exec.execute(() -> {
286296
latch1.countDown();
287297
try {
288298
reply.set(gateway.handleRequestMessage(new GenericMessage<>("foo")));
@@ -304,6 +314,7 @@ public void testContainerWithTemporary() throws Exception {
304314

305315
gateway.stop();
306316
scheduler.destroy();
317+
exec.shutdownNow();
307318
}
308319

309320
@Test
@@ -324,7 +335,8 @@ public void testContainerWithTemporaryNoCorrelation() throws Exception {
324335
final AtomicReference<Object> reply = new AtomicReference<>();
325336
final CountDownLatch latch1 = new CountDownLatch(1);
326337
final CountDownLatch latch2 = new CountDownLatch(1);
327-
Executors.newSingleThreadExecutor().execute(() -> {
338+
ExecutorService exec = Executors.newSingleThreadExecutor();
339+
exec.execute(() -> {
328340
latch1.countDown();
329341
try {
330342
reply.set(gateway.handleRequestMessage(new GenericMessage<>("foo")));
@@ -349,6 +361,7 @@ public void testContainerWithTemporaryNoCorrelation() throws Exception {
349361

350362
gateway.stop();
351363
scheduler.destroy();
364+
exec.shutdownNow();
352365
}
353366

354367
@Test
@@ -371,7 +384,8 @@ public void testLazyContainerWithDest() throws Exception {
371384
gateway.setReceiveTimeout(20000);
372385
gateway.afterPropertiesSet();
373386
gateway.start();
374-
Executors.newSingleThreadExecutor().execute(() -> {
387+
ExecutorService exec = Executors.newSingleThreadExecutor();
388+
exec.execute(() -> {
375389
JmsTemplate template = new JmsTemplate();
376390
template.setConnectionFactory(getConnectionFactory());
377391
template.setReceiveTimeout(20000);
@@ -393,6 +407,7 @@ public void testLazyContainerWithDest() throws Exception {
393407
gateway.stop();
394408
assertFalse(container.isRunning());
395409
scheduler.destroy();
410+
exec.shutdownNow();
396411
}
397412

398413
private void receiveAndSend(JmsTemplate template) {

spring-integration-jms/src/test/java/org/springframework/integration/jms/PollableJmsChannelTests.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
import java.util.ArrayList;
2929
import java.util.List;
3030
import java.util.concurrent.CountDownLatch;
31+
import java.util.concurrent.ExecutorService;
3132
import java.util.concurrent.Executors;
3233
import java.util.concurrent.TimeUnit;
3334
import java.util.concurrent.atomic.AtomicReference;
@@ -59,7 +60,7 @@
5960
* @author Gunnar Hillert
6061
* @author Artem Bilan
6162
*/
62-
public class PollableJmsChannelTests {
63+
public class PollableJmsChannelTests extends ActiveMQMultiContextTests {
6364

6465
private ActiveMQConnectionFactory connectionFactory;
6566

@@ -188,7 +189,8 @@ public void qos() throws Exception {
188189
assertTrue(sent1);
189190
final AtomicReference<javax.jms.Message> message = new AtomicReference<javax.jms.Message>();
190191
final CountDownLatch latch1 = new CountDownLatch(1);
191-
Executors.newSingleThreadExecutor().execute(() -> {
192+
ExecutorService exec = Executors.newSingleThreadExecutor();
193+
exec.execute(() -> {
192194
message.set(receiver.receive(queue));
193195
latch1.countDown();
194196
});
@@ -201,7 +203,7 @@ public void qos() throws Exception {
201203
final CountDownLatch latch2 = new CountDownLatch(1);
202204
boolean sent2 = channel.send(MessageBuilder.withPayload("foo").setPriority(6).build());
203205
assertTrue(sent2);
204-
Executors.newSingleThreadExecutor().execute(() -> {
206+
exec.execute(() -> {
205207
message.set(receiver.receive(queue));
206208
latch2.countDown();
207209
});
@@ -210,6 +212,7 @@ public void qos() throws Exception {
210212
assertEquals(6, message.get().getJMSPriority());
211213
assertTrue(message.get().getJMSExpiration() <= System.currentTimeMillis() + ttl);
212214
assertTrue(message.get().toString().contains("persistent = false"));
215+
exec.shutdownNow();
213216
}
214217

215218
@Test

spring-integration-jms/src/test/java/org/springframework/integration/jms/SubscribableJmsChannelTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -68,7 +68,7 @@
6868
* @author Artem Bilan
6969
* @since 2.0
7070
*/
71-
public class SubscribableJmsChannelTests {
71+
public class SubscribableJmsChannelTests extends ActiveMQMultiContextTests {
7272

7373
private static final int TIMEOUT = 30000;
7474

0 commit comments

Comments
 (0)