Skip to content

Commit 5852583

Browse files
artembilangaryrussell
authored andcommitted
Add IntegrationFlowDefinition.nullChannel() (#2555)
* Add `IntegrationFlowDefinition.nullChannel()` When a `NullChannel` is used in the middle of the flow, it may be not so obvious why our flow is stopped after accidentally added the next endpoint * For convenient add a terminal `nullChannel()` operator into the `IntegrationFlowDefinition` * * Add WARN about `NullChannel` subscription from the endpoints
1 parent 6c379d7 commit 5852583

File tree

4 files changed

+50
-3
lines changed

4 files changed

+50
-3
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2961,6 +2961,17 @@ public <T> Publisher<Message<T>> toReactivePublisher() {
29612961
return new PublisherIntegrationFlow<>(this.integrationComponents, publisher);
29622962
}
29632963

2964+
/**
2965+
* Add a {@value IntegrationContextUtils#NULL_CHANNEL_BEAN_NAME} bean into this flow
2966+
* definition as a terminal operator.
2967+
* @return The {@link IntegrationFlow} instance based on this definition.
2968+
* @since 5.1
2969+
*/
2970+
public IntegrationFlow nullChannel() {
2971+
return channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME)
2972+
.get();
2973+
}
2974+
29642975
@SuppressWarnings("unchecked")
29652976
private <S extends ConsumerEndpointSpec<S, ? extends MessageHandler>> B register(S endpointSpec,
29662977
Consumer<S> endpointConfigurer) {

spring-integration-core/src/main/java/org/springframework/integration/endpoint/PollingConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.springframework.context.Lifecycle;
2727
import org.springframework.integration.channel.ExecutorChannelInterceptorAware;
28+
import org.springframework.integration.channel.NullChannel;
2829
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
2930
import org.springframework.integration.core.MessageProducer;
3031
import org.springframework.integration.router.MessageRouter;
@@ -62,6 +63,10 @@ public class PollingConsumer extends AbstractPollingEndpoint implements Integrat
6263
public PollingConsumer(PollableChannel inputChannel, MessageHandler handler) {
6364
Assert.notNull(inputChannel, "inputChannel must not be null");
6465
Assert.notNull(handler, "handler must not be null");
66+
if (inputChannel instanceof NullChannel && logger.isWarnEnabled()) {
67+
logger.warn("The polling from the NullChannel does not have any effects: " +
68+
"it doesn't forward messages sent to it. A NullChannel is the end of the flow.");
69+
}
6570
this.inputChannel = inputChannel;
6671
this.handler = handler;
6772
if (this.inputChannel instanceof ExecutorChannelInterceptorAware) {

spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
import org.springframework.context.Lifecycle;
2626
import org.springframework.integration.channel.MessageChannelReactiveUtils;
2727
import org.springframework.integration.channel.MessagePublishingErrorHandler;
28+
import org.springframework.integration.channel.NullChannel;
2829
import org.springframework.integration.core.MessageProducer;
2930
import org.springframework.integration.router.MessageRouter;
3031
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
@@ -41,6 +42,7 @@
4142

4243
/**
4344
* @author Artem Bilan
45+
*
4446
* @since 5.0
4547
*/
4648
public class ReactiveStreamsConsumer extends AbstractEndpoint implements IntegrationConsumer {
@@ -72,6 +74,11 @@ public ReactiveStreamsConsumer(MessageChannel inputChannel, final Subscriber<Mes
7274
Assert.notNull(inputChannel, "'inputChannel' must not be null");
7375
Assert.notNull(subscriber, "'subscriber' must not be null");
7476

77+
if (inputChannel instanceof NullChannel && logger.isWarnEnabled()) {
78+
logger.warn("The consuming from the NullChannel does not have any effects: " +
79+
"it doesn't forward messages sent to it. A NullChannel is the end of the flow.");
80+
}
81+
7582
this.publisher = MessageChannelReactiveUtils.toPublisher(inputChannel);
7683
this.subscriber = subscriber;
7784
this.lifecycleDelegate = subscriber instanceof Lifecycle ? (Lifecycle) subscriber : null;

spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.springframework.integration.annotation.ServiceActivator;
5757
import org.springframework.integration.channel.DirectChannel;
5858
import org.springframework.integration.channel.FixedSubscriberChannel;
59+
import org.springframework.integration.channel.NullChannel;
5960
import org.springframework.integration.channel.QueueChannel;
6061
import org.springframework.integration.config.EnableIntegration;
6162
import org.springframework.integration.context.IntegrationContextUtils;
@@ -481,6 +482,23 @@ public void testDedicatedPollingThreadFlow() throws InterruptedException {
481482
assertEquals("dedicatedTaskScheduler-1", threadNameReference.get());
482483
}
483484

485+
@Autowired
486+
private MessageChannel flowWithNullChannelInput;
487+
488+
@Autowired
489+
private NullChannel nullChannel;
490+
491+
@Test
492+
public void testNullChannelInTheEndOfFlow() {
493+
this.nullChannel.setCountsEnabled(true);
494+
495+
this.flowWithNullChannelInput.send(new GenericMessage<>("foo"));
496+
497+
assertEquals(1, this.nullChannel.getSendCount());
498+
499+
this.nullChannel.setCountsEnabled(false);
500+
}
501+
484502
@MessagingGateway
485503
public interface ControlBusGateway {
486504

@@ -495,7 +513,7 @@ public static class SupplierContextConfiguration1 {
495513
@Bean
496514
public IntegrationFlow supplierFlow() {
497515
return IntegrationFlows.from(() -> "foo")
498-
.<String, String>transform(p -> p.toUpperCase())
516+
.<String, String>transform(String::toUpperCase)
499517
.channel("suppliedChannel")
500518
.get();
501519
}
@@ -594,7 +612,7 @@ public IntegrationFlow flow2() {
594612
.channel("foo")
595613
.fixedSubscriberChannel()
596614
.<String, Integer>transform(Integer::parseInt)
597-
.<Integer, Foo>transform(i -> new Foo(i))
615+
.transform(Foo::new)
598616
.transform(new PayloadSerializingTransformer(),
599617
c -> c.autoStartup(false).id("payloadSerializingTransformer"))
600618
.channel(MessageChannels.queue(new SimpleMessageStore(), "fooQueue"))
@@ -841,6 +859,12 @@ public TaskScheduler dedicatedTaskScheduler() {
841859
return new ThreadPoolTaskScheduler();
842860
}
843861

862+
@Bean
863+
public IntegrationFlow flowWithNullChannel() {
864+
return IntegrationFlows.from("flowWithNullChannelInput")
865+
.nullChannel();
866+
}
867+
844868
}
845869

846870
@Service

0 commit comments

Comments
 (0)