Skip to content

INT-4542: CorrMH: Use popSequenceDetails properly #2591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -93,22 +93,19 @@ else if (payload instanceof AbstractIntegrationMessageBuilder) {
builder = getMessageBuilderFactory().withPayload(payload);
}

return builder.copyHeadersIfAbsent(headers)
.popSequenceDetails()
.build();
return builder.copyHeadersIfAbsent(headers);
}

/**
* This default implementation simply returns all headers that have no conflicts among the group. An absent header
* on one or more Messages within the group is not considered a conflict. Subclasses may override this method with
* more advanced conflict-resolution strategies if necessary.
*
* @param group The message group.
* @return The aggregated headers.
*/
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
Map<String, Object> aggregatedHeaders = new HashMap<String, Object>();
Set<String> conflictKeys = new HashSet<String>();
Map<String, Object> aggregatedHeaders = new HashMap<>();
Set<String> conflictKeys = new HashSet<>();
for (Message<?> message : group.getMessages()) {
for (Entry<String, Object> entry : message.getHeaders().entrySet()) {
String key = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.springframework.integration.store.MessageStore;
import org.springframework.integration.store.SimpleMessageGroup;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.integration.util.UUIDConverter;
Expand Down Expand Up @@ -140,6 +142,8 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP

private boolean expireGroupsUponTimeout = true;

private boolean popSequence = true;

private volatile boolean running;

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
Expand Down Expand Up @@ -217,6 +221,69 @@ public void setOutputProcessor(MessageGroupProcessor outputProcessor) {
this.outputProcessor = outputProcessor;
}

public void setDiscardChannel(MessageChannel discardChannel) {
Assert.notNull(discardChannel, "'discardChannel' cannot be null");
this.discardChannel = discardChannel;
}

public void setDiscardChannelName(String discardChannelName) {
Assert.hasText(discardChannelName, "'discardChannelName' must not be empty");
this.discardChannelName = discardChannelName;
}


public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) {
this.sendPartialResultOnExpiry = sendPartialResultOnExpiry;
}

/**
* By default, when a MessageGroupStoreReaper is configured to expire partial
* groups, empty groups are also removed. Empty groups exist after a group
* is released normally. This is to enable the detection and discarding of
* late-arriving messages. If you wish to expire empty groups on a longer
* schedule than expiring partial groups, set this property. Empty groups will
* then not be removed from the MessageStore until they have not been modified
* for at least this number of milliseconds.
* @param minimumTimeoutForEmptyGroups The minimum timeout.
*/
public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) {
this.minimumTimeoutForEmptyGroups = minimumTimeoutForEmptyGroups;
}

/**
* Set {@code releasePartialSequences} on an underlying default
* {@link SequenceSizeReleaseStrategy}. Ignored for other release strategies.
* @param releasePartialSequences true to allow release.
*/
public void setReleasePartialSequences(boolean releasePartialSequences) {
if (!this.releaseStrategySet && releasePartialSequences) {
setReleaseStrategy(new SequenceSizeReleaseStrategy());
}
this.releasePartialSequences = releasePartialSequences;
}

/**
* Expire (completely remove) a group if it is completed due to timeout.
* Default true
* @param expireGroupsUponTimeout the expireGroupsUponTimeout to set
* @since 4.1
*/
public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
this.expireGroupsUponTimeout = expireGroupsUponTimeout;
}

/**
* Perform a {@link MessageBuilder#popSequenceDetails()} for output message or not.
* Default to true.
* This option removes the sequence information added by the nearest upstream component with
* {@code applySequence=true} (for example splitter).
* @param popSequence the boolean flag to use.
* @since 5.1
*/
public void setPopSequence(boolean popSequence) {
this.popSequence = popSequence;
}

@Override
public void setTaskScheduler(TaskScheduler taskScheduler) {
super.setTaskScheduler(taskScheduler);
Expand Down Expand Up @@ -285,57 +352,6 @@ private MessageGroupProcessor createGroupTimeoutProcessor() {
return processor;
}

public void setDiscardChannel(MessageChannel discardChannel) {
Assert.notNull(discardChannel, "'discardChannel' cannot be null");
this.discardChannel = discardChannel;
}

public void setDiscardChannelName(String discardChannelName) {
Assert.hasText(discardChannelName, "'discardChannelName' must not be empty");
this.discardChannelName = discardChannelName;
}


public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) {
this.sendPartialResultOnExpiry = sendPartialResultOnExpiry;
}

