Skip to content

Commit 3cb1a74

Browse files
mjaggardgaryrussell
authored andcommitted
GH-1436: Async Stop Containers
Resolves #1436 Allow shutdown to be started but waiting to be completed asynchronously Use Task Executor from parent Update abstract parent to allow running to be set to false
1 parent d4374a6 commit 3cb1a74

File tree

2 files changed

+75
-58
lines changed

2 files changed

+75
-58
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -108,6 +108,7 @@
108108
* @author Arnaud Cogoluègnes
109109
* @author Artem Bilan
110110
* @author Mohammad Hewedy
111+
* @author Mat Jaggard
111112
*/
112113
public abstract class AbstractMessageListenerContainer extends RabbitAccessor
113114
implements MessageListenerContainer, ApplicationContextAware, BeanNameAware, DisposableBean,
@@ -1330,10 +1331,14 @@ public void shutdown() {
13301331
throw convertRabbitAccessException(ex);
13311332
}
13321333
finally {
1333-
synchronized (this.lifecycleMonitor) {
1334-
this.running = false;
1335-
this.lifecycleMonitor.notifyAll();
1336-
}
1334+
setNotRunning();
1335+
}
1336+
}
1337+
1338+
protected void setNotRunning() {
1339+
synchronized (this.lifecycleMonitor) {
1340+
this.running = false;
1341+
this.lifecycleMonitor.notifyAll();
13371342
}
13381343
}
13391344

@@ -1419,20 +1424,7 @@ public void stop() {
14191424
throw convertRabbitAccessException(ex);
14201425
}
14211426
finally {
1422-
synchronized (this.lifecycleMonitor) {
1423-
this.running = false;
1424-
this.lifecycleMonitor.notifyAll();
1425-
}
1426-
}
1427-
}
1428-
1429-
@Override
1430-
public void stop(Runnable callback) {
1431-
try {
1432-
stop();
1433-
}
1434-
finally {
1435-
callback.run();
1427+
setNotRunning();
14361428
}
14371429
}
14381430

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 64 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -77,6 +77,7 @@
7777
* @author Gary Russell
7878
* @author Artem Bilan
7979
* @author Alex Panchenko
80+
* @author Mat Jaggard
8081
*
8182
* @since 1.0
8283
*/
@@ -605,59 +606,83 @@ private void waitForConsumersToStart(Set<AsyncMessageProcessingConsumer> process
605606

606607
@Override
607608
protected void doShutdown() {
609+
shutdownAndWaitOrCallback(null);
610+
}
611+
612+
@Override
613+
public void stop(Runnable callback) {
614+
shutdownAndWaitOrCallback(() -> {
615+
setNotRunning();
616+
callback.run();
617+
});
618+
}
619+
620+
private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
608621
Thread thread = this.containerStoppingForAbort.get();
609622
if (thread != null && !thread.equals(Thread.currentThread())) {
610623
logger.info("Shutdown ignored - container is stopping due to an aborted consumer");
611624
return;
612625
}
613626

614-
try {
615-
List<BlockingQueueConsumer> canceledConsumers = new ArrayList<>();
616-
synchronized (this.consumersMonitor) {
617-
if (this.consumers != null) {
618-
Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
619-
while (consumerIterator.hasNext()) {
620-
BlockingQueueConsumer consumer = consumerIterator.next();
621-
consumer.basicCancel(true);
622-
canceledConsumers.add(consumer);
623-
consumerIterator.remove();
624-
if (consumer.declaring) {
625-
consumer.thread.interrupt();
626-
}
627+
List<BlockingQueueConsumer> canceledConsumers = new ArrayList<>();
628+
synchronized (this.consumersMonitor) {
629+
if (this.consumers != null) {
630+
Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
631+
while (consumerIterator.hasNext()) {
632+
BlockingQueueConsumer consumer = consumerIterator.next();
633+
consumer.basicCancel(true);
634+
canceledConsumers.add(consumer);
635+
consumerIterator.remove();
636+
if (consumer.declaring) {
637+
consumer.thread.interrupt();
627638
}
628639
}
640+
}
641+
else {
642+
logger.info("Shutdown ignored - container is already stopped");
643+
return;
644+
}
645+
}
646+
647+
Runnable awaitShutdown = () -> {
648+
logger.info("Waiting for workers to finish.");
649+
try {
650+
boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
651+
if (finished) {
652+
logger.info("Successfully waited for workers to finish.");
653+
}
629654
else {
630-
logger.info("Shutdown ignored - container is already stopped");
631-
return;
655+
logger.info("Workers not finished.");
656+
if (isForceCloseChannel()) {
657+
canceledConsumers.forEach(consumer -> {
658+
if (logger.isWarnEnabled()) {
659+
logger.warn("Closing channel for unresponsive consumer: " + consumer);
660+
}
661+
consumer.stop();
662+
});
663+
}
632664
}
633665
}
634-
logger.info("Waiting for workers to finish.");
635-
boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
636-
if (finished) {
637-
logger.info("Successfully waited for workers to finish.");
666+
catch (InterruptedException e) {
667+
Thread.currentThread().interrupt();
668+
logger.warn("Interrupted waiting for workers. Continuing with shutdown.");
638669
}
639-
else {
640-
logger.info("Workers not finished.");
641-
if (isForceCloseChannel()) {
642-
canceledConsumers.forEach(consumer -> {
643-
if (logger.isWarnEnabled()) {
644-
logger.warn("Closing channel for unresponsive consumer: " + consumer);
645-
}
646-
consumer.stop();
647-
});
648-
}
670+
671+
synchronized (this.consumersMonitor) {
672+
this.consumers = null;
673+
this.cancellationLock.deactivate();
649674
}
650-
}
651-
catch (InterruptedException e) {
652-
Thread.currentThread().interrupt();
653-
logger.warn("Interrupted waiting for workers. Continuing with shutdown.");
654-
}
655675

656-
synchronized (this.consumersMonitor) {
657-
this.consumers = null;
658-
this.cancellationLock.deactivate();
676+
if (callback != null) {
677+
callback.run();
678+
}
679+
};
680+
if (callback == null) {
681+
awaitShutdown.run();
682+
}
683+
else {
684+
getTaskExecutor().execute(awaitShutdown);
659685
}
660-
661686
}
662687

663688
private boolean isActive(BlockingQueueConsumer consumer) {

0 commit comments

Comments
 (0)