Skip to content

Commit 5d0fa19

Browse files
committed
* Refactor FluxMessageChannel to use ReplayProcessor for subscribedSignal.
This one is used `delaySubscription` for the upstream publishers * Use a `AtomicBoolean` for subscription state since `doOnSubscribe()` is called before `EmitterProcessor` adds subscribers for its `downstreams` * Use `publishOn(Schedulers.boundedElastic())` for upstream publishers to avoid blocking over there when our `EmitterProcessor` doesn't have enough demand * Refactor reactive tests to have a subscription into the `FluxMessageChannel` earlier than emission happens for it
1 parent 3462140 commit 5d0fa19

File tree

5 files changed

+96
-78
lines changed

5 files changed

+96
-78
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package org.springframework.integration.channel;
1818

19-
import java.util.concurrent.atomic.AtomicInteger;
19+
import java.util.concurrent.atomic.AtomicBoolean;
2020

2121
import org.reactivestreams.Publisher;
2222
import org.reactivestreams.Subscriber;
@@ -27,66 +27,71 @@
2727
import reactor.core.publisher.EmitterProcessor;
2828
import reactor.core.publisher.Flux;
2929
import reactor.core.publisher.FluxSink;
30-
import reactor.core.publisher.Mono;
30+
import reactor.core.publisher.ReplayProcessor;
31+
import reactor.core.scheduler.Schedulers;
3132

