Skip to content

Commit 88120a6

Browse files
garyrussellartembilan
authored andcommitted
INT-4482: AMQP: Fix Double ErrorMessage
JIRA: https://jira.spring.io/browse/INT-4482 The outer try/catch sends an `ErrorMessage` for all exceptions; it should only do so for `MessageConversionException`. Integration flow exceptions will have been already handled by `MessageProducerSupport`. Also, populate the raw message header consistently - previously it only was populated for flow exceptions. Although the LEFE contains the raw message, it should be in the `ErrorMessage` header for consistency. **cherry-pick to 5.0.x, 4.3.x** * Polishing - PR Comments
1 parent bb5528c commit 88120a6

File tree

4 files changed

+164
-20
lines changed

4 files changed

+164
-20
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
2626
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
2727
import org.springframework.amqp.support.AmqpHeaders;
28+
import org.springframework.amqp.support.converter.MessageConversionException;
2829
import org.springframework.amqp.support.converter.MessageConverter;
2930
import org.springframework.amqp.support.converter.SimpleMessageConverter;
3031
import org.springframework.core.AttributeAccessor;
@@ -200,14 +201,10 @@ protected class Listener implements ChannelAwareMessageListener, RetryListener {
200201
@SuppressWarnings("unchecked")
201202
@Override
202203
public void onMessage(final Message message, final Channel channel) throws Exception {
204+
boolean retryDisabled = AmqpInboundChannelAdapter.this.retryTemplate == null;
203205
try {
204-
if (AmqpInboundChannelAdapter.this.retryTemplate == null) {
205-
try {
206-
createAndSend(message, channel);
207-
}
208-
finally {
209-
attributesHolder.remove();
210-
}
206+
if (retryDisabled) {
207+
createAndSend(message, channel);
211208
}
212209
else {
213210
final org.springframework.messaging.Message<Object> toSend = createMessage(message, channel);
@@ -220,15 +217,21 @@ public void onMessage(final Message message, final Channel channel) throws Excep
220217
(RecoveryCallback<Object>) AmqpInboundChannelAdapter.this.recoveryCallback);
221218
}
222219
}
223-
catch (RuntimeException e) {
220+
catch (MessageConversionException e) {
224221
if (getErrorChannel() != null) {
222+
setAttributesIfNecessary(message, null);
225223
getMessagingTemplate().send(getErrorChannel(), buildErrorMessage(null,
226224
new ListenerExecutionFailedException("Message conversion failed", e, message)));
227225
}
228226
else {
229227
throw e;
230228
}
231229
}
230+
finally {
231+
if (retryDisabled) {
232+
attributesHolder.remove();
233+
}
234+
}
232235
}
233236

234237
private void createAndSend(Message message, Channel channel) {

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java

Lines changed: 123 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2017 the original author or authors.
2+
* Copyright 2014-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,11 +16,16 @@
1616

1717
package org.springframework.integration.amqp.dsl;
1818

19+
import static org.hamcrest.Matchers.instanceOf;
1920
import static org.junit.Assert.assertEquals;
2021
import static org.junit.Assert.assertNotNull;
22+
import static org.junit.Assert.assertNull;
2123
import static org.junit.Assert.assertSame;
24+
import static org.junit.Assert.assertThat;
2225
import static org.junit.Assert.assertTrue;
2326

27+
import java.util.concurrent.atomic.AtomicReference;
28+
2429
import org.junit.jupiter.api.AfterAll;
2530
import org.junit.jupiter.api.Test;
2631

@@ -37,21 +42,27 @@
3742
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
3843
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
3944
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
45+
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
46+
import org.springframework.amqp.support.converter.MessageConversionException;
47+
import org.springframework.amqp.support.converter.SimpleMessageConverter;
4048
import org.springframework.beans.factory.annotation.Autowired;
4149
import org.springframework.beans.factory.annotation.Qualifier;
50+
import org.springframework.context.ConfigurableApplicationContext;
4251
import org.springframework.context.Lifecycle;
4352
import org.springframework.context.annotation.Bean;
4453
import org.springframework.context.annotation.Configuration;
4554
import org.springframework.integration.amqp.channel.AbstractAmqpChannel;
4655
import org.springframework.integration.amqp.inbound.AmqpInboundGateway;
4756
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
57+
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
4858
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
4959
import org.springframework.integration.channel.QueueChannel;
5060
import org.springframework.integration.config.EnableIntegration;
5161
import org.springframework.integration.dsl.IntegrationFlow;
5262
import org.springframework.integration.dsl.IntegrationFlowBuilder;
5363
import org.springframework.integration.dsl.IntegrationFlows;
5464
import org.springframework.integration.support.MessageBuilder;
65+
import org.springframework.integration.support.StringObjectMapBuilder;
5566
import org.springframework.integration.test.util.TestUtils;
5667
import org.springframework.messaging.Message;
5768
import org.springframework.messaging.MessageChannel;
@@ -67,7 +78,8 @@
6778
*/
6879
@SpringJUnitConfig
6980
@RabbitAvailable(queues = { "amqpOutboundInput", "amqpReplyChannel", "asyncReplies",
70-
"defaultReplyTo", "si.dsl.test", "testTemplateChannelTransacted" })
81+
"defaultReplyTo", "si.dsl.test", "si.dsl.exception.test.dlq",
82+
"si.dsl.conv.exception.test.dlq", "testTemplateChannelTransacted" })
7183
@DirtiesContext
7284
public class AmqpTests {
7385

@@ -92,8 +104,10 @@ public class AmqpTests {
92104
private Lifecycle asyncOutboundGateway;
93105

94106
@AfterAll
95-
public static void tearDown() {
96-
RabbitAvailableCondition.getBrokerRunning().removeTestQueues();
107+
public static void tearDown(ConfigurableApplicationContext context) {
108+
context.stop(); // prevent queues from being redeclared after deletion
109+
RabbitAvailableCondition.getBrokerRunning().removeTestQueues("si.dsl.exception.test",
110+
"si.dsl.conv.exception.test");
97111
}
98112

99113
@Test
@@ -173,6 +187,33 @@ public void testAmqpAsyncOutboundGatewayFlow() throws Exception {
173187
this.asyncOutboundGateway.stop();
174188
}
175189

190+
@Autowired
191+
private AtomicReference<ListenerExecutionFailedException> lefe;
192+
193+
@Autowired
194+
private AtomicReference<?> raw;
195+
196+
197+
@Test
198+
public void testInboundMessagingExceptionFlow() {
199+
this.amqpTemplate.convertAndSend("si.dsl.exception.test", "foo");
200+
assertNotNull(this.amqpTemplate.receive("si.dsl.exception.test.dlq", 30_000));
201+
assertNull(this.lefe.get());
202+
assertNotNull(this.raw.get());
203+
this.raw.set(null);
204+
}
205+
206+
@Test
207+
public void testInboundConversionExceptionFlow() {
208+
this.amqpTemplate.convertAndSend("si.dsl.conv.exception.test", "foo");
209+
assertNotNull(this.amqpTemplate.receive("si.dsl.conv.exception.test.dlq", 30_000));
210+
assertNotNull(this.lefe.get());
211+
assertThat(this.lefe.get().getCause(), instanceOf(MessageConversionException.class));
212+
assertNotNull(this.raw.get());
213+
this.raw.set(null);
214+
this.lefe.set(null);
215+
}
216+
176217
@Autowired
177218
private AbstractAmqpChannel unitChannel;
178219

@@ -277,6 +318,84 @@ public IntegrationFlow amqpInboundFlow(ConnectionFactory rabbitConnectionFactory
277318
.get();
278319
}
279320

321+
@Bean
322+
public AtomicReference<ListenerExecutionFailedException> lefe() {
323+
return new AtomicReference<>();
324+
}
325+
326+
@Bean
327+
public AtomicReference<org.springframework.amqp.core.Message> raw() {
328+
return new AtomicReference<>();
329+
}
330+
331+
@Bean
332+
public Queue exQueue() {
333+
return new Queue("si.dsl.exception.test", true, false, false,
334+
new StringObjectMapBuilder()
335+
.put("x-dead-letter-exchange", "")
336+
.put("x-dead-letter-routing-key", exDLQ().getName())
337+
.get());
338+
}
339+
340+
@Bean
341+
public Queue exDLQ() {
342+
return new Queue("si.dsl.exception.test.dlq");
343+
}
344+
345+
@Bean
346+
public IntegrationFlow inboundWithExceptionFlow(ConnectionFactory cf) {
347+
return IntegrationFlows.from(Amqp.inboundAdapter(cf, exQueue())
348+
.configureContainer(c -> c.defaultRequeueRejected(false))
349+
.errorChannel("errors.input"))
350+
.handle(m -> {
351+
throw new RuntimeException("fail");
352+
})
353+
.get();
354+
}
355+
356+
@Bean
357+
public IntegrationFlow errors() {
358+
return f -> f.handle(m -> {
359+
raw().set(m.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE,
360+
org.springframework.amqp.core.Message.class));
361+
if (m.getPayload() instanceof ListenerExecutionFailedException) {
362+
lefe().set((ListenerExecutionFailedException) m.getPayload());
363+
}
364+
throw (RuntimeException) m.getPayload();
365+
});
366+
}
367+
368+
@Bean
369+
public Queue exConvQueue() {
370+
return new Queue("si.dsl.conv.exception.test", true, false, false,
371+
new StringObjectMapBuilder()
372+
.put("x-dead-letter-exchange", "")
373+
.put("x-dead-letter-routing-key", exConvDLQ().getName())
374+
.get());
375+
}
376+
377+
@Bean
378+
public Queue exConvDLQ() {
379+
return new Queue("si.dsl.conv.exception.test.dlq");
380+
}
381+
382+
@Bean
383+
public IntegrationFlow inboundWithConvExceptionFlow(ConnectionFactory cf) {
384+
return IntegrationFlows.from(Amqp.inboundAdapter(cf, exConvQueue())
385+
.configureContainer(c -> c.defaultRequeueRejected(false))
386+
.messageConverter(new SimpleMessageConverter() {
387+
388+
@Override
389+
public Object fromMessage(org.springframework.amqp.core.Message message)
390+
throws MessageConversionException {
391+
throw new MessageConversionException("fail");
392+
}
393+
394+
})
395+
.errorChannel("errors.input"))
396+
.get();
397+
}
398+
280399
@Bean
281400
public Queue asyncReplies() {
282401
return new Queue("asyncReplies");

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -239,17 +239,11 @@ public void testAdapterConversionError() throws Exception {
239239
adapter.setOutputChannel(outputChannel);
240240
QueueChannel errorChannel = new QueueChannel();
241241
adapter.setErrorChannel(errorChannel);
242-
adapter.setMessageConverter(new MessageConverter() {
243-
244-
@Override
245-
public org.springframework.amqp.core.Message toMessage(Object object, MessageProperties messageProperties)
246-
throws MessageConversionException {
247-
throw new MessageConversionException("intended");
248-
}
242+
adapter.setMessageConverter(new SimpleMessageConverter() {
249243

250244
@Override
251245
public Object fromMessage(org.springframework.amqp.core.Message message) throws MessageConversionException {
252-
return null;
246+
throw new MessageConversionException("intended");
253247
}
254248

255249
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.support;
18+
19+
/**
20+
* A map builder creating a map with String keys and values.
21+
*
22+
* @author Gary Russell
23+
*
24+
* @since 5.0.6
25+
*/
26+
public class StringObjectMapBuilder extends MapBuilder<StringObjectMapBuilder, String, Object> {
27+
28+
}

0 commit comments

Comments
 (0)