Skip to content

Commit 757cd46

Browse files
committed
INT-4542: CorrMH: Use popSequenceDetails properly
JIRA: https://jira.spring.io/browse/INT-4542 Perform a `MessageBuilder.popSequenceDetails()` for those `MessageGroupProcessor` results which are not `Message` or `Collection<Message>` **Cherry-pick to 5.0.x**
1 parent 551f03c commit 757cd46

File tree

3 files changed

+68
-7
lines changed

3 files changed

+68
-7
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.springframework.integration.store.MessageStore;
5252
import org.springframework.integration.store.SimpleMessageGroup;
5353
import org.springframework.integration.store.SimpleMessageStore;
54+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
5455
import org.springframework.integration.support.locks.DefaultLockRegistry;
5556
import org.springframework.integration.support.locks.LockRegistry;
5657
import org.springframework.integration.util.UUIDConverter;
@@ -653,7 +654,8 @@ protected void forceComplete(MessageGroup group) {
653654
* groups. A longer timeout for empty groups can be enabled by
654655
* setting minimumTimeoutForEmptyGroups.
655656
*/
656-
removeGroup = lastModifiedNow <= (System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups);
657+
removeGroup = lastModifiedNow <= (System
658+
.currentTimeMillis() - this.minimumTimeoutForEmptyGroups);
657659
if (removeGroup && this.logger.isDebugEnabled()) {
658660
this.logger.debug("Removing empty group: " + correlationKey);
659661
}
@@ -753,10 +755,24 @@ protected Collection<Message<?>> completeGroup(Message<?> message, Object correl
753755
Object result = this.outputProcessor.processMessageGroup(group);
754756
Collection<Message<?>> partialSequence = null;
755757
if (result instanceof Collection<?>) {
756-
this.verifyResultCollectionConsistsOfMessages((Collection<?>) result);
758+
verifyResultCollectionConsistsOfMessages((Collection<?>) result);
757759
partialSequence = (Collection<Message<?>>) result;
758760
}
759-
this.sendOutputs(result, message);
761+
762+
if (partialSequence == null && !(result instanceof Message<?>)) {
763+
AbstractIntegrationMessageBuilder<?> messageBuilder;
764+
if (result instanceof AbstractIntegrationMessageBuilder<?>) {
765+
messageBuilder = (AbstractIntegrationMessageBuilder<?>) result;
766+
}
767+
else {
768+
messageBuilder = getMessageBuilderFactory()
769+
.withPayload(result)
770+
.copyHeaders(message.getHeaders());
771+
}
772+
result = messageBuilder.popSequenceDetails();
773+
}
774+
775+
sendOutputs(result, message);
760776
return partialSequence;
761777
}
762778

@@ -772,10 +788,8 @@ protected Long obtainGroupTimeout(MessageGroup group) {
772788
}
773789

774790
@Override
775-
public void destroy() throws Exception {
776-
for (ScheduledFuture<?> future : this.expireGroupScheduledFutures.values()) {
777-
future.cancel(true);
778-
}
791+
public void destroy() {
792+
this.expireGroupScheduledFutures.values().forEach(future -> future.cancel(true));
779793
}
780794

781795
@Override

spring-integration-core/src/test/java/org/springframework/integration/dsl/routers/RouterTests.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Arrays;
3030
import java.util.List;
3131
import java.util.concurrent.atomic.AtomicReference;
32+
import java.util.stream.Collectors;
3233

3334
import org.junit.Test;
3435
import org.junit.runner.RunWith;
@@ -531,6 +532,25 @@ public void testExceptionTypeRouteFlow() {
531532
assertNull(this.messageHandlingExceptionChannel.receive(0));
532533
}
533534

535+
@Autowired
536+
@Qualifier("nestedScatterGatherFlow.input")
537+
private MessageChannel nestedScatterGatherFlowInput;
538+
539+
@Test
540+
public void testNestedScatterGather() {
541+
QueueChannel replyChannel = new QueueChannel();
542+
Message<String> request = MessageBuilder.withPayload("this is a test")
543+
.setReplyChannel(replyChannel)
544+
.build();
545+
this.nestedScatterGatherFlowInput.send(request);
546+
Message<?> bestQuoteMessage = replyChannel.receive(10000);
547+
assertNotNull(bestQuoteMessage);
548+
Object payload = bestQuoteMessage.getPayload();
549+
assertThat(payload, instanceOf(String.class));
550+
// assertThat(((List<?>) payload).size(), greaterThanOrEqualTo(1));
551+
}
552+
553+
534554
@Configuration
535555
@EnableIntegration
536556
@EnableMessageHistory({ "recipientListOrder*", "recipient1*", "recipient2*" })
@@ -773,6 +793,29 @@ public IntegrationFlow scatterGatherFlow() {
773793
.gatherTimeout(10_000));
774794
}
775795

796+
@Bean
797+
public IntegrationFlow nestedScatterGatherFlow() {
798+
return f -> f
799+
.split(s -> s.delimiters(" "))
800+
.scatterGather(
801+
scatterer -> scatterer
802+
.recipientFlow(f1 -> f1.handle((p, h) -> p + " - flow 1"))
803+
.recipientFlow(f2 -> f2.handle((p, h) -> p + " - flow 2"))
804+
.applySequence(true),
805+
gatherer -> gatherer
806+
.outputProcessor(mg -> mg
807+
.getMessages()
808+
.stream()
809+
.map(m -> m.getPayload().toString())
810+
.collect(Collectors.joining(", "))),
811+
scatterGather -> scatterGather.gatherTimeout(10_000))
812+
.aggregate()
813+
.<List<String>, String>transform(source ->
814+
source.stream()
815+
.map(s -> "- " + s)
816+
.collect(Collectors.joining("\n")));
817+
}
818+
776819
}
777820

778821
private static class RoutingTestBean {

src/reference/asciidoc/aggregator.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ This method is invoked for aggregating messages as follows:
103103

104104
NOTE: In the interest of code simplicity and promoting best practices such as low coupling, testability, and others, the preferred way of implementing the aggregation logic is through a POJO and using the XML or annotation support for configuring it in the application.
105105

106+
IMPORTANT: After processing message group, an `AbstractCorrelatingMessageHandler` performs a `MessageBuilder.popSequenceDetails()` message headers modification for the proper splitter-aggregator scenario with several nested levels.
107+
It is done only if the message group release result is not a message or collection of messages.
108+
In that case a target `MessageGroupProcessor` is responsible for the `MessageBuilder.popSequenceDetails()` call during those messages building.
109+
106110
[[agg-message-collection]]
107111
IMPORTANT: The `SimpleMessageGroup.getMessages()` method returns an `unmodifiableCollection`.
108112
Therefore, if your aggregating POJO method has a `Collection<Message>` parameter, the argument passed in is exactly that `Collection` instance and, when you use a `SimpleMessageStore` for the aggregator, that original `Collection<Message>` is cleared after releasing the group.

0 commit comments

Comments
 (0)