Skip to content

Commit 094781e

Browse files
authored
GH-1465: Add Routing Key Strategy to SuperStream
To support spring-cloud-stream. * Only include the ordinal in the queue name, not the whole routing key.
1 parent 322605d commit 094781e

File tree

2 files changed

+33
-9
lines changed

2 files changed

+33
-9
lines changed

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.util.Collection;
2121
import java.util.List;
2222
import java.util.Map;
23+
import java.util.function.BiFunction;
24+
import java.util.stream.Collectors;
2325
import java.util.stream.IntStream;
2426

2527
import org.springframework.amqp.core.Binding;
@@ -28,6 +30,7 @@
2830
import org.springframework.amqp.core.Declarables;
2931
import org.springframework.amqp.core.DirectExchange;
3032
import org.springframework.amqp.core.Queue;
33+
import org.springframework.util.Assert;
3134

3235
/**
3336
* Create Super Stream Topology {@link Declarable}s.
@@ -44,16 +47,33 @@ public class SuperStream extends Declarables {
4447
* @param partitions the number of partitions.
4548
*/
4649
public SuperStream(String name, int partitions) {
47-
super(declarables(name, partitions));
50+
this(name, partitions, (q, i) -> IntStream.range(0, i)
51+
.mapToObj(String::valueOf)
52+
.collect(Collectors.toList()));
4853
}
4954

50-
private static Collection<Declarable> declarables(String name, int partitions) {
55+
/**
56+
* Create a Super Stream with the provided parameters.
57+
* @param name the stream name.
58+
* @param partitions the number of partitions.
59+
* @param routingKeyStrategy a strategy to determine routing keys to use for the
60+
* partitions. The first parameter is the queue name, the second the number of
61+
* partitions, the returned list must have a size equal to the partitions.
62+
*/
63+
public SuperStream(String name, int partitions, BiFunction<String, Integer, List<String>> routingKeyStrategy) {
64+
super(declarables(name, partitions, routingKeyStrategy));
65+
}
66+
67+
private static Collection<Declarable> declarables(String name, int partitions,
68+
BiFunction<String, Integer, List<String>> routingKeyStrategy) {
69+
5170
List<Declarable> declarables = new ArrayList<>();
52-
String[] rks = IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new);
71+
List<String> rks = routingKeyStrategy.apply(name, partitions);
72+
Assert.state(rks.size() == partitions, () -> "Expected " + partitions + " routing keys, not " + rks.size());
5373
declarables.add(new DirectExchange(name, true, false, Map.of("x-super-stream", true)));
5474
for (int i = 0; i < partitions; i++) {
55-
String rk = rks[i];
56-
Queue q = new Queue(name + "-" + rk, true, false, false, Map.of("x-queue-type", "stream"));
75+
String rk = rks.get(i);
76+
Queue q = new Queue(name + "-" + i, true, false, false, Map.of("x-queue-type", "stream"));
5777
declarables.add(q);
5878
declarables.add(new Binding(q.getName(), DestinationType.QUEUE, name, rk,
5979
Map.of("x-stream-partition-order", i)));

spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamSACTests.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.concurrent.ConcurrentHashMap;
2525
import java.util.concurrent.CountDownLatch;
2626
import java.util.concurrent.TimeUnit;
27+
import java.util.stream.Collectors;
28+
import java.util.stream.IntStream;
2729

2830
import org.junit.jupiter.api.Test;
2931

@@ -69,9 +71,9 @@ void superStream(@Autowired ApplicationContext context, @Autowired RabbitTemplat
6971
container2.start();
7072
StreamListenerContainer container3 = context.getBean(StreamListenerContainer.class, env, "three");
7173
container3.start();
72-
template.convertAndSend("ss.sac.test", "0", "foo");
73-
template.convertAndSend("ss.sac.test", "1", "bar");
74-
template.convertAndSend("ss.sac.test", "2", "baz");
74+
template.convertAndSend("ss.sac.test", "rk-0", "foo");
75+
template.convertAndSend("ss.sac.test", "rk-1", "bar");
76+
template.convertAndSend("ss.sac.test", "rk-2", "baz");
7577
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
7678
assertThat(config.messages.keySet()).contains("one", "two", "three");
7779
assertThat(config.info).contains("one:foo", "two:bar", "three:baz");
@@ -112,7 +114,9 @@ RabbitTemplate template(ConnectionFactory cf) {
112114

113115
@Bean
114116
SuperStream superStream() {
115-
return new SuperStream("ss.sac.test", 3);
117+
return new SuperStream("ss.sac.test", 3, (q, i) -> IntStream.range(0, i)
118+
.mapToObj(j -> "rk-" + j)
119+
.collect(Collectors.toList()));
116120
}
117121

118122
@Bean

0 commit comments

Comments
 (0)