Skip to content

Commit 55d1289

Browse files
mjaggardgaryrussell
authored andcommitted
GH-1436: Async Stop Containers
Resolves #1436 Allow shutdown to be started but waiting to be completed asynchronously Use Task Executor from parent Update abstract parent to allow running to be set to false
1 parent d660edb commit 55d1289

File tree

2 files changed

+75
-58
lines changed

2 files changed

+75
-58
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -109,6 +109,7 @@
109109
* @author Arnaud Cogoluègnes
110110
* @author Artem Bilan
111111
* @author Mohammad Hewedy
112+
* @author Mat Jaggard
112113
*/
113114
public abstract class AbstractMessageListenerContainer extends RabbitAccessor
114115
implements MessageListenerContainer, ApplicationContextAware, BeanNameAware, DisposableBean,
@@ -1331,10 +1332,14 @@ public void shutdown() {
13311332
throw convertRabbitAccessException(ex);
13321333
}
13331334
finally {
1334-
synchronized (this.lifecycleMonitor) {
1335-
this.running = false;
1336-
this.lifecycleMonitor.notifyAll();
1337-
}
1335+
setNotRunning();
1336+
}
1337+
}
1338+
1339+
protected void setNotRunning() {
1340+
synchronized (this.lifecycleMonitor) {
1341+
this.running = false;
1342+
this.lifecycleMonitor.notifyAll();
13381343
}
13391344
}
13401345

@@ -1420,20 +1425,7 @@ public void stop() {
14201425
throw convertRabbitAccessException(ex);
14211426
}
14221427
finally {
1423-
synchronized (this.lifecycleMonitor) {
1424-
this.running = false;
1425-
this.lifecycleMonitor.notifyAll();
1426-
}
1427-
}
1428-
}
1429-
1430-
@Override
1431-
public void stop(Runnable callback) {
1432-
try {
1433-
stop();
1434-
}
1435-
finally {
1436-
callback.run();
1428+
setNotRunning();
14371429
}
14381430
}
14391431

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 64 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -77,6 +77,7 @@
7777
* @author Gary Russell
7878
* @author Artem Bilan
7979
* @author Alex Panchenko
80+
* @author Mat Jaggard
8081
*
8182
* @since 1.0
8283
*/
@@ -605,59 +606,83 @@ private void waitForConsumersToStart(Set<AsyncMessageProcessingConsumer> process
605606

606607
@Override
607608
protected void doShutdown() {
609+
shutdownAndWaitOrCallback(null);
610+
}
611+
612+
@Override
613+
public void stop(Runnable callback) {
614+
shutdownAndWaitOrCallback(() -> {
615+
setNotRunning();
616+
callback.run();
617+
});
618+
}
619+
620+
private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
608621
Thread thread = this.containerStoppingForAbort.get();
609622
if (thread != null && !thread.equals(Thread.currentThread())) {
610623
logger.info("Shutdown ignored - container is stopping due to an aborted consumer");
611624
return;
612625
}
613626

614-
try {
615-
List<BlockingQueueConsumer> canceledConsumers = new ArrayList<>();
616-
synchronized (this.consumersMonitor) {
617-
if (this.consumers != null) {
618-
Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
619-
while (consumerIterator.hasNext()) {
620-
BlockingQueueConsumer consumer = consumerIterator.next();
621-
consumer.basicCancel(true);
622-
canceledConsumers.add(consumer);
623-
consumerIterator.remove();
624-
if (consumer.declaring) {
625-
consumer.thread.interrupt();
626-
}
627+
List<BlockingQueueConsumer> canceledConsumers = new ArrayList<>();
628+
synchronized (this.consumersMonitor) {
629+
if (this.consumers != null) {
630+
Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
631+
while (consumerIterator.hasNext()) {
632+
BlockingQueueConsumer consumer = consumerIterator.next();
633+
consumer.basicCancel(true);
634+
canceledConsumers.add(consumer);
635+
consumerIterator.remove();
636+
if (consumer.declaring) {
637+
consumer.thread.interrupt();
627638
}
628639
}
640+
}
641+
else {
642+
logger.info("Shutdown ignored - container is already stopped");
643+
return;
644+
}
645+
}
646+
647+
Runnable awaitShutdown = () -> {
648+
logger.info("Waiting for workers to finish.");
649+
try {
650+
boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
651+
if (finished) {
652+
logger.info("Successfully waited for workers to finish.");
653+
}
629654
else {
630-
logger.info("Shutdown ignored - container is already stopped");
631-
return;
655+
logger.info("Workers not finished.");
656+
if (isForceCloseChannel()) {
657+
canceledConsumers.forEach(consumer -> {
658+
if (logger.isWarnEnabled()) {
659+
logger.warn("Closing channel for unresponsive consumer: " + consumer);
660+
}
661+
consumer.stop();
662+
});
663+
}
632664
}
633665
}
634-
logger.info("Waiting for workers to finish.");
635-
boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
636-
if (finished) {
637-
logger.info("Successfully waited for workers to finish.");
666+
catch (InterruptedException e) {
667+
Thread.currentThread().interrupt();
668+
logger.warn("Interrupted waiting for workers. Continuing with shutdown.");
638669
}
639-
else {
640-
logger.info("Workers not finished.");
641-
if (isForceCloseChannel()) {
642-
canceledConsumers.forEach(consumer -> {
643-
if (logger.isWarnEnabled()) {
644-
logger.warn("Closing channel for unresponsive consumer: " + consumer);
645-
}
646-
consumer.stop();
647-
});
648-
}
670+
671+
synchronized (this.consumersMonitor) {
672+
this.consumers = null;
673+
this.cancellationLock.deactivate();
649674
}
650-
}
651-
catch (InterruptedException e) {
652-
Thread.currentThread().interrupt();
653-
logger.warn("Interrupted waiting for workers. Continuing with shutdown.");
654-
}
655675

656-
synchronized (this.consumersMonitor) {
657-
this.consumers = null;
658-
this.cancellationLock.deactivate();
676+
if (callback != null) {
677+
callback.run();
678+
}
679+
};
680+
if (callback == null) {
681+
awaitShutdown.run();
682+
}
683+
else {
684+
getTaskExecutor().execute(awaitShutdown);
659685
}
660-
661686
}
662687

663688
private boolean isActive(BlockingQueueConsumer consumer) {

0 commit comments

Comments
 (0)