Skip to content

Commit d44a8d0

Browse files
authored
GH-2482: Fix Async Container Stop
- Stop with callback did not reset `active` flag (both containers) - DMLC did not release the `cancellationLock` - Also restore `@LongRunning`
1 parent 6539f9c commit d44a8d0

File tree

5 files changed

+27
-9
lines changed

5 files changed

+27
-9
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,9 +1276,19 @@ public void initialize() {
12761276
}
12771277

12781278
/**
1279-
* Stop the shared Connection, call {@link #doShutdown()}, and close this container.
1279+
* Stop the shared Connection, call {@link #shutdown(Runnable)}, and close this
1280+
* container.
12801281
*/
12811282
public void shutdown() {
1283+
shutdown(null);
1284+
}
1285+
1286+
/**
1287+
* Stop the shared Connection, call {@link #shutdownAndWaitOrCallback(Runnable)}, and
1288+
* close this container.
1289+
* @param callback an optional {@link Runnable} to call when the stop is complete.
1290+
*/
1291+
public void shutdown(@Nullable Runnable callback) {
12821292
synchronized (this.lifecycleMonitor) {
12831293
if (!isActive()) {
12841294
logger.debug("Shutdown ignored - container is not active already");
@@ -1293,7 +1303,7 @@ public void shutdown() {
12931303

12941304
// Shut down the invokers.
12951305
try {
1296-
doShutdown();
1306+
shutdownAndWaitOrCallback(callback);
12971307
}
12981308
catch (Exception ex) {
12991309
throw convertRabbitAccessException(ex);
@@ -1331,10 +1341,7 @@ protected void doShutdown() {
13311341

13321342
@Override
13331343
public void stop(Runnable callback) {
1334-
shutdownAndWaitOrCallback(() -> {
1335-
setNotRunning();
1336-
callback.run();
1337-
});
1344+
shutdown(callback);
13381345
}
13391346

13401347
protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1338,14 +1338,14 @@ void cancelConsumer(final String eventMessage) {
13381338

13391339
private void finalizeConsumer() {
13401340
closeChannel();
1341-
DirectMessageListenerContainer.this.cancellationLock.release(this);
13421341
consumerRemoved(this);
13431342
}
13441343

13451344
private void closeChannel() {
13461345
RabbitUtils.setPhysicalCloseRequired(getChannel(), true);
13471346
RabbitUtils.closeChannel(getChannel());
13481347
RabbitUtils.closeConnection(this.connection);
1348+
DirectMessageListenerContainer.this.cancellationLock.release(this);
13491349
}
13501350

13511351
@Override

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,7 @@ public void onMessage(Message message) {
801801
}
802802

803803
@Test
804-
void forceStop() {
804+
void forceStop() throws InterruptedException {
805805
CountDownLatch latch1 = new CountDownLatch(1);
806806
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
807807
DirectMessageListenerContainer container = new DirectMessageListenerContainer(cf);
@@ -812,6 +812,7 @@ void forceStop() {
812812
try {
813813
container.setQueueNames(Q3);
814814
container.setForceStop(true);
815+
container.setShutdownTimeout(20_000L);
815816
template.convertAndSend(Q3, "one");
816817
template.convertAndSend(Q3, "two");
817818
template.convertAndSend(Q3, "three");
@@ -828,14 +829,21 @@ void forceStop() {
828829
assertThat(queueInfo).isNotNull();
829830
assertThat(queueInfo.getMessageCount()).isEqualTo(0);
830831
});
832+
CountDownLatch latch2 = new CountDownLatch(1);
833+
long t1 = System.currentTimeMillis();
831834
container.stop(() -> {
835+
latch2.countDown();
832836
});
833837
latch1.countDown();
838+
assertThat(System.currentTimeMillis() - t1).isLessThan(5_000L);
834839
await().untilAsserted(() -> {
835840
QueueInformation queueInfo = admin.getQueueInfo(Q3);
836841
assertThat(queueInfo).isNotNull();
837842
assertThat(queueInfo.getMessageCount()).isEqualTo(4);
838843
});
844+
assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
845+
assertThat(container.isActive()).isFalse();
846+
assertThat(container.isRunning()).isFalse();
839847
}
840848
finally {
841849
container.stop();

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegration2Tests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.springframework.amqp.rabbit.core.RabbitAdmin;
6868
import org.springframework.amqp.rabbit.core.RabbitTemplate;
6969
import org.springframework.amqp.rabbit.junit.BrokerTestUtils;
70+
import org.springframework.amqp.rabbit.junit.LongRunning;
7071
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
7172
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
7273
import org.springframework.amqp.rabbit.listener.adapter.ReplyingMessageListener;
@@ -95,7 +96,7 @@
9596
*/
9697
@RabbitAvailable(queues = { SimpleMessageListenerContainerIntegration2Tests.TEST_QUEUE,
9798
SimpleMessageListenerContainerIntegration2Tests.TEST_QUEUE_1 })
98-
//@LongRunning
99+
@LongRunning
99100
public class SimpleMessageListenerContainerIntegration2Tests {
100101

101102
public static final String TEST_QUEUE = "test.queue.SimpleMessageListenerContainerIntegration2Tests";

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,7 @@ public void testCallbackIsRunOnStopAlsoWhenNoConsumerIsActive() throws Interrupt
437437
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
438438

439439
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
440+
ReflectionTestUtils.setField(container, "active", Boolean.TRUE);
440441

441442
final CountDownLatch countDownLatch = new CountDownLatch(1);
442443
container.stop(countDownLatch::countDown);
@@ -449,6 +450,7 @@ public void testCallbackIsRunOnStopAlsoWhenContainerIsStoppingForAbort() throws
449450

450451
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
451452
ReflectionTestUtils.setField(container, "containerStoppingForAbort", new AtomicReference<>(new Thread()));
453+
ReflectionTestUtils.setField(container, "active", Boolean.TRUE);
452454

453455
final CountDownLatch countDownLatch = new CountDownLatch(1);
454456
container.stop(countDownLatch::countDown);

0 commit comments

Comments
 (0)