Skip to content

Commit e066b8c

Browse files
committed
* Use flux.onComplete() instead of iteration over subscribers
* Change `subscribers` list into just `AtomicInteger` count marker * fix `DefaultSplitterTests` according a new logic in the `FluxMessageChannel`
1 parent b7d848b commit e066b8c

File tree

2 files changed

+28
-35
lines changed

2 files changed

+28
-35
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616

1717
package org.springframework.integration.channel;
1818

19-
import java.util.ArrayList;
20-
import java.util.List;
2119
import java.util.Map;
2220
import java.util.concurrent.ConcurrentHashMap;
21+
import java.util.concurrent.atomic.AtomicInteger;
2322

2423
import org.reactivestreams.Publisher;
2524
import org.reactivestreams.Subscriber;
@@ -45,7 +44,7 @@
4544
public class FluxMessageChannel extends AbstractMessageChannel
4645
implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel {
4746

48-
private final List<Subscriber<? super Message<?>>> subscribers = new ArrayList<>();
47+
private final AtomicInteger subscribed = new AtomicInteger();
4948

5049
private final Map<Publisher<? extends Message<?>>, ConnectableFlux<?>> publishers = new ConcurrentHashMap<>();
5150

@@ -62,20 +61,18 @@ public FluxMessageChannel() {
6261

6362
@Override
6463
protected boolean doSend(Message<?> message, long timeout) {
65-
Assert.state(this.subscribers.size() > 0,
64+
Assert.state(this.subscribed.get() > 0,
6665
() -> "The [" + this + "] doesn't have subscribers to accept messages");
6766
this.sink.next(message);
6867
return true;
6968
}
7069

7170
@Override
7271
public void subscribe(Subscriber<? super Message<?>> subscriber) {
73-
this.subscribers.add(subscriber);
74-
75-
this.flux.doFinally((signal) -> this.subscribers.remove(subscriber))
72+
this.flux.doFinally((signal) -> this.subscribed.decrementAndGet())
7673
.retry()
7774
.subscribe(subscriber);
78-
75+
this.subscribed.incrementAndGet();
7976
this.publishers.values()
8077
.forEach((connectableFlux) -> this.disposables.put(connectableFlux, connectableFlux.connect()));
8178
}
@@ -92,7 +89,7 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
9289

9390
this.publishers.put(publisher, connectableFlux);
9491

95-
if (!this.subscribers.isEmpty()) {
92+
if (this.subscribed.get() > 0) {
9693
connectableFlux.connect((disposable) -> this.disposables.put(connectableFlux, disposable));
9794
}
9895
}
@@ -101,7 +98,7 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
10198
public void destroy() {
10299
super.destroy();
103100

104-
this.subscribers.forEach(Subscriber::onComplete);
101+
this.flux.onComplete();
105102
this.disposables.values().forEach(Disposable::dispose);
106103
}
107104

spring-integration-core/src/test/java/org/springframework/integration/splitter/DefaultSplitterTests.java

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,20 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.time.Duration;
2122
import java.util.Arrays;
2223
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.stream.Stream;
2526

26-
import org.junit.Test;
27-
import org.reactivestreams.Subscriber;
27+
import org.junit.jupiter.api.Test;
2828

2929
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3030
import org.springframework.integration.channel.DirectChannel;
3131
import org.springframework.integration.channel.FluxMessageChannel;
3232
import org.springframework.integration.channel.QueueChannel;
3333
import org.springframework.integration.endpoint.EventDrivenConsumer;
3434
import org.springframework.integration.support.MessageBuilder;
35-
import org.springframework.integration.test.util.TestUtils;
3635
import org.springframework.messaging.Message;
3736
import org.springframework.messaging.support.GenericMessage;
3837

@@ -45,10 +44,10 @@
4544
* @author Gunnar Hillert
4645
* @author Artem Bilan
4746
*/
48-
public class DefaultSplitterTests {
47+
class DefaultSplitterTests {
4948

5049
@Test
51-
public void splitMessageWithArrayPayload() throws Exception {
50+
void splitMessageWithArrayPayload() {
5251
String[] payload = new String[] { "x", "y", "z" };
5352
Message<String[]> message = MessageBuilder.withPayload(payload).build();
5453
QueueChannel replyChannel = new QueueChannel();
@@ -69,7 +68,7 @@ public void splitMessageWithArrayPayload() throws Exception {
6968
}
7069

7170
@Test
72-
public void splitMessageWithCollectionPayload() throws Exception {
71+
void splitMessageWithCollectionPayload() {
7372
List<String> payload = Arrays.asList("x", "y", "z");
7473
Message<List<String>> message = MessageBuilder.withPayload(payload).build();
7574
QueueChannel replyChannel = new QueueChannel();
@@ -90,7 +89,7 @@ public void splitMessageWithCollectionPayload() throws Exception {
9089
}
9190

9291
@Test
93-
public void correlationIdCopiedFromMessageId() {
92+
void correlationIdCopiedFromMessageId() {
9493
Message<String> message = MessageBuilder.withPayload("test").build();
9594
DirectChannel inputChannel = new DirectChannel();
9695
QueueChannel outputChannel = new QueueChannel(1);
@@ -105,7 +104,7 @@ public void correlationIdCopiedFromMessageId() {
105104
}
106105

107106
@Test
108-
public void splitMessageWithEmptyCollectionPayload() throws Exception {
107+
void splitMessageWithEmptyCollectionPayload() {
109108
Message<List<String>> message = MessageBuilder.withPayload(Collections.<String>emptyList()).build();
110109
QueueChannel replyChannel = new QueueChannel();
111110
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
@@ -116,7 +115,7 @@ public void splitMessageWithEmptyCollectionPayload() throws Exception {
116115
}
117116

118117
@Test
119-
public void splitStream() {
118+
void splitStream() {
120119
Message<?> message = new GenericMessage<>(
121120
Stream.generate(Math::random)
122121
.limit(10));
@@ -133,7 +132,7 @@ public void splitStream() {
133132
}
134133

135134
@Test
136-
public void splitFlux() {
135+
void splitFlux() {
137136
Message<?> message = new GenericMessage<>(
138137
Flux
139138
.generate(() -> 0,
@@ -159,7 +158,7 @@ public void splitFlux() {
159158
}
160159

161160
@Test
162-
public void splitArrayPayloadReactive() {
161+
void splitArrayPayloadReactive() {
163162
Message<?> message = new GenericMessage<>(new String[] { "x", "y", "z" });
164163
FluxMessageChannel replyChannel = new FluxMessageChannel();
165164
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
@@ -174,14 +173,13 @@ public void splitArrayPayloadReactive() {
174173

175174
StepVerifier.create(testFlux)
176175
.expectNext("x", "y", "z")
177-
.then(() ->
178-
((Subscriber<?>) TestUtils.getPropertyValue(replyChannel, "subscribers", List.class).get(0))
179-
.onComplete())
180-
.verifyComplete();
176+
.expectNoEvent(Duration.ofMillis(100))
177+
.thenCancel()
178+
.verify(Duration.ofSeconds(1));
181179
}
182180

183181
@Test
184-
public void splitStreamReactive() {
182+
void splitStreamReactive() {
185183
Message<?> message = new GenericMessage<>(Stream.of("x", "y", "z"));
186184
FluxMessageChannel replyChannel = new FluxMessageChannel();
187185
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
@@ -196,14 +194,13 @@ public void splitStreamReactive() {
196194

197195
StepVerifier.create(testFlux)
198196
.expectNext("x", "y", "z")
199-
.then(() ->
200-
((Subscriber<?>) TestUtils.getPropertyValue(replyChannel, "subscribers", List.class).get(0))
201-
.onComplete())
202-
.verifyComplete();
197+
.expectNoEvent(Duration.ofMillis(100))
198+
.thenCancel()
199+
.verify(Duration.ofSeconds(1));
203200
}
204201

205202
@Test
206-
public void splitFluxReactive() {
203+
void splitFluxReactive() {
207204
Message<?> message = new GenericMessage<>(Flux.just("x", "y", "z"));
208205
FluxMessageChannel replyChannel = new FluxMessageChannel();
209206
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
@@ -218,10 +215,9 @@ public void splitFluxReactive() {
218215

219216
StepVerifier.create(testFlux)
220217
.expectNext("x", "y", "z")
221-
.then(() ->
222-
((Subscriber<?>) TestUtils.getPropertyValue(replyChannel, "subscribers", List.class).get(0))
223-
.onComplete())
224-
.verifyComplete();
218+
.expectNoEvent(Duration.ofMillis(100))
219+
.thenCancel()
220+
.verify(Duration.ofSeconds(1));
225221
}
226222

227223
}

0 commit comments

Comments
 (0)