Skip to content

Commit 075d237

Browse files
garyrussellartembilan
authored andcommitted
INT-4527: Delayer Retries and Error Handling
JIRA: https://jira.spring.io/browse/INT-4527 Fixes #2543 - Add error handling within the scope of a transaction - Add `retryDelay` and `maxAttempts` - Include the delivery attempt header in the `ErrorMessage` - Add `transactionalRelease()` methods to the DSL Check context in `ContextRefreshedEvent`. Complete test case; don't schedule for re-release after a successful send after a failure. Use `identityHashCode` for deliveries map key - error flow might change the message `hashCode`. Debug logging Polishing; remove parameter from the `DelayerEndpointSpec.transactionalRelease()` * Polishing some code style
1 parent 68bccdc commit 075d237

File tree

5 files changed

+469
-154
lines changed

5 files changed

+469
-154
lines changed

spring-integration-core/src/main/java/org/springframework/integration/IntegrationMessageHeaderAccessor.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class IntegrationMessageHeaderAccessor extends MessageHeaderAccessor {
6767

6868
public static final String ACKNOWLEDGMENT_CALLBACK = "acknowledgmentCallback";
6969

70-
private Set<String> readOnlyHeaders = new HashSet<String>();
70+
private Set<String> readOnlyHeaders = new HashSet<>();
7171

7272
public IntegrationMessageHeaderAccessor(Message<?> message) {
7373
super(message);
@@ -84,33 +84,33 @@ public IntegrationMessageHeaderAccessor(Message<?> message) {
8484
public void setReadOnlyHeaders(String... readOnlyHeaders) {
8585
Assert.noNullElements(readOnlyHeaders, "'readOnlyHeaders' must not be contain null items.");
8686
if (!ObjectUtils.isEmpty(readOnlyHeaders)) {
87-
this.readOnlyHeaders = new HashSet<String>(Arrays.asList(readOnlyHeaders));
87+
this.readOnlyHeaders = new HashSet<>(Arrays.asList(readOnlyHeaders));
8888
}
8989
}
9090

9191
@Nullable
9292
public Long getExpirationDate() {
93-
return this.getHeader(EXPIRATION_DATE, Long.class);
93+
return getHeader(EXPIRATION_DATE, Long.class);
9494
}
9595

9696
@Nullable
9797
public Object getCorrelationId() {
98-
return this.getHeader(CORRELATION_ID);
98+
return getHeader(CORRELATION_ID);
9999
}
100100

101101
public int getSequenceNumber() {
102-
Number sequenceNumber = this.getHeader(SEQUENCE_NUMBER, Number.class);
102+
Number sequenceNumber = getHeader(SEQUENCE_NUMBER, Number.class);
103103
return (sequenceNumber != null ? sequenceNumber.intValue() : 0);
104104
}
105105

106106
public int getSequenceSize() {
107-
Number sequenceSize = this.getHeader(SEQUENCE_SIZE, Number.class);
107+
Number sequenceSize = getHeader(SEQUENCE_SIZE, Number.class);
108108
return (sequenceSize != null ? sequenceSize.intValue() : 0);
109109
}
110110

111111
@Nullable
112112
public Integer getPriority() {
113-
Number priority = this.getHeader(PRIORITY, Number.class);
113+
Number priority = getHeader(PRIORITY, Number.class);
114114
return (priority != null ? priority.intValue() : null);
115115
}
116116

@@ -133,19 +133,20 @@ public Closeable getCloseableResource() {
133133
* @return the callback.
134134
* @since 5.0.1
135135
*/
136+
@Nullable
136137
public AcknowledgmentCallback getAcknowledgmentCallback() {
137138
return getHeader(ACKNOWLEDGMENT_CALLBACK, AcknowledgmentCallback.class);
138139
}
139140

140141
/**
141-
* When a message-driven enpoint supports retry implicitly, this
142+
* When a message-driven endpoint supports retry implicitly, this
142143
* header is incremented for each delivery attempt.
143144
* @return the delivery attempt.
144145
* @since 5.0.1
145146
*/
146147
@Nullable
147148
public AtomicInteger getDeliveryAttempt() {
148-
return this.getHeader(DELIVERY_ATTEMPT, AtomicInteger.class);
149+
return getHeader(DELIVERY_ATTEMPT, AtomicInteger.class);
149150
}
150151

151152
@SuppressWarnings("unchecked")

spring-integration-core/src/main/java/org/springframework/integration/dsl/DelayerEndpointSpec.java

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,19 +27,27 @@
2727
import org.springframework.integration.expression.FunctionExpression;
2828
import org.springframework.integration.handler.DelayHandler;
2929
import org.springframework.integration.store.MessageGroupStore;
30+
import org.springframework.integration.transaction.TransactionInterceptorBuilder;
3031
import org.springframework.messaging.Message;
32+
import org.springframework.messaging.MessageChannel;
33+
import org.springframework.messaging.MessageHandler;
34+
import org.springframework.messaging.support.ErrorMessage;
35+
import org.springframework.transaction.PlatformTransactionManager;
36+
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
37+
import org.springframework.transaction.interceptor.TransactionInterceptor;
3138
import org.springframework.util.Assert;
3239

3340
/**
3441
* A {@link ConsumerEndpointSpec} for a {@link DelayHandler}.
3542
*
3643
* @author Artem Bilan
44+
* @author Gary Russell
3745
*
3846
* @since 5.0
3947
*/
4048
public final class DelayerEndpointSpec extends ConsumerEndpointSpec<DelayerEndpointSpec, DelayHandler> {
4149

42-
private final List<Advice> delayedAdvice = new LinkedList<Advice>();
50+
private final List<Advice> delayedAdvice = new LinkedList<>();
4351

4452
DelayerEndpointSpec(DelayHandler delayHandler) {
4553
super(delayHandler);
@@ -97,6 +105,105 @@ public DelayerEndpointSpec delayExpression(String delayExpression) {
97105
return this;
98106
}
99107

108+
/**
109+
* Set a message channel to which an {@link ErrorMessage} will be sent if sending the
110+
* released message fails. If the error flow returns normally, the release is complete.
111+
* If the error flow throws an exception, the release will be re-attempted.
112+
* If there is a transaction advice on the release task, the error flow is called
113+
* within the transaction.
114+
* @param channel the channel.
115+
* @return the endpoint spec.
116+
* @see #maxAttempts(int)
117+
* @see #retryDelay(long)
118+
* @since 5.0.8
119+
*/
120+
public DelayerEndpointSpec delayedMessageErrorChannel(MessageChannel channel) {
121+
this.handler.setDelayedMessageErrorChannel(channel);
122+
return this;
123+
}
124+
125+
/**
126+
* Set a message channel name to which an {@link ErrorMessage} will be sent if sending
127+
* the released message fails. If the error flow returns normally, the release is
128+
* complete. If the error flow throws an exception, the release will be re-attempted.
129+
* If there is a transaction advice on the release task, the error flow is called
130+
* within the transaction.
131+
* @param channel the channel name.
132+
* @return the endpoint spec.
133+
* @see #maxAttempts(int)
134+
* @see #retryDelay(long)
135+
* @since 5.0.8
136+
*/
137+
public DelayerEndpointSpec delayedMessageErrorChannel(String channel) {
138+
this.handler.setDelayedMessageErrorChannelName(channel);
139+
return this;
140+
}
141+
142+
/**
143+
* Set the maximum number of release attempts for when message release fails.
144+
* Default {@value DelayHandler#DEFAULT_MAX_ATTEMPTS}.
145+
* @param maxAttempts the max attempts.
146+
* @return the endpoint spec.
147+
* @see #retryDelay(long)
148+
* @since 5.0.8
149+
*/
150+
public DelayerEndpointSpec maxAttempts(int maxAttempts) {
151+
this.handler.setMaxAttempts(maxAttempts);
152+
return this;
153+
}
154+
155+
/**
156+
* Set an additional delay to apply when retrying after a release failure.
157+
* Default {@value DelayHandler#DEFAULT_RETRY_DELAY}.
158+
* @param retryDelay the retry delay.
159+
* @return the endpoint spec.
160+
* @see #maxAttempts(int)
161+
* @since 5.0.8
162+
*/
163+
public DelayerEndpointSpec retryDelay(long retryDelay) {
164+
this.handler.setRetryDelay(retryDelay);
165+
return this;
166+
}
167+
168+
/**
169+
* Specify a {@link TransactionInterceptor} {@link Advice} with default
170+
* {@link PlatformTransactionManager} and {@link DefaultTransactionAttribute} for the
171+
* {@link MessageHandler}.
172+
* @return the spec.
173+
* @since 5.0.8
174+
*/
175+
public DelayerEndpointSpec transactionalRelease() {
176+
TransactionInterceptor transactionInterceptor = new TransactionInterceptorBuilder().build();
177+
this.componentsToRegister.put(transactionInterceptor, null);
178+
return delayedAdvice(transactionInterceptor);
179+
}
180+
181+
/**
182+
* Specify a {@link TransactionInterceptor} {@link Advice} for the {@link MessageHandler}.
183+
* @param transactionInterceptor the {@link TransactionInterceptor} to use.
184+
* @return the spec.
185+
* @see TransactionInterceptorBuilder
186+
* @since 5.0.8
187+
*/
188+
public DelayerEndpointSpec transactionalRelease(TransactionInterceptor transactionInterceptor) {
189+
return delayedAdvice(transactionInterceptor);
190+
}
191+
192+
/**
193+
* Specify a {@link TransactionInterceptor} {@link Advice} with the provided
194+
* {@code PlatformTransactionManager} and default {@link DefaultTransactionAttribute}
195+
* for the {@link MessageHandler}.
196+
* @param transactionManager the {@link PlatformTransactionManager} to use.
197+
* @return the spec.
198+
* @since 5.0.8
199+
*/
200+
public DelayerEndpointSpec transactionalRelease(PlatformTransactionManager transactionManager) {
201+
return transactionalRelease(
202+
new TransactionInterceptorBuilder()
203+
.transactionManager(transactionManager)
204+
.build());
205+
}
206+
100207
/**
101208
* Specify the function to determine delay value against {@link Message}.
102209
* Typically used with a Java 8 Lambda expression:

0 commit comments

Comments
 (0)