Skip to content

Commit fb52b63

Browse files
committed
Use EmitterProcessor in the FluxMessageChannel
The `EmitterProcessor` has a good logic to block upstream producer when its downstream subscriber cannot keep up with overproducing. * Rework `FluxMessageChannel` logic to rely on the `EmitterProcessor` instead of `Flux.create()` * Cancel `FluxMessageChannel` internal subscriptions in the `destroy()` * Fix `ReactiveStreamsTests.testFluxTransform()` for the splitter's delimiter * Ensure in the `FluxMessageChannelTests.testFluxMessageChannel` that we can have several concurrent subscribers to the `FluxMessageChannel`
1 parent ddccb5a commit fb52b63

File tree

3 files changed

+53
-33
lines changed

3 files changed

+53
-33
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import org.springframework.messaging.Message;
2828
import org.springframework.util.Assert;
2929

30+
import reactor.core.Disposable;
3031
import reactor.core.publisher.ConnectableFlux;
32+
import reactor.core.publisher.EmitterProcessor;
3133
import reactor.core.publisher.Flux;
3234
import reactor.core.publisher.FluxSink;
3335

@@ -47,15 +49,15 @@ public class FluxMessageChannel extends AbstractMessageChannel
4749

4850
private final Map<Publisher<? extends Message<?>>, ConnectableFlux<?>> publishers = new ConcurrentHashMap<>();
4951

50-
private final Flux<Message<?>> flux;
52+
private final Map<ConnectableFlux<?>, Disposable> disposables = new ConcurrentHashMap<>();
53+
54+
private final EmitterProcessor<Message<?>> flux;
5155

5256
private FluxSink<Message<?>> sink;
5357

5458
public FluxMessageChannel() {
55-
this.flux =
56-
Flux.<Message<?>>create(emitter -> this.sink = emitter, FluxSink.OverflowStrategy.IGNORE)
57-
.publish()
58-
.autoConnect();
59+
this.flux = EmitterProcessor.create(1, false);
60+
this.sink = this.flux.sink();
5961
}
6062

6163
@Override
@@ -70,11 +72,12 @@ protected boolean doSend(Message<?> message, long timeout) {
7072
public void subscribe(Subscriber<? super Message<?>> subscriber) {
7173
this.subscribers.add(subscriber);
7274

73-
this.flux.doOnCancel(() -> this.subscribers.remove(subscriber))
75+
this.flux.doFinally((signal) -> this.subscribers.remove(subscriber))
7476
.retry()
7577
.subscribe(subscriber);
7678

77-
this.publishers.values().forEach(ConnectableFlux::connect);
79+
this.publishers.values()
80+
.forEach((connectableFlux) -> this.disposables.put(connectableFlux, connectableFlux.connect()));
7881
}
7982

8083
@Override
@@ -84,14 +87,22 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
8487
.handle((message, sink) -> sink.next(send(message)))
8588
.onErrorContinue((throwable, event) ->
8689
logger.warn("Error during processing event: " + event, throwable))
87-
.doOnComplete(() -> this.publishers.remove(publisher))
90+
.doFinally((signal) -> this.disposables.remove(this.publishers.remove(publisher)))
8891
.publish();
8992

9093
this.publishers.put(publisher, connectableFlux);
9194

9295
if (!this.subscribers.isEmpty()) {
93-
connectableFlux.connect();
96+
connectableFlux.connect((disposable) -> this.disposables.put(connectableFlux, disposable));
9497
}
9598
}
9699

100+
@Override
101+
public void destroy() {
102+
super.destroy();
103+
104+
this.subscribers.forEach(Subscriber::onComplete);
105+
this.disposables.values().forEach(Disposable::dispose);
106+
}
107+
97108
}

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@
2323
import java.util.Map;
2424
import java.util.concurrent.CountDownLatch;
2525
import java.util.concurrent.TimeUnit;
26+
import java.util.stream.Collectors;
2627
import java.util.stream.IntStream;
2728

