Skip to content

Commit 02f0386

Browse files
garyrussellartembilan
authored andcommitted
GH-1465: Part II - Super Stream SAC
Resolves #1465 Add support for single active consumers on super streams. Stop containers in test. Use Snapshot Repo Use snapshot repo; use TestContainers.
1 parent d4e0f5c commit 02f0386

File tree

6 files changed

+181
-4
lines changed

6 files changed

+181
-4
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ ext {
5858
micrometerVersion = '1.10.0-SNAPSHOT'
5959
micrometerTracingVersion = '1.0.0-SNAPSHOT'
6060
mockitoVersion = '4.5.1'
61-
rabbitmqStreamVersion = '0.4.0'
61+
rabbitmqStreamVersion = '0.7.0'
6262
rabbitmqVersion = project.hasProperty('rabbitmqVersion') ? project.rabbitmqVersion : '5.13.1'
6363
rabbitmqHttpClientVersion = '3.12.1'
6464
reactorVersion = '2020.0.18'
@@ -105,7 +105,7 @@ allprojects {
105105
maven { url 'https://repo.spring.io/libs-milestone' }
106106
if (version.endsWith('-SNAPSHOT')) {
107107
maven { url 'https://repo.spring.io/libs-snapshot' }
108-
maven { url 'https://oss.sonatype.org/content/repositories/snapshots' }
108+
// maven { url 'https://oss.sonatype.org/content/repositories/snapshots' }
109109
}
110110
// maven { url 'https://repo.spring.io/libs-staging-local' }
111111
}

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,19 @@ public void setQueueNames(String... queueNames) {
9696
this.builder.stream(queueNames[0]);
9797
}
9898

99+
/**
100+
* Enable Single Active Consumer on a Super Stream.
101+
* @param superStream the stream.
102+
* @param name the consumer name.
103+
* @since 3.0
104+
*/
105+
public void superStream(String superStream, String name) {
106+
Assert.notNull(superStream, "'superStream' cannot be null");
107+
this.builder.superStream(superStream)
108+
.singleActiveConsumer()
109+
.name(name);
110+
}
111+
99112
/**
100113
* Get a {@link StreamMessageConverter} used to convert a
101114
* {@link com.rabbitmq.stream.Message} to a
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.rabbit.stream.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.TimeUnit;
27+
28+
import org.junit.jupiter.api.Test;
29+
30+
import org.springframework.amqp.core.Declarables;
31+
import org.springframework.amqp.core.DirectExchange;
32+
import org.springframework.amqp.core.Message;
33+
import org.springframework.amqp.core.Queue;
34+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
35+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
36+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
37+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
38+
import org.springframework.beans.factory.annotation.Autowired;
39+
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
40+
import org.springframework.context.ApplicationContext;
41+
import org.springframework.context.annotation.Bean;
42+
import org.springframework.context.annotation.Configuration;
43+
import org.springframework.context.annotation.Scope;
44+
import org.springframework.rabbit.stream.config.SuperStream;
45+
import org.springframework.rabbit.stream.support.AbstractIntegrationTests;
46+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
47+
48+
import com.rabbitmq.stream.Address;
49+
import com.rabbitmq.stream.Environment;
50+
import com.rabbitmq.stream.OffsetSpecification;
51+
52+
/**
53+
* @author Gary Russell
54+
* @since 3.0
55+
*
56+
*/
57+
@SpringJUnitConfig
58+
public class SuperStreamSACTests extends AbstractIntegrationTests {
59+
60+
@Test
61+
void superStream(@Autowired ApplicationContext context, @Autowired RabbitTemplate template,
62+
@Autowired Environment env, @Autowired Config config, @Autowired RabbitAdmin admin,
63+
@Autowired Declarables declarables) throws InterruptedException {
64+
65+
template.getConnectionFactory().createConnection();
66+
StreamListenerContainer container1 = context.getBean(StreamListenerContainer.class, env, "one");
67+
container1.start();
68+
StreamListenerContainer container2 = context.getBean(StreamListenerContainer.class, env, "two");
69+
container2.start();
70+
StreamListenerContainer container3 = context.getBean(StreamListenerContainer.class, env, "three");
71+
container3.start();
72+
template.convertAndSend("ss.sac.test", "0", "foo");
73+
template.convertAndSend("ss.sac.test", "1", "bar");
74+
template.convertAndSend("ss.sac.test", "2", "baz");
75+
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
76+
assertThat(config.messages.keySet()).contains("one", "two", "three");
77+
assertThat(config.info).contains("one:foo", "two:bar", "three:baz");
78+
container1.stop();
79+
container2.stop();
80+
container3.stop();
81+
clean(admin, declarables);
82+
}
83+
84+
private void clean(RabbitAdmin admin, Declarables declarables) {
85+
declarables.getDeclarablesByType(Queue.class).forEach(queue -> admin.deleteQueue(queue.getName()));
86+
declarables.getDeclarablesByType(DirectExchange.class).forEach(ex -> admin.deleteExchange(ex.getName()));
87+
}
88+
89+
@Configuration
90+
public static class Config {
91+
92+
final List<String> info = new ArrayList<>();
93+
94+
final Map<String, Message> messages = new ConcurrentHashMap<>();
95+
96+
final CountDownLatch latch = new CountDownLatch(3);
97+
98+
@Bean
99+
CachingConnectionFactory cf() {
100+
return new CachingConnectionFactory("localhost", amqpPort());
101+
}
102+
103+
@Bean
104+
RabbitAdmin admin(ConnectionFactory cf) {
105+
return new RabbitAdmin(cf);
106+
}
107+
108+
@Bean
109+
RabbitTemplate template(ConnectionFactory cf) {
110+
return new RabbitTemplate(cf);
111+
}
112+
113+
@Bean
114+
SuperStream superStream() {
115+
return new SuperStream("ss.sac.test", 3);
116+
}
117+
118+
@Bean
119+
static Environment environment() {
120+
return Environment.builder()
121+
.addressResolver(add -> new Address("localhost", streamPort()))
122+
.build();
123+
}
124+
125+
@Bean
126+
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
127+
StreamListenerContainer container(Environment env, String name) {
128+
StreamListenerContainer container = new StreamListenerContainer(env);
129+
container.superStream("ss.sac.test", "test");
130+
container.setupMessageListener(msg -> {
131+
this.messages.put(name, msg);
132+
this.info.add(name + ":" + new String(msg.getBody()));
133+
this.latch.countDown();
134+
});
135+
container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
136+
container.setAutoStartup(false);
137+
return container;
138+
}
139+
140+
}
141+
142+
}

src/reference/asciidoc/quick-tour.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ The minimum Spring Framework version dependency is 5.2.0.
3636

3737
The minimum `amqp-client` Java client library version is 5.7.0.
3838

39+
The minimum `stream-client` Java client library for stream queues is 0.7.0.
40+
3941
===== Very, Very Quick
4042

4143
This section offers the fastest introduction.

src/reference/asciidoc/stream.adoc

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,4 +182,21 @@ The `RabbitAdmin` detects this bean and will declare the exchange (`my.super.str
182182

183183
===== Consuming Super Streams with Single Active Consumers
184184

185-
TBD.
185+
Invoke the `superStream` method on the listener container to enable a single active consumer on a super stream.
186+
187+
====
188+
[source, java]
189+
----
190+
@Bean
191+
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
192+
StreamListenerContainer container(Environment env, String name) {
193+
StreamListenerContainer container = new StreamListenerContainer(env);
194+
container.superStream("ss.sac", "myConsumer");
195+
container.setupMessageListener(msg -> {
196+
...
197+
});
198+
container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
199+
return container;
200+
}
201+
----
202+
====

src/reference/asciidoc/whats-new.adoc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ See <<async-template>> for more information.
2020
==== Stream Support Changes
2121

2222
`RabbitStreamOperations2` and `RabbitStreamTemplate2` have been deprecated in favor of `RabbitStreamOperations` and `RabbitStreamTemplate` respectively.
23+
24+
Super streams and single active consumers thereon are now supported.
25+
2326
See <<stream-support>> for more information.
2427

2528
==== `@RabbitListener` Changes
@@ -35,4 +38,4 @@ See <<Jackson2JsonMessageConverter-from-message>> for more information.
3538
==== Connection Factory Changes
3639

3740
The default `addressShuffleMode` in `AbstractConnectionFactory` is now `RANDOM`. This results in connecting to a random host when multiple addresses are provided.
38-
See <<cluster>> for more information.
41+
See <<cluster>> for more information.

0 commit comments

Comments
 (0)