/**
* By default, when a MessageGroupStoreReaper is configured to expire partial
* groups, empty groups are also removed. Empty groups exist after a group
* is released normally. This is to enable the detection and discarding of
* late-arriving messages. If you wish to expire empty groups on a longer
* schedule than expiring partial groups, set this property. Empty groups will
* then not be removed from the MessageStore until they have not been modified
* for at least this number of milliseconds.
* @param minimumTimeoutForEmptyGroups The minimum timeout.
*/
public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) {
this.minimumTimeoutForEmptyGroups = minimumTimeoutForEmptyGroups;
}

/**
* Set {@code releasePartialSequences} on an underlying default
* {@link SequenceSizeReleaseStrategy}. Ignored for other release strategies.
* @param releasePartialSequences true to allow release.
*/
public void setReleasePartialSequences(boolean releasePartialSequences) {
if (!this.releaseStrategySet && releasePartialSequences) {
setReleaseStrategy(new SequenceSizeReleaseStrategy());
}
this.releasePartialSequences = releasePartialSequences;
}

/**
* Expire (completely remove) a group if it is completed due to timeout.
* Default true
* @param expireGroupsUponTimeout the expireGroupsUponTimeout to set
* @since 4.1
*/
public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
this.expireGroupsUponTimeout = expireGroupsUponTimeout;
}

