-
Notifications
You must be signed in to change notification settings - Fork 1.1k
INT-4542: CorrMH: Use popSequenceDetails properly #2591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
ba7405b
f337141
9d75acf
81f6e0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,8 @@ | |
import org.springframework.integration.store.MessageStore; | ||
import org.springframework.integration.store.SimpleMessageGroup; | ||
import org.springframework.integration.store.SimpleMessageStore; | ||
import org.springframework.integration.support.AbstractIntegrationMessageBuilder; | ||
import org.springframework.integration.support.MessageBuilder; | ||
import org.springframework.integration.support.locks.DefaultLockRegistry; | ||
import org.springframework.integration.support.locks.LockRegistry; | ||
import org.springframework.integration.util.UUIDConverter; | ||
|
@@ -140,6 +142,8 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP | |
|
||
private boolean expireGroupsUponTimeout = true; | ||
|
||
private boolean popSequence = true; | ||
|
||
private volatile boolean running; | ||
|
||
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, | ||
|
@@ -217,6 +221,69 @@ public void setOutputProcessor(MessageGroupProcessor outputProcessor) { | |
this.outputProcessor = outputProcessor; | ||
} | ||
|
||
public void setDiscardChannel(MessageChannel discardChannel) { | ||
Assert.notNull(discardChannel, "'discardChannel' cannot be null"); | ||
this.discardChannel = discardChannel; | ||
} | ||
|
||
public void setDiscardChannelName(String discardChannelName) { | ||
Assert.hasText(discardChannelName, "'discardChannelName' must not be empty"); | ||
this.discardChannelName = discardChannelName; | ||
} | ||
|
||
|
||
public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) { | ||
this.sendPartialResultOnExpiry = sendPartialResultOnExpiry; | ||
} | ||
|
||
/** | ||
* By default, when a MessageGroupStoreReaper is configured to expire partial | ||
* groups, empty groups are also removed. Empty groups exist after a group | ||
* is released normally. This is to enable the detection and discarding of | ||
* late-arriving messages. If you wish to expire empty groups on a longer | ||
* schedule than expiring partial groups, set this property. Empty groups will | ||
* then not be removed from the MessageStore until they have not been modified | ||
* for at least this number of milliseconds. | ||
* @param minimumTimeoutForEmptyGroups The minimum timeout. | ||
*/ | ||
public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) { | ||
this.minimumTimeoutForEmptyGroups = minimumTimeoutForEmptyGroups; | ||
} | ||
|
||
/** | ||
* Set {@code releasePartialSequences} on an underlying default | ||
* {@link SequenceSizeReleaseStrategy}. Ignored for other release strategies. | ||
* @param releasePartialSequences true to allow release. | ||
*/ | ||
public void setReleasePartialSequences(boolean releasePartialSequences) { | ||
if (!this.releaseStrategySet && releasePartialSequences) { | ||
setReleaseStrategy(new SequenceSizeReleaseStrategy()); | ||
} | ||
this.releasePartialSequences = releasePartialSequences; | ||
} | ||
|
||
/** | ||
* Expire (completely remove) a group if it is completed due to timeout. | ||
* Default true | ||
* @param expireGroupsUponTimeout the expireGroupsUponTimeout to set | ||
* @since 4.1 | ||
*/ | ||
public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) { | ||
this.expireGroupsUponTimeout = expireGroupsUponTimeout; | ||
} | ||
|
||
/** | ||
* Perform a {@link MessageBuilder#popSequenceDetails()} for output message or not. | ||
* Default to true. | ||
* This option plays an opposite role to the | ||
* {@link org.springframework.integration.splitter.AbstractMessageSplitter#setApplySequence(boolean)}. | ||
* @param popSequence the boolean flag to use. | ||
* @since 5.1 | ||
*/ | ||
public void setPopSequence(boolean popSequence) { | ||
this.popSequence = popSequence; | ||
} | ||
|
||
@Override | ||
public void setTaskScheduler(TaskScheduler taskScheduler) { | ||
super.setTaskScheduler(taskScheduler); | ||
|
@@ -285,57 +352,6 @@ private MessageGroupProcessor createGroupTimeoutProcessor() { | |
return processor; | ||
} | ||
|
||
public void setDiscardChannel(MessageChannel discardChannel) { | ||
Assert.notNull(discardChannel, "'discardChannel' cannot be null"); | ||
this.discardChannel = discardChannel; | ||
} | ||
|
||
public void setDiscardChannelName(String discardChannelName) { | ||
Assert.hasText(discardChannelName, "'discardChannelName' must not be empty"); | ||
this.discardChannelName = discardChannelName; | ||
} | ||
|
||
|
||
public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) { | ||
this.sendPartialResultOnExpiry = sendPartialResultOnExpiry; | ||
} | ||
|
||
/** | ||
* By default, when a MessageGroupStoreReaper is configured to expire partial | ||
* groups, empty groups are also removed. Empty groups exist after a group | ||
* is released normally. This is to enable the detection and discarding of | ||
* late-arriving messages. If you wish to expire empty groups on a longer | ||
* schedule than expiring partial groups, set this property. Empty groups will | ||
* then not be removed from the MessageStore until they have not been modified | ||
* for at least this number of milliseconds. | ||
* @param minimumTimeoutForEmptyGroups The minimum timeout. | ||
*/ | ||
public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) { | ||
this.minimumTimeoutForEmptyGroups = minimumTimeoutForEmptyGroups; | ||
} | ||
|
||
/** | ||
* Set {@code releasePartialSequences} on an underlying default | ||
* {@link SequenceSizeReleaseStrategy}. Ignored for other release strategies. | ||
* @param releasePartialSequences true to allow release. | ||
*/ | ||
public void setReleasePartialSequences(boolean releasePartialSequences) { | ||
if (!this.releaseStrategySet && releasePartialSequences) { | ||
setReleaseStrategy(new SequenceSizeReleaseStrategy()); | ||
} | ||
this.releasePartialSequences = releasePartialSequences; | ||
} | ||
|
||
/** | ||
* Expire (completely remove) a group if it is completed due to timeout. | ||
* Default true | ||
* @param expireGroupsUponTimeout the expireGroupsUponTimeout to set | ||
* @since 4.1 | ||
*/ | ||
public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) { | ||
this.expireGroupsUponTimeout = expireGroupsUponTimeout; | ||
} | ||
|
||
@Override | ||
public String getComponentType() { | ||
return "aggregator"; | ||
|
@@ -653,7 +669,8 @@ protected void forceComplete(MessageGroup group) { | |
* groups. A longer timeout for empty groups can be enabled by | ||
* setting minimumTimeoutForEmptyGroups. | ||
*/ | ||
removeGroup = lastModifiedNow <= (System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups); | ||
removeGroup = lastModifiedNow <= (System | ||
.currentTimeMillis() - this.minimumTimeoutForEmptyGroups); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks odd; move System to the next line too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm. Some fault of my IDEA code reformatting. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still weird indentation. |
||
if (removeGroup && this.logger.isDebugEnabled()) { | ||
this.logger.debug("Removing empty group: " + correlationKey); | ||
} | ||
|
@@ -753,10 +770,24 @@ protected Collection<Message<?>> completeGroup(Message<?> message, Object correl | |
Object result = this.outputProcessor.processMessageGroup(group); | ||
Collection<Message<?>> partialSequence = null; | ||
if (result instanceof Collection<?>) { | ||
this.verifyResultCollectionConsistsOfMessages((Collection<?>) result); | ||
verifyResultCollectionConsistsOfMessages((Collection<?>) result); | ||
partialSequence = (Collection<Message<?>>) result; | ||
} | ||
this.sendOutputs(result, message); | ||
|
||
if (this.popSequence && partialSequence == null && !(result instanceof Message<?>)) { | ||
AbstractIntegrationMessageBuilder<?> messageBuilder; | ||
if (result instanceof AbstractIntegrationMessageBuilder<?>) { | ||
messageBuilder = (AbstractIntegrationMessageBuilder<?>) result; | ||
} | ||
else { | ||
messageBuilder = getMessageBuilderFactory() | ||
.withPayload(result) | ||
.copyHeaders(message.getHeaders()); | ||
} | ||
result = messageBuilder.popSequenceDetails(); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now I look at this, I wonder if it is completely valid to do this unconditionally. Consider
If the custom splitter doesn't push sequence details, we will remove the real ones from the first splitter. Perhaps we need a new flag Or, add some metadata ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. Sounds like we just need to be sure that our aggregation is based on the Or... I just don't see the whole picture and need to sleep with this... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After sleeping with this, I came with the conclusion that your suggestion about We may do a condition based on the default From here I think we just need to add such a Saying that I don't think that we are good for back-porting to Let me know WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe call it simply There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. Will fix soon. |
||
|
||
sendOutputs(result, message); | ||
return partialSequence; | ||
} | ||
|
||
|
@@ -772,10 +803,8 @@ protected Long obtainGroupTimeout(MessageGroup group) { | |
} | ||
|
||
@Override | ||
public void destroy() throws Exception { | ||
for (ScheduledFuture<?> future : this.expireGroupScheduledFutures.values()) { | ||
future.cancel(true); | ||
} | ||
public void destroy() { | ||
this.expireGroupScheduledFutures.values().forEach(future -> future.cancel(true)); | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not just splitter - router, pubsub etc; maybe just say "removes the sequence information added by the nearest upstream component with
applySequence=true
(for example splitter).