Skip to content

Commit c98d505

Browse files
garyrussellartembilan
authored andcommitted
INT-4371: Polishing - raw message header; docs
JIRA: https://jira.spring.io/browse/INT-4371 * Polishing - DSL
1 parent 30450c4 commit c98d505

File tree

4 files changed

+67
-3
lines changed

4 files changed

+67
-3
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundPolledChannelAdapterSpec.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,9 @@ public AmqpInboundPolledChannelAdapterSpec messageConverter(MessageConverter mes
6565
return this;
6666
}
6767

68+
public AmqpInboundPolledChannelAdapterSpec rawMessageHeader(boolean rawMessageHeader) {
69+
this.target.setRawMessageHeader(rawMessageHeader);
70+
return this;
71+
}
72+
6873
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpMessageSource.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.springframework.context.MessageSource;
3333
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3434
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
35+
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
3536
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
3637
import org.springframework.integration.endpoint.AbstractMessageSource;
3738
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
@@ -66,6 +67,8 @@ public class AmqpMessageSource extends AbstractMessageSource<Object> {
6667

6768
private MessageConverter messageConverter = new SimpleMessageConverter();
6869

70+
private boolean rawMessageHeader;
71+
6972
public AmqpMessageSource(ConnectionFactory connectionFactory, String queue) {
7073
this(connectionFactory, new AmqpAckCallbackFactory(), queue);
7174
}
@@ -132,6 +135,20 @@ public void setMessageConverter(MessageConverter messageConverter) {
132135
this.messageConverter = messageConverter;
133136
}
134137

138+
protected boolean isRawMessageHeader() {
139+
return this.rawMessageHeader;
140+
}
141+
142+
/**
143+
* Set to true to include the raw spring-amqp message as a header
144+
* with key {@link AmqpMessageHeaderErrorMessageStrategy#AMQP_RAW_MESSAGE},
145+
* enabling callers to have access to the message to process errors.
146+
* @param rawMessageHeader true to include the header.
147+
*/
148+
public void setRawMessageHeader(boolean rawMessageHeader) {
149+
this.rawMessageHeader = rawMessageHeader;
150+
}
151+
135152
@Override
136153
public String getComponentType() {
137154
return "amqp:message-source";
@@ -153,11 +170,15 @@ protected AbstractIntegrationMessageBuilder<Object> doReceive() {
153170
MessageProperties messageProperties = this.propertiesConverter.toMessageProperties(resp.getProps(),
154171
resp.getEnvelope(), StandardCharsets.UTF_8.name());
155172
Map<String, Object> headers = this.headerMapper.toHeadersFromRequest(messageProperties);
156-
Object payload = this.messageConverter
157-
.fromMessage(new org.springframework.amqp.core.Message(resp.getBody(), messageProperties));
158-
return getMessageBuilderFactory().withPayload(payload)
173+
org.springframework.amqp.core.Message amqpMessage = new org.springframework.amqp.core.Message(resp.getBody(), messageProperties);
174+
Object payload = this.messageConverter.fromMessage(amqpMessage);
175+
AbstractIntegrationMessageBuilder<Object> builder = getMessageBuilderFactory().withPayload(payload)
159176
.copyHeaders(headers)
160177
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, callback);
178+
if (this.rawMessageHeader) {
179+
builder.setHeader(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE, amqpMessage);
180+
}
181+
return builder;
161182
}
162183
catch (IOException e) {
163184
RabbitUtils.closeChannel(channel);

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpMessageSourceTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.amqp.inbound;
1818

19+
import static org.hamcrest.Matchers.instanceOf;
20+
import static org.junit.Assert.assertThat;
1921
import static org.mockito.ArgumentMatchers.anyString;
2022
import static org.mockito.ArgumentMatchers.isNull;
2123
import static org.mockito.BDDMockito.willReturn;
@@ -30,6 +32,7 @@
3032
import org.junit.Test;
3133

3234
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
35+
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
3336
import org.springframework.integration.support.AcknowledgmentCallback.Status;
3437
import org.springframework.integration.support.StaticMessageHeaderAccessor;
3538
import org.springframework.messaging.Message;
@@ -65,7 +68,10 @@ public void testAck() throws Exception {
6568

6669
CachingConnectionFactory ccf = new CachingConnectionFactory(connectionFactory);
6770
AmqpMessageSource source = new AmqpMessageSource(ccf, "foo");
71+
source.setRawMessageHeader(true);
6872
Message<?> received = source.receive();
73+
assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE),
74+
instanceOf(org.springframework.amqp.core.Message.class));
6975
// make sure channel is not cached
7076
org.springframework.amqp.rabbit.connection.Connection conn = ccf.createConnection();
7177
Channel notCached = conn.createChannel(false); // should not have been "closed"

src/reference/asciidoc/amqp.adoc

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,38 @@ public class AmqpJavaApplication {
301301
}
302302
----
303303

304+
=== Polled Inbound Channel Adapter
305+
306+
Starting with _version 5.0.1_, a polled channel adapter is provided, allowing fetching individual messages on-demand, for example with a `MessageSourcePollingTemplate` or a poller.
307+
See <<deferred-acks-message-source>> for more information.
308+
309+
It does not currently have XML configuration.
310+
311+
[source, java]
312+
----
313+
@Bean
314+
public AmqpMessageSource source(ConnectionFactory connectionFactory) {
315+
return new AmpqpMessageSource(connectionFactory, "someQueue");
316+
}
317+
----
318+
319+
Refer to the javadocs for configuration properties.
320+
321+
With the Java DSL:
322+
323+
[source, java]
324+
----
325+
@Bean
326+
public IntegrationFlow flow() {
327+
return IntegrationFlows.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE),
328+
e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false))
329+
.handle(p -> {
330+
...
331+
})
332+
.get();
333+
}
334+
----
335+
304336
[[amqp-inbound-gateway]]
305337
=== Inbound Gateway
306338

0 commit comments

Comments
 (0)