Skip to content

Commit f21f810

Browse files
garyrussellartembilan
authored andcommitted
INT-4369: Add DeliveryAttempt header
JIRA: https://jira.spring.io/browse/INT-4369 Also add `StaticMessageHeaderAccessor` - avoids object creation when getting type-safe well-known headers. * Docs
1 parent 381d3f9 commit f21f810

File tree

9 files changed

+241
-108
lines changed

9 files changed

+241
-108
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.amqp.inbound;
1818

1919
import java.util.Map;
20+
import java.util.concurrent.atomic.AtomicInteger;
2021

2122
import org.springframework.amqp.core.AcknowledgeMode;
2223
import org.springframework.amqp.core.Message;
@@ -27,13 +28,15 @@
2728
import org.springframework.amqp.support.converter.MessageConverter;
2829
import org.springframework.amqp.support.converter.SimpleMessageConverter;
2930
import org.springframework.core.AttributeAccessor;
31+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3032
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
3133
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
3234
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
3335
import org.springframework.integration.context.OrderlyShutdownCapable;
3436
import org.springframework.integration.endpoint.MessageProducerSupport;
3537
import org.springframework.integration.support.ErrorMessageStrategy;
3638
import org.springframework.integration.support.ErrorMessageUtils;
39+
import org.springframework.integration.support.StaticMessageHeaderAccessor;
3740
import org.springframework.retry.RecoveryCallback;
3841
import org.springframework.retry.RetryCallback;
3942
import org.springframework.retry.RetryContext;
@@ -200,15 +203,18 @@ public void onMessage(final Message message, final Channel channel) throws Excep
200203
try {
201204
if (AmqpInboundChannelAdapter.this.retryTemplate == null) {
202205
try {
203-
processMessage(message, channel);
206+
createAndSend(message, channel);
204207
}
205208
finally {
206209
attributesHolder.remove();
207210
}
208211
}
209212
else {
213+
final org.springframework.messaging.Message<Object> toSend = createMessage(message, channel);
210214
AmqpInboundChannelAdapter.this.retryTemplate.execute(context -> {
211-
processMessage(message, channel);
215+
StaticMessageHeaderAccessor.getDeliveryAttempt(toSend).incrementAndGet();
216+
setAttributesIfNecessary(message, toSend);
217+
sendMessage(toSend);
212218
return null;
213219
},
214220
(RecoveryCallback<Object>) AmqpInboundChannelAdapter.this.recoveryCallback);
@@ -225,7 +231,13 @@ public void onMessage(final Message message, final Channel channel) throws Excep
225231
}
226232
}
227233

228-
private void processMessage(Message message, Channel channel) {
234+
private void createAndSend(Message message, Channel channel) {
235+
org.springframework.messaging.Message<Object> messagingMessage = createMessage(message, channel);
236+
setAttributesIfNecessary(message, messagingMessage);
237+
sendMessage(messagingMessage);
238+
}
239+
240+
private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
229241
Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
230242
Map<String, Object> headers = AmqpInboundChannelAdapter.this.headerMapper
231243
.toHeadersFromRequest(message.getMessageProperties());
@@ -234,12 +246,14 @@ private void processMessage(Message message, Channel channel) {
234246
headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
235247
headers.put(AmqpHeaders.CHANNEL, channel);
236248
}
249+
if (AmqpInboundChannelAdapter.this.retryTemplate != null) {
250+
headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
251+
}
237252
final org.springframework.messaging.Message<Object> messagingMessage = getMessageBuilderFactory()
238253
.withPayload(payload)
239254
.copyHeaders(headers)
240255
.build();
241-
setAttributesIfNecessary(message, messagingMessage);
242-
sendMessage(messagingMessage);
256+
return messagingMessage;
243257
}
244258

245259
@Override

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java

Lines changed: 64 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.amqp.inbound;
1818

1919
import java.util.Map;
20+
import java.util.concurrent.atomic.AtomicInteger;
2021

