Skip to content

Commit 393c257

Browse files
committed
* Add AbstractCorrelatingMessageHandler.popSequenceDetails property
1 parent 757cd46 commit 393c257

14 files changed

+250
-157
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractAggregatingMessageGroupProcessor.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -93,22 +93,19 @@ else if (payload instanceof AbstractIntegrationMessageBuilder) {
9393
builder = getMessageBuilderFactory().withPayload(payload);
9494
}
9595

96-
return builder.copyHeadersIfAbsent(headers)
97-
.popSequenceDetails()
98-
.build();
96+
return builder.copyHeadersIfAbsent(headers);
9997
}
10098

10199
/**
102100
* This default implementation simply returns all headers that have no conflicts among the group. An absent header
103101
* on one or more Messages within the group is not considered a conflict. Subclasses may override this method with
104102
* more advanced conflict-resolution strategies if necessary.
105-
*
106103
* @param group The message group.
107104
* @return The aggregated headers.
108105
*/
109106
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
110-
Map<String, Object> aggregatedHeaders = new HashMap<String, Object>();
111-
Set<String> conflictKeys = new HashSet<String>();
107+
Map<String, Object> aggregatedHeaders = new HashMap<>();
108+
Set<String> conflictKeys = new HashSet<>();
112109
for (Message<?> message : group.getMessages()) {
113110
for (Entry<String, Object> entry : message.getHeaders().entrySet()) {
114111
String key = entry.getKey();

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 67 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.springframework.integration.store.SimpleMessageGroup;
5353
import org.springframework.integration.store.SimpleMessageStore;
5454
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
55+
import org.springframework.integration.support.MessageBuilder;
5556
import org.springframework.integration.support.locks.DefaultLockRegistry;
5657
import org.springframework.integration.support.locks.LockRegistry;
5758
import org.springframework.integration.util.UUIDConverter;
@@ -141,6 +142,8 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
141142

142143
private boolean expireGroupsUponTimeout = true;
143144

145+
private boolean popSequenceDetails = true;
146+
144147
private volatile boolean running;
145148

146149
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
@@ -218,6 +221,69 @@ public void setOutputProcessor(MessageGroupProcessor outputProcessor) {
218221
this.outputProcessor = outputProcessor;
219222
}
220223

224+
public void setDiscardChannel(MessageChannel discardChannel) {
225+
Assert.notNull(discardChannel, "'discardChannel' cannot be null");
226+
this.discardChannel = discardChannel;
227+
}
228+
229+
public void setDiscardChannelName(String discardChannelName) {
230+
Assert.hasText(discardChannelName, "'discardChannelName' must not be empty");
231+
this.discardChannelName = discardChannelName;
232+
}
233+
234+
235+
public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) {
236+
this.sendPartialResultOnExpiry = sendPartialResultOnExpiry;
237+
}
238+
239+
/**
240+
* By default, when a MessageGroupStoreReaper is configured to expire partial
241+
* groups, empty groups are also removed. Empty groups exist after a group
242+
* is released normally. This is to enable the detection and discarding of
243+
* late-arriving messages. If you wish to expire empty groups on a longer
244+
* schedule than expiring partial groups, set this property. Empty groups will
245+
* then not be removed from the MessageStore until they have not been modified
246+
* for at least this number of milliseconds.
247+
* @param minimumTimeoutForEmptyGroups The minimum timeout.
248+
*/
249+
public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) {
250+
this.minimumTimeoutForEmptyGroups = minimumTimeoutForEmptyGroups;
251+
}
252+
253+
/**
254+
* Set {@code releasePartialSequences} on an underlying default
255+
* {@link SequenceSizeReleaseStrategy}. Ignored for other release strategies.
256+
* @param releasePartialSequences true to allow release.
257+
*/
258+
public void setReleasePartialSequences(boolean releasePartialSequences) {
259+
if (!this.releaseStrategySet && releasePartialSequences) {
260+
setReleaseStrategy(new SequenceSizeReleaseStrategy());
261+
}
262+
this.releasePartialSequences = releasePartialSequences;
263+
}
264+
265+
/**
266+
* Expire (completely remove) a group if it is completed due to timeout.
267+
* Default true
268+
* @param expireGroupsUponTimeout the expireGroupsUponTimeout to set
269+
* @since 4.1
270+
*/
271+
public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
272+
this.expireGroupsUponTimeout = expireGroupsUponTimeout;
273+
}
274+
275+
/**
276+
* Perform a {@link MessageBuilder#popSequenceDetails()} for output message or not.
277+
* Default to true.
278+
* This option play an opposite role to the
279+
* {@link org.springframework.integration.splitter.AbstractMessageSplitter#setApplySequence(boolean)}.
280+
* @param popSequenceDetails the boolean flag to use.
281+
* @since 5.1
282+
*/
283+
public void setPopSequenceDetails(boolean popSequenceDetails) {
284+
this.popSequenceDetails = popSequenceDetails;
285+
}
286+
221287
@Override
222288
public void setTaskScheduler(TaskScheduler taskScheduler) {
223289
super.setTaskScheduler(taskScheduler);
@@ -286,57 +352,6 @@ private MessageGroupProcessor createGroupTimeoutProcessor() {
286352
return processor;
287353
}
288354

289-
public void setDiscardChannel(MessageChannel discardChannel) {
290-
Assert.notNull(discardChannel, "'discardChannel' cannot be null");
291-
this.discardChannel = discardChannel;
292-
}
293-
294-
public void setDiscardChannelName(String discardChannelName) {
295-
Assert.hasText(discardChannelName, "'discardChannelName' must not be empty");
296-
this.discardChannelName = discardChannelName;
297-
}
298-
299-
300-
public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) {
301-
this.sendPartialResultOnExpiry = sendPartialResultOnExpiry;
302-
}
303-
304-
/**
305-
* By default, when a MessageGroupStoreReaper is configured to expire partial
306-
* groups, empty groups are also removed. Empty groups exist after a group
307-
* is released normally. This is to enable the detection and discarding of
308-
* late-arriving messages. If you wish to expire empty groups on a longer
309-
* schedule than expiring partial groups, set this property. Empty groups will
310-
* then not be removed from the MessageStore until they have not been modified
311-
* for at least this number of milliseconds.
312-
* @param minimumTimeoutForEmptyGroups The minimum timeout.
313-
*/
314-
public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) {
315-
this.minimumTimeoutForEmptyGroups = minimumTimeoutForEmptyGroups;
316-
}
317-
318-
/**
319-
* Set {@code releasePartialSequences} on an underlying default
320-
* {@link SequenceSizeReleaseStrategy}. Ignored for other release strategies.
321-
* @param releasePartialSequences true to allow release.
322-
*/
323-
public void setReleasePartialSequences(boolean releasePartialSequences) {
324-
if (!this.releaseStrategySet && releasePartialSequences) {
325-
setReleaseStrategy(new SequenceSizeReleaseStrategy());
326-
}
327-
this.releasePartialSequences = releasePartialSequences;
328-
}
329-
330-
/**
331-
* Expire (completely remove) a group if it is completed due to timeout.
332-
* Default true
333-
* @param expireGroupsUponTimeout the expireGroupsUponTimeout to set
334-
* @since 4.1
335-
*/
336-
public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
337-
this.expireGroupsUponTimeout = expireGroupsUponTimeout;
338-
}
339-
340355
@Override
341356
public String getComponentType() {
342357
return "aggregator";
@@ -759,7 +774,7 @@ protected Collection<Message<?>> completeGroup(Message<?> message, Object correl
759774
partialSequence = (Collection<Message<?>>) result;
760775
}
761776

762-
if (partialSequence == null && !(result instanceof Message<?>)) {
777+
if (this.popSequenceDetails && partialSequence == null && !(result instanceof Message<?>)) {
763778
AbstractIntegrationMessageBuilder<?> messageBuilder;
764779
if (result instanceof AbstractIntegrationMessageBuilder<?>) {
765780
messageBuilder = (AbstractIntegrationMessageBuilder<?>) result;

spring-integration-core/src/main/java/org/springframework/integration/config/AggregatorFactoryBean.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2016 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -40,6 +40,8 @@
4040
* {@link FactoryBean} to create an {@link AggregatingMessageHandler}.
4141
*
4242
* @author Gary Russell
43+
* @author Artem Bilan
44+
*
4345
* @since 4.2
4446
*
4547
*/
@@ -85,6 +87,8 @@ public class AggregatorFactoryBean extends AbstractSimpleMessageHandlerFactoryBe
8587

8688
private Boolean expireGroupsUponTimeout;
8789

90+
private Boolean popSequenceDetails;
91+
8892
public void setProcessorBean(Object processorBean) {
8993
this.processorBean = processorBean;
9094
}
@@ -165,6 +169,10 @@ public void setExpireGroupsUponTimeout(Boolean expireGroupsUponTimeout) {
165169
this.expireGroupsUponTimeout = expireGroupsUponTimeout;
166170
}
167171

172+
public void setPopSequenceDetails(Boolean popSequenceDetails) {
173+
this.popSequenceDetails = popSequenceDetails;
174+
}
175+
168176
@Override
169177
protected AggregatingMessageHandler createHandler() {
170178
MessageGroupProcessor outputProcessor;
@@ -253,6 +261,10 @@ protected AggregatingMessageHandler createHandler() {
253261
aggregator.setExpireGroupsUponTimeout(this.expireGroupsUponTimeout);
254262
}
255263

264+
if (this.popSequenceDetails != null) {
265+
aggregator.setPopSequenceDetails(this.popSequenceDetails);
266+
}
267+
256268
return aggregator;
257269
}
258270

spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractCorrelatingMessageHandlerParser.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -83,6 +83,8 @@ protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetad
8383
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "empty-group-min-timeout",
8484
"minimumTimeoutForEmptyGroups");
8585

86+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "pop-sequence-details");
87+
8688
BeanDefinition expressionDef =
8789
IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression("group-timeout",
8890
"group-timeout-expression", parserContext, element, false);

spring-integration-core/src/main/java/org/springframework/integration/dsl/CorrelationHandlerSpec.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,6 +34,7 @@
3434
import org.springframework.integration.expression.ValueExpression;
3535
import org.springframework.integration.store.MessageGroup;
3636
import org.springframework.integration.store.MessageGroupStore;
37+
import org.springframework.integration.support.MessageBuilder;
3738
import org.springframework.integration.support.locks.LockRegistry;
3839
import org.springframework.messaging.MessageChannel;
3940
import org.springframework.scheduling.TaskScheduler;
@@ -53,7 +54,7 @@ public abstract class CorrelationHandlerSpec<S extends CorrelationHandlerSpec<S,
5354
H extends AbstractCorrelatingMessageHandler>
5455
extends ConsumerEndpointSpec<S, H> {
5556

56-
private final List<Advice> forceReleaseAdviceChain = new LinkedList<Advice>();
57+
private final List<Advice> forceReleaseAdviceChain = new LinkedList<>();
5758

5859
protected CorrelationHandlerSpec(H messageHandler) {
5960
super(messageHandler);
@@ -314,4 +315,16 @@ public S lockRegistry(LockRegistry lockRegistry) {
314315
return _this();
315316
}
316317

318+
/**
319+
* Perform a {@link MessageBuilder#popSequenceDetails()} for output message or not.
320+
* @param popSequenceDetails the boolean flag to use.
321+
* @return the endpoint spec.
322+
* @since 5.1
323+
* @see AbstractCorrelatingMessageHandler#setPopSequenceDetails(boolean)
324+
*/
325+
public S popSequenceDetails(boolean popSequenceDetails) {
326+
this.handler.setPopSequenceDetails(popSequenceDetails);
327+
return _this();
328+
}
329+
317330
}

spring-integration-core/src/main/resources/org/springframework/integration/config/spring-integration-5.1.xsd

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3829,6 +3829,19 @@
38293829
<xsd:union memberTypes="xsd:boolean xsd:string" />
38303830
</xsd:simpleType>
38313831
</xsd:attribute>
3832+
<xsd:attribute name="pop-sequence-details" default="true">
3833+
<xsd:annotation>
3834+
<xsd:documentation>
3835+
Boolean flag specifying, if the 'MessageBuilder.popSequenceDetails()' should be called
3836+
for the output message. Plays the opposite role to the
3837+
'AbstractMessageSplitter.setApplySequence()'.
3838+
Defaults to 'true'.
3839+
</xsd:documentation>
3840+
</xsd:annotation>
3841+
<xsd:simpleType>
3842+
<xsd:union memberTypes="xsd:boolean xsd:string" />
3843+
</xsd:simpleType>
3844+
</xsd:attribute>
38323845
<xsd:attribute name="lock-registry" type="xsd:string">
38333846
<xsd:annotation>
38343847
<xsd:appinfo>

spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.junit.Test;
3939

4040
import org.springframework.beans.DirectFieldAccessor;
41+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
4142
import org.springframework.integration.channel.QueueChannel;
4243
import org.springframework.integration.store.MessageGroup;
4344
import org.springframework.integration.store.MessageGroupStore;
@@ -150,7 +151,7 @@ protected void afterRelease(MessageGroup group, Collection<Message<?>> completed
150151
}
151152

152153
@Test // INT-2833
153-
public void testReaperReapsAnEmptyGroup() throws Exception {
154+
public void testReaperReapsAnEmptyGroup() {
154155
final MessageGroupStore groupStore = new SimpleMessageStore();
155156
AggregatingMessageHandler handler = new AggregatingMessageHandler(group -> group, groupStore);
156157

@@ -447,4 +448,32 @@ public void testDontReapMessageOfOtherHandler() {
447448
assertEquals(1, handler2DiscardChannel.getQueueSize());
448449
}
449450

451+
@Test
452+
public void testNoPopSequenceDetails() {
453+
MessageGroupProcessor mgp = new DefaultAggregatingMessageGroupProcessor();
454+
AggregatingMessageHandler handler = new AggregatingMessageHandler(mgp);
455+
handler.setReleaseStrategy(group -> true);
456+
handler.setPopSequenceDetails(false);
457+
QueueChannel outputChannel = new QueueChannel();
458+
handler.setOutputChannel(outputChannel);
459+
460+
Message<?> testMessage = MessageBuilder.withPayload("foo")
461+
.setCorrelationId(1)
462+
.setSequenceNumber(1)
463+
.setSequenceSize(1)
464+
.pushSequenceDetails(2, 2, 2)
465+
.build();
466+
467+
handler.handleMessage(testMessage);
468+
469+
Message<?> receive = outputChannel.receive(10_000);
470+
471+
assertNotNull(receive);
472+
473+
assertEquals(2, receive.getHeaders().get(IntegrationMessageHeaderAccessor.CORRELATION_ID));
474+
assertEquals(2, receive.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER));
475+
assertEquals(2, receive.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE));
476+
assertTrue(receive.getHeaders().containsKey(IntegrationMessageHeaderAccessor.SEQUENCE_DETAILS));
477+
}
478+
450479
}

0 commit comments

Comments
 (0)