Skip to content

Use EmitterProcessor in the FluxMessageChannel #3104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -16,82 +16,79 @@

package org.springframework.integration.channel;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import org.springframework.messaging.Message;
import org.springframework.util.Assert;

import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Schedulers;

/**
* The {@link AbstractMessageChannel} implementation for the
* Reactive Streams {@link Publisher} based on the Project Reactor {@link Flux}.
*
* @author Artem Bilan
* @author Gary Russell
* @author Sergei Egorov
*
* @since 5.0
*/
public class FluxMessageChannel extends AbstractMessageChannel
implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel {

private final List<Subscriber<? super Message<?>>> subscribers = new ArrayList<>();

private final Map<Publisher<? extends Message<?>>, ConnectableFlux<?>> publishers = new ConcurrentHashMap<>();
private final EmitterProcessor<Message<?>> processor;

private final Flux<Message<?>> flux;
private final FluxSink<Message<?>> sink;

private FluxSink<Message<?>> sink;
private final ReplayProcessor<Boolean> subscribedSignal = ReplayProcessor.create(1);

public FluxMessageChannel() {
this.flux =
Flux.<Message<?>>create(emitter -> this.sink = emitter, FluxSink.OverflowStrategy.IGNORE)
.publish()
.autoConnect();
this.processor = EmitterProcessor.create(1, false);
this.sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
}

@Override
protected boolean doSend(Message<?> message, long timeout) {
Assert.state(this.subscribers.size() > 0,
Assert.state(this.processor.hasDownstreams(),
() -> "The [" + this + "] doesn't have subscribers to accept messages");
this.sink.next(message);
return true;
}

@Override
public void subscribe(Subscriber<? super Message<?>> subscriber) {
this.subscribers.add(subscriber);

this.flux.doOnCancel(() -> this.subscribers.remove(subscriber))
.retry()
this.processor
.doFinally((s) -> this.subscribedSignal.onNext(this.processor.hasDownstreams()))
.subscribe(subscriber);

this.publishers.values().forEach(ConnectableFlux::connect);
this.subscribedSignal.onNext(this.processor.hasDownstreams());
}

@Override
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
ConnectableFlux<?> connectableFlux =
Flux.from(publisher)
.handle((message, sink) -> sink.next(send(message)))
.onErrorContinue((throwable, event) ->
logger.warn("Error during processing event: " + event, throwable))
.doOnComplete(() -> this.publishers.remove(publisher))
.publish();

this.publishers.put(publisher, connectableFlux);
Flux.from(publisher)
.delaySubscription(this.subscribedSignal.filter(Boolean::booleanValue).next())
.publishOn(Schedulers.boundedElastic())
.doOnNext((message) -> {
try {
send(message);
}
catch (Exception e) {
logger.warn("Error during processing event: " + message, e);
}
})
.subscribe();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the proper way would be to have a subscribeTo() to return Mono<Void> instead to let that caller to subscribe or compose further its reactive flow.
This way we would defer an emission from this source publisher as late as possible.

But I'd like to revise that stuff in a separate PR.

Thanks

}

if (!this.subscribers.isEmpty()) {
connectableFlux.connect();
}
@Override
public void destroy() {
this.subscribedSignal.onNext(false);
this.processor.onComplete();
super.destroy();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.BridgeFrom;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.MessageChannelReactiveUtils;
Expand All @@ -47,24 +47,26 @@
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;

/**
* @author Artem Bilan
*
* @since 5.0
*/
@RunWith(SpringRunner.class)
@SpringJUnitConfig
@DirtiesContext
public class FluxMessageChannelTests {

@Autowired
private MessageChannel fluxMessageChannel;

@Autowired
private MessageChannel queueChannel;
private QueueChannel queueChannel;

@Autowired
private PollableChannel errorChannel;
Expand All @@ -73,7 +75,7 @@ public class FluxMessageChannelTests {
private IntegrationFlowContext integrationFlowContext;

@Test
public void testFluxMessageChannel() {
void testFluxMessageChannel() {
QueueChannel replyChannel = new QueueChannel();

for (int i = 0; i < 10; i++) {
Expand All @@ -90,28 +92,35 @@ public void testFluxMessageChannel() {
Message<?> error = this.errorChannel.receive(0);
assertThat(error).isNotNull();
assertThat(((MessagingException) error.getPayload()).getFailedMessage().getPayload()).isEqualTo(5);

List<Message<?>> messages = this.queueChannel.clear();
assertThat(messages).extracting((message) -> (Integer) message.getPayload())
.containsAll(IntStream.range(0, 10).boxed().collect(Collectors.toList()));
}

@Test
public void testMessageChannelReactiveAdaptation() throws InterruptedException {
void testMessageChannelReactiveAdaptation() throws InterruptedException {
CountDownLatch done = new CountDownLatch(2);
List<String> results = new ArrayList<>();

Flux.from(MessageChannelReactiveUtils.<String>toPublisher(this.queueChannel))
.map(Message::getPayload)
.map(String::toUpperCase)
.doOnNext(results::add)
.subscribe(v -> done.countDown());
Disposable disposable =
Flux.from(MessageChannelReactiveUtils.<String>toPublisher(this.queueChannel))
.map(Message::getPayload)
.map(String::toUpperCase)
.doOnNext(results::add)
.subscribe(v -> done.countDown());

this.queueChannel.send(new GenericMessage<>("foo"));
this.queueChannel.send(new GenericMessage<>("bar"));

assertThat(done.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(results).containsExactly("FOO", "BAR");

disposable.dispose();
}

@Test
public void testFluxMessageChannelCleanUp() throws InterruptedException {
void testFluxMessageChannelCleanUp() throws InterruptedException {
FluxMessageChannel flux = MessageChannels.flux().get();

CountDownLatch finishLatch = new CountDownLatch(1);
Expand All @@ -130,9 +139,9 @@ public void testFluxMessageChannelCleanUp() throws InterruptedException {

assertThat(finishLatch.await(10, TimeUnit.SECONDS)).isTrue();

assertThat(TestUtils.getPropertyValue(flux, "publishers", Map.class).isEmpty()).isTrue();

flowRegistration.destroy();

assertThat(TestUtils.getPropertyValue(flux, "processor", EmitterProcessor.class).isTerminated()).isTrue();
}

@Configuration
Expand All @@ -158,6 +167,7 @@ public String handle(int payload) {
}

@Bean
@BridgeFrom("fluxMessageChannel")
public MessageChannel queueChannel() {
return new QueueChannel();
}
Expand Down