3233
/**
3334
* The {@link AbstractMessageChannel} implementation for the
3435
* Reactive Streams {@link Publisher} based on the Project Reactor {@link Flux}.
3536
*
3637
* @author Artem Bilan
3738
* @author Gary Russell
39+
* @author Sergei Egorov
3840
*
3941
* @since 5.0
4042
*/
4143
public class FluxMessageChannel extends AbstractMessageChannel
4244
implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel {
4345

44-
private final AtomicInteger subscribed = new AtomicInteger();
46+
private final EmitterProcessor<Message<?>> processor;
4547

46-
private final EmitterProcessor<Message<?>> flux;
48+
private final FluxSink<Message<?>> sink;
4749

48-
private final EmitterProcessor<Integer> subscriptionDelay = EmitterProcessor.create(false);
50+
private final AtomicBoolean subscribed = new AtomicBoolean();
4951

50-
private FluxSink<Message<?>> sink;
52+
private final ReplayProcessor<Boolean> subscribedSignal = ReplayProcessor.create(1);
5153

5254
public FluxMessageChannel() {
53-
this.flux = EmitterProcessor.create(1, false);
54-
this.sink = this.flux.sink();
55+
this.processor = EmitterProcessor.create(1, false);
56+
this.sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
57+
this.subscribedSignal.doOnNext(this.subscribed::set).subscribe();
5558
}
5659

5760
@Override
5861
protected boolean doSend(Message<?> message, long timeout) {
59-
Assert.state(this.subscribed.get() > 0,
62+
Assert.state(this.subscribed.get(),
6063
() -> "The [" + this + "] doesn't have subscribers to accept messages");
6164
this.sink.next(message);
6265
return true;
6366
}
6467

6568
@Override
6669
public void subscribe(Subscriber<? super Message<?>> subscriber) {
67-
this.flux.doFinally((signal) -> this.subscribed.decrementAndGet())
68-
.retry()
70+
this.processor.doOnSubscribe((s) -> this.subscribedSignal.onNext(true))
71+
.doFinally((s) -> this.subscribedSignal.onNext(this.processor.hasDownstreams()))
6972
.subscribe(subscriber);
70-
this.subscriptionDelay.onNext(this.subscribed.incrementAndGet());
7173
}
7274

7375
@Override
7476
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
7577
Flux.from(publisher)
76-
.handle((message, sink) -> sink.next(send(message)))
77-
.onErrorContinue((throwable, event) ->
78-
logger.warn("Error during processing event: " + event, throwable))
79-
.delaySubscription(
80-
Mono.fromSupplier(this.subscribed::get)
81-
.filter((subscribers) -> subscribers > 0)
82-
.switchIfEmpty(this.subscriptionDelay.next()))
78+
.delaySubscription(this.subscribedSignal.filter(Boolean::booleanValue).next())
79+
.publishOn(Schedulers.boundedElastic())
80+
.doOnNext((message) -> {
81+
try {
82+
send(message);
83+
}
84+
catch (Exception e) {
85+
logger.warn("Error during processing event: " + message, e);
86+
}
87+
})
8388
.subscribe();
8489
}
8590

8691
@Override
8792
public void destroy() {
88-
this.subscriptionDelay.onComplete();
89-
this.flux.onComplete();
93+
this.subscribedSignal.onNext(false);
94+
this.processor.onComplete();
9095
super.destroy();
9196
}
9297

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ void testFluxMessageChannelCleanUp() throws InterruptedException {
141141

142142
flowRegistration.destroy();
143143

144-
assertThat(TestUtils.getPropertyValue(flux, "flux", EmitterProcessor.class).isTerminated()).isTrue();
144+
assertThat(TestUtils.getPropertyValue(flux, "processor", EmitterProcessor.class).isTerminated()).isTrue();
145145
}
146146

147147
@Configuration

spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,12 @@ void testPollableReactiveFlow() throws Exception {
152152

153153
@Test
154154
void testFromPublisher() {
155-
Flux<Message<?>> messageFlux = Flux.just("1,2,3,4")
156-
.map(v -> v.split(","))
157-
.flatMapIterable(Arrays::asList)
158-
.map(Integer::parseInt)
159-
.map(GenericMessage::new);
155+
Flux<Message<?>> messageFlux =
156+
Flux.just("1,2,3,4")
157+
.map(v -> v.split(","))
158+
.flatMapIterable(Arrays::asList)
159+
.map(Integer::parseInt)
160+
.map(GenericMessage::new);
160161

161162
QueueChannel resultChannel = new QueueChannel();
162163

spring-integration-core/src/test/java/org/springframework/integration/splitter/DefaultSplitterTests.java

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -161,63 +161,74 @@ void splitFlux() {
161161
void splitArrayPayloadReactive() {
162162
Message<?> message = new GenericMessage<>(new String[] { "x", "y", "z" });
163163
FluxMessageChannel replyChannel = new FluxMessageChannel();
164-
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
165-
splitter.setOutputChannel(replyChannel);
166-
167-
splitter.handleMessage(message);
168164

169165
Flux<String> testFlux =
170166
Flux.from(replyChannel)
171167
.map(Message::getPayload)
172168
.cast(String.class);
173169

174-
StepVerifier.create(testFlux)
175-
.expectNext("x", "y", "z")
176-
.expectNoEvent(Duration.ofMillis(100))
177-
.thenCancel()
178-
.verify(Duration.ofSeconds(1));
179-
}
170+
StepVerifier verifier =
171+
StepVerifier.create(testFlux)
172+
.expectNext("x", "y", "z")
173+
.expectNoEvent(Duration.ofMillis(100))
174+
.thenCancel()
175+
.verifyLater();
180176

181-
@Test
182-
void splitStreamReactive() {
183-
Message<?> message = new GenericMessage<>(Stream.of("x", "y", "z"));
184-
FluxMessageChannel replyChannel = new FluxMessageChannel();
185177
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
186178
splitter.setOutputChannel(replyChannel);
187179

188180
splitter.handleMessage(message);
189181

182+
verifier.verify(Duration.ofSeconds(1));
183+
}
184+
185+
@Test
186+
void splitStreamReactive() {
187+
Message<?> message = new GenericMessage<>(Stream.of("x", "y", "z"));
188+
FluxMessageChannel replyChannel = new FluxMessageChannel();
190189
Flux<String> testFlux =
191190
Flux.from(replyChannel)
192191
.map(Message::getPayload)
193192
.cast(String.class);
194193

195-
StepVerifier.create(testFlux)
196-
.expectNext("x", "y", "z")
197-
.expectNoEvent(Duration.ofMillis(100))
198-
.thenCancel()
199-
.verify(Duration.ofSeconds(1));
194+
StepVerifier verifier =
195+
StepVerifier.create(testFlux)
196+
.expectNext("x", "y", "z")
197+
.expectNoEvent(Duration.ofMillis(100))
198+
.thenCancel()
199+
.verifyLater();
200+
201+
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
202+
splitter.setOutputChannel(replyChannel);
203+
204+
splitter.handleMessage(message);
205+
206+
verifier.verify(Duration.ofSeconds(1));
200207
}
201208

202209
@Test
203210
void splitFluxReactive() {
204211
Message<?> message = new GenericMessage<>(Flux.just("x", "y", "z"));
205212
FluxMessageChannel replyChannel = new FluxMessageChannel();
213+
Flux<String> testFlux =
214+
Flux.from(replyChannel)
215+
.map(Message::getPayload)
216+
.cast(String.class);
217+
218+
StepVerifier verifier =
219+
StepVerifier.create(testFlux)
220+
.expectNext("x", "y", "z")
221+
.expectNoEvent(Duration.ofMillis(100))
222+
.thenCancel()
223+
.verifyLater();
224+
206225
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
207226
splitter.setOutputChannel(replyChannel);
208227

209228
splitter.handleMessage(message);
210229

211-
Flux<String> testFlux =
212-
Flux.from(replyChannel)
213-
.map(Message::getPayload)
214-
.cast(String.class);
215230

216-
StepVerifier.create(testFlux)
217-
.expectNext("x", "y", "z")
218-
.expectNoEvent(Duration.ofMillis(100))
219-
.thenCancel()
220-
.verify(Duration.ofSeconds(1));
231+
verifier.verify(Duration.ofSeconds(1));
221232
}
222233

223234
}

spring-integration-file/src/test/java/org/springframework/integration/file/splitter/FileSplitterTests.java

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -265,34 +265,35 @@ void testMarkersJson() throws Exception {
265265
@Test
266266
void testFileSplitterReactive() {
267267
FluxMessageChannel outputChannel = new FluxMessageChannel();
268+
StepVerifier verifier =
269+
StepVerifier.create(outputChannel)
270+
.assertNext(m -> {
271+
assertThat(m.getHeaders())
272+
.containsKey(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE)
273+
.containsEntry(FileHeaders.MARKER, "START");
274+
assertThat(m.getPayload()).isInstanceOf(FileMarker.class);
275+
FileMarker fileMarker = (FileMarker) m.getPayload();
276+
assertThat(fileMarker.getMark()).isEqualTo(FileMarker.Mark.START);
277+
assertThat(fileMarker.getFilePath()).isEqualTo(file.getAbsolutePath());
278+
})
279+
.expectNextCount(2)
280+
.assertNext(m -> {
281+
assertThat(m.getHeaders()).containsEntry(FileHeaders.MARKER, "END");
282+
assertThat(m.getPayload()).isInstanceOf(FileMarker.class);
283+
FileMarker fileMarker = (FileMarker) m.getPayload();
284+
assertThat(fileMarker.getMark()).isEqualTo(FileMarker.Mark.END);
285+
assertThat(fileMarker.getFilePath()).isEqualTo(file.getAbsolutePath());
286+
assertThat(fileMarker.getLineCount()).isEqualTo(2);
287+
})
288+
.expectNoEvent(Duration.ofMillis(100))
289+
.thenCancel()
290+
.verifyLater();
268291
FileSplitter splitter = new FileSplitter(true, true);
269292
splitter.setApplySequence(true);
270293
splitter.setOutputChannel(outputChannel);
271294
splitter.handleMessage(new GenericMessage<>(file));
272295

273-
274-
StepVerifier.create(outputChannel)
275-
.assertNext(m -> {
276-
assertThat(m.getHeaders())
277-
.containsKey(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE)
278-
.containsEntry(FileHeaders.MARKER, "START");
279-
assertThat(m.getPayload()).isInstanceOf(FileSplitter.FileMarker.class);
280-
FileMarker fileMarker = (FileSplitter.FileMarker) m.getPayload();
281-
assertThat(fileMarker.getMark()).isEqualTo(FileMarker.Mark.START);
282-
assertThat(fileMarker.getFilePath()).isEqualTo(file.getAbsolutePath());
283-
})
284-
.expectNextCount(2)
285-
.assertNext(m -> {
286-
assertThat(m.getHeaders()).containsEntry(FileHeaders.MARKER, "END");
287-
assertThat(m.getPayload()).isInstanceOf(FileSplitter.FileMarker.class);
288-
FileMarker fileMarker = (FileSplitter.FileMarker) m.getPayload();
289-
assertThat(fileMarker.getMark()).isEqualTo(FileMarker.Mark.END);
290-
assertThat(fileMarker.getFilePath()).isEqualTo(file.getAbsolutePath());
291-
assertThat(fileMarker.getLineCount()).isEqualTo(2);
292-
})
293-
.expectNoEvent(Duration.ofMillis(100))
294-
.thenCancel()
295-
.verify(Duration.ofSeconds(1));
296+
verifier.verify(Duration.ofSeconds(1));
296297
}
297298

298299
@Test

0 commit comments

Comments
 (0)