Skip to content

GH-2452: Redeclare manual entities automatically #2453

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.amqp.core;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;

import org.springframework.lang.Nullable;
Expand All @@ -27,6 +29,7 @@
* @author Mark Pollack
* @author Dave Syer
* @author Gary Russell
* @author Artem Bilan
*/
public interface AmqpAdmin {

Expand Down Expand Up @@ -126,6 +129,15 @@ public interface AmqpAdmin {
@Nullable
QueueInformation getQueueInfo(String queueName);

/**
* Return the manually declared AMQP objects.
* @return the manually declared AMQP objects.
* @since 2.4.13
*/
default Map<String, Declarable> getManualDeclarables() {
return Collections.emptyMap();
}

/**
* Initialize the admin.
* @since 2.1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -729,6 +729,16 @@ public void resetAllManualDeclarations() {
this.manualDeclarables.clear();
}

/**
* Return the manually declared AMQP objects.
* @return the manually declared AMQP objects.
* @since 2.4.13
*/
@Override
public Map<String, Declarable> getManualDeclarables() {
return Collections.unmodifiableMap(this.manualDeclarables);
}

private void processDeclarables(Collection<Exchange> contextExchanges, Collection<Queue> contextQueues,
Collection<Binding> contextBindings) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ protected final String getBeanName() {
return this.beanName;
}

@Nullable
protected final ApplicationContext getApplicationContext() {
return this.applicationContext;
}
Expand Down Expand Up @@ -1975,14 +1976,20 @@ protected synchronized void redeclareElementsIfNecessary() {
}

private void attemptDeclarations(AmqpAdmin admin) {
ApplicationContext context = this.getApplicationContext();
ApplicationContext context = getApplicationContext();
if (context != null) {
Set<String> queueNames = getQueueNamesAsSet();
Collection<Queue> queueBeans = new LinkedHashSet<>(
Collection<Queue> queues = new LinkedHashSet<>(
context.getBeansOfType(Queue.class, false, false).values());
Map<String, Declarables> declarables = context.getBeansOfType(Declarables.class, false, false);
declarables.values().forEach(dec -> queueBeans.addAll(dec.getDeclarablesByType(Queue.class)));
for (Queue queue : queueBeans) {
declarables.values().forEach(dec -> queues.addAll(dec.getDeclarablesByType(Queue.class)));
admin.getManualDeclarables()
.values()
.stream()
.filter(Queue.class::isInstance)
.map(Queue.class::cast)
.forEach(queues::add);
for (Queue queue : queues) {
if (isMismatchedQueuesFatal() || (queueNames.contains(queue.getName()) &&
admin.getQueueProperties(queue.getName()) == null)) {
if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -539,16 +539,20 @@ public void testRecoverDeletedQueueNoAutoDeclare(BrokerRunningSupport brokerRunn
private void testRecoverDeletedQueueGuts(boolean autoDeclare, BrokerRunningSupport brokerRunning) throws Exception {
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
DirectMessageListenerContainer container = new DirectMessageListenerContainer(cf);
GenericApplicationContext context = new GenericApplicationContext();
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf);
if (autoDeclare) {
GenericApplicationContext context = new GenericApplicationContext();
context.getBeanFactory().registerSingleton("foo", new Queue(Q1));
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf);
rabbitAdmin.setApplicationContext(context);
context.getBeanFactory().registerSingleton("admin", rabbitAdmin);
context.refresh();
container.setApplicationContext(context);
}
container.setAutoDeclare(autoDeclare);
else {
rabbitAdmin.setRedeclareManualDeclarations(true);
rabbitAdmin.declareQueue(new Queue(Q1));
}
rabbitAdmin.setApplicationContext(context);
context.refresh();
container.setApplicationContext(context);

container.setAmqpAdmin(rabbitAdmin);
container.setQueueNames(Q1, Q2);
container.setConsumersPerQueue(2);
container.setConsumersPerQueue(2);
Expand All @@ -565,11 +569,6 @@ private void testRecoverDeletedQueueGuts(boolean autoDeclare, BrokerRunningSuppo
assertThat(consumersOnQueue(Q2, 2)).isTrue();
assertThat(activeConsumerCount(container, 2)).isTrue();
assertThat(restartConsumerCount(container, 2)).isTrue();
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf);
if (!autoDeclare) {
Thread.sleep(2000);
rabbitAdmin.declareQueue(new Queue(Q1));
}
assertThat(consumersOnQueue(Q1, 2)).isTrue();
assertThat(consumersOnQueue(Q2, 2)).isTrue();
assertThat(activeConsumerCount(container, 4)).isTrue();
Expand Down