Skip to content

Commit 0f5cfd4

Browse files
artembilangaryrussell
authored andcommitted
INT-4542: CorrMH: Use popSequenceDetails properly (#2591)
* INT-4542: CorrMH: Use popSequenceDetails properly JIRA: https://jira.spring.io/browse/INT-4542 Perform a `MessageBuilder.popSequenceDetails()` for those `MessageGroupProcessor` results which are not `Message` or `Collection<Message>` **Cherry-pick to 5.0.x** * * Add `AbstractCorrelatingMessageHandler.popSequenceDetails` property * * Rename property to just a `popSequence` * * Fix JavaDocs on the `AbstractCorrelatingMessageHandler.setPopSequence()` * Fix Docs typos and language * finish the `RouterTests.testNestedScatterGather()`
1 parent 1595462 commit 0f5cfd4

15 files changed

+336
-163
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractAggregatingMessageGroupProcessor.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -93,22 +93,19 @@ else if (payload instanceof AbstractIntegrationMessageBuilder) {
9393
builder = getMessageBuilderFactory().withPayload(payload);
9494
}
9595

96-
return builder.copyHeadersIfAbsent(headers)
97-
.popSequenceDetails()
98-
.build();
96+
return builder.copyHeadersIfAbsent(headers);
9997
}
10098

