Skip to content

Commit f267182

Browse files
committed
* Use delaySubscription() for subscribing publishers in the FluxMessageChannel
to wait until this one subscribed. * Use an `EmitterProcessor` to catch subscriptions and pass them as a signal to delayed upstream publishers * Fix `FluxMessageChannelTests.testFluxMessageChannelCleanUp` to verify an actual property instead of removed. * Fix `RSocketOutboundGatewayIntegrationTests` for the proper subscription into a `FluxMessageChannel` before actual interaction with an RSocket gateway. This should help us also to avoid some race conditions in the future
1 parent bf6de6b commit f267182

File tree

2 files changed

+18
-30
lines changed

2 files changed

+18
-30
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package org.springframework.integration.channel;
1818

19-
import java.util.Map;
20-
import java.util.concurrent.ConcurrentHashMap;
2119
import java.util.concurrent.atomic.AtomicInteger;
2220

2321
import org.reactivestreams.Publisher;
@@ -26,11 +24,10 @@
2624
import org.springframework.messaging.Message;
2725
import org.springframework.util.Assert;
2826

29-
import reactor.core.Disposable;
30-
import reactor.core.publisher.ConnectableFlux;
3127
import reactor.core.publisher.EmitterProcessor;
3228
import reactor.core.publisher.Flux;
3329
import reactor.core.publisher.FluxSink;
30+
import reactor.core.publisher.Mono;
3431

3532
/**
3633
* The {@link AbstractMessageChannel} implementation for the
@@ -46,12 +43,10 @@ public class FluxMessageChannel extends AbstractMessageChannel
4643

4744
private final AtomicInteger subscribed = new AtomicInteger();
4845

49-
private final Map<Publisher<? extends Message<?>>, ConnectableFlux<?>> publishers = new ConcurrentHashMap<>();
50-
51-
private final Map<ConnectableFlux<?>, Disposable> disposables = new ConcurrentHashMap<>();
52-
5346
private final EmitterProcessor<Message<?>> flux;
5447

48+
private final EmitterProcessor<Integer> subscriptionDelay = EmitterProcessor.create(false);
49+
5550
private FluxSink<Message<?>> sink;
5651

5752
public FluxMessageChannel() {
@@ -72,34 +67,27 @@ public void subscribe(Subscriber<? super Message<?>> subscriber) {
7267
this.flux.doFinally((signal) -> this.subscribed.decrementAndGet())
7368
.retry()
7469
.subscribe(subscriber);
75-
this.subscribed.incrementAndGet();
76-
this.publishers.values()
77-
.forEach((connectableFlux) -> this.disposables.put(connectableFlux, connectableFlux.connect()));
70+
this.subscriptionDelay.onNext(this.subscribed.incrementAndGet());
7871
}
7972

8073
@Override
8174
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
82-
ConnectableFlux<?> connectableFlux =
83-
Flux.from(publisher)
84-
.handle((message, sink) -> sink.next(send(message)))
85-
.onErrorContinue((throwable, event) ->
86-
logger.warn("Error during processing event: " + event, throwable))
87-
.doFinally((signal) -> this.disposables.remove(this.publishers.remove(publisher)))
88-
.publish();
89-
90-
this.publishers.put(publisher, connectableFlux);
91-
92-
if (this.subscribed.get() > 0) {
93-
connectableFlux.connect((disposable) -> this.disposables.put(connectableFlux, disposable));
94-
}
75+
Flux.from(publisher)
76+
.handle((message, sink) -> sink.next(send(message)))
77+
.onErrorContinue((throwable, event) ->
78+
logger.warn("Error during processing event: " + event, throwable))
79+
.delaySubscription(
80+
Mono.fromSupplier(this.subscribed::get)
81+
.filter((subscribers) -> subscribers > 0)
82+
.switchIfEmpty(this.subscriptionDelay.next()))
83+
.subscribe();
9584
}
9685

9786
@Override
9887
public void destroy() {
99-
super.destroy();
100-
88+
this.subscriptionDelay.onComplete();
10189
this.flux.onComplete();
102-
this.disposables.values().forEach(Disposable::dispose);
90+
super.destroy();
10391
}
10492

10593
}

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.List;
23-
import java.util.Map;
2423
import java.util.concurrent.CountDownLatch;
2524
import java.util.concurrent.TimeUnit;
2625
import java.util.stream.Collectors;
@@ -51,6 +50,7 @@
5150
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
5251

5352
import reactor.core.Disposable;
53+
import reactor.core.publisher.EmitterProcessor;
5454
import reactor.core.publisher.Flux;
5555

5656
/**
@@ -139,9 +139,9 @@ void testFluxMessageChannelCleanUp() throws InterruptedException {
139139

140140
assertThat(finishLatch.await(10, TimeUnit.SECONDS)).isTrue();
141141

142-
assertThat(TestUtils.getPropertyValue(flux, "publishers", Map.class).isEmpty()).isTrue();
143-
144142
flowRegistration.destroy();
143+
144+
assertThat(TestUtils.getPropertyValue(flux, "flux", EmitterProcessor.class).isTerminated()).isTrue();
145145
}
146146

147147
@Configuration

0 commit comments

Comments
 (0)