Skip to content

Commit 3620f11

Browse files
kurenchuksergeygaryrussell
authored andcommitted
GH-2540: Add SuperStreamBuilder
Resolves #2540 Usability improvements: New SuperStream builder builder provide a way to configure: - maxAge - maxLength - maxSegmentSize Usability improvements: New SuperStream builder License Usability improvements: New SuperStream builder Fix style tests and add a new one for the super stream builder Usability improvements: New SuperStream builder Covered x-initial-cluster-size. Fixes after review
1 parent 4cf9b5c commit 3620f11

File tree

3 files changed

+369
-6
lines changed

3 files changed

+369
-6
lines changed

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.util.ArrayList;
2020
import java.util.Collection;
21+
import java.util.HashMap;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.function.BiFunction;
@@ -36,8 +37,8 @@
3637
* Create Super Stream Topology {@link Declarable}s.
3738
*
3839
* @author Gary Russell
40+
* @author Sergei Kurenchuk
3941
* @since 3.0
40-
*
4142
*/
4243
public class SuperStream extends Declarables {
4344

@@ -47,9 +48,22 @@ public class SuperStream extends Declarables {
4748
* @param partitions the number of partitions.
4849
*/
4950
public SuperStream(String name, int partitions) {
51+
this(name, partitions, Map.of());
52+
}
53+
54+
/**
55+
* Create a Super Stream with the provided parameters.
56+
* @param name the stream name.
57+
* @param partitions the number of partitions.
58+
* @param arguments the stream arguments
59+
* @since 3.1
60+
*/
61+
public SuperStream(String name, int partitions, Map<String, Object> arguments) {
5062
this(name, partitions, (q, i) -> IntStream.range(0, i)
5163
.mapToObj(String::valueOf)
52-
.collect(Collectors.toList()));
64+
.collect(Collectors.toList()),
65+
arguments
66+
);
5367
}
5468

5569
/**
@@ -61,19 +75,37 @@ public SuperStream(String name, int partitions) {
6175
* partitions, the returned list must have a size equal to the partitions.
6276
*/
6377
public SuperStream(String name, int partitions, BiFunction<String, Integer, List<String>> routingKeyStrategy) {
64-
super(declarables(name, partitions, routingKeyStrategy));
78+
this(name, partitions, routingKeyStrategy, Map.of());
79+
}
80+
81+
/**
82+
* Create a Super Stream with the provided parameters.
83+
* @param name the stream name.
84+
* @param partitions the number of partitions.
85+
* @param routingKeyStrategy a strategy to determine routing keys to use for the
86+
* partitions. The first parameter is the queue name, the second the number of
87+
* partitions, the returned list must have a size equal to the partitions.
88+
* @param arguments the stream arguments
89+
* @since 3.1
90+
*/
91+
public SuperStream(String name, int partitions, BiFunction<String, Integer, List<String>> routingKeyStrategy, Map<String, Object> arguments) {
92+
super(declarables(name, partitions, routingKeyStrategy, arguments));
6593
}
6694

6795
private static Collection<Declarable> declarables(String name, int partitions,
68-
BiFunction<String, Integer, List<String>> routingKeyStrategy) {
96+
BiFunction<String, Integer, List<String>> routingKeyStrategy,
97+
Map<String, Object> arguments) {
6998

7099
List<Declarable> declarables = new ArrayList<>();
71100
List<String> rks = routingKeyStrategy.apply(name, partitions);
72101
Assert.state(rks.size() == partitions, () -> "Expected " + partitions + " routing keys, not " + rks.size());
73102
declarables.add(new DirectExchange(name, true, false, Map.of("x-super-stream", true)));
103+
104+
Map<String, Object> argumentsCopy = new HashMap<>(arguments);
105+
argumentsCopy.put("x-queue-type", "stream");
74106
for (int i = 0; i < partitions; i++) {
75107
String rk = rks.get(i);
76-
Queue q = new Queue(name + "-" + i, true, false, false, Map.of("x-queue-type", "stream"));
108+
Queue q = new Queue(name + "-" + i, true, false, false, argumentsCopy);
77109
declarables.add(q);
78110
declarables.add(new Binding(q.getName(), DestinationType.QUEUE, name, rk,
79111
Map.of("x-stream-partition-order", i)));
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright 2021-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.rabbit.stream.config;
18+
19+
import java.util.HashMap;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.function.BiFunction;
23+
24+
import org.springframework.util.StringUtils;
25+
26+
/**
27+
* Builds a Spring AMQP Super Stream using a fluent API.
28+
* Based on <a href="https://www.rabbitmq.com/streams.html">Streams documentation</a>
29+
*
30+
* @author Sergei Kurenchuk
31+
* @since 3.1
32+
*/
33+
public class SuperStreamBuilder {
34+
private final Map<String, Object> arguments = new HashMap<>();
35+
private String name;
36+
private int partitions = -1;
37+
38+
private BiFunction<String, Integer, List<String>> routingKeyStrategy;
39+
40+
/**
41+
* Creates a builder for Super Stream.
42+
* @param name stream name
43+
* @return the builder
44+
*/
45+
public static SuperStreamBuilder superStream(String name) {
46+
SuperStreamBuilder builder = new SuperStreamBuilder();
47+
builder.name(name);
48+
return builder;
49+
}
50+
51+
/**
52+
* Creates a builder for Super Stream.
53+
* @param name stream name
54+
* @param partitions partitions number
55+
* @return the builder
56+
*/
57+
public static SuperStreamBuilder superStream(String name, int partitions) {
58+
return superStream(name).partitions(partitions);
59+
}
60+
61+
/**
62+
* Set the maximum age retention per stream, which will remove the oldest data.
63+
* @param maxAge valid units: Y, M, D, h, m, s. For example: "7D" for a week
64+
* @return the builder
65+
*/
66+
public SuperStreamBuilder maxAge(String maxAge) {
67+
return withArgument("x-max-age", maxAge);
68+
}
69+
70+
/**
71+
* Set the maximum log size as the retention configuration for each stream,
72+
* which will truncate the log based on the data size.
73+
* @param bytes the max total size in bytes
74+
* @return the builder
75+
*/
76+
public SuperStreamBuilder maxLength(int bytes) {
77+
return withArgument("max-length-bytes", bytes);
78+
}
79+
80+
/**
81+
* Set the maximum size limit for segment file.
82+
* @param bytes the max segments size in bytes
83+
* @return the builder
84+
*/
85+
public SuperStreamBuilder maxSegmentSize(int bytes) {
86+
return withArgument("x-stream-max-segment-size-bytes", bytes);
87+
}
88+
89+
/**
90+
* Set initial replication factor for each partition.
91+
* @param count number of nodes per partition
92+
* @return the builder
93+
*/
94+
public SuperStreamBuilder initialClusterSize(int count) {
95+
return withArgument("x-initial-cluster-size", count);
96+
}
97+
98+
/**
99+
* Set extra argument which is not covered by builder's methods.
100+
* @param key argument name
101+
* @param value argument value
102+
* @return the builder
103+
*/
104+
public SuperStreamBuilder withArgument(String key, Object value) {
105+
if ("x-queue-type".equals(key) && !"stream".equals(value)) {
106+
throw new IllegalArgumentException("Changing x-queue-type argument is not permitted");
107+
}
108+
this.arguments.put(key, value);
109+
return this;
110+
}
111+
112+
/**
113+
* Set the stream name.
114+
* @param name the stream name.
115+
* @return the builder
116+
*/
117+
public SuperStreamBuilder name(String name) {
118+
this.name = name;
119+
return this;
120+
}
121+
122+
/**
123+
* Set the partitions number.
124+
* @param partitions the partitions number
125+
* @return the builder
126+
*/
127+
public SuperStreamBuilder partitions(int partitions) {
128+
this.partitions = partitions;
129+
return this;
130+
}
131+
132+
/**
133+
* Set a strategy to determine routing keys to use for the
134+
* partitions. The first parameter is the queue name, the second the number of
135+
* partitions, the returned list must have a size equal to the partitions.
136+
* @param routingKeyStrategy the strategy
137+
* @return the builder
138+
*/
139+
public SuperStreamBuilder routingKeyStrategy(BiFunction<String, Integer, List<String>> routingKeyStrategy) {
140+
this.routingKeyStrategy = routingKeyStrategy;
141+
return this;
142+
}
143+
144+
/**
145+
* Builds a final Super Stream.
146+
* @return the Super Stream instance
147+
*/
148+
public SuperStream build() {
149+
if (!StringUtils.hasText(this.name)) {
150+
throw new IllegalArgumentException("Stream name can't be empty");
151+
}
152+
153+
if (this.partitions <= 0) {
154+
throw new IllegalArgumentException(
155+
String.format("Partitions number should be great then zero. Current value; %d", this.partitions)
156+
);
157+
}
158+
159+
if (this.routingKeyStrategy == null) {
160+
return new SuperStream(this.name, this.partitions, this.arguments);
161+
}
162+
163+
return new SuperStream(this.name, this.partitions, this.routingKeyStrategy, this.arguments);
164+
}
165+
}

0 commit comments

Comments
 (0)