28-
import org.junit.Test;
29-
import org.junit.runner.RunWith;
29+
import org.junit.jupiter.api.Test;
3030

3131
import org.springframework.beans.factory.annotation.Autowired;
3232
import org.springframework.context.annotation.Bean;
3333
import org.springframework.context.annotation.Configuration;
34+
import org.springframework.integration.annotation.BridgeFrom;
3435
import org.springframework.integration.annotation.ServiceActivator;
3536
import org.springframework.integration.channel.FluxMessageChannel;
3637
import org.springframework.integration.channel.MessageChannelReactiveUtils;
@@ -47,24 +48,25 @@
4748
import org.springframework.messaging.support.GenericMessage;
4849
import org.springframework.messaging.support.MessageBuilder;
4950
import org.springframework.test.annotation.DirtiesContext;
50-
import org.springframework.test.context.junit4.SpringRunner;
51+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
5152

53+
import reactor.core.Disposable;
5254
import reactor.core.publisher.Flux;
5355

5456
/**
5557
* @author Artem Bilan
5658
*
5759
* @since 5.0
5860
*/
59-
@RunWith(SpringRunner.class)
61+
@SpringJUnitConfig
6062
@DirtiesContext
6163
public class FluxMessageChannelTests {
6264

6365
@Autowired
6466
private MessageChannel fluxMessageChannel;
6567

6668
@Autowired
67-
private MessageChannel queueChannel;
69+
private QueueChannel queueChannel;
6870

6971
@Autowired
7072
private PollableChannel errorChannel;
@@ -73,7 +75,7 @@ public class FluxMessageChannelTests {
7375
private IntegrationFlowContext integrationFlowContext;
7476

7577
@Test
76-
public void testFluxMessageChannel() {
78+
void testFluxMessageChannel() {
7779
QueueChannel replyChannel = new QueueChannel();
7880

7981
for (int i = 0; i < 10; i++) {
@@ -90,28 +92,35 @@ public void testFluxMessageChannel() {
9092
Message<?> error = this.errorChannel.receive(0);
9193
assertThat(error).isNotNull();
9294
assertThat(((MessagingException) error.getPayload()).getFailedMessage().getPayload()).isEqualTo(5);
95+
96+
List<Message<?>> messages = this.queueChannel.clear();
97+
assertThat(messages).extracting((message) -> (Integer) message.getPayload())
98+
.containsAll(IntStream.range(0, 10).boxed().collect(Collectors.toList()));
9399
}
94100

95101
@Test
96-
public void testMessageChannelReactiveAdaptation() throws InterruptedException {
102+
void testMessageChannelReactiveAdaptation() throws InterruptedException {
97103
CountDownLatch done = new CountDownLatch(2);
98104
List<String> results = new ArrayList<>();
99105

100-
Flux.from(MessageChannelReactiveUtils.<String>toPublisher(this.queueChannel))
101-
.map(Message::getPayload)
102-
.map(String::toUpperCase)
103-
.doOnNext(results::add)
104-
.subscribe(v -> done.countDown());
106+
Disposable disposable =
107+
Flux.from(MessageChannelReactiveUtils.<String>toPublisher(this.queueChannel))
108+
.map(Message::getPayload)
109+
.map(String::toUpperCase)
110+
.doOnNext(results::add)
111+
.subscribe(v -> done.countDown());
105112

106113
this.queueChannel.send(new GenericMessage<>("foo"));
107114
this.queueChannel.send(new GenericMessage<>("bar"));
108115

109116
assertThat(done.await(10, TimeUnit.SECONDS)).isTrue();
110117
assertThat(results).containsExactly("FOO", "BAR");
118+
119+
disposable.dispose();
111120
}
112121

113122
@Test
114-
public void testFluxMessageChannelCleanUp() throws InterruptedException {
123+
void testFluxMessageChannelCleanUp() throws InterruptedException {
115124
FluxMessageChannel flux = MessageChannels.flux().get();
116125

117126
CountDownLatch finishLatch = new CountDownLatch(1);
@@ -158,6 +167,7 @@ public String handle(int payload) {
158167
}
159168

160169
@Bean
170+
@BridgeFrom("fluxMessageChannel")
161171
public MessageChannel queueChannel() {
162172
return new QueueChannel();
163173
}

spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@
3131
import java.util.concurrent.atomic.AtomicBoolean;
3232
import java.util.stream.Collectors;
3333

34-
import org.junit.Test;
35-
import org.junit.runner.RunWith;
34+
import org.junit.jupiter.api.Test;
3635
import org.reactivestreams.Publisher;
3736

3837
import org.springframework.beans.factory.annotation.Autowired;
@@ -51,7 +50,7 @@
5150
import org.springframework.messaging.MessageChannel;
5251
import org.springframework.messaging.support.GenericMessage;
5352
import org.springframework.test.annotation.DirtiesContext;
54-
import org.springframework.test.context.junit4.SpringRunner;
53+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
5554

5655
import reactor.core.publisher.Flux;
5756

@@ -62,7 +61,7 @@
6261
*
6362
* @since 5.0
6463
*/
65-
@RunWith(SpringRunner.class)
64+
@SpringJUnitConfig
6665
@DirtiesContext
6766
public class ReactiveStreamsTests {
6867

@@ -98,7 +97,7 @@ public class ReactiveStreamsTests {
9897
private Publisher<Message<String>> fixedSubscriberChannelFlow;
9998

10099
@Test
101-
public void testReactiveFlow() throws Exception {
100+
void testReactiveFlow() throws Exception {
102101
List<String> results = new ArrayList<>();
103102
CountDownLatch latch = new CountDownLatch(6);
104103
Flux.from(this.publisher)
@@ -115,7 +114,7 @@ public void testReactiveFlow() throws Exception {
115114
}
116115

117116
@Test
118-
public void testPollableReactiveFlow() throws Exception {
117+
void testPollableReactiveFlow() throws Exception {
119118
this.inputChannel.send(new GenericMessage<>("1,2,3,4,5"));
120119

121120
CountDownLatch latch = new CountDownLatch(6);
@@ -152,7 +151,7 @@ public void testPollableReactiveFlow() throws Exception {
152151
}
153152

154153
@Test
155-
public void testFromPublisher() {
154+
void testFromPublisher() {
156155
Flux<Message<?>> messageFlux = Flux.just("1,2,3,4")
157156
.map(v -> v.split(","))
158157
.flatMapIterable(Arrays::asList)
@@ -178,11 +177,11 @@ public void testFromPublisher() {
178177
}
179178

180179
@Test
181-
public void testFluxTransform() {
180+
void testFluxTransform() {
182181
QueueChannel resultChannel = new QueueChannel();
183182

184183
IntegrationFlow integrationFlow = f -> f
185-
.split()
184+
.split((splitter) -> splitter.delimiters(","))
186185
.<String, String>fluxTransform(flux -> flux
187186
.map(Message::getPayload)
188187
.map(String::toUpperCase))
@@ -212,7 +211,7 @@ public void testFluxTransform() {
212211
}
213212

214213
@Test
215-
public void singleChannelFlowTest() throws InterruptedException {
214+
void singleChannelFlowTest() throws InterruptedException {
216215
CountDownLatch latch = new CountDownLatch(1);
217216
Flux.from(this.singleChannelFlow)
218217
.map(m -> m.getPayload().toUpperCase())
@@ -224,7 +223,7 @@ public void singleChannelFlowTest() throws InterruptedException {
224223
}
225224

226225
@Test
227-
public void fixedSubscriberChannelFlowTest() throws InterruptedException {
226+
void fixedSubscriberChannelFlowTest() throws InterruptedException {
228227
CountDownLatch latch = new CountDownLatch(1);
229228
Flux.from(this.fixedSubscriberChannelFlow)
230229
.map(m -> m.getPayload().toUpperCase())

0 commit comments

Comments
 (0)