diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/AmqpAdmin.java b/spring-amqp/src/main/java/org/springframework/amqp/core/AmqpAdmin.java index 62d8566d34..91156cbd4c 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/core/AmqpAdmin.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/AmqpAdmin.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.amqp.core; +import java.util.Collections; +import java.util.Map; import java.util.Properties; import org.springframework.lang.Nullable; @@ -27,6 +29,7 @@ * @author Mark Pollack * @author Dave Syer * @author Gary Russell + * @author Artem Bilan */ public interface AmqpAdmin { @@ -126,6 +129,15 @@ public interface AmqpAdmin { @Nullable QueueInformation getQueueInfo(String queueName); + /** + * Return the manually declared AMQP objects. + * @return the manually declared AMQP objects. + * @since 2.4.13 + */ + default Map getManualDeclarables() { + return Collections.emptyMap(); + } + /** * Initialize the admin. * @since 2.1 diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitAdmin.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitAdmin.java index 3aa2684720..ed1b3bf846 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitAdmin.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitAdmin.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -729,6 +729,16 @@ public void resetAllManualDeclarations() { this.manualDeclarables.clear(); } + /** + * Return the manually declared AMQP objects. + * @return the manually declared AMQP objects. + * @since 2.4.13 + */ + @Override + public Map getManualDeclarables() { + return Collections.unmodifiableMap(this.manualDeclarables); + } + private void processDeclarables(Collection contextExchanges, Collection contextQueues, Collection contextBindings) { diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java index 0426a6d35a..7bf4483ddd 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java @@ -618,6 +618,7 @@ protected final String getBeanName() { return this.beanName; } + @Nullable protected final ApplicationContext getApplicationContext() { return this.applicationContext; } @@ -1975,14 +1976,20 @@ protected synchronized void redeclareElementsIfNecessary() { } private void attemptDeclarations(AmqpAdmin admin) { - ApplicationContext context = this.getApplicationContext(); + ApplicationContext context = getApplicationContext(); if (context != null) { Set queueNames = getQueueNamesAsSet(); - Collection queueBeans = new LinkedHashSet<>( + Collection queues = new LinkedHashSet<>( context.getBeansOfType(Queue.class, false, false).values()); Map declarables = context.getBeansOfType(Declarables.class, false, false); - declarables.values().forEach(dec -> queueBeans.addAll(dec.getDeclarablesByType(Queue.class))); - for (Queue queue : queueBeans) { + declarables.values().forEach(dec -> queues.addAll(dec.getDeclarablesByType(Queue.class))); + admin.getManualDeclarables() + .values() + .stream() + .filter(Queue.class::isInstance) + .map(Queue.class::cast) + .forEach(queues::add); + for (Queue queue : queues) { if (isMismatchedQueuesFatal() || (queueNames.contains(queue.getName()) && admin.getQueueProperties(queue.getName()) == null)) { if (logger.isDebugEnabled()) { diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java index 27b1824b94..71e1403ddf 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -539,16 +539,20 @@ public void testRecoverDeletedQueueNoAutoDeclare(BrokerRunningSupport brokerRunn private void testRecoverDeletedQueueGuts(boolean autoDeclare, BrokerRunningSupport brokerRunning) throws Exception { CachingConnectionFactory cf = new CachingConnectionFactory("localhost"); DirectMessageListenerContainer container = new DirectMessageListenerContainer(cf); + GenericApplicationContext context = new GenericApplicationContext(); + RabbitAdmin rabbitAdmin = new RabbitAdmin(cf); if (autoDeclare) { - GenericApplicationContext context = new GenericApplicationContext(); context.getBeanFactory().registerSingleton("foo", new Queue(Q1)); - RabbitAdmin rabbitAdmin = new RabbitAdmin(cf); - rabbitAdmin.setApplicationContext(context); - context.getBeanFactory().registerSingleton("admin", rabbitAdmin); - context.refresh(); - container.setApplicationContext(context); } - container.setAutoDeclare(autoDeclare); + else { + rabbitAdmin.setRedeclareManualDeclarations(true); + rabbitAdmin.declareQueue(new Queue(Q1)); + } + rabbitAdmin.setApplicationContext(context); + context.refresh(); + container.setApplicationContext(context); + + container.setAmqpAdmin(rabbitAdmin); container.setQueueNames(Q1, Q2); container.setConsumersPerQueue(2); container.setConsumersPerQueue(2); @@ -565,11 +569,6 @@ private void testRecoverDeletedQueueGuts(boolean autoDeclare, BrokerRunningSuppo assertThat(consumersOnQueue(Q2, 2)).isTrue(); assertThat(activeConsumerCount(container, 2)).isTrue(); assertThat(restartConsumerCount(container, 2)).isTrue(); - RabbitAdmin rabbitAdmin = new RabbitAdmin(cf); - if (!autoDeclare) { - Thread.sleep(2000); - rabbitAdmin.declareQueue(new Queue(Q1)); - } assertThat(consumersOnQueue(Q1, 2)).isTrue(); assertThat(consumersOnQueue(Q2, 2)).isTrue(); assertThat(activeConsumerCount(container, 4)).isTrue();