From 9ec24e448082673b34c69f19e4202c3f844a14ef Mon Sep 17 00:00:00 2001 From: renyansong <534446014@qq.com> Date: Wed, 3 Aug 2022 22:01:58 +0800 Subject: [PATCH 1/4] SimpleMessageListenerContainer.AsyncMessageProcessingConsumer countdown when the container is not active --- .../amqp/rabbit/listener/SimpleMessageListenerContainer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index 201a371dba..d5c9e91d81 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -1192,6 +1192,7 @@ private FatalListenerStartupException getStartupException() throws InterruptedEx @Override // NOSONAR - complexity - many catch blocks public void run() { // NOSONAR - line count if (!isActive()) { + this.start.countDown(); return; } From 5402399cf7c62bf0dd0d70f96f2ce8e737ee8887 Mon Sep 17 00:00:00 2001 From: renyansong <534446014@qq.com> Date: Tue, 9 Aug 2022 15:01:22 +0800 Subject: [PATCH 2/4] test for AsyncMessageProcessingConsumer countdown when the container is not active --- .../SimpleMessageListenerContainerTests.java | 56 ++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java index c5f09aaadc..3565485723 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -86,6 +86,7 @@ import org.springframework.amqp.utils.test.TestUtils; import org.springframework.aop.support.AopUtils; import org.springframework.beans.DirectFieldAccessor; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.test.util.ReflectionTestUtils; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionException; @@ -731,6 +732,30 @@ void filterMppNoDoubleAck() throws Exception { verifyNoMoreInteractions(listener); } + @Test + void testWithConsumerStartWhenNotActive() { + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + Connection connection = mock(Connection.class); + Channel channel = mock(Channel.class); + given(connectionFactory.createConnection()).willReturn(connection); + given(connection.createChannel(false)).willReturn(channel); + + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + // overwrite task execute. shutdown container before task execute. + TestExecutor testExecutor = new TestExecutor(container); + container.setTaskExecutor(testExecutor); + container.start(); + + // then add queue for trigger container shutdown + container.addQueueNames("bar"); + + // valid the 'start' countdown is 0. lastTask is AsyncMessageProcessingConsumer + Runnable lastTask = testExecutor.getLastTask(); + CountDownLatch start = TestUtils.getPropertyValue(lastTask, "start", CountDownLatch.class); + + assertThat(start.getCount()).isEqualTo(0L); + } + private Answer messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container, final boolean cancel, final CountDownLatch latch) { return invocation -> { @@ -784,4 +809,33 @@ protected void doRollback(DefaultTransactionStatus status) throws TransactionExc } + @SuppressWarnings("serial") + private class TestExecutor extends SimpleAsyncTaskExecutor { + + private final SimpleMessageListenerContainer simpleMessageListenerContainer; + + private int shutdownCount = 0; + + private Runnable lastTask = null; + + private TestExecutor(SimpleMessageListenerContainer simpleMessageListenerContainer) { + this.simpleMessageListenerContainer = simpleMessageListenerContainer; + } + + public Runnable getLastTask() { + return lastTask; + } + + @Override + public void execute(Runnable task) { + // skip the first execution + if (++shutdownCount > 1) { + lastTask = task; + // before execute, shutdown the container for test + this.simpleMessageListenerContainer.shutdown(); + } + super.execute(task); + } + } + } From 7205ddfcbe1b0a567921dd9bedeb78d1173f5c68 Mon Sep 17 00:00:00 2001 From: renyansongno1 <45755446+renyansongno1@users.noreply.github.com> Date: Wed, 10 Aug 2022 10:09:02 +0800 Subject: [PATCH 3/4] change TestExecutor for the static and final Co-authored-by: Artem Bilan --- .../rabbit/listener/SimpleMessageListenerContainerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java index 3565485723..857820274e 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java @@ -810,7 +810,7 @@ protected void doRollback(DefaultTransactionStatus status) throws TransactionExc } @SuppressWarnings("serial") - private class TestExecutor extends SimpleAsyncTaskExecutor { + private static final class TestExecutor extends SimpleAsyncTaskExecutor { private final SimpleMessageListenerContainer simpleMessageListenerContainer; From f9b851e2ff2876ff0fdc3f6d54ce73a36934cf15 Mon Sep 17 00:00:00 2001 From: renyansong <534446014@qq.com> Date: Wed, 10 Aug 2022 10:11:06 +0800 Subject: [PATCH 4/4] add author --- .../amqp/rabbit/listener/SimpleMessageListenerContainer.java | 1 + .../rabbit/listener/SimpleMessageListenerContainerTests.java | 1 + 2 files changed, 2 insertions(+) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index d5c9e91d81..4bfef6db04 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -78,6 +78,7 @@ * @author Artem Bilan * @author Alex Panchenko * @author Mat Jaggard + * @author Yansong Ren * * @since 1.0 */ diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java index 3565485723..5df7ea1236 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java @@ -107,6 +107,7 @@ * @author Gary Russell * @author Artem Bilan * @author Mohammad Hewedy + * @author Yansong Ren */ public class SimpleMessageListenerContainerTests {