Skip to content

Commit ee681bd

Browse files
authored
GH-2452: Redeclare manual entities automatically (#2453)
Fixes #2452 The queue might be declared manually by an `AmqpAdmin` and its name simply can be used in the listener container. When we lose a connection, we would like to have just deleted manual anonymous queue to be redeclared. * Introduce `AmqpAdmin.getManualDeclarables()` * Check for queue name presence in the `manualDeclarables` as well from a `AbstractMessageListenerContainer.attemptDeclarations()` * Modify `DirectMessageListenerContainerIntegrationTests.testRecoverDeletedQueueGuts()` to deal with manual declaration and auto-recovery from the container **Cherry-pick to `2.4.x`**
1 parent 5ccf51f commit ee681bd

File tree

4 files changed

+47
-19
lines changed

4 files changed

+47
-19
lines changed

spring-amqp/src/main/java/org/springframework/amqp/core/AmqpAdmin.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.amqp.core;
1818

19+
import java.util.Collections;
20+
import java.util.Map;
1921
import java.util.Properties;
2022

2123
import org.springframework.lang.Nullable;
@@ -27,6 +29,7 @@
2729
* @author Mark Pollack
2830
* @author Dave Syer
2931
* @author Gary Russell
32+
* @author Artem Bilan
3033
*/
3134
public interface AmqpAdmin {
3235

@@ -126,6 +129,15 @@ public interface AmqpAdmin {
126129
@Nullable
127130
QueueInformation getQueueInfo(String queueName);
128131

132+
/**
133+
* Return the manually declared AMQP objects.
134+
* @return the manually declared AMQP objects.
135+
* @since 2.4.13
136+
*/
137+
default Map<String, Declarable> getManualDeclarables() {
138+
return Collections.emptyMap();
139+
}
140+
129141
/**
130142
* Initialize the admin.
131143
* @since 2.1

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitAdmin.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -729,6 +729,16 @@ public void resetAllManualDeclarations() {
729729
this.manualDeclarables.clear();
730730
}
731731

732+
/**
733+
* Return the manually declared AMQP objects.
734+
* @return the manually declared AMQP objects.
735+
* @since 2.4.13
736+
*/
737+
@Override
738+
public Map<String, Declarable> getManualDeclarables() {
739+
return Collections.unmodifiableMap(this.manualDeclarables);
740+
}
741+
732742
private void processDeclarables(Collection<Exchange> contextExchanges, Collection<Queue> contextQueues,
733743
Collection<Binding> contextBindings) {
734744

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,7 @@ protected final String getBeanName() {
618618
return this.beanName;
619619
}
620620

621+
@Nullable
621622
protected final ApplicationContext getApplicationContext() {
622623
return this.applicationContext;
623624
}
@@ -1975,14 +1976,20 @@ protected synchronized void redeclareElementsIfNecessary() {
19751976
}
19761977

19771978
private void attemptDeclarations(AmqpAdmin admin) {
1978-
ApplicationContext context = this.getApplicationContext();
1979+
ApplicationContext context = getApplicationContext();
19791980
if (context != null) {
19801981
Set<String> queueNames = getQueueNamesAsSet();
1981-
Collection<Queue> queueBeans = new LinkedHashSet<>(
1982+
Collection<Queue> queues = new LinkedHashSet<>(
19821983
context.getBeansOfType(Queue.class, false, false).values());
19831984
Map<String, Declarables> declarables = context.getBeansOfType(Declarables.class, false, false);
1984-
declarables.values().forEach(dec -> queueBeans.addAll(dec.getDeclarablesByType(Queue.class)));
1985-
for (Queue queue : queueBeans) {
1985+
declarables.values().forEach(dec -> queues.addAll(dec.getDeclarablesByType(Queue.class)));
1986+
admin.getManualDeclarables()
1987+
.values()
1988+
.stream()
1989+
.filter(Queue.class::isInstance)
1990+
.map(Queue.class::cast)
1991+
.forEach(queues::add);
1992+
for (Queue queue : queues) {
19861993
if (isMismatchedQueuesFatal() || (queueNames.contains(queue.getName()) &&
19871994
admin.getQueueProperties(queue.getName()) == null)) {
19881995
if (logger.isDebugEnabled()) {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -539,16 +539,20 @@ public void testRecoverDeletedQueueNoAutoDeclare(BrokerRunningSupport brokerRunn
539539
private void testRecoverDeletedQueueGuts(boolean autoDeclare, BrokerRunningSupport brokerRunning) throws Exception {
540540
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
541541
DirectMessageListenerContainer container = new DirectMessageListenerContainer(cf);
542+
GenericApplicationContext context = new GenericApplicationContext();
543+
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf);
542544
if (autoDeclare) {
543-
GenericApplicationContext context = new GenericApplicationContext();
544545
context.getBeanFactory().registerSingleton("foo", new Queue(Q1));
545-
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf);
546-
rabbitAdmin.setApplicationContext(context);
547-
context.getBeanFactory().registerSingleton("admin", rabbitAdmin);
548-
context.refresh();
549-
container.setApplicationContext(context);
550546
}
551-
container.setAutoDeclare(autoDeclare);
547+
else {
548+
rabbitAdmin.setRedeclareManualDeclarations(true);
549+
rabbitAdmin.declareQueue(new Queue(Q1));
550+
}
551+
rabbitAdmin.setApplicationContext(context);
552+
context.refresh();
553+
container.setApplicationContext(context);
554+
555+
container.setAmqpAdmin(rabbitAdmin);
552556
container.setQueueNames(Q1, Q2);
553557
container.setConsumersPerQueue(2);
554558
container.setConsumersPerQueue(2);
@@ -565,11 +569,6 @@ private void testRecoverDeletedQueueGuts(boolean autoDeclare, BrokerRunningSuppo
565569
assertThat(consumersOnQueue(Q2, 2)).isTrue();
566570
assertThat(activeConsumerCount(container, 2)).isTrue();
567571
assertThat(restartConsumerCount(container, 2)).isTrue();
568-
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf);
569-
if (!autoDeclare) {
570-
Thread.sleep(2000);
571-
rabbitAdmin.declareQueue(new Queue(Q1));
572-
}
573572
assertThat(consumersOnQueue(Q1, 2)).isTrue();
574573
assertThat(consumersOnQueue(Q2, 2)).isTrue();
575574
assertThat(activeConsumerCount(container, 4)).isTrue();

0 commit comments

Comments
 (0)