Skip to content

Commit 519face

Browse files
garyrussellartembilan
authored andcommitted
GH-1439: Fix Memory Leak with Misconfiguration
Resolves #1439 Do not store pending confirms/returns if `RabbitTemplate` has confirms enabled but the factory does not support confirms. * Test polishing; include a returned message. **cherry-pick to `2.4.x` & `2.3.x`**
1 parent 6ba9596 commit 519face

File tree

2 files changed

+47
-11
lines changed

2 files changed

+47
-11
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2437,17 +2437,22 @@ protected void sendToRabbit(Channel channel, String exchange, String routingKey,
24372437
private void setupConfirm(Channel channel, Message message, @Nullable CorrelationData correlationDataArg) {
24382438
if ((this.publisherConfirms || this.confirmCallback != null) && channel instanceof PublisherCallbackChannel) {
24392439

2440-
PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;
2441-
CorrelationData correlationData = this.correlationDataPostProcessor != null
2442-
? this.correlationDataPostProcessor.postProcess(message, correlationDataArg)
2443-
: correlationDataArg;
24442440
long nextPublishSeqNo = channel.getNextPublishSeqNo();
2445-
message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
2446-
publisherCallbackChannel.addPendingConfirm(this, nextPublishSeqNo,
2447-
new PendingConfirm(correlationData, System.currentTimeMillis()));
2448-
if (correlationData != null && StringUtils.hasText(correlationData.getId())) {
2449-
message.getMessageProperties().setHeader(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY,
2450-
correlationData.getId());
2441+
if (nextPublishSeqNo > 0) {
2442+
PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;
2443+
CorrelationData correlationData = this.correlationDataPostProcessor != null
2444+
? this.correlationDataPostProcessor.postProcess(message, correlationDataArg)
2445+
: correlationDataArg;
2446+
message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
2447+
publisherCallbackChannel.addPendingConfirm(this, nextPublishSeqNo,
2448+
new PendingConfirm(correlationData, System.currentTimeMillis()));
2449+
if (correlationData != null && StringUtils.hasText(correlationData.getId())) {
2450+
message.getMessageProperties().setHeader(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY,
2451+
correlationData.getId());
2452+
}
2453+
}
2454+
else {
2455+
logger.debug("Factory does not have confirms enabled");
24512456
}
24522457
}
24532458
else if (channel instanceof ChannelProxy && ((ChannelProxy) channel).isConfirmSelected()) {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegrationTests.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,6 +48,7 @@
4848
import java.util.concurrent.atomic.AtomicReference;
4949

5050
import org.apache.commons.logging.Log;
51+
import org.assertj.core.api.InstanceOfAssertFactories;
5152
import org.junit.jupiter.api.AfterEach;
5253
import org.junit.jupiter.api.BeforeEach;
5354
import org.junit.jupiter.api.Test;
@@ -130,6 +131,7 @@ public void create() {
130131
connectionFactoryWithReturnsEnabled.setPort(BrokerTestUtils.getPort());
131132
connectionFactoryWithReturnsEnabled.setPublisherReturns(true);
132133
templateWithReturnsEnabled = new RabbitTemplate(connectionFactoryWithReturnsEnabled);
134+
templateWithReturnsEnabled.setMandatory(true);
133135
connectionFactoryWithConfirmsAndReturnsEnabled = new CachingConnectionFactory();
134136
connectionFactoryWithConfirmsAndReturnsEnabled.setHost("localhost");
135137
connectionFactoryWithConfirmsAndReturnsEnabled.setChannelCacheSize(100);
@@ -320,6 +322,7 @@ public void testPublisherConfirmNotReceived() throws Exception {
320322
Connection mockConnection = mock(Connection.class);
321323
Channel mockChannel = mock(Channel.class);
322324
given(mockChannel.isOpen()).willReturn(true);
325+
given(mockChannel.getNextPublishSeqNo()).willReturn(1L);
323326

324327
given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection);
325328
given(mockConnection.isOpen()).willReturn(true);
@@ -863,4 +866,32 @@ public void testWithFuture() throws Exception {
863866
admin.deleteQueue(queue.getName());
864867
}
865868

869+
@Test
870+
void justReturns() throws InterruptedException {
871+
CorrelationData correlationData = new CorrelationData();
872+
CountDownLatch latch = new CountDownLatch(1);
873+
this.templateWithReturnsEnabled.setReturnsCallback(returned -> {
874+
latch.countDown();
875+
});
876+
this.templateWithReturnsEnabled.setConfirmCallback((correlationData1, ack, cause) -> {
877+
// has callback but factory is not enabled
878+
});
879+
this.templateWithReturnsEnabled.convertAndSend("", ROUTE, "foo", correlationData);
880+
ChannelProxy channel = (ChannelProxy) this.connectionFactoryWithReturnsEnabled.createConnection()
881+
.createChannel(false);
882+
assertThat(channel.getTargetChannel())
883+
.extracting("pendingReturns")
884+
.asInstanceOf(InstanceOfAssertFactories.MAP)
885+
.isEmpty();
886+
assertThat(channel.getTargetChannel())
887+
.extracting("pendingConfirms")
888+
.asInstanceOf(InstanceOfAssertFactories.MAP)
889+
.extracting(map -> map.values().iterator().next())
890+
.asInstanceOf(InstanceOfAssertFactories.MAP)
891+
.isEmpty();
892+
893+
this.templateWithReturnsEnabled.convertAndSend("", "___JUNK___", "foo", correlationData);
894+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
895+
}
896+
866897
}

0 commit comments

Comments
 (0)