Skip to content

Commit 50452f8

Browse files
committed
spring-projectsGH-2478 Handle conversion exception in AsyncRabbitTemplate
Previously, conversion errors in AsyncRabbitTemplate lead to AmqpReplyTimeoutException
1 parent d7058bb commit 50452f8

File tree

2 files changed

+39
-5
lines changed

2 files changed

+39
-5
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,6 +50,7 @@
5050
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
5151
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
5252
import org.springframework.amqp.support.converter.MessageConverter;
53+
import org.springframework.amqp.support.converter.MessageConversionException;
5354
import org.springframework.amqp.support.converter.SmartMessageConverter;
5455
import org.springframework.amqp.utils.JavaUtils;
5556
import org.springframework.beans.factory.BeanNameAware;
@@ -603,12 +604,16 @@ public void onMessage(Message message, Channel channel) {
603604
if (future instanceof RabbitConverterFuture) {
604605
MessageConverter messageConverter = this.template.getMessageConverter();
605606
RabbitConverterFuture<Object> rabbitFuture = (RabbitConverterFuture<Object>) future;
606-
Object converted = rabbitFuture.getReturnType() != null
607+
try {
608+
Object converted = rabbitFuture.getReturnType() != null
607609
&& messageConverter instanceof SmartMessageConverter smart
608610
? smart.fromMessage(message,
609611
rabbitFuture.getReturnType())
610612
: messageConverter.fromMessage(message);
611-
rabbitFuture.complete(converted);
613+
rabbitFuture.complete(converted);
614+
} catch (MessageConversionException e) {
615+
rabbitFuture.completeExceptionally(e);
616+
}
612617
}
613618
else {
614619
((RabbitMessageFuture) future).complete(message);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
5555
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
5656
import org.springframework.amqp.rabbit.listener.adapter.ReplyingMessageListener;
57+
import org.springframework.amqp.support.converter.MessageConversionException;
5758
import org.springframework.amqp.support.converter.SimpleMessageConverter;
5859
import org.springframework.amqp.support.postprocessor.GUnzipPostProcessor;
5960
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
@@ -301,7 +302,7 @@ public void testMessageWithConfirmDirect() throws Exception {
301302
assertThat(confirm.get(10, TimeUnit.SECONDS)).isTrue();
302303
checkMessageResult(future, "SLEEP");
303304
}
304-
305+
305306
@SuppressWarnings("unchecked")
306307
@Test
307308
@DirtiesContext
@@ -386,6 +387,34 @@ public void testStopCancelled() throws Exception {
386387
assertThat(callback.result).isNull();
387388
}
388389

390+
@Test
391+
public void testConversionException() throws InterruptedException {
392+
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
393+
connectionFactory.setChannelCacheSize(1);
394+
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
395+
rabbitTemplate.setMessageConverter(new SimpleMessageConverter(){
396+
@Override
397+
public Object fromMessage(Message message) throws MessageConversionException {
398+
throw new MessageConversionException("Failed to convert message");
399+
}
400+
});
401+
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
402+
asyncRabbitTemplate.start();
403+
404+
RabbitConverterFuture<String> replyFuture = asyncRabbitTemplate.convertSendAndReceive("conversionException");
405+
406+
CountDownLatch cdl = new CountDownLatch(1);
407+
replyFuture.whenComplete((result, ex) -> {
408+
cdl.countDown();
409+
});
410+
assertThat(cdl.await(10, TimeUnit.SECONDS)).isTrue();
411+
assertThat(replyFuture).isCompletedExceptionally();
412+
413+
asyncRabbitTemplate.stop();
414+
connectionFactory.destroy();
415+
}
416+
417+
389418
@Test
390419
void ctorCoverage() {
391420
AsyncRabbitTemplate template = new AsyncRabbitTemplate(mock(ConnectionFactory.class), "ex", "rk");
@@ -603,7 +632,7 @@ public SimpleMessageListenerContainer remoteContainer(ConnectionFactory connecti
603632
}
604633
else if ("noReply".equals(message)) {
605634
return null;
606-
}
635+
}
607636
return message.toUpperCase();
608637
});
609638

0 commit comments

Comments
 (0)