Skip to content

Commit ca23176

Browse files
garyrussellartembilan
authored andcommitted
Rename ReactiveChannel
- as discussed last week TODO: - should we take the channel outside of the `AbstractMessageChannel` hierarchy? - avoid blocking interceptors - we would lose channel metrics though - rename `ReactiveConsumer` ? Polishing some missed renaming
1 parent cd4964e commit ca23176

File tree

11 files changed

+69
-63
lines changed

11 files changed

+69
-63
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/ReactiveChannel.java renamed to spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,34 +32,34 @@
3232
import reactor.core.publisher.FluxSink;
3333

3434
/**
35+
* The {@link AbstractMessageChannel} implementation for the
36+
* Reactive Streams {@link Publisher} based on the Project Reactor {@link FluxProcessor}.
37+
*
3538
* @author Artem Bilan
3639
* @author Gary Russell
3740
*
3841
* @since 5.0
3942
*/
40-
public class ReactiveChannel extends AbstractMessageChannel
41-
implements Publisher<Message<?>>, ReactiveSubscribableChannel {
43+
public class FluxMessageChannel extends AbstractMessageChannel
44+
implements Publisher<Message<?>>, FluxSubscribableChannel {
4245

4346
private final List<Subscriber<? super Message<?>>> subscribers = new ArrayList<>();
4447

4548
private final List<Publisher<Message<?>>> publishers = new CopyOnWriteArrayList<>();
4649

4750
private final FluxProcessor<Message<?>, Message<?>> processor;
4851

49-
private final Flux<Message<?>> flux;
50-
5152
private final FluxSink<Message<?>> sink;
5253

5354
private volatile boolean upstreamSubscribed;
5455

55-
public ReactiveChannel() {
56+
public FluxMessageChannel() {
5657
this(DirectProcessor.create());
5758
}
5859

59-
public ReactiveChannel(FluxProcessor<Message<?>, Message<?>> processor) {
60+
public FluxMessageChannel(FluxProcessor<Message<?>, Message<?>> processor) {
6061
Assert.notNull(processor, "'processor' must not be null");
6162
this.processor = processor;
62-
this.flux = Flux.from(processor);
6363
this.sink = processor.sink();
6464
}
6565

@@ -73,7 +73,7 @@ protected boolean doSend(Message<?> message, long timeout) {
7373
public void subscribe(Subscriber<? super Message<?>> subscriber) {
7474
this.subscribers.add(subscriber);
7575

76-
this.flux.doOnCancel(() -> ReactiveChannel.this.subscribers.remove(subscriber))
76+
this.processor.doOnCancel(() -> FluxMessageChannel.this.subscribers.remove(subscriber))
7777
.subscribe(subscriber);
7878

7979
if (!this.upstreamSubscribed) {
@@ -82,7 +82,7 @@ public void subscribe(Subscriber<? super Message<?>> subscriber) {
8282
}
8383

8484
@Override
85-
public void subscribeTo(Publisher<Message<?>> publisher) {
85+
public void subscribeTo(Flux<Message<?>> publisher) {
8686
this.publishers.add(publisher);
8787
if (!this.subscribers.isEmpty()) {
8888
doSubscribeTo(publisher);
@@ -91,11 +91,11 @@ public void subscribeTo(Publisher<Message<?>> publisher) {
9191

9292
private void doSubscribeTo(Publisher<Message<?>> publisher) {
9393
Flux.from(publisher)
94-
.doOnSubscribe(s -> ReactiveChannel.this.upstreamSubscribed = true)
94+
.doOnSubscribe(s -> FluxMessageChannel.this.upstreamSubscribed = true)
9595
.doOnComplete(() -> {
96-
ReactiveChannel.this.publishers.remove(publisher);
97-
if (ReactiveChannel.this.publishers.isEmpty()) {
98-
ReactiveChannel.this.upstreamSubscribed = false;
96+
FluxMessageChannel.this.publishers.remove(publisher);
97+
if (FluxMessageChannel.this.publishers.isEmpty()) {
98+
FluxMessageChannel.this.upstreamSubscribed = false;
9999
}
100100
})
101101
.subscribe(this.processor);
Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,18 @@
1616

1717
package org.springframework.integration.channel;
1818

19-
import org.reactivestreams.Publisher;
20-
2119
import org.springframework.messaging.Message;
2220

21+
import reactor.core.publisher.Flux;
22+
2323
/**
2424
* @author Artem Bilan
25+
* @author Gary Russell
2526
*
2627
* @since 5.0
2728
*/
28-
public interface ReactiveSubscribableChannel {
29+
public interface FluxSubscribableChannel {
2930

30-
void subscribeTo(Publisher<Message<?>> publisher);
31+
void subscribeTo(Flux<Message<?>> publisher);
3132

3233
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/Channels.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121

2222
import org.springframework.integration.dsl.channel.DirectChannelSpec;
2323
import org.springframework.integration.dsl.channel.ExecutorChannelSpec;
24+
import org.springframework.integration.dsl.channel.FluxMessageChannelSpec;
2425
import org.springframework.integration.dsl.channel.MessageChannels;
2526
import org.springframework.integration.dsl.channel.PriorityChannelSpec;
2627
import org.springframework.integration.dsl.channel.PublishSubscribeChannelSpec;
2728
import org.springframework.integration.dsl.channel.QueueChannelSpec;
28-
import org.springframework.integration.dsl.channel.ReactiveChannelSpec;
2929
import org.springframework.integration.dsl.channel.RendezvousChannelSpec;
3030
import org.springframework.integration.store.ChannelMessageStore;
3131
import org.springframework.integration.store.PriorityCapableChannelMessageStore;
@@ -132,20 +132,20 @@ public ExecutorChannelSpec executor(String id, Executor executor) {
132132
}
133133

134134

135-
public ReactiveChannelSpec reactive() {
136-
return MessageChannels.reactive();
135+
public FluxMessageChannelSpec flux() {
136+
return MessageChannels.flux();
137137
}
138138

139-
public ReactiveChannelSpec reactive(String id) {
140-
return MessageChannels.reactive(id);
139+
public FluxMessageChannelSpec flux(String id) {
140+
return MessageChannels.flux(id);
141141
}
142142

143-
public ReactiveChannelSpec reactive(FluxProcessor<Message<?>, Message<?>> processor) {
144-
return MessageChannels.reactive(processor);
143+
public FluxMessageChannelSpec flux(FluxProcessor<Message<?>, Message<?>> processor) {
144+
return MessageChannels.flux(processor);
145145
}
146146

147-
public ReactiveChannelSpec reactive(String id, FluxProcessor<Message<?>, Message<?>> processor) {
148-
return MessageChannels.reactive(id, processor);
147+
public FluxMessageChannelSpec flux(String id, FluxProcessor<Message<?>, Message<?>> processor) {
148+
return MessageChannels.flux(id, processor);
149149
}
150150

151151
Channels() {

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@
3838
import org.springframework.integration.channel.ChannelInterceptorAware;
3939
import org.springframework.integration.channel.DirectChannel;
4040
import org.springframework.integration.channel.FixedSubscriberChannel;
41+
import org.springframework.integration.channel.FluxMessageChannel;
4142
import org.springframework.integration.channel.MessageChannelReactiveUtils;
42-
import org.springframework.integration.channel.ReactiveChannel;
4343
import org.springframework.integration.channel.interceptor.WireTap;
4444
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
4545
import org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean;
@@ -2550,7 +2550,7 @@ public <T> Publisher<Message<T>> toReactivePublisher() {
25502550
publisher = MessageChannelReactiveUtils.toPublisher(channelForPublisher);
25512551
}
25522552
else {
2553-
MessageChannel reactiveChannel = new ReactiveChannel();
2553+
MessageChannel reactiveChannel = new FluxMessageChannel();
25542554
publisher = (Publisher<Message<T>>) reactiveChannel;
25552555
channel(reactiveChannel);
25562556
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlows.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.reactivestreams.Publisher;
2222

2323
import org.springframework.integration.channel.DirectChannel;
24-
import org.springframework.integration.channel.ReactiveChannel;
24+
import org.springframework.integration.channel.FluxMessageChannel;
2525
import org.springframework.integration.core.MessageSource;
2626
import org.springframework.integration.dsl.channel.MessageChannelSpec;
2727
import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype;
@@ -35,10 +35,13 @@
3535
import org.springframework.messaging.MessageChannel;
3636
import org.springframework.util.Assert;
3737

38+
import reactor.core.publisher.Flux;
39+
3840
/**
3941
* The central factory for fluent {@link IntegrationFlowBuilder} API.
4042
*
4143
* @author Artem Bilan
44+
* @author Gary Russell
4245
*
4346
* @since 5.0
4447
*
@@ -299,15 +302,15 @@ protected void onInit() {
299302
}
300303

301304
/**
302-
* Populate a {@link ReactiveChannel} to the {@link IntegrationFlowBuilder} chain
305+
* Populate a {@link FluxMessageChannel} to the {@link IntegrationFlowBuilder} chain
303306
* and subscribe it to the provided {@link Publisher}.
304307
* @param publisher the {@link Publisher} to subscribe to.
305308
* @return new {@link IntegrationFlowBuilder}.
306309
*/
307-
public static IntegrationFlowBuilder from(Publisher<Message<?>> publisher) {
308-
ReactiveChannel reactiveChannel = new ReactiveChannel();
310+
public static IntegrationFlowBuilder from(Flux<Message<?>> publisher) {
311+
FluxMessageChannel reactiveChannel = new FluxMessageChannel();
309312
reactiveChannel.subscribeTo(publisher);
310-
return from((MessageChannel) reactiveChannel);
313+
return from(reactiveChannel);
311314
}
312315

313316
private static IntegrationFlowBuilder from(MessagingGatewaySupport inboundGateway,
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package org.springframework.integration.dsl.channel;
1818

19-
import org.springframework.integration.channel.ReactiveChannel;
19+
import org.springframework.integration.channel.FluxMessageChannel;
2020
import org.springframework.messaging.Message;
2121

2222
import reactor.core.publisher.FluxProcessor;
@@ -27,14 +27,14 @@
2727
*
2828
* @since 5.0
2929
*/
30-
public class ReactiveChannelSpec extends MessageChannelSpec<ReactiveChannelSpec, ReactiveChannel> {
30+
public class FluxMessageChannelSpec extends MessageChannelSpec<FluxMessageChannelSpec, FluxMessageChannel> {
3131

32-
ReactiveChannelSpec() {
33-
this.channel = new ReactiveChannel();
32+
FluxMessageChannelSpec() {
33+
this.channel = new FluxMessageChannel();
3434
}
3535

36-
ReactiveChannelSpec(FluxProcessor<Message<?>, Message<?>> processor) {
37-
this.channel = new ReactiveChannel(processor);
36+
FluxMessageChannelSpec(FluxProcessor<Message<?>, Message<?>> processor) {
37+
this.channel = new FluxMessageChannel(processor);
3838
}
3939

4040
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/channel/MessageChannels.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,20 +126,22 @@ public static <S extends PublishSubscribeChannelSpec<S>> PublishSubscribeChannel
126126
return MessageChannels.<S>publishSubscribe(executor).id(id);
127127
}
128128

129-
public static ReactiveChannelSpec reactive() {
130-
return new ReactiveChannelSpec();
129+
public static FluxMessageChannelSpec flux() {
130+
return new FluxMessageChannelSpec();
131131
}
132132

133-
public static ReactiveChannelSpec reactive(String id) {
134-
return reactive().id(id);
133+
public static FluxMessageChannelSpec flux(String id) {
134+
return flux()
135+
.id(id);
135136
}
136137

137-
public static ReactiveChannelSpec reactive(FluxProcessor<Message<?>, Message<?>> processor) {
138-
return new ReactiveChannelSpec(processor);
138+
public static FluxMessageChannelSpec flux(String id, FluxProcessor<Message<?>, Message<?>> processor) {
139+
return flux(processor)
140+
.id(id);
139141
}
140142

141-
public static ReactiveChannelSpec reactive(String id, FluxProcessor<Message<?>, Message<?>> processor) {
142-
return reactive(processor).id(id);
143+
public static FluxMessageChannelSpec flux(FluxProcessor<Message<?>, Message<?>> processor) {
144+
return new FluxMessageChannelSpec(processor);
143145
}
144146

145147
private MessageChannels() {

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.reactivestreams.Publisher;
2525

2626
import org.springframework.integration.IntegrationMessageHeaderAccessor;
27-
import org.springframework.integration.channel.ReactiveSubscribableChannel;
27+
import org.springframework.integration.channel.FluxSubscribableChannel;
2828
import org.springframework.integration.core.MessageProducer;
2929
import org.springframework.integration.core.MessagingTemplate;
3030
import org.springframework.integration.routingslip.RoutingSlipRouteStrategy;
@@ -190,7 +190,7 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) {
190190
}
191191

192192
if (this.async && (reply instanceof ListenableFuture<?> || reply instanceof Publisher<?>)) {
193-
if (reply instanceof ListenableFuture<?> || !(getOutputChannel() instanceof ReactiveSubscribableChannel)) {
193+
if (reply instanceof ListenableFuture<?> || !(getOutputChannel() instanceof FluxSubscribableChannel)) {
194194
ListenableFuture<?> future;
195195
if (reply instanceof ListenableFuture<?>) {
196196
future = (ListenableFuture<?>) reply;
@@ -235,7 +235,7 @@ public void onFailure(Throwable ex) {
235235
});
236236
}
237237
else {
238-
((ReactiveSubscribableChannel) getOutputChannel())
238+
((FluxSubscribableChannel) getOutputChannel())
239239
.subscribeTo(Flux.from((Publisher<?>) reply)
240240
.map(result -> createOutputMessage(result, requestHeaders)));
241241
}
Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@
3636
import org.springframework.context.annotation.Bean;
3737
import org.springframework.context.annotation.Configuration;
3838
import org.springframework.integration.annotation.ServiceActivator;
39+
import org.springframework.integration.channel.FluxMessageChannel;
3940
import org.springframework.integration.channel.MessageChannelReactiveUtils;
4041
import org.springframework.integration.channel.QueueChannel;
41-
import org.springframework.integration.channel.ReactiveChannel;
4242
import org.springframework.integration.config.EnableIntegration;
4343
import org.springframework.messaging.Message;
4444
import org.springframework.messaging.MessageChannel;
@@ -57,10 +57,10 @@
5757
*/
5858
@RunWith(SpringRunner.class)
5959
@DirtiesContext
60-
public class ReactiveChannelTests {
60+
public class FluxMessageChannelTests {
6161

6262
@Autowired
63-
private MessageChannel reactiveChannel;
63+
private MessageChannel fluxMessageChannel;
6464

6565
@Autowired
6666
private MessageChannel queueChannel;
@@ -69,11 +69,11 @@ public class ReactiveChannelTests {
6969
private PollableChannel errorChannel;
7070

7171
@Test
72-
public void testReactiveMessageChannel() throws InterruptedException {
72+
public void testFluxMessageChannel() throws InterruptedException {
7373
QueueChannel replyChannel = new QueueChannel();
7474

7575
for (int i = 0; i < 10; i++) {
76-
this.reactiveChannel.send(MessageBuilder.withPayload(i).setReplyChannel(replyChannel).build());
76+
this.fluxMessageChannel.send(MessageBuilder.withPayload(i).setReplyChannel(replyChannel).build());
7777
}
7878

7979
for (int i = 0; i < 9; i++) {
@@ -116,11 +116,11 @@ public QueueChannel errorChannel() {
116116
}
117117

118118
@Bean
119-
public MessageChannel reactiveChannel() {
120-
return new ReactiveChannel();
119+
public MessageChannel fluxMessageChannel() {
120+
return new FluxMessageChannel();
121121
}
122122

123-
@ServiceActivator(inputChannel = "reactiveChannel")
123+
@ServiceActivator(inputChannel = "fluxMessageChannel")
124124
public String handle(int payload) {
125125
if (payload == 5) {
126126
throw new IllegalStateException("intentional");

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveConsumerTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
4848
import org.springframework.integration.channel.DirectChannel;
4949
import org.springframework.integration.channel.QueueChannel;
50-
import org.springframework.integration.channel.ReactiveChannel;
50+
import org.springframework.integration.channel.FluxMessageChannel;
5151
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
5252
import org.springframework.integration.endpoint.ReactiveConsumer;
5353
import org.springframework.integration.handler.MethodInvokingMessageHandler;
@@ -67,7 +67,7 @@ public class ReactiveConsumerTests {
6767

6868
@Test
6969
public void testReactiveConsumerReactiveChannel() throws InterruptedException {
70-
ReactiveChannel testChannel = new ReactiveChannel(EmitterProcessor.create(false));
70+
FluxMessageChannel testChannel = new FluxMessageChannel(EmitterProcessor.create(false));
7171

7272
List<Message<?>> result = new LinkedList<>();
7373
CountDownLatch stopLatch = new CountDownLatch(2);
@@ -224,7 +224,7 @@ public void testReactiveConsumerPollableChannel() throws InterruptedException {
224224

225225
@Test
226226
public void testReactiveConsumerViaConsumerEndpointFactoryBean() throws Exception {
227-
ReactiveChannel testChannel = new ReactiveChannel();
227+
FluxMessageChannel testChannel = new FluxMessageChannel();
228228

229229
List<Message<?>> result = new LinkedList<>();
230230
CountDownLatch stopLatch = new CountDownLatch(3);

0 commit comments

Comments
 (0)