Skip to content

Commit 1da0006

Browse files
committed
* Refactor FluxMessageChannel to use ReplayProcessor for subscribedSignal.
This one is used `delaySubscription` for the upstream publishers * Use a `AtomicBoolean` for subscription state since `doOnSubscribe()` is called before `EmitterProcessor` adds subscribers for its `downstreams` * Use `publishOn(Schedulers.boundedElastic())` for upstream publishers to avoid blocking over there when our `EmitterProcessor` doesn't have enough demand * Refactor reactive tests to have a subscription into the `FluxMessageChannel` earlier than emission happens for it
1 parent 66480b6 commit 1da0006

File tree

2 files changed

+27
-22
lines changed

2 files changed

+27
-22
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package org.springframework.integration.channel;
1818

19-
import java.util.concurrent.atomic.AtomicInteger;
19+
import java.util.concurrent.atomic.AtomicBoolean;
2020

2121
import org.reactivestreams.Publisher;
2222
import org.reactivestreams.Subscriber;
@@ -27,66 +27,71 @@
2727
import reactor.core.publisher.EmitterProcessor;
2828
import reactor.core.publisher.Flux;
2929
import reactor.core.publisher.FluxSink;
30-
import reactor.core.publisher.Mono;
30+
import reactor.core.publisher.ReplayProcessor;
31+
import reactor.core.scheduler.Schedulers;
3132

3233
/**
3334
* The {@link AbstractMessageChannel} implementation for the
3435
* Reactive Streams {@link Publisher} based on the Project Reactor {@link Flux}.
3536
*
3637
* @author Artem Bilan
3738
* @author Gary Russell
39+
* @author Sergei Egorov
3840
*
3941
* @since 5.0
4042
*/
4143
public class FluxMessageChannel extends AbstractMessageChannel
4244
implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel {
4345

44-
private final AtomicInteger subscribed = new AtomicInteger();
46+
private final EmitterProcessor<Message<?>> processor;
4547

46-
private final EmitterProcessor<Message<?>> flux;
48+
private final FluxSink<Message<?>> sink;
4749

48-
private final EmitterProcessor<Integer> subscriptionDelay = EmitterProcessor.create(false);
50+
private final AtomicBoolean subscribed = new AtomicBoolean();
4951

50-
private FluxSink<Message<?>> sink;
52+
private final ReplayProcessor<Boolean> subscribedSignal = ReplayProcessor.create(1);
5153

5254
public FluxMessageChannel() {
53-
this.flux = EmitterProcessor.create(1, false);
54-
this.sink = this.flux.sink();
55+
this.processor = EmitterProcessor.create(1, false);
56+
this.sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
57+
this.subscribedSignal.doOnNext(this.subscribed::set).subscribe();
5558
}
5659

5760
@Override
5861
protected boolean doSend(Message<?> message, long timeout) {
59-
Assert.state(this.subscribed.get() > 0,
62+
Assert.state(this.subscribed.get(),
6063
() -> "The [" + this + "] doesn't have subscribers to accept messages");
6164
this.sink.next(message);
6265
return true;
6366
}
6467

6568
@Override
6669
public void subscribe(Subscriber<? super Message<?>> subscriber) {
67-
this.flux.doFinally((signal) -> this.subscribed.decrementAndGet())
68-
.retry()
70+
this.processor.doOnSubscribe((s) -> this.subscribedSignal.onNext(true))
71+
.doFinally((s) -> this.subscribedSignal.onNext(this.processor.hasDownstreams()))
6972
.subscribe(subscriber);
70-
this.subscriptionDelay.onNext(this.subscribed.incrementAndGet());
7173
}
7274

7375
@Override
7476
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
7577
Flux.from(publisher)
76-
.handle((message, sink) -> sink.next(send(message)))
77-
.onErrorContinue((throwable, event) ->
78-
logger.warn("Error during processing event: " + event, throwable))
79-
.delaySubscription(
80-
Mono.fromSupplier(this.subscribed::get)
81-
.filter((subscribers) -> subscribers > 0)
82-
.switchIfEmpty(this.subscriptionDelay.next()))
78+
.delaySubscription(this.subscribedSignal.filter(Boolean::booleanValue).next())
79+
.publishOn(Schedulers.boundedElastic())
80+
.doOnNext((message) -> {
81+
try {
82+
send(message);
83+
}
84+
catch (Exception e) {
85+
logger.warn("Error during processing event: " + message, e);
86+
}
87+
})
8388
.subscribe();
8489
}
8590

8691
@Override
8792
public void destroy() {
88-
this.subscriptionDelay.onComplete();
89-
this.flux.onComplete();
93+
this.subscribedSignal.onNext(false);
94+
this.processor.onComplete();
9095
super.destroy();
9196
}
9297

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ void testFluxMessageChannelCleanUp() throws InterruptedException {
141141

142142
flowRegistration.destroy();
143143

144-
assertThat(TestUtils.getPropertyValue(flux, "flux", EmitterProcessor.class).isTerminated()).isTrue();
144+
assertThat(TestUtils.getPropertyValue(flux, "processor", EmitterProcessor.class).isTerminated()).isTrue();
145145
}
146146

147147
@Configuration

0 commit comments

Comments
 (0)