Skip to content

Commit 8aa91d1

Browse files
garyrussellartembilan
authored andcommitted
Shutdown test executors in -core
- don't use `ExecutorService` as `@Bean` - spring can't stop them - also add log adjuster for ftp test that sometimes fails
1 parent d890f14 commit 8aa91d1

20 files changed

+137
-62
lines changed

spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -77,7 +77,8 @@ protected void afterRelease(MessageGroup group, Collection<Message<?>> completed
7777
/*
7878
* Runs "reap" when group 'bar' is in completion
7979
*/
80-
Executors.newSingleThreadExecutor().execute(() -> {
80+
ExecutorService exec = Executors.newSingleThreadExecutor();
81+
exec.execute(() -> {
8182
try {
8283
waitReapStartLatch.await(10, TimeUnit.SECONDS);
8384
}
@@ -145,6 +146,7 @@ protected void afterRelease(MessageGroup group, Collection<Message<?>> completed
145146
assertEquals(1, ((MessageGroup) outputMessages.get(1).getPayload()).size()); // 'qux'
146147

147148
assertNull(discards.receive(0));
149+
exec.shutdownNow();
148150
}
149151

150152
@Test // INT-2833

spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ public void testShouldNotSendPartialResultOnTimeoutByDefault() throws Interrupte
272272
Message<?> message = createMessage(3, "ABC", 2, 1, replyChannel, null);
273273
this.aggregator.handleMessage(message);
274274
this.store.expireMessageGroups(-10000);
275-
Message<?> reply = replyChannel.receive(1000);
275+
Message<?> reply = replyChannel.receive(0);
276276
assertNull("No message should have been sent normally", reply);
277277
Message<?> discardedMessage = discardChannel.receive(1000);
278278
assertNotNull("A message should have been discarded", discardedMessage);

spring-integration-core/src/test/java/org/springframework/integration/aggregator/BarrierMessageHandlerTests.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2016 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -132,6 +132,7 @@ public void testRequestBeforeReply() throws Exception {
132132
assertEquals("bar", result.get(1));
133133
assertEquals(0, suspensions.size());
134134
assertEquals(0, inProcess.size());
135+
exec.shutdownNow();
135136
}
136137

137138
@Test
@@ -141,8 +142,8 @@ public void testReplyBeforeRequest() throws Exception {
141142
handler.setOutputChannel(outputChannel);
142143
handler.setBeanFactory(mock(BeanFactory.class));
143144
handler.afterPropertiesSet();
144-
Executors.newSingleThreadExecutor()
145-
.execute(() -> handler.trigger(MessageBuilder.withPayload("bar").setCorrelationId("foo").build()));
145+
ExecutorService exec = Executors.newSingleThreadExecutor();
146+
exec.execute(() -> handler.trigger(MessageBuilder.withPayload("bar").setCorrelationId("foo").build()));
146147
Map<?, ?> suspensions = TestUtils.getPropertyValue(handler, "suspensions", Map.class);
147148
int n = 0;
148149
while (n++ < 100 && suspensions.size() == 0) {
@@ -156,6 +157,7 @@ public void testReplyBeforeRequest() throws Exception {
156157
assertEquals("foo", result.get(0));
157158
assertEquals("bar", result.get(1));
158159
assertEquals(0, suspensions.size());
160+
exec.shutdownNow();
159161
}
160162

161163
@Test
@@ -169,7 +171,8 @@ public void testLateReply() throws Exception {
169171
handler.setBeanFactory(mock(BeanFactory.class));
170172
handler.afterPropertiesSet();
171173
final CountDownLatch latch = new CountDownLatch(1);
172-
Executors.newSingleThreadExecutor().execute(() -> {
174+
ExecutorService exec = Executors.newSingleThreadExecutor();
175+
exec.execute(() -> {
173176
handler.handleMessage(MessageBuilder.withPayload("foo").setCorrelationId("foo").build());
174177
latch.countDown();
175178
});
@@ -190,6 +193,7 @@ public void testLateReply() throws Exception {
190193
assertSame(discard, triggerMessage);
191194
handler.handleMessage(MessageBuilder.withPayload("foo").setCorrelationId("foo").build());
192195
assertEquals(0, suspensions.size());
196+
exec.shutdownNow();
193197
}
194198

195199
@Test
@@ -218,7 +222,8 @@ public void testExceptionReply() throws Exception {
218222
handler.afterPropertiesSet();
219223
final AtomicReference<Exception> exception = new AtomicReference<Exception>();
220224
final CountDownLatch latch = new CountDownLatch(1);
221-
Executors.newSingleThreadExecutor().execute(() -> {
225+
ExecutorService exec = Executors.newSingleThreadExecutor();
226+
exec.execute(() -> {
222227
try {
223228
handler.handleMessage(MessageBuilder.withPayload("foo").setCorrelationId("foo").build());
224229
}
@@ -238,6 +243,7 @@ public void testExceptionReply() throws Exception {
238243
assertTrue(latch.await(10, TimeUnit.SECONDS));
239244
assertSame(exc, exception.get().getCause());
240245
assertEquals(0, suspensions.size());
246+
exec.shutdownNow();
241247
}
242248

243249
@Test

spring-integration-core/src/test/java/org/springframework/integration/aggregator/CorrelatingMessageBarrierTests.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ConcurrentHashMap;
2828
import java.util.concurrent.ConcurrentMap;
2929
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.ExecutorService;
3031
import java.util.concurrent.Executors;
3132
import java.util.concurrent.Semaphore;
3233

@@ -89,8 +90,9 @@ public void shouldNotDropMessageOrBlockSendingThread() {
8990
barrier.setReleaseStrategy(trackingReleaseStrategy);
9091
final CountDownLatch start = new CountDownLatch(1);
9192
final CountDownLatch sent = new CountDownLatch(200);
93+
ExecutorService exec = Executors.newSingleThreadExecutor();
9294
for (int i = 0; i < 200; i++) {
93-
sendAsynchronously(barrier, testMessage(), start, sent);
95+
sendAsynchronously(barrier, testMessage(), start, sent, exec);
9496
}
9597
start.countDown();
9698

@@ -106,10 +108,12 @@ public void shouldNotDropMessageOrBlockSendingThread() {
106108
trackingReleaseStrategy.release("foo");
107109
assertThat((barrier.receive()), is(notNullValue()));
108110
}
111+
exec.shutdownNow();
109112
}
110113

111-
private void sendAsynchronously(final MessageHandler handler, final Message<Object> message, final CountDownLatch start, final CountDownLatch sent) {
112-
Executors.newSingleThreadExecutor().execute(() -> {
114+
private void sendAsynchronously(final MessageHandler handler, final Message<Object> message,
115+
final CountDownLatch start, final CountDownLatch sent, ExecutorService exec) {
116+
exec.execute(() -> {
113117
try {
114118
start.await();
115119
}
@@ -119,7 +123,6 @@ private void sendAsynchronously(final MessageHandler handler, final Message<Obje
119123
handler.handleMessage(message);
120124
sent.countDown();
121125
});
122-
123126
}
124127

125128
private Message<Object> testMessage() {

spring-integration-core/src/test/java/org/springframework/integration/aggregator/CorrelatingMessageHandlerTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
import java.util.ArrayList;
2929
import java.util.List;
3030
import java.util.concurrent.CountDownLatch;
31+
import java.util.concurrent.ExecutorService;
3132
import java.util.concurrent.Executors;
3233
import java.util.concurrent.TimeUnit;
3334

@@ -146,7 +147,8 @@ public void shouldNotPruneWhileCompleting() throws Exception {
146147
handler.handleMessage(message1);
147148
bothMessagesHandled.countDown();
148149
storedMessages.add(message1);
149-
Executors.newSingleThreadExecutor().submit(() -> {
150+
ExecutorService exec = Executors.newSingleThreadExecutor();
151+
exec.submit(() -> {
150152
handler.handleMessage(message2);
151153
storedMessages.add(message2);
152154
bothMessagesHandled.countDown();
@@ -155,6 +157,7 @@ public void shouldNotPruneWhileCompleting() throws Exception {
155157
assertTrue(bothMessagesHandled.await(10, TimeUnit.SECONDS));
156158

157159
assertEquals(0, store.expireMessageGroups(10000));
160+
exec.shutdownNow();
158161
}
159162

160163
@Test

spring-integration-core/src/test/java/org/springframework/integration/aggregator/scenarios/AggregatorWithCustomReleaseStrategyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public void validateSequenceSizeHasNoAffectCustomCorrelator() throws Exception {
9292
});
9393
}
9494

95-
assertTrue("Sends failed to complete: " + latch.getCount() + " remain", latch.await(60, TimeUnit.SECONDS));
95+
assertTrue("Sends failed to complete: " + latch.getCount() + " remain", latch.await(120, TimeUnit.SECONDS));
9696

9797
Message<?> message = resultChannel.receive(1000);
9898
int counter = 0;

spring-integration-core/src/test/java/org/springframework/integration/channel/CGLibProxyChannelTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,8 +20,7 @@
2020
import static org.junit.Assert.assertTrue;
2121

2222
import java.util.concurrent.CountDownLatch;
23-
import java.util.concurrent.ExecutorService;
24-
import java.util.concurrent.Executors;
23+
import java.util.concurrent.Executor;
2524
import java.util.concurrent.TimeUnit;
2625
import java.util.concurrent.atomic.AtomicReference;
2726

@@ -36,6 +35,7 @@
3635
import org.springframework.messaging.Message;
3736
import org.springframework.messaging.MessageChannel;
3837
import org.springframework.messaging.support.GenericMessage;
38+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
3939
import org.springframework.test.annotation.DirtiesContext;
4040
import org.springframework.test.context.junit4.SpringRunner;
4141

@@ -128,8 +128,8 @@ public ProxyFactoryBean publishSubscribeChannel() {
128128
}
129129

130130
@Bean
131-
public ExecutorService executor() {
132-
return Executors.newCachedThreadPool();
131+
public Executor executor() {
132+
return new ThreadPoolTaskExecutor();
133133
}
134134

135135
private ProxyFactoryBean createProxyFactory(MessageChannel channel) {

spring-integration-core/src/test/java/org/springframework/integration/channel/ExecutorChannelTests.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.CountDownLatch;
3333
import java.util.concurrent.Executor;
3434
import java.util.concurrent.Executors;
35+
import java.util.concurrent.ScheduledExecutorService;
3536
import java.util.concurrent.TimeUnit;
3637
import java.util.concurrent.atomic.AtomicInteger;
3738

@@ -76,8 +77,9 @@ public void verifyDifferentThread() throws Exception {
7677
@Test
7778
public void roundRobinLoadBalancing() throws Exception {
7879
int numberOfMessages = 11;
79-
ConcurrentTaskExecutor taskExecutor = new ConcurrentTaskExecutor(
80-
Executors.newSingleThreadScheduledExecutor(new CustomizableThreadFactory("test-")));
80+
ScheduledExecutorService exec = Executors
81+
.newSingleThreadScheduledExecutor(new CustomizableThreadFactory("test-"));
82+
ConcurrentTaskExecutor taskExecutor = new ConcurrentTaskExecutor(exec);
8183
ExecutorChannel channel = new ExecutorChannel(
8284
taskExecutor, new RoundRobinLoadBalancingStrategy());
8385
CountDownLatch latch = new CountDownLatch(numberOfMessages);
@@ -104,13 +106,15 @@ public void roundRobinLoadBalancing() throws Exception {
104106
assertEquals(4, handler1.count.get());
105107
assertEquals(4, handler2.count.get());
106108
assertEquals(3, handler3.count.get());
109+
exec.shutdownNow();
107110
}
108111

109112
@Test
110113
public void verifyFailoverWithLoadBalancing() throws Exception {
111114
int numberOfMessages = 11;
112-
ConcurrentTaskExecutor taskExecutor = new ConcurrentTaskExecutor(
113-
Executors.newSingleThreadScheduledExecutor(new CustomizableThreadFactory("test-")));
115+
ScheduledExecutorService exec = Executors
116+
.newSingleThreadScheduledExecutor(new CustomizableThreadFactory("test-"));
117+
ConcurrentTaskExecutor taskExecutor = new ConcurrentTaskExecutor(exec);
114118
ExecutorChannel channel = new ExecutorChannel(
115119
taskExecutor, new RoundRobinLoadBalancingStrategy());
116120
CountDownLatch latch = new CountDownLatch(numberOfMessages);
@@ -138,13 +142,15 @@ public void verifyFailoverWithLoadBalancing() throws Exception {
138142
assertEquals(0, handler2.count.get());
139143
assertEquals(4, handler1.count.get());
140144
assertEquals(7, handler3.count.get());
145+
exec.shutdownNow();
141146
}
142147

143148
@Test
144149
public void verifyFailoverWithoutLoadBalancing() throws Exception {
145150
int numberOfMessages = 11;
146-
ConcurrentTaskExecutor taskExecutor = new ConcurrentTaskExecutor(
147-
Executors.newSingleThreadScheduledExecutor(new CustomizableThreadFactory("test-")));
151+
ScheduledExecutorService exec = Executors
152+
.newSingleThreadScheduledExecutor(new CustomizableThreadFactory("test-"));
153+
ConcurrentTaskExecutor taskExecutor = new ConcurrentTaskExecutor(exec);
148154
ExecutorChannel channel = new ExecutorChannel(taskExecutor, null);
149155
CountDownLatch latch = new CountDownLatch(numberOfMessages);
150156
TestHandler handler1 = new TestHandler(latch);
@@ -169,6 +175,7 @@ public void verifyFailoverWithoutLoadBalancing() throws Exception {
169175
assertEquals(0, handler1.count.get());
170176
assertEquals(0, handler3.count.get());
171177
assertEquals(numberOfMessages, handler2.count.get());
178+
exec.shutdownNow();
172179
}
173180

174181
@Test

spring-integration-core/src/test/java/org/springframework/integration/channel/MixedDispatcherConfigurationScenarioTests.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.TimeUnit;
3333
import java.util.concurrent.atomic.AtomicBoolean;
3434

35+
import org.junit.After;
3536
import org.junit.Before;
3637
import org.junit.Test;
3738
import org.junit.runner.RunWith;
@@ -40,7 +41,7 @@
4041
import org.mockito.Mockito;
4142
import org.mockito.junit.MockitoJUnitRunner;
4243

43-
import org.springframework.context.ApplicationContext;
44+
import org.springframework.context.ConfigurableApplicationContext;
4445
import org.springframework.context.support.ClassPathXmlApplicationContext;
4546
import org.springframework.integration.MessageRejectedException;
4647
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
@@ -69,7 +70,7 @@ public class MixedDispatcherConfigurationScenarioTests {
6970
@Mock
7071
private List<Exception> exceptionRegistry;
7172

72-
private ApplicationContext ac;
73+
private ConfigurableApplicationContext ac;
7374

7475
@Mock
7576
private MessageHandler handlerA;
@@ -99,6 +100,12 @@ public void initialize() throws Exception {
99100
failed = new AtomicBoolean(false);
100101
}
101102

103+
@After
104+
public void tearDown() {
105+
this.executor.shutdownNow();
106+
this.ac.close();
107+
}
108+
102109
@Test
103110
public void noFailoverNoLoadBalancing() {
104111
DirectChannel channel = (DirectChannel) ac.getBean("noLoadBalancerNoFailover");

spring-integration-core/src/test/java/org/springframework/integration/channel/PriorityChannelTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,7 +24,6 @@
2424

2525
import java.util.Comparator;
2626
import java.util.concurrent.CountDownLatch;
27-
import java.util.concurrent.Executor;
2827
import java.util.concurrent.ExecutorService;
2928
import java.util.concurrent.Executors;
3029
import java.util.concurrent.TimeUnit;
@@ -38,6 +37,7 @@
3837

3938
/**
4039
* @author Mark Fisher
40+
* @author Gary Russell
4141
*/
4242
public class PriorityChannelTests {
4343

@@ -246,7 +246,7 @@ public void testTimeoutDoesNotElapse() throws InterruptedException {
246246
final PriorityChannel channel = new PriorityChannel(1);
247247
final AtomicBoolean sentSecondMessage = new AtomicBoolean(false);
248248
final CountDownLatch latch = new CountDownLatch(1);
249-
Executor executor = Executors.newSingleThreadScheduledExecutor();
249+
ExecutorService executor = Executors.newSingleThreadScheduledExecutor();
250250
channel.send(new GenericMessage<String>("test-1"));
251251
executor.execute(() -> {
252252
sentSecondMessage.set(channel.send(new GenericMessage<String>("test-2"), 3000));
@@ -262,6 +262,7 @@ public void testTimeoutDoesNotElapse() throws InterruptedException {
262262
Message<?> message2 = channel.receive();
263263
assertNotNull(message2);
264264
assertEquals("test-2", message2.getPayload());
265+
executor.shutdownNow();
265266
}
266267

267268
@Test

0 commit comments

Comments
 (0)