Skip to content

Commit 1171c00

Browse files
artembilangaryrussell
authored andcommitted
GH-1134: Fix stop(Runnable) usage
Fixes #1134 We always have to call `callback` in the `stop(Runnable)` implementation independently of the component state **Cherry-pick until 1.1.x to support Spring Boot 1.5.x**
1 parent 96352b1 commit 1171c00

File tree

2 files changed

+16
-8
lines changed

2 files changed

+16
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -248,16 +248,21 @@ public void stop() {
248248

249249
@Override
250250
public void stop(Runnable callback) {
251-
Collection<MessageListenerContainer> listenerContainers = getListenerContainers();
252-
AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainers.size(), callback);
253-
for (MessageListenerContainer listenerContainer : listenerContainers) {
254-
if (listenerContainer.isRunning()) {
255-
listenerContainer.stop(aggregatingCallback);
256-
}
257-
else {
258-
aggregatingCallback.run();
251+
Collection<MessageListenerContainer> listenerContainersToStop = getListenerContainers();
252+
if (listenerContainersToStop.size() > 0) {
253+
AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainersToStop.size(), callback);
254+
for (MessageListenerContainer listenerContainer : listenerContainersToStop) {
255+
if (listenerContainer.isRunning()) {
256+
listenerContainer.stop(aggregatingCallback);
257+
}
258+
else {
259+
aggregatingCallback.run();
260+
}
259261
}
260262
}
263+
else {
264+
callback.run();
265+
}
261266
}
262267

263268
@Override

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,9 @@ public void stop(Runnable callback) {
326326
if (isRunning()) {
327327
doStop(callback);
328328
}
329+
else {
330+
callback.run();
331+
}
329332
}
330333
}
331334

0 commit comments

Comments
 (0)