Skip to content

Commit e1cebaf

Browse files
committed
GH-9558: Expose BarrierSpec.discardChannel & triggerTimeout
Fixes: #9558 Issue link: #9558 **Auto-cherry-pick to `6.3.x` & `6.2.x`**
1 parent 15e914b commit e1cebaf

File tree

4 files changed

+72
-5
lines changed

4 files changed

+72
-5
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/BarrierMessageHandler.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2020 the original author or authors.
2+
* Copyright 2015-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727
import org.springframework.integration.handler.DiscardingMessageHandler;
2828
import org.springframework.integration.handler.MessageTriggerAction;
2929
import org.springframework.integration.store.SimpleMessageGroup;
30+
import org.springframework.lang.Nullable;
3031
import org.springframework.messaging.Message;
3132
import org.springframework.messaging.MessageChannel;
3233
import org.springframework.messaging.MessageHandlingException;
@@ -170,7 +171,8 @@ public BarrierMessageHandler(long requestTimeout, long triggerTimeout, MessageGr
170171
}
171172

172173
/**
173-
* Set the name of the channel to which late arriving trigger messages are sent.
174+
* Set the name of the channel to which late arriving trigger messages are sent,
175+
* or request message does not arrive in time.
174176
* @param discardChannelName the discard channel.
175177
* @since 5.0
176178
*/
@@ -179,7 +181,8 @@ public void setDiscardChannelName(String discardChannelName) {
179181
}
180182

181183
/**
182-
* Set the channel to which late arriving trigger messages are sent.
184+
* Set the channel to which late arriving trigger messages are sent,
185+
* or request message does not arrive in time.
183186
* @param discardChannel the discard channel.
184187
* @since 5.0
185188
*/
@@ -188,8 +191,11 @@ public void setDiscardChannel(MessageChannel discardChannel) {
188191
}
189192

190193
/**
194+
* Return the discard message channel for trigger action message.
195+
* @return a discard message channel.
191196
* @since 5.0
192197
*/
198+
@Nullable
193199
@Override
194200
public MessageChannel getDiscardChannel() {
195201
String channelName = this.discardChannelName;

spring-integration-core/src/main/java/org/springframework/integration/dsl/BarrierSpec.java

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,8 @@
2525
import org.springframework.integration.aggregator.HeaderAttributeCorrelationStrategy;
2626
import org.springframework.integration.aggregator.MessageGroupProcessor;
2727
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
28+
import org.springframework.lang.Nullable;
29+
import org.springframework.messaging.MessageChannel;
2830
import org.springframework.util.Assert;
2931

3032
/**
@@ -43,6 +45,15 @@ public class BarrierSpec extends ConsumerEndpointSpec<BarrierSpec, BarrierMessag
4345
private CorrelationStrategy correlationStrategy =
4446
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID);
4547

48+
@Nullable
49+
private MessageChannel discardChannel;
50+
51+
@Nullable
52+
private String discardChannelName;
53+
54+
@Nullable
55+
private Long triggerTimeout;
56+
4657
protected BarrierSpec(long timeout) {
4758
super(null);
4859
this.timeout = timeout;
@@ -60,9 +71,57 @@ public BarrierSpec correlationStrategy(CorrelationStrategy correlationStrategy)
6071
return this;
6172
}
6273

74+
/**
75+
* Set the channel to which late arriving trigger messages are sent,
76+
* or request message does not arrive in time.
77+
* @param discardChannel the message channel for discarded triggers.
78+
* @return the spec
79+
* @since 6.2.10
80+
*/
81+
public BarrierSpec discardChannel(@Nullable MessageChannel discardChannel) {
82+
this.discardChannel = discardChannel;
83+
return this;
84+
}
85+
86+
/**
87+
* Set the channel bean name to which late arriving trigger messages are sent,
88+
* or request message does not arrive in time.
89+
* @param discardChannelName the message channel for discarded triggers.
90+
* @return the spec
91+
* @since 6.2.10
92+
*/
93+
public BarrierSpec discardChannel(@Nullable String discardChannelName) {
94+
this.discardChannelName = discardChannelName;
95+
return this;
96+
}
97+
98+
/**
99+
* Set the timeout in milliseconds when waiting for a request message.
100+
* @param triggerTimeout the timeout in milliseconds when waiting for a request message.
101+
* @return the spec
102+
* @since 6.2.10
103+
*/
104+
public BarrierSpec triggerTimeout(long triggerTimeout) {
105+
this.triggerTimeout = triggerTimeout;
106+
return this;
107+
}
108+
63109
@Override
64110
public Tuple2<ConsumerEndpointFactoryBean, BarrierMessageHandler> doGet() {
65-
this.handler = new BarrierMessageHandler(this.timeout, this.outputProcessor, this.correlationStrategy);
111+
if (this.triggerTimeout == null) {
112+
this.handler = new BarrierMessageHandler(this.timeout, this.outputProcessor, this.correlationStrategy);
113+
}
114+
else {
115+
this.handler =
116+
new BarrierMessageHandler(this.timeout, this.triggerTimeout, this.outputProcessor,
117+
this.correlationStrategy);
118+
}
119+
if (this.discardChannel != null) {
120+
this.handler.setDiscardChannel(this.discardChannel);
121+
}
122+
else if (this.discardChannelName != null) {
123+
this.handler.setDiscardChannelName(this.discardChannelName);
124+
}
66125
return super.doGet();
67126
}
68127

spring-integration-core/src/test/java/org/springframework/integration/dsl/correlation/CorrelationHandlerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ public IntegrationFlow barrierFlow() {
300300
return f -> f
301301
.barrier(10000, b -> b
302302
.correlationStrategy(new HeaderAttributeCorrelationStrategy(BARRIER))
303+
.discardChannel("nullChannel")
303304
.outputProcessor(g ->
304305
g.getMessages()
305306
.stream()

src/reference/antora/modules/ROOT/pages/barrier.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,6 @@ XML::
7575
Depending on which one has a message arrive first, either the thread sending a message to `in` or the thread sending a message to `release` waits for up to ten seconds until the other message arrives.
7676
When the message is released, the `out` channel is sent a message that combines the result of invoking the custom `MessageGroupProcessor` bean, named `myOutputProcessor`.
7777
If the main thread times out and a trigger arrives later, you can configure a discard channel to which the late trigger is sent.
78+
The trigger message is also discarded if request message does not arrive in time.
7879

7980
For an example of this component, see the https://github.com/spring-projects/spring-integration-samples/tree/main/basic/barrier[barrier sample application].

0 commit comments

Comments
 (0)