Skip to content

Commit 42c5435

Browse files
garyrussellartembilan
authored andcommitted
AMQP-834: AnonymousQueue: Master Locator
JIRA: https://jira.spring.io/browse/AMQP-834 Resolves #818 Locate anonymous (auto-delete, exclusive) queues on the local node.
1 parent ee75eaf commit 42c5435

File tree

7 files changed

+85
-12
lines changed

7 files changed

+85
-12
lines changed

spring-amqp/src/main/java/org/springframework/amqp/core/AnonymousQueue.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -94,6 +94,9 @@ public AnonymousQueue(org.springframework.amqp.core.NamingStrategy namingStrateg
9494
*/
9595
public AnonymousQueue(org.springframework.amqp.core.NamingStrategy namingStrategy, Map<String, Object> arguments) {
9696
super(namingStrategy.generateName(), false, true, true, arguments);
97+
if (!getArguments().containsKey(X_QUEUE_MASTER_LOCATOR)) {
98+
setMasterLocator("client-local");
99+
}
97100
}
98101

99102
/**

spring-amqp/src/main/java/org/springframework/amqp/core/Queue.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616

1717
package org.springframework.amqp.core;
1818

19+
import java.util.HashMap;
1920
import java.util.Map;
2021

22+
import org.springframework.lang.Nullable;
2123
import org.springframework.util.Assert;
2224
import org.springframework.util.StringUtils;
2325

@@ -30,6 +32,12 @@
3032
*/
3133
public class Queue extends AbstractDeclarable {
3234

35+
/**
36+
* Argument key for the master locator.
37+
* @since 2.1
38+
*/
39+
public static final String X_QUEUE_MASTER_LOCATOR = "x-queue-master-locator";
40+
3341
private final String name;
3442

3543
private final boolean durable;
@@ -90,7 +98,7 @@ public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete
9098
this.durable = durable;
9199
this.exclusive = exclusive;
92100
this.autoDelete = autoDelete;
93-
this.arguments = arguments;
101+
this.arguments = arguments != null ? arguments : new HashMap<>();
94102
}
95103

96104
/**
@@ -153,6 +161,20 @@ public String getActualName() {
153161
return this.actualName;
154162
}
155163

164+
/**
165+
* Set the master locator strategy argument for this queue.
166+
* @param locator the locator; null to clear the argument.
167+
* @since 2.1
168+
*/
169+
public final void setMasterLocator(@Nullable String locator) {
170+
if (locator == null) {
171+
this.arguments.remove(X_QUEUE_MASTER_LOCATOR);
172+
}
173+
else {
174+
this.arguments.put(X_QUEUE_MASTER_LOCATOR, locator);
175+
}
176+
}
177+
156178
@Override
157179
public String toString() {
158180
return "Queue [name=" + this.name + ", durable=" + this.durable + ", autoDelete=" + this.autoDelete

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminDeclarationTests.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -82,7 +82,8 @@ public void testUnconditional() throws Exception {
8282
Channel channel = mock(Channel.class);
8383
when(cf.createConnection()).thenReturn(conn);
8484
when(conn.createChannel(false)).thenReturn(channel);
85-
when(channel.queueDeclare("foo", true, false, false, null)).thenReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
85+
when(channel.queueDeclare("foo", true, false, false, new HashMap<>()))
86+
.thenReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
8687
final AtomicReference<ConnectionListener> listener = new AtomicReference<ConnectionListener>();
8788
doAnswer(invocation -> {
8889
listener.set((ConnectionListener) invocation.getArguments()[0]);
@@ -102,7 +103,7 @@ public void testUnconditional() throws Exception {
102103
assertNotNull(listener.get());
103104
listener.get().onCreate(conn);
104105

105-
verify(channel).queueDeclare("foo", true, false, false, null);
106+
verify(channel).queueDeclare("foo", true, false, false, new HashMap<>());
106107
verify(channel).exchangeDeclare("bar", "direct", true, false, false, new HashMap<String, Object>());
107108
verify(channel).queueBind("foo", "bar", "foo", null);
108109
}
@@ -163,7 +164,8 @@ public void testUnconditionalWithExplicitFactory() throws Exception {
163164
Channel channel = mock(Channel.class);
164165
when(cf.createConnection()).thenReturn(conn);
165166
when(conn.createChannel(false)).thenReturn(channel);
166-
when(channel.queueDeclare("foo", true, false, false, null)).thenReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
167+
when(channel.queueDeclare("foo", true, false, false, new HashMap<>()))
168+
.thenReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
167169
final AtomicReference<ConnectionListener> listener = new AtomicReference<ConnectionListener>();
168170
doAnswer(invocation -> {
169171
listener.set(invocation.getArgument(0));
@@ -186,7 +188,7 @@ public void testUnconditionalWithExplicitFactory() throws Exception {
186188
assertNotNull(listener.get());
187189
listener.get().onCreate(conn);
188190

189-
verify(channel).queueDeclare("foo", true, false, false, null);
191+
verify(channel).queueDeclare("foo", true, false, false, new HashMap<>());
190192
verify(channel).exchangeDeclare("bar", "direct", true, false, false, new HashMap<String, Object>());
191193
verify(channel).queueBind("foo", "bar", "foo", null);
192194
}
@@ -270,7 +272,7 @@ public void testSkipBecauseShouldntDeclare() throws Exception {
270272
public void testJavaConfig() throws Exception {
271273
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
272274
Config.listener1.onCreate(Config.conn1);
273-
verify(Config.channel1).queueDeclare("foo", true, false, false, null);
275+
verify(Config.channel1).queueDeclare("foo", true, false, false, new HashMap<>());
274276
verify(Config.channel1).exchangeDeclare("bar", "direct", true, false, true, new HashMap<String, Object>());
275277
verify(Config.channel1).queueBind("foo", "bar", "foo", null);
276278

@@ -357,7 +359,7 @@ public ConnectionFactory cf1() throws IOException {
357359
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
358360
when(connectionFactory.createConnection()).thenReturn(conn1);
359361
when(conn1.createChannel(false)).thenReturn(channel1);
360-
when(channel1.queueDeclare("foo", true, false, false, null))
362+
when(channel1.queueDeclare("foo", true, false, false, new HashMap<>()))
361363
.thenReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
362364
doAnswer(invocation -> {
363365
listener1 = invocation.getArgument(0);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@
8989
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
9090
import com.rabbitmq.client.Channel;
9191
import com.rabbitmq.client.DefaultConsumer;
92+
import com.rabbitmq.http.client.Client;
93+
import com.rabbitmq.http.client.domain.QueueInfo;
9294

9395
/**
9496
* @author Mark Pollack
@@ -107,7 +109,7 @@ public class RabbitAdminTests {
107109
public ExpectedException exception = ExpectedException.none();
108110

109111
@Rule
110-
public BrokerRunning brokerIsRunning = BrokerRunning.isRunning();
112+
public BrokerRunning brokerIsRunning = BrokerRunning.isBrokerAndManagementRunning();
111113

112114
@Test
113115
public void testSettingOfNullConnectionFactory() {
@@ -372,6 +374,36 @@ public void testRetry() throws Exception {
372374
ctx.close();
373375
}
374376

377+
@Test
378+
public void testMasterLocator() throws Exception {
379+
CachingConnectionFactory cf = new CachingConnectionFactory(brokerIsRunning.getConnectionFactory());
380+
RabbitAdmin admin = new RabbitAdmin(cf);
381+
AnonymousQueue queue = new AnonymousQueue();
382+
admin.declareQueue(queue);
383+
Client client = new Client("http://guest:guest@localhost:15672/api");
384+
QueueInfo info = client.getQueue("?", queue.getName());
385+
int n = 0;
386+
while (n++ < 100 && info == null) {
387+
Thread.sleep(100);
388+
info = client.getQueue("/", queue.getName());
389+
}
390+
assertNotNull(info);
391+
assertThat(info.getArguments().get(Queue.X_QUEUE_MASTER_LOCATOR), equalTo("client-local"));
392+
393+
queue = new AnonymousQueue();
394+
queue.setMasterLocator(null);
395+
admin.declareQueue(queue);
396+
info = client.getQueue("?", queue.getName());
397+
n = 0;
398+
while (n++ < 100 && info == null) {
399+
Thread.sleep(100);
400+
info = client.getQueue("/", queue.getName());
401+
}
402+
assertNotNull(info);
403+
assertNull(info.getArguments().get(Queue.X_QUEUE_MASTER_LOCATOR));
404+
cf.destroy();
405+
}
406+
375407
@Configuration
376408
public static class Config {
377409

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.junit.Assert.fail;
2727
import static org.mockito.ArgumentMatchers.any;
2828
import static org.mockito.ArgumentMatchers.anyBoolean;
29+
import static org.mockito.ArgumentMatchers.anyMap;
2930
import static org.mockito.ArgumentMatchers.anyString;
3031
import static org.mockito.ArgumentMatchers.isNull;
3132
import static org.mockito.BDDMockito.given;
@@ -350,7 +351,7 @@ public void testNestedTxBinding() throws Exception {
350351
given(connection.isOpen()).willReturn(true);
351352
given(connection.createChannel()).willReturn(channel1, channel2);
352353
DeclareOk dok = new DeclareOk("foo", 0, 0);
353-
willReturn(dok).given(channel1).queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), isNull());
354+
willReturn(dok).given(channel1).queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), anyMap());
354355
CachingConnectionFactory ccf = new CachingConnectionFactory(cf);
355356
ccf.setExecutor(mock(ExecutorService.class));
356357
RabbitTemplate rabbitTemplate = new RabbitTemplate(ccf);
@@ -368,7 +369,7 @@ public void testNestedTxBinding() throws Exception {
368369
});
369370
});
370371
verify(channel1).txSelect();
371-
verify(channel1).queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), isNull());
372+
verify(channel1).queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), anyMap());
372373
assertThat(((ChannelProxy) templateChannel.get()).getTargetChannel(), equalTo(channel1));
373374
verify(channel1).txCommit();
374375
}

src/reference/asciidoc/amqp.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3612,6 +3612,8 @@ A `<rabbit:queue/>` with an empty, or missing, `name` attribute will always crea
36123612

36133613
See <<anonymous-queue>> to understand why `AnonymousQueue` is preferred over broker-generated queue names, as well as
36143614
how to control the format of the name.
3615+
Starting with version 2.1, anonymous queues are declared with argument `x-queue-master-locator` set to `client-local` by default.
3616+
This ensures that the queue will be declared on the node the application is connected to.
36153617
Declarative queues must have fixed names because they might be referenced elsewhere in the context, for example, in a
36163618
listener:
36173619

@@ -3724,6 +3726,10 @@ With Spring Framework 3.2 and later, this can be declared a little more succinct
37243726
</rabbit:queue>
37253727
----
37263728

3729+
When using Java configuration, the `x-queue-master-locator` is supported as a first class property via the `setMasterLocator()` method on the `Queue` class.
3730+
Starting with version 2.1, anonymous queues are declared with this property set to `client-local` by default.
3731+
This ensures that the queue will be declared on the node the application is connected to.
3732+
37273733
IMPORTANT: The RabbitMQ broker will not allow declaration of a queue with mismatched arguments.
37283734
For example, if a `queue` already exists with no `time to live` argument, and you attempt to declare it with, say, `key="x-message-ttl" value="100"`, an exception will be thrown.
37293735

@@ -4148,6 +4154,10 @@ The third creates names like `custom.gen-MRBv9sqISkuCiPfOYfpo4g`.
41484154

41494155
Of course, you can provide your own naming strategy bean.
41504156

4157+
Starting with version 2.1, anonymous queues are declared with argument `x-queue-master-locator` set to `client-local` by default.
4158+
This ensures that the queue will be declared on the node the application is connected to.
4159+
To revert to the previous behavior, call `queue.setMasterLocator(null)` after constructing the instance.
4160+
41514161
[[broker-events]]
41524162
==== Broker Event Listener
41534163

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ Users are discouraged from using the old mechanism of declaring `<Collection<Que
4343
By default, the old mechanism is disabled.
4444
See <<collection-declaration>> for more information.
4545

46+
`AnonymousQueue` s are now declared with `x-queue-master-locator` set to `client-local` by default, to ensure the queues are created on the node the application is connected to.
47+
See <<broker-configuration>> for more information.
48+
4649
===== RabbitTemplate Changes
4750

4851
The `RabbitTemplate` now can be configured with the `noLocalReplyConsumer` option to control a `noLocal` flag for reply consumers in the `sendAndReceive()` operations.

0 commit comments

Comments
 (0)