-
Notifications
You must be signed in to change notification settings - Fork 1.1k
INT-4542: CorrMH: Use popSequenceDetails properly #2591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -653,7 +654,8 @@ protected void forceComplete(MessageGroup group) { | |||
* groups. A longer timeout for empty groups can be enabled by | |||
* setting minimumTimeoutForEmptyGroups. | |||
*/ | |||
removeGroup = lastModifiedNow <= (System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups); | |||
removeGroup = lastModifiedNow <= (System | |||
.currentTimeMillis() - this.minimumTimeoutForEmptyGroups); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks odd; move System to the next line too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. Some fault of my IDEA code reformatting.
Will fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still weird indentation.
.copyHeaders(message.getHeaders()); | ||
} | ||
result = messageBuilder.popSequenceDetails(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I look at this, I wonder if it is completely valid to do this unconditionally. Consider
splitter -> custom-splitter -> custom-aggregator -> aggregator
If the custom splitter doesn't push sequence details, we will remove the real ones from the first splitter.
Perhaps we need a new flag alwaysPopSequenceDetails
(true in 5.1, false in 5.0) - even in the abstract output processor.
Or, add some metadata (seqInfoId
) to the sequence details and only pop if the splitter and aggregator are configured with the same metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Sounds like we just need to be sure that our aggregation is based on the sequenceDetails
. Because custom splitter
may not be based on the applySequence
.
So, maybe just an extra sequenceApplied
header? But then how to understand on which level it has not been used?..
Or... I just don't see the whole picture and need to sleep with this...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After sleeping with this, I came with the conclusion that your suggestion about popSequenceDetails(true|false)
is the way to go.
We won't pollute messages with some extra header which may be really out of control and I don't want to tie an aggregator with the splitter.
We may do a condition based on the default HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)
and SimpleSequenceSizeReleaseStrategy
, but that still doesn't come as a robust solution since we can use defaults, but populate sequence header some other way, not via splitter.
From here I think we just need to add such a popSequenceDetails
option to the correlation handler, make it true
by default (looks like most cases, since we haven't had issues so far), modify a AbstractAggregatingMessageGroupProcessor
to return a AbstractIntegrationMessageBuilder
instead of the whole message and do conditional .popSequenceDetails()
already in the AbstractCorrelatingMessageHandler
.
Saying that I don't think that we are good for back-porting to 5.0.x
: looks like this is going to be some breaking change.
Let me know WDYT?
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call it simply popSequence
with the documentation explaining it undoes the nearest upstream applySequence
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Will fix soon.
JIRA: https://jira.spring.io/browse/INT-4542 Perform a `MessageBuilder.popSequenceDetails()` for those `MessageGroupProcessor` results which are not `Message` or `Collection<Message>` **Cherry-pick to 5.0.x**
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also needs a migration guide indicating that the sequence is now popped by default for all MGPs.
* Perform a {@link MessageBuilder#popSequenceDetails()} for output message or not. | ||
* Default to true. | ||
* This option plays an opposite role to the | ||
* {@link org.springframework.integration.splitter.AbstractMessageSplitter#setApplySequence(boolean)}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not just splitter - router, pubsub etc; maybe just say "removes the sequence information added by the nearest upstream component with applySequence=true
(for example splitter).
@@ -653,7 +654,8 @@ protected void forceComplete(MessageGroup group) { | |||
* groups. A longer timeout for empty groups can be enabled by | |||
* setting minimumTimeoutForEmptyGroups. | |||
*/ | |||
removeGroup = lastModifiedNow <= (System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups); | |||
removeGroup = lastModifiedNow <= (System | |||
.currentTimeMillis() - this.minimumTimeoutForEmptyGroups); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still weird indentation.
assertNotNull(bestQuoteMessage); | ||
Object payload = bestQuoteMessage.getPayload(); | ||
assertThat(payload, instanceOf(String.class)); | ||
// assertThat(((List<?>) payload).size(), greaterThanOrEqualTo(1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this here? The payload is a String.
@@ -103,6 +103,13 @@ This method is invoked for aggregating messages as follows: | |||
|
|||
NOTE: In the interest of code simplicity and promoting best practices such as low coupling, testability, and others, the preferred way of implementing the aggregation logic is through a POJO and using the XML or annotation support for configuring it in the application. | |||
|
|||
Starting with version 5.1, after processing message group, an `AbstractCorrelatingMessageHandler` performs a `MessageBuilder.popSequenceDetails()` message headers modification for the proper splitter-aggregator scenario with several nested levels. | |||
It is done only if the message group release result is not a message or collection of messages. | |||
In that case a target `MessageGroupProcessor` is responsible for the `MessageBuilder.popSequenceDetails()` call during those messages building. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/during those messages building/while building those messages/
Starting with version 5.1, after processing message group, an `AbstractCorrelatingMessageHandler` performs a `MessageBuilder.popSequenceDetails()` message headers modification for the proper splitter-aggregator scenario with several nested levels. | ||
It is done only if the message group release result is not a message or collection of messages. | ||
In that case a target `MessageGroupProcessor` is responsible for the `MessageBuilder.popSequenceDetails()` call during those messages building. | ||
This functionality can be controlled by newly introduced `popSequence` `boolean` property, so the `MessageBuilder.popSequenceDetails()` can be disabled in some scenarios when correlation details have not been populated by the standard splitter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/by newly introduced/by a new/
It is done only if the message group release result is not a message or collection of messages. | ||
In that case a target `MessageGroupProcessor` is responsible for the `MessageBuilder.popSequenceDetails()` call during those messages building. | ||
This functionality can be controlled by newly introduced `popSequence` `boolean` property, so the `MessageBuilder.popSequenceDetails()` can be disabled in some scenarios when correlation details have not been populated by the standard splitter. | ||
This property, essentially, undoes what has been done with the nearest upstream `applySequence = true` in the `AbstractMessageSplitter`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...by the nearest...
…ce()` * Fix Docs typos and language * finish the `RouterTests.testNestedScatterGather()`
We're no longer cherry-picking; correct? |
No, we are not. There is a workaround with the custom Migration Guide note has been added: https://github.com/spring-projects/spring-integration/wiki/Spring-Integration-5.0-to-5.1-Migration-Guide#aggregator-changes |
JIRA: https://jira.spring.io/browse/INT-4542
Perform a
MessageBuilder.popSequenceDetails()
for thoseMessageGroupProcessor
results which are notMessage
or
Collection<Message>