Skip to content

Commit 00b910c

Browse files
garyrussellartembilan
authored andcommitted
INT-4371: More polishing - consumer queue header
JIRA: https://jira.spring.io/browse/INT-4371
1 parent c98d505 commit 00b910c

File tree

2 files changed

+4
-0
lines changed

2 files changed

+4
-0
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpMessageSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ protected AbstractIntegrationMessageBuilder<Object> doReceive() {
169169
.createCallback(new AmqpAckInfo(connection, channel, this.transacted, resp));
170170
MessageProperties messageProperties = this.propertiesConverter.toMessageProperties(resp.getProps(),
171171
resp.getEnvelope(), StandardCharsets.UTF_8.name());
172+
messageProperties.setConsumerQueue(this.queue);
172173
Map<String, Object> headers = this.headerMapper.toHeadersFromRequest(messageProperties);
173174
org.springframework.amqp.core.Message amqpMessage = new org.springframework.amqp.core.Message(resp.getBody(), messageProperties);
174175
Object payload = this.messageConverter.fromMessage(amqpMessage);

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpMessageSourceTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.amqp.inbound;
1818

19+
import static org.hamcrest.Matchers.equalTo;
1920
import static org.hamcrest.Matchers.instanceOf;
2021
import static org.junit.Assert.assertThat;
2122
import static org.mockito.ArgumentMatchers.anyString;
@@ -32,6 +33,7 @@
3233
import org.junit.Test;
3334

3435
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
36+
import org.springframework.amqp.support.AmqpHeaders;
3537
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
3638
import org.springframework.integration.support.AcknowledgmentCallback.Status;
3739
import org.springframework.integration.support.StaticMessageHeaderAccessor;
@@ -72,6 +74,7 @@ public void testAck() throws Exception {
7274
Message<?> received = source.receive();
7375
assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE),
7476
instanceOf(org.springframework.amqp.core.Message.class));
77+
assertThat(received.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE), equalTo("foo"));
7578
// make sure channel is not cached
7679
org.springframework.amqp.rabbit.connection.Connection conn = ccf.createConnection();
7780
Channel notCached = conn.createChannel(false); // should not have been "closed"

0 commit comments

Comments
 (0)