@Override
public String getComponentType() {
return "aggregator";
Expand Down Expand Up @@ -413,7 +429,8 @@ protected EvaluationContext getEvaluationContext() {
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
Object correlationKey = this.correlationStrategy.getCorrelationKey(message);
Assert.state(correlationKey != null, "Null correlation not allowed. Maybe the CorrelationStrategy is failing?");
Assert.state(correlationKey != null,
"Null correlation not allowed. Maybe the CorrelationStrategy is failing?");

if (this.logger.isDebugEnabled()) {
this.logger.debug("Handling message with correlationKey [" + correlationKey + "]: " + message);
Expand Down Expand Up @@ -653,7 +670,8 @@ protected void forceComplete(MessageGroup group) {
* groups. A longer timeout for empty groups can be enabled by
* setting minimumTimeoutForEmptyGroups.
*/
removeGroup = lastModifiedNow <= (System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups);
removeGroup =
lastModifiedNow <= (System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups);
if (removeGroup && this.logger.isDebugEnabled()) {
this.logger.debug("Removing empty group: " + correlationKey);
}
Expand Down Expand Up @@ -753,10 +771,24 @@ protected Collection<Message<?>> completeGroup(Message<?> message, Object correl
Object result = this.outputProcessor.processMessageGroup(group);
Collection<Message<?>> partialSequence = null;
if (result instanceof Collection<?>) {
this.verifyResultCollectionConsistsOfMessages((Collection<?>) result);
verifyResultCollectionConsistsOfMessages((Collection<?>) result);
partialSequence = (Collection<Message<?>>) result;
}
this.sendOutputs(result, message);

if (this.popSequence && partialSequence == null && !(result instanceof Message<?>)) {
AbstractIntegrationMessageBuilder<?> messageBuilder;
if (result instanceof AbstractIntegrationMessageBuilder<?>) {
messageBuilder = (AbstractIntegrationMessageBuilder<?>) result;
}
else {
messageBuilder = getMessageBuilderFactory()
.withPayload(result)
.copyHeaders(message.getHeaders());
}
result = messageBuilder.popSequenceDetails();
}
Copy link
Contributor

@garyrussell garyrussell Oct 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I look at this, I wonder if it is completely valid to do this unconditionally. Consider

splitter -> custom-splitter -> custom-aggregator -> aggregator

If the custom splitter doesn't push sequence details, we will remove the real ones from the first splitter.

Perhaps we need a new flag alwaysPopSequenceDetails (true in 5.1, false in 5.0) - even in the abstract output processor.

Or, add some metadata (seqInfoId) to the sequence details and only pop if the splitter and aggregator are configured with the same metadata.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Sounds like we just need to be sure that our aggregation is based on the sequenceDetails. Because custom splitter may not be based on the applySequence.
So, maybe just an extra sequenceApplied header? But then how to understand on which level it has not been used?..

Or... I just don't see the whole picture and need to sleep with this...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After sleeping with this, I came with the conclusion that your suggestion about popSequenceDetails(true|false) is the way to go.
We won't pollute messages with some extra header which may be really out of control and I don't want to tie an aggregator with the splitter.

We may do a condition based on the default HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) and SimpleSequenceSizeReleaseStrategy, but that still doesn't come as a robust solution since we can use defaults, but populate sequence header some other way, not via splitter.

From here I think we just need to add such a popSequenceDetails option to the correlation handler, make it true by default (looks like most cases, since we haven't had issues so far), modify a AbstractAggregatingMessageGroupProcessor to return a AbstractIntegrationMessageBuilder instead of the whole message and do conditional .popSequenceDetails() already in the AbstractCorrelatingMessageHandler.

Saying that I don't think that we are good for back-porting to 5.0.x: looks like this is going to be some breaking change.

Let me know WDYT?
Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call it simply popSequence with the documentation explaining it undoes the nearest upstream applySequence?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Will fix soon.


sendOutputs(result, message);
return partialSequence;
}

Expand All @@ -772,10 +804,8 @@ protected Long obtainGroupTimeout(MessageGroup group) {
}

@Override
public void destroy() throws Exception {
for (ScheduledFuture<?> future : this.expireGroupScheduledFutures.values()) {
future.cancel(true);
}
public void destroy() {
this.expireGroupScheduledFutures.values().forEach(future -> future.cancel(true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,6 +40,8 @@
* {@link FactoryBean} to create an {@link AggregatingMessageHandler}.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 4.2
*
*/
Expand Down Expand Up @@ -85,6 +87,8 @@ public class AggregatorFactoryBean extends AbstractSimpleMessageHandlerFactoryBe

private Boolean expireGroupsUponTimeout;

private Boolean popSequence;

public void setProcessorBean(Object processorBean) {
this.processorBean = processorBean;
}
Expand Down Expand Up @@ -165,6 +169,10 @@ public void setExpireGroupsUponTimeout(Boolean expireGroupsUponTimeout) {
this.expireGroupsUponTimeout = expireGroupsUponTimeout;
}

public void setPopSequence(Boolean popSequence) {
this.popSequence = popSequence;
}

@Override
protected AggregatingMessageHandler createHandler() {
MessageGroupProcessor outputProcessor;
Expand Down Expand Up @@ -253,6 +261,10 @@ protected AggregatingMessageHandler createHandler() {
aggregator.setExpireGroupsUponTimeout(this.expireGroupsUponTimeout);
}

if (this.popSequence != null) {
aggregator.setPopSequence(this.popSequence);
}

return aggregator;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,6 +83,8 @@ protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetad
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "empty-group-min-timeout",
"minimumTimeoutForEmptyGroups");

IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "pop-sequence");

BeanDefinition expressionDef =
IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression("group-timeout",
"group-timeout-expression", parserContext, element, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,6 +34,7 @@
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.TaskScheduler;
Expand All @@ -53,7 +54,7 @@ public abstract class CorrelationHandlerSpec<S extends CorrelationHandlerSpec<S,
H extends AbstractCorrelatingMessageHandler>
extends ConsumerEndpointSpec<S, H> {

private final List<Advice> forceReleaseAdviceChain = new LinkedList<Advice>();
private final List<Advice> forceReleaseAdviceChain = new LinkedList<>();

protected CorrelationHandlerSpec(H messageHandler) {
super(messageHandler);
Expand Down Expand Up @@ -314,4 +315,16 @@ public S lockRegistry(LockRegistry lockRegistry) {
return _this();
}

/**
* Perform a {@link MessageBuilder#popSequenceDetails()} for output message or not.
* @param popSequence the boolean flag to use.
* @return the endpoint spec.
* @since 5.1
* @see AbstractCorrelatingMessageHandler#setPopSequence(boolean)
*/
public S popSequence(boolean popSequence) {
this.handler.setPopSequence(popSequence);
return _this();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3829,6 +3829,19 @@
<xsd:union memberTypes="xsd:boolean xsd:string" />
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="pop-sequence" default="true">
<xsd:annotation>
<xsd:documentation>
Boolean flag specifying, if the 'MessageBuilder.popSequenceDetails()' should be called
for the output message. Plays the opposite role to the
'AbstractMessageSplitter.setApplySequence()'.
Defaults to 'true'.
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:union memberTypes="xsd:boolean xsd:string" />
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="lock-registry" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
Expand Down
Loading