10199
/**
102100
* This default implementation simply returns all headers that have no conflicts among the group. An absent header
103101
* on one or more Messages within the group is not considered a conflict. Subclasses may override this method with
104102
* more advanced conflict-resolution strategies if necessary.
105-
*
106103
* @param group The message group.
107104
* @return The aggregated headers.
108105
*/
109106
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
110-
Map<String, Object> aggregatedHeaders = new HashMap<String, Object>();
111-
Set<String> conflictKeys = new HashSet<String>();
107+
Map<String, Object> aggregatedHeaders = new HashMap<>();
108+
Set<String> conflictKeys = new HashSet<>();
112109
for (Message<?> message : group.getMessages()) {
113110
for (Entry<String, Object> entry : message.getHeaders().entrySet()) {
114111
String key = entry.getKey();

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 89 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import org.springframework.integration.store.MessageStore;
5252
import org.springframework.integration.store.SimpleMessageGroup;
5353
import org.springframework.integration.store.SimpleMessageStore;
54+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
55+
import org.springframework.integration.support.MessageBuilder;
5456
import org.springframework.integration.support.locks.DefaultLockRegistry;
5557
import org.springframework.integration.support.locks.LockRegistry;
5658
import org.springframework.integration.util.UUIDConverter;
@@ -140,6 +142,8 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
140142

141143
private boolean expireGroupsUponTimeout = true;
142144

145+
private boolean popSequence = true;
146+
143147
private volatile boolean running;
144148

145149
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
@@ -217,6 +221,69 @@ public void setOutputProcessor(MessageGroupProcessor outputProcessor) {
217221
this.outputProcessor = outputProcessor;
218222
}
219223

224+
public void setDiscardChannel(MessageChannel discardChannel) {
225+
Assert.notNull(discardChannel, "'discardChannel' cannot be null");
226+
this.discardChannel = discardChannel;
227+
}
228+
229+
public void setDiscardChannelName(String discardChannelName) {
230+
Assert.hasText(discardChannelName, "'discardChannelName' must not be empty");
231+
this.discardChannelName = discardChannelName;
232+
}
233+
234+
235+
public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) {
236+
this.sendPartialResultOnExpiry = sendPartialResultOnExpiry;
237+
}
238+
239+
/**
240+
* By default, when a MessageGroupStoreReaper is configured to expire partial
241+
* groups, empty groups are also removed. Empty groups exist after a group
242+
* is released normally. This is to enable the detection and discarding of
243+
* late-arriving messages. If you wish to expire empty groups on a longer
244+
* schedule than expiring partial groups, set this property. Empty groups will
245+
* then not be removed from the MessageStore until they have not been modified
246+
* for at least this number of milliseconds.
247+
* @param minimumTimeoutForEmptyGroups The minimum timeout.
248+
*/
249+
public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) {
250+
this.minimumTimeoutForEmptyGroups = minimumTimeoutForEmptyGroups;
251+
}
252+
253+
/**
254+
* Set {@code releasePartialSequences} on an underlying default
255+
* {@link SequenceSizeReleaseStrategy}. Ignored for other release strategies.
256+
* @param releasePartialSequences true to allow release.
257+
*/
258+
public void setReleasePartialSequences(boolean releasePartialSequences) {
259+
if (!this.releaseStrategySet && releasePartialSequences) {
260+
setReleaseStrategy(new SequenceSizeReleaseStrategy());
261+
}
262+
this.releasePartialSequences = releasePartialSequences;
263+
}
264+
265+
/**
266+
* Expire (completely remove) a group if it is completed due to timeout.
267+
* Default true
268+
* @param expireGroupsUponTimeout the expireGroupsUponTimeout to set
269+
* @since 4.1
270+
*/
271+
public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
272+
this.expireGroupsUponTimeout = expireGroupsUponTimeout;
273+
}
274+
275+
/**
276+
* Perform a {@link MessageBuilder#popSequenceDetails()} for output message or not.
277+
* Default to true.
278+
* This option removes the sequence information added by the nearest upstream component with
279+
* {@code applySequence=true} (for example splitter).
280+
* @param popSequence the boolean flag to use.
281+
* @since 5.1
282+
*/
283+
public void setPopSequence(boolean popSequence) {
284+
this.popSequence = popSequence;
285+
}
286+
220287
@Override
221288
public void setTaskScheduler(TaskScheduler taskScheduler) {
222289
super.setTaskScheduler(taskScheduler);
@@ -285,57 +352,6 @@ private MessageGroupProcessor createGroupTimeoutProcessor() {
285352
return processor;
286353
}
287354

288-
public void setDiscardChannel(MessageChannel discardChannel) {
289-
Assert.notNull(discardChannel, "'discardChannel' cannot be null");
290-
this.discardChannel = discardChannel;
291-
}
292-
293-
public void setDiscardChannelName(String discardChannelName) {
294-
Assert.hasText(discardChannelName, "'discardChannelName' must not be empty");
295-
this.discardChannelName = discardChannelName;
296-
}
297-
298-
299-
public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) {
300-
this.sendPartialResultOnExpiry = sendPartialResultOnExpiry;
301-
}
302-
303-
/**
304-
* By default, when a MessageGroupStoreReaper is configured to expire partial
305-
* groups, empty groups are also removed. Empty groups exist after a group
306-
* is released normally. This is to enable the detection and discarding of
307-
* late-arriving messages. If you wish to expire empty groups on a longer
308-
* schedule than expiring partial groups, set this property. Empty groups will
309-
* then not be removed from the MessageStore until they have not been modified
310-
* for at least this number of milliseconds.
311-
* @param minimumTimeoutForEmptyGroups The minimum timeout.
312-
*/
313-
public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) {
314-
this.minimumTimeoutForEmptyGroups = minimumTimeoutForEmptyGroups;
315-
}
316-
317-
/**
318-
* Set {@code releasePartialSequences} on an underlying default
319-
* {@link SequenceSizeReleaseStrategy}. Ignored for other release strategies.
320-
* @param releasePartialSequences true to allow release.
321-
*/
322-
public void setReleasePartialSequences(boolean releasePartialSequences) {
323-
if (!this.releaseStrategySet && releasePartialSequences) {
324-
setReleaseStrategy(new SequenceSizeReleaseStrategy());
325-
}
326-
this.releasePartialSequences = releasePartialSequences;
327-
}
328-
329-
/**
330-
* Expire (completely remove) a group if it is completed due to timeout.
331-
* Default true
332-
* @param expireGroupsUponTimeout the expireGroupsUponTimeout to set
333-
* @since 4.1
334-
*/
335-
public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
336-
this.expireGroupsUponTimeout = expireGroupsUponTimeout;
337-
}
338-
339355
@Override
340356
public String getComponentType() {
341357
return "aggregator";
@@ -413,7 +429,8 @@ protected EvaluationContext getEvaluationContext() {
413429
@Override
414430
protected void handleMessageInternal(Message<?> message) throws Exception {
415431
Object correlationKey = this.correlationStrategy.getCorrelationKey(message);
416-
Assert.state(correlationKey != null, "Null correlation not allowed. Maybe the CorrelationStrategy is failing?");
432+
Assert.state(correlationKey != null,
433+
"Null correlation not allowed. Maybe the CorrelationStrategy is failing?");
417434

418435
if (this.logger.isDebugEnabled()) {
419436
this.logger.debug("Handling message with correlationKey [" + correlationKey + "]: " + message);
@@ -653,7 +670,8 @@ protected void forceComplete(MessageGroup group) {
653670
* groups. A longer timeout for empty groups can be enabled by
654671
* setting minimumTimeoutForEmptyGroups.
655672
*/
656-
removeGroup = lastModifiedNow <= (System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups);
673+
removeGroup =
674+
lastModifiedNow <= (System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups);
657675
if (removeGroup && this.logger.isDebugEnabled()) {
658676
this.logger.debug("Removing empty group: " + correlationKey);
659677
}
@@ -753,10 +771,24 @@ protected Collection<Message<?>> completeGroup(Message<?> message, Object correl
753771
Object result = this.outputProcessor.processMessageGroup(group);
754772
Collection<Message<?>> partialSequence = null;
755773
if (result instanceof Collection<?>) {
756-
this.verifyResultCollectionConsistsOfMessages((Collection<?>) result);
774+
verifyResultCollectionConsistsOfMessages((Collection<?>) result);
757775
partialSequence = (Collection<Message<?>>) result;
758776
}
759-
this.sendOutputs(result, message);
777+
778+
if (this.popSequence && partialSequence == null && !(result instanceof Message<?>)) {
779+
AbstractIntegrationMessageBuilder<?> messageBuilder;
780+
if (result instanceof AbstractIntegrationMessageBuilder<?>) {
781+
messageBuilder = (AbstractIntegrationMessageBuilder<?>) result;
782+
}
783+
else {
784+
messageBuilder = getMessageBuilderFactory()
785+
.withPayload(result)
786+
.copyHeaders(message.getHeaders());
787+
}
788+
result = messageBuilder.popSequenceDetails();
789+
}
790+
791+
sendOutputs(result, message);
760792
return partialSequence;
761793
}
762794

@@ -772,10 +804,8 @@ protected Long obtainGroupTimeout(MessageGroup group) {
772804
}
773805

774806
@Override
775-
public void destroy() throws Exception {
776-
for (ScheduledFuture<?> future : this.expireGroupScheduledFutures.values()) {
777-
future.cancel(true);
778-
}
807+
public void destroy() {
808+
this.expireGroupScheduledFutures.values().forEach(future -> future.cancel(true));
779809
}
780810

781811
@Override

spring-integration-core/src/main/java/org/springframework/integration/config/AggregatorFactoryBean.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2016 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -40,6 +40,8 @@
4040
* {@link FactoryBean} to create an {@link AggregatingMessageHandler}.
4141
*
4242
* @author Gary Russell
43+
* @author Artem Bilan
44+
*
4345
* @since 4.2
4446
*
4547
*/
@@ -85,6 +87,8 @@ public class AggregatorFactoryBean extends AbstractSimpleMessageHandlerFactoryBe
8587

8688
private Boolean expireGroupsUponTimeout;
8789

90+
private Boolean popSequence;
91+
8892
public void setProcessorBean(Object processorBean) {
8993
this.processorBean = processorBean;
9094
}
@@ -165,6 +169,10 @@ public void setExpireGroupsUponTimeout(Boolean expireGroupsUponTimeout) {
165169
this.expireGroupsUponTimeout = expireGroupsUponTimeout;
166170
}
167171

172+
public void setPopSequence(Boolean popSequence) {
173+
this.popSequence = popSequence;
174+
}
175+
168176
@Override
169177
protected AggregatingMessageHandler createHandler() {
170178
MessageGroupProcessor outputProcessor;
@@ -253,6 +261,10 @@ protected AggregatingMessageHandler createHandler() {
253261
aggregator.setExpireGroupsUponTimeout(this.expireGroupsUponTimeout);
254262
}
255263

264+
if (this.popSequence != null) {
265+
aggregator.setPopSequence(this.popSequence);
266+
}
267+
256268
return aggregator;
257269
}
258270

spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractCorrelatingMessageHandlerParser.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -83,6 +83,8 @@ protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetad
8383
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "empty-group-min-timeout",
8484
"minimumTimeoutForEmptyGroups");
8585

86+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "pop-sequence");
87+
8688
BeanDefinition expressionDef =
8789
IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression("group-timeout",
8890
"group-timeout-expression", parserContext, element, false);

spring-integration-core/src/main/java/org/springframework/integration/dsl/CorrelationHandlerSpec.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,6 +34,7 @@
3434
import org.springframework.integration.expression.ValueExpression;
3535
import org.springframework.integration.store.MessageGroup;
3636
import org.springframework.integration.store.MessageGroupStore;
37+
import org.springframework.integration.support.MessageBuilder;
3738
import org.springframework.integration.support.locks.LockRegistry;
3839
import org.springframework.messaging.MessageChannel;
3940
import org.springframework.scheduling.TaskScheduler;
@@ -53,7 +54,7 @@ public abstract class CorrelationHandlerSpec<S extends CorrelationHandlerSpec<S,
5354
H extends AbstractCorrelatingMessageHandler>
5455
extends ConsumerEndpointSpec<S, H> {
5556

56-
private final List<Advice> forceReleaseAdviceChain = new LinkedList<Advice>();
57+
private final List<Advice> forceReleaseAdviceChain = new LinkedList<>();
5758

5859
protected CorrelationHandlerSpec(H messageHandler) {
5960
super(messageHandler);
@@ -314,4 +315,16 @@ public S lockRegistry(LockRegistry lockRegistry) {
314315
return _this();
315316
}
316317

318+
/**
319+
* Perform a {@link MessageBuilder#popSequenceDetails()} for output message or not.
320+
* @param popSequence the boolean flag to use.
321+
* @return the endpoint spec.
322+
* @since 5.1
323+
* @see AbstractCorrelatingMessageHandler#setPopSequence(boolean)
324+
*/
325+
public S popSequence(boolean popSequence) {
326+
this.handler.setPopSequence(popSequence);
327+
return _this();
328+
}
329+
317330
}

spring-integration-core/src/main/resources/org/springframework/integration/config/spring-integration-5.1.xsd

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3829,6 +3829,19 @@
38293829
<xsd:union memberTypes="xsd:boolean xsd:string" />
38303830
</xsd:simpleType>
38313831
</xsd:attribute>
3832+
<xsd:attribute name="pop-sequence" default="true">
3833+
<xsd:annotation>
3834+
<xsd:documentation>
3835+
Boolean flag specifying, if the 'MessageBuilder.popSequenceDetails()' should be called
3836+
for the output message. Plays the opposite role to the
3837+
'AbstractMessageSplitter.setApplySequence()'.
3838+
Defaults to 'true'.
3839+
</xsd:documentation>
3840+
</xsd:annotation>
3841+
<xsd:simpleType>
3842+
<xsd:union memberTypes="xsd:boolean xsd:string" />
3843+
</xsd:simpleType>
3844+
</xsd:attribute>
38323845
<xsd:attribute name="lock-registry" type="xsd:string">
38333846
<xsd:annotation>
38343847
<xsd:appinfo>

0 commit comments

Comments
 (0)