2122
import org.springframework.amqp.core.AcknowledgeMode;
2223
import org.springframework.amqp.core.Address;
@@ -32,12 +33,14 @@
3233
import org.springframework.amqp.support.converter.MessageConverter;
3334
import org.springframework.amqp.support.converter.SimpleMessageConverter;
3435
import org.springframework.core.AttributeAccessor;
36+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3537
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
3638
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
3739
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
3840
import org.springframework.integration.gateway.MessagingGatewaySupport;
3941
import org.springframework.integration.support.ErrorMessageStrategy;
4042
import org.springframework.integration.support.ErrorMessageUtils;
43+
import org.springframework.integration.support.StaticMessageHeaderAccessor;
4144
import org.springframework.retry.RecoveryCallback;
4245
import org.springframework.retry.RetryCallback;
4346
import org.springframework.retry.RetryContext;
@@ -255,23 +258,29 @@ protected class Listener implements ChannelAwareMessageListener, RetryListener {
255258
public void onMessage(final Message message, final Channel channel) throws Exception {
256259
if (AmqpInboundGateway.this.retryTemplate == null) {
257260
try {
258-
doOnMessage(message, channel);
261+
org.springframework.messaging.Message<Object> converted = convert(message, channel);
262+
if (converted != null) {
263+
process(message, converted);
264+
}
259265
}
260266
finally {
261267
attributesHolder.remove();
262268
}
263269
}
264270
else {
265-
AmqpInboundGateway.this.retryTemplate.execute(context -> {
266-
doOnMessage(message, channel);
271+
org.springframework.messaging.Message<Object> converted = convert(message, channel);
272+
if (converted != null) {
273+
AmqpInboundGateway.this.retryTemplate.execute(context -> {
274+
StaticMessageHeaderAccessor.getDeliveryAttempt(converted).incrementAndGet();
275+
process(message, converted);
267276
return null;
268277
},
269278
(RecoveryCallback<Object>) AmqpInboundGateway.this.recoveryCallback);
279+
}
270280
}
271281
}
272282

273-
private void doOnMessage(Message message, Channel channel) {
274-
boolean error = false;
283+
private org.springframework.messaging.Message<Object> convert(Message message, Channel channel) {
275284
Map<String, Object> headers = null;
276285
Object payload = null;
277286
try {
@@ -281,6 +290,9 @@ private void doOnMessage(Message message, Channel channel) {
281290
headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
282291
headers.put(AmqpHeaders.CHANNEL, channel);
283292
}
293+
if (AmqpInboundGateway.this.retryTemplate != null) {
294+
headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
295+
}
284296
}
285297
catch (RuntimeException e) {
286298
if (getErrorChannel() != null) {
@@ -290,60 +302,60 @@ private void doOnMessage(Message message, Channel channel) {
290302
else {
291303
throw e;
292304
}
293-
error = true;
305+
return null;
294306
}
295-
296-
if (!error) {
297-
org.springframework.messaging.Message<Object> messagingMessage = getMessageBuilderFactory()
307+
return getMessageBuilderFactory()
298308
.withPayload(payload)
299309
.copyHeaders(headers)
300310
.build();
301-
setAttributesIfNecessary(message, messagingMessage);
302-
final org.springframework.messaging.Message<?> reply = sendAndReceiveMessage(messagingMessage);
303-
if (reply != null) {
304-
Address replyTo;
305-
String replyToProperty = message.getMessageProperties().getReplyTo();
306-
if (replyToProperty != null) {
307-
replyTo = new Address(replyToProperty);
308-
}
309-
else {
310-
replyTo = AmqpInboundGateway.this.defaultReplyTo;
311-
}
311+
}
312312

313-
MessagePostProcessor messagePostProcessor =
314-
message1 -> {
315-
MessageProperties messageProperties = message1.getMessageProperties();
316-
String contentEncoding = messageProperties.getContentEncoding();
317-
long contentLength = messageProperties.getContentLength();
318-
String contentType = messageProperties.getContentType();
319-
AmqpInboundGateway.this.headerMapper.fromHeadersToReply(reply.getHeaders(),
320-
messageProperties);
321-
// clear the replyTo from the original message since we are using it now
322-
messageProperties.setReplyTo(null);
323-
// reset the content-* properties as determined by the MessageConverter
324-
if (StringUtils.hasText(contentEncoding)) {
325-
messageProperties.setContentEncoding(contentEncoding);
326-
}
327-
messageProperties.setContentLength(contentLength);
328-
if (contentType != null) {
329-
messageProperties.setContentType(contentType);
330-
}
331-
return message1;
332-
};
333-
334-
if (replyTo != null) {
335-
AmqpInboundGateway.this.amqpTemplate.convertAndSend(replyTo.getExchangeName(),
336-
replyTo.getRoutingKey(), reply.getPayload(), messagePostProcessor);
313+
private void process(Message message, org.springframework.messaging.Message<Object> messagingMessage) {
314+
setAttributesIfNecessary(message, messagingMessage);
315+
final org.springframework.messaging.Message<?> reply = sendAndReceiveMessage(messagingMessage);
316+
if (reply != null) {
317+
Address replyTo;
318+
String replyToProperty = message.getMessageProperties().getReplyTo();
319+
if (replyToProperty != null) {
320+
replyTo = new Address(replyToProperty);
321+
}
322+
else {
323+
replyTo = AmqpInboundGateway.this.defaultReplyTo;
324+
}
325+
326+
MessagePostProcessor messagePostProcessor =
327+
message1 -> {
328+
MessageProperties messageProperties = message1.getMessageProperties();
329+
String contentEncoding = messageProperties.getContentEncoding();
330+
long contentLength = messageProperties.getContentLength();
331+
String contentType = messageProperties.getContentType();
332+
AmqpInboundGateway.this.headerMapper.fromHeadersToReply(reply.getHeaders(),
333+
messageProperties);
334+
// clear the replyTo from the original message since we are using it now
335+
messageProperties.setReplyTo(null);
336+
// reset the content-* properties as determined by the MessageConverter
337+
if (StringUtils.hasText(contentEncoding)) {
338+
messageProperties.setContentEncoding(contentEncoding);
339+
}
340+
messageProperties.setContentLength(contentLength);
341+
if (contentType != null) {
342+
messageProperties.setContentType(contentType);
343+
}
344+
return message1;
345+
};
346+
347+
if (replyTo != null) {
348+
AmqpInboundGateway.this.amqpTemplate.convertAndSend(replyTo.getExchangeName(),
349+
replyTo.getRoutingKey(), reply.getPayload(), messagePostProcessor);
350+
}
351+
else {
352+
if (!AmqpInboundGateway.this.amqpTemplateExplicitlySet) {
353+
throw new IllegalStateException("There is no 'replyTo' message property " +
354+
"and the `defaultReplyTo` hasn't been configured.");
337355
}
338356
else {
339-
if (!AmqpInboundGateway.this.amqpTemplateExplicitlySet) {
340-
throw new IllegalStateException("There is no 'replyTo' message property " +
341-
"and the `defaultReplyTo` hasn't been configured.");
342-
}
343-
else {
344-
AmqpInboundGateway.this.amqpTemplate.convertAndSend(reply.getPayload(),
345-
messagePostProcessor);
346-
}
357+
AmqpInboundGateway.this.amqpTemplate.convertAndSend(reply.getPayload(),
358+
messagePostProcessor);
347359
}
348360
}
349361
}

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.hamcrest.CoreMatchers.containsString;
2020
import static org.hamcrest.CoreMatchers.instanceOf;
2121
import static org.hamcrest.CoreMatchers.notNullValue;
22+
import static org.hamcrest.Matchers.equalTo;
2223
import static org.junit.Assert.assertEquals;
2324
import static org.junit.Assert.assertFalse;
2425
import static org.junit.Assert.assertNotEquals;
@@ -66,6 +67,7 @@
6667
import org.springframework.integration.json.ObjectToJsonTransformer;
6768
import org.springframework.integration.mapping.support.JsonHeaders;
6869
import org.springframework.integration.support.MessageBuilder;
70+
import org.springframework.integration.support.StaticMessageHeaderAccessor;
6971
import org.springframework.integration.transformer.MessageTransformingHandler;
7072
import org.springframework.integration.transformer.Transformer;
7173
import org.springframework.messaging.Message;
@@ -307,9 +309,11 @@ public void testRetryWithinOnMessageAdapter() throws Exception {
307309
Message<?> errorMessage = errors.receive(0);
308310
assertNotNull(errorMessage);
309311
assertThat(errorMessage.getPayload(), instanceOf(MessagingException.class));
310-
assertThat(((MessagingException) errorMessage.getPayload()).getMessage(), containsString("Dispatcher has no"));
312+
MessagingException payload = (MessagingException) errorMessage.getPayload();
313+
assertThat(payload.getMessage(), containsString("Dispatcher has no"));
314+
assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(payload.getFailedMessage()).get(), equalTo(3));
311315
org.springframework.amqp.core.Message amqpMessage = errorMessage.getHeaders()
312-
.get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE, org.springframework.amqp.core.Message.class);
316+
.get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE, org.springframework.amqp.core.Message.class);
313317
assertThat(amqpMessage, notNullValue());
314318
assertNull(errors.receive(0));
315319
}
@@ -332,9 +336,11 @@ public void testRetryWithinOnMessageGateway() throws Exception {
332336
Message<?> errorMessage = errors.receive(0);
333337
assertNotNull(errorMessage);
334338
assertThat(errorMessage.getPayload(), instanceOf(MessagingException.class));
335-
assertThat(((MessagingException) errorMessage.getPayload()).getMessage(), containsString("Dispatcher has no"));
339+
MessagingException payload = (MessagingException) errorMessage.getPayload();
340+
assertThat(payload.getMessage(), containsString("Dispatcher has no"));
341+
assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(payload.getFailedMessage()).get(), equalTo(3));
336342
org.springframework.amqp.core.Message amqpMessage = errorMessage.getHeaders()
337-
.get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE, org.springframework.amqp.core.Message.class);
343+
.get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE, org.springframework.amqp.core.Message.class);
338344
assertThat(amqpMessage, notNullValue());
339345
assertNull(errors.receive(0));
340346
}

spring-integration-core/src/main/java/org/springframework/integration/IntegrationMessageHeaderAccessor.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import java.util.HashSet;
2323
import java.util.Map;
2424
import java.util.Set;
25+
import java.util.concurrent.atomic.AtomicInteger;
2526

27+
import org.springframework.lang.Nullable;
2628
import org.springframework.messaging.Message;
2729
import org.springframework.messaging.MessageHeaders;
2830
import org.springframework.messaging.support.MessageHeaderAccessor;
@@ -35,7 +37,7 @@
3537
*
3638
* @author Andy Wilkinson
3739
* @author Artem Bilan
38-
* @author Gary Russel
40+
* @author Gary Russell
3941
*
4042
* @since 4.0
4143
*
@@ -60,6 +62,8 @@ public class IntegrationMessageHeaderAccessor extends MessageHeaderAccessor {
6062

6163
public static final String CLOSEABLE_RESOURCE = "closeableResource";
6264

65+
public static final String DELIVERY_ATTEMPT = "deliveryAttempt";
66+
6367
private Set<String> readOnlyHeaders = new HashSet<String>();
6468

6569
public IntegrationMessageHeaderAccessor(Message<?> message) {
@@ -81,10 +85,12 @@ public void setReadOnlyHeaders(String... readOnlyHeaders) {
8185
}
8286
}
8387

88+
@Nullable
8489
public Long getExpirationDate() {
8590
return this.getHeader(EXPIRATION_DATE, Long.class);
8691
}
8792

93+
@Nullable
8894
public Object getCorrelationId() {
8995
return this.getHeader(CORRELATION_ID);
9096
}
@@ -99,6 +105,7 @@ public int getSequenceSize() {
99105
return (sequenceSize != null ? sequenceSize.intValue() : 0);
100106
}
101107

108+
@Nullable
102109
public Integer getPriority() {
103110
Number priority = this.getHeader(PRIORITY, Number.class);
104111
return (priority != null ? priority.intValue() : null);
@@ -113,11 +120,24 @@ public Integer getPriority() {
113120
* @return the {@link Closeable}.
114121
* @since 4.3
115122
*/
123+
@Nullable
116124
public Closeable getCloseableResource() {
117125
return this.getHeader(CLOSEABLE_RESOURCE, Closeable.class);
118126
}
119127

128+
/**
129+
* When a message-driven enpoint supports retry implicitly, this
130+
* header is incremented for each delivery attempt.
131+
* @return the delivery attempt.
132+
* @since 5.0.1
133+
*/
134+
@Nullable
135+
public AtomicInteger getDeliveryAttempt() {
136+
return this.getHeader(DELIVERY_ATTEMPT, AtomicInteger.class);
137+
}
138+
120139
@SuppressWarnings("unchecked")
140+
@Nullable
121141
public <T> T getHeader(String key, Class<T> type) {
122142
Object value = getHeader(key);
123143
if (value == null) {

0 commit comments

Comments
 (0)