Skip to content

GH-2765: Add discardChannel for splitter #2883

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.springframework.integration.splitter.DefaultMessageSplitter;
import org.springframework.integration.splitter.ExpressionEvaluatingSplitter;
import org.springframework.integration.splitter.MethodInvokingSplitter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
Expand All @@ -42,6 +43,10 @@ public class SplitterFactoryBean extends AbstractStandardMessageHandlerFactoryBe

private String delimiters;

private MessageChannel discardChannel;

private String discardChannelName;

public void setApplySequence(boolean applySequence) {
this.applySequence = applySequence;
}
Expand All @@ -50,6 +55,14 @@ public void setDelimiters(String delimiters) {
this.delimiters = delimiters;
}

public void setDiscardChannel(MessageChannel discardChannel) {
this.discardChannel = discardChannel;
}

public void setDiscardChannelName(String discardChannelName) {
this.discardChannelName = discardChannelName;
}

@Override
protected MessageHandler createMethodInvokingHandler(Object targetObject, String targetMethodName) {
Assert.notNull(targetObject, "targetObject must not be null");
Expand Down Expand Up @@ -90,6 +103,12 @@ protected MessageHandler createDefaultHandler() {

protected AbstractMessageSplitter configureSplitter(AbstractMessageSplitter splitter) {
postProcessReplyProducer(splitter);
if (this.discardChannel != null) {
splitter.setDiscardChannel(this.discardChannel);
}
else if (StringUtils.hasText(this.discardChannelName)) {
splitter.setDiscardChannelName(this.discardChannelName);
}
return splitter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* @author Mark Fisher
* @author Iwein Fuld
* @author Gary Russell
* @author Artem Bilan
*/
public class SplitterParser extends AbstractDelegatingConsumerEndpointParser {

Expand All @@ -45,6 +46,7 @@ boolean hasDefaultOption() {
void postProcess(BeanDefinitionBuilder builder, Element element, ParserContext parserContext) {
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "apply-sequence");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "delimiters");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "discard-channel", "discardChannelName");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.splitter.DefaultMessageSplitter;
import org.springframework.messaging.MessageChannel;

/**
* A {@link ConsumerEndpointSpec} for a {@link AbstractMessageSplitter} implementations.
Expand All @@ -43,7 +44,7 @@ public final class SplitterEndpointSpec<S extends AbstractMessageSplitter>
*/
public SplitterEndpointSpec<S> applySequence(boolean applySequence) {
this.handler.setApplySequence(applySequence);
return _this();
return this;
}

/**
Expand All @@ -65,4 +66,45 @@ public SplitterEndpointSpec<S> delimiters(String delimiters) {
return this;
}

/**
* Specify a channel where rejected Messages should be sent. If the discard
* channel is null (the default), rejected Messages will be dropped.
* A "Rejected Message" means that split function has returned an empty result (but not null):
* no items to iterate for sending.
* @param discardChannel The discard channel.
* @return the endpoint spec.
* @since 5.2
* @see DefaultMessageSplitter#setDelimiters(String)
*/
public SplitterEndpointSpec<S> discardChannel(MessageChannel discardChannel) {
this.handler.setDiscardChannel(discardChannel);
return this;
}

/**
* Specify a channel bean name where rejected Messages should be sent. If the discard
* channel is null (the default), rejected Messages will be dropped.
* A "Rejected Message" means that split function has returned an empty result (but not null):
* no items to iterate for sending.
* @param discardChannelName The discard channel bean name.
* @return the endpoint spec.
* @since 5.2
* @see DefaultMessageSplitter#setDelimiters(String)
*/
public SplitterEndpointSpec<S> discardChannel(String discardChannelName) {
this.handler.setDiscardChannelName(discardChannelName);
return this;
}

/**
* Configure a subflow to run for discarded messages instead of a
* {@link #discardChannel(MessageChannel)}.
* @param discardFlow the discard flow.
* @return the endpoint spec.
* @since 5.2
*/
public SplitterEndpointSpec<S> discardFlow(IntegrationFlow discardFlow) {
return discardChannel(obtainInputChannelFromFlow(discardFlow));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@

import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.DiscardingMessageHandler;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.json.JacksonPresent;
import org.springframework.integration.util.FunctionIterator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;

import com.fasterxml.jackson.core.TreeNode;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
* Base class for Message-splitting handlers.
Expand All @@ -47,10 +51,15 @@
* @author Ruslan Stelmachenko
* @author Gary Russell
*/
public abstract class AbstractMessageSplitter extends AbstractReplyProducingMessageHandler {
public abstract class AbstractMessageSplitter extends AbstractReplyProducingMessageHandler
implements DiscardingMessageHandler {

private boolean applySequence = true;

private MessageChannel discardChannel;

private String discardChannelName;

/**
* Set the applySequence flag to the specified value. Defaults to true.
* @param applySequence true to apply sequence information.
Expand All @@ -59,10 +68,54 @@ public void setApplySequence(boolean applySequence) {
this.applySequence = applySequence;
}

/**
* Specify a channel where rejected Messages should be sent. If the discard
* channel is null (the default), rejected Messages will be dropped.
* A "Rejected Message" means that split function has returned an empty result (but not null):
* no items to iterate for sending.
* @param discardChannel The discard channel.
* @since 5.2
*/
public void setDiscardChannel(MessageChannel discardChannel) {
this.discardChannel = discardChannel;
}

/**
* Specify a channel bean name (resolved to {@link MessageChannel} lazily)
* where rejected Messages should be sent. If the discard
* channel is null (the default), rejected Messages will be dropped.
* A "Rejected Message" means that split function has returned an empty result (but not null):
* no items to iterate for sending.
* @param discardChannelName The discard channel bean name.
* @since 5.2
*/
public void setDiscardChannelName(String discardChannelName) {
Assert.hasText(discardChannelName, "'discardChannelName' must not be empty");
this.discardChannelName = discardChannelName;
}

@Override
public MessageChannel getDiscardChannel() {
if (this.discardChannel == null) {
String channelName = this.discardChannelName;
if (channelName != null) {
this.discardChannel = getChannelResolver().resolveDestination(channelName);
this.discardChannelName = null;
}
}
return this.discardChannel;
}

@Override
protected void doInit() {
Assert.state(!(this.discardChannelName != null && this.discardChannel != null),
"'discardChannelName' and 'discardChannel' are mutually exclusive.");
}

@Override
@SuppressWarnings("unchecked")
protected final Object handleRequestMessage(Message<?> message) {
Object result = this.splitMessage(message);
Object result = splitMessage(message);
// return null if 'null'
if (result == null) {
return null;
Expand Down Expand Up @@ -137,6 +190,10 @@ else if (result instanceof Publisher<?>) {
}

if (iterator != null && !iterator.hasNext()) {
MessageChannel discardingChannel = getDiscardChannel();
if (discardingChannel != null) {
this.messagingTemplate.send(discardingChannel, message);
}
return null;
}

Expand All @@ -154,7 +211,16 @@ else if (result instanceof Publisher<?>) {
object -> createBuilder(object, headers, correlationId, sequenceNumber.getAndIncrement(), sequenceSize);

if (reactive) {
return flux.map(messageBuilderFunction);
return flux
.map(messageBuilderFunction)
.switchIfEmpty(
Mono.defer(() -> {
MessageChannel discardingChannel = getDiscardChannel();
if (discardingChannel != null) {
this.messagingTemplate.send(discardingChannel, message);
}
return Mono.empty();
}));
}
else {
return new FunctionIterator<>(result instanceof AutoCloseable && !result.equals(iterator)
Expand Down Expand Up @@ -195,6 +261,7 @@ protected int obtainSizeIfPossible(Iterator<?> iterator) {

private AbstractIntegrationMessageBuilder<?> createBuilder(Object item, Map<String, Object> headers,
Object correlationId, int sequenceNumber, int sequenceSize) {

AbstractIntegrationMessageBuilder<?> builder;
if (item instanceof Message) {
builder = getMessageBuilderFactory().fromMessage((Message<?>) item);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3669,6 +3669,19 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="discard-channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.messaging.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
The channel where the splitter will send the messages that return an empty container from
split function.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

import org.junit.Test;
Expand All @@ -31,6 +30,7 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.aggregator.HeaderAttributeCorrelationStrategy;
import org.springframework.integration.channel.QueueChannel;
Expand All @@ -47,10 +47,10 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.TextNode;

/**
Expand Down Expand Up @@ -90,6 +90,9 @@ public class CorrelationHandlerTests {
@Autowired
private PollableChannel releaseChannel;

@Autowired
private PollableChannel discardChannel;

@Test
public void testSplitterResequencer() {
QueueChannel replyChannel = new QueueChannel();
Expand Down Expand Up @@ -150,17 +153,24 @@ public void testBarrier() {
assertThat(out.getPayload()).isEqualTo("bar");
}

@Test
public void testSplitterDiscard() {
this.splitAggregateInput.send(new GenericMessage<>(new ArrayList<>()));
Message<?> receive = this.discardChannel.receive(10_000);
assertThat(receive)
.isNotNull()
.extracting(Message::getPayload)
.isInstanceOf(ArrayNode.class)
.extracting("_children")
.element(0)
.asList()
.hasSize(0);
}

@Configuration
@EnableIntegration
public static class ContextConfiguration {

@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor tpte = new ThreadPoolTaskExecutor();
tpte.setCorePoolSize(50);
return tpte;
}

@Bean
public TestSplitterPojo testSplitterData() {
List<String> first = new ArrayList<>();
Expand All @@ -175,22 +185,22 @@ public TestSplitterPojo testSplitterData() {
}

@Bean
public MessageChannelSpec<?, ?> executorChannel() {
return MessageChannels.executor(taskExecutor());
public MessageChannelSpec<?, ?> executorChannel(TaskExecutor taskExecutor) {
return MessageChannels.executor(taskExecutor);
}

@Bean
@SuppressWarnings("rawtypes")
public IntegrationFlow splitResequenceFlow(MessageChannel executorChannel) {
public IntegrationFlow splitResequenceFlow(MessageChannel executorChannel, TaskExecutor taskExecutor) {
return f -> f.enrichHeaders(s -> s.header("FOO", "BAR"))
.split("testSplitterData", "buildList", c -> c.applySequence(false))
.channel(executorChannel)
.split(Message.class, Message::getPayload, c -> c.applySequence(false))
.channel(MessageChannels.executor(taskExecutor()))
.channel(MessageChannels.executor(taskExecutor))
.split(s -> s
.applySequence(false)
.delimiters(","))
.channel(MessageChannels.executor(taskExecutor()))
.channel(MessageChannels.executor(taskExecutor))
.<String, Integer>transform(Integer::parseInt)
.enrichHeaders(h ->
h.headerFunction(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, Message::getPayload))
Expand All @@ -203,8 +213,9 @@ public IntegrationFlow splitResequenceFlow(MessageChannel executorChannel) {
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlows.from("splitAggregateInput", true)
.transform(Transformers.toJson(ObjectToJsonTransformer.ResultType.NODE))
.split()
.channel(MessageChannels.executor(taskExecutor()))
.split((splitter) -> splitter
.discardFlow((subFlow) -> subFlow.channel((c) -> c.queue("discardChannel"))))
.channel(MessageChannels.flux())
.resequence()
.aggregate()
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "apply-sequence");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "send-timeout");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "first-line-as-header");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "discard-channel", "discardChannelName");
return builder;
}

Expand Down
Loading