Skip to content

Commit 732f0da

Browse files
garyrussellartembilan
authored andcommitted
GH-1465: Super Stream Support in Template
1 parent f54f8fb commit 732f0da

File tree

3 files changed

+81
-1
lines changed

3 files changed

+81
-1
lines changed

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.rabbit.stream.producer;
1818

1919
import java.util.concurrent.CompletableFuture;
20+
import java.util.function.Function;
2021

2122
import org.springframework.amqp.core.Message;
2223
import org.springframework.amqp.core.MessagePostProcessor;
@@ -52,6 +53,8 @@ public class RabbitStreamTemplate implements RabbitStreamOperations, BeanNameAwa
5253

5354
private final String streamName;
5455

56+
private Function<com.rabbitmq.stream.Message, String> superStreamRouting;
57+
5558
private MessageConverter messageConverter = new SimpleMessageConverter();
5659

5760
private StreamMessageConverter streamConverter = new DefaultStreamMessageConverter();
@@ -80,7 +83,13 @@ public RabbitStreamTemplate(Environment environment, String streamName) {
8083
private synchronized Producer createOrGetProducer() {
8184
if (this.producer == null) {
8285
ProducerBuilder builder = this.environment.producerBuilder();
83-
builder.stream(this.streamName);
86+
if (this.superStreamRouting == null) {
87+
builder.stream(this.streamName);
88+
}
89+
else {
90+
builder.superStream(this.streamName)
91+
.routing(this.superStreamRouting);
92+
}
8493
this.producerCustomizer.accept(this.beanName, builder);
8594
this.producer = builder.build();
8695
if (!this.streamConverterSet) {
@@ -96,6 +105,16 @@ public synchronized void setBeanName(String name) {
96105
this.beanName = name;
97106
}
98107

108+
/**
109+
* Add a routing function, making the stream a super stream.
110+
* @param superStreamRouting the routing function.
111+
* @since 3.0
112+
*/
113+
public void setSuperStreamRouting(Function<com.rabbitmq.stream.Message, String> superStreamRouting) {
114+
this.superStreamRouting = superStreamRouting;
115+
}
116+
117+
99118
/**
100119
* Set a converter for {@link #convertAndSend(Object)} operations.
101120
* @param messageConverter the converter.

spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplateTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import static org.mockito.BDDMockito.given;
2323
import static org.mockito.BDDMockito.willAnswer;
2424
import static org.mockito.Mockito.mock;
25+
import static org.mockito.Mockito.never;
26+
import static org.mockito.Mockito.verify;
2527

2628
import java.util.concurrent.CompletableFuture;
2729
import java.util.concurrent.ExecutionException;
@@ -114,4 +116,27 @@ void handleConfirm() throws InterruptedException, ExecutionException {
114116
}
115117
}
116118

119+
@Test
120+
void superStream() {
121+
Environment env = mock(Environment.class);
122+
ProducerBuilder pb = mock(ProducerBuilder.class);
123+
given(pb.superStream(any())).willReturn(pb);
124+
given(env.producerBuilder()).willReturn(pb);
125+
Producer producer = mock(Producer.class);
126+
given(pb.build()).willReturn(producer);
127+
try (RabbitStreamTemplate template = new RabbitStreamTemplate(env, "foo")) {
128+
SimpleMessageConverter messageConverter = new SimpleMessageConverter();
129+
template.setMessageConverter(messageConverter);
130+
assertThat(template.messageConverter()).isSameAs(messageConverter);
131+
StreamMessageConverter converter = mock(StreamMessageConverter.class);
132+
given(converter.fromMessage(any())).willReturn(mock(Message.class));
133+
template.setStreamConverter(converter);
134+
template.setSuperStreamRouting(msg -> "bar");
135+
template.convertAndSend("x");
136+
verify(pb).superStream("foo");
137+
verify(pb).routing(any());
138+
verify(pb, never()).stream("foo");
139+
}
140+
}
141+
117142
}

src/reference/asciidoc/stream.adoc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,42 @@ SuperStream superStream() {
180180

181181
The `RabbitAdmin` detects this bean and will declare the exchange (`my.super.stream`) and 3 queues (partitions) - `my.super-stream-n` where `n` is `0`, `1`, `2`, bound with routing keys equal to `n`.
182182

183+
If you also wish to publish over AMQP to the exchange, you can provide custom routing keys:
184+
185+
====
186+
[source, java]
187+
----
188+
@Bean
189+
SuperStream superStream() {
190+
return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
191+
.mapToObj(j -> "rk-" + j)
192+
.collect(Collectors.toList()));
193+
}
194+
----
195+
====
196+
197+
The number of keys must equal the number of partitions.
198+
199+
===== Producing to a SuperStream
200+
201+
You must add a `superStreamRoutingFunction` to the `RabbitStreamTemplate`:
202+
203+
====
204+
[source, java]
205+
----
206+
@Bean
207+
RabbitStreamTemplate streamTemplate(Environment env) {
208+
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
209+
template.setSuperStreamRouting(message -> {
210+
// some logic to return a String for the client's hashing algorithm
211+
});
212+
return template;
213+
}
214+
----
215+
====
216+
217+
You can also publish over AMQP, using the `RabbitTemplate`.
218+
183219
===== Consuming Super Streams with Single Active Consumers
184220

185221
Invoke the `superStream` method on the listener container to enable a single active consumer on a super stream.

0 commit comments

Comments
 (0)