Skip to content

Commit 9f53b80

Browse files
garyrussellartembilan
authored andcommitted
GH-2931: AMQP De-Batching as List<?> Payload
Resolves #2931
1 parent e05b6f7 commit 9f53b80

File tree

7 files changed

+202
-3
lines changed

7 files changed

+202
-3
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616

1717
package org.springframework.integration.amqp.inbound;
1818

19+
import java.util.ArrayList;
20+
import java.util.List;
1921
import java.util.Map;
2022
import java.util.concurrent.atomic.AtomicInteger;
2123

2224
import org.springframework.amqp.core.AcknowledgeMode;
2325
import org.springframework.amqp.core.Message;
26+
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
27+
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
2428
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
2529
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
2630
import org.springframework.amqp.support.AmqpHeaders;
@@ -69,6 +73,8 @@ public class AmqpInboundChannelAdapter extends MessageProducerSupport implements
6973

7074
private RecoveryCallback<? extends Object> recoveryCallback;
7175

76+
private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L);
77+
7278
public AmqpInboundChannelAdapter(AbstractMessageListenerContainer listenerContainer) {
7379
Assert.notNull(listenerContainer, "listenerContainer must not be null");
7480
Assert.isNull(listenerContainer.getMessageListener(),
@@ -115,6 +121,16 @@ public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallb
115121
this.recoveryCallback = recoveryCallback;
116122
}
117123

124+
/**
125+
* Set a batching strategy to use when de-batching messages.
126+
* Default is {@link SimpleBatchingStrategy}.
127+
* @param batchingStrategy the strategy.
128+
* @since 5.2
129+
*/
130+
public void setBatchingStrategy(BatchingStrategy batchingStrategy) {
131+
Assert.notNull(batchingStrategy, "'batchingStrategy' cannot be null");
132+
this.batchingStrategy = batchingStrategy;
133+
}
118134

119135
@Override
120136
public String getComponentType() {
@@ -239,7 +255,16 @@ private void createAndSend(Message message, Channel channel) {
239255
}
240256

241257
private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
242-
Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
258+
Object payload;
259+
if (AmqpInboundChannelAdapter.this.batchingStrategy.canDebatch(message.getMessageProperties())) {
260+
List<Object> payloads = new ArrayList<>();
261+
AmqpInboundChannelAdapter.this.batchingStrategy.deBatch(message, fragment -> payloads
262+
.add(AmqpInboundChannelAdapter.this.messageConverter.fromMessage(fragment)));
263+
payload = payloads;
264+
}
265+
else {
266+
payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
267+
}
243268
Map<String, Object> headers = AmqpInboundChannelAdapter.this.headerMapper
244269
.toHeadersFromRequest(message.getMessageProperties());
245270
if (isManualAck()) {

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.amqp.inbound;
1818

19+
import java.util.ArrayList;
20+
import java.util.List;
1921
import java.util.Map;
2022
import java.util.concurrent.atomic.AtomicInteger;
2123

@@ -25,6 +27,8 @@
2527
import org.springframework.amqp.core.Message;
2628
import org.springframework.amqp.core.MessagePostProcessor;
2729
import org.springframework.amqp.core.MessageProperties;
30+
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
31+
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
2832
import org.springframework.amqp.rabbit.core.RabbitTemplate;
2933
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
3034
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
@@ -81,6 +85,8 @@ public class AmqpInboundGateway extends MessagingGatewaySupport {
8185

8286
private RecoveryCallback<? extends Object> recoveryCallback;
8387

88+
private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L);
89+
8490
public AmqpInboundGateway(AbstractMessageListenerContainer listenerContainer) {
8591
this(listenerContainer, new RabbitTemplate(listenerContainer.getConnectionFactory()), false);
8692
}
@@ -175,6 +181,17 @@ public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallb
175181
this.recoveryCallback = recoveryCallback;
176182
}
177183

184+
/**
185+
* Set a batching strategy to use when de-batching messages.
186+
* Default is {@link SimpleBatchingStrategy}.
187+
* @param batchingStrategy the strategy.
188+
* @since 5.2
189+
*/
190+
public void setBatchingStrategy(BatchingStrategy batchingStrategy) {
191+
Assert.notNull(batchingStrategy, "'batchingStrategy' cannot be null");
192+
this.batchingStrategy = batchingStrategy;
193+
}
194+
178195
@Override
179196
public String getComponentType() {
180197
return "amqp:inbound-gateway";
@@ -286,7 +303,15 @@ private org.springframework.messaging.Message<Object> convert(Message message, C
286303
boolean isManualAck = AmqpInboundGateway.this.messageListenerContainer
287304
.getAcknowledgeMode() == AcknowledgeMode.MANUAL;
288305
try {
289-
payload = AmqpInboundGateway.this.amqpMessageConverter.fromMessage(message);
306+
if (AmqpInboundGateway.this.batchingStrategy.canDebatch(message.getMessageProperties())) {
307+
List<Object> payloads = new ArrayList<>();
308+
AmqpInboundGateway.this.batchingStrategy.deBatch(message, fragment -> payloads
309+
.add(AmqpInboundGateway.this.amqpMessageConverter.fromMessage(fragment)));
310+
payload = payloads;
311+
}
312+
else {
313+
payload = AmqpInboundGateway.this.amqpMessageConverter.fromMessage(message);
314+
}
290315
headers = AmqpInboundGateway.this.headerMapper.toHeadersFromRequest(message.getMessageProperties());
291316
if (isManualAck) {
292317
headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpMessageSource.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818

1919
import java.io.IOException;
2020
import java.nio.charset.StandardCharsets;
21+
import java.util.ArrayList;
22+
import java.util.List;
2123
import java.util.Map;
2224

2325
import org.apache.commons.logging.Log;
2426
import org.apache.commons.logging.LogFactory;
2527

2628
import org.springframework.amqp.core.MessageProperties;
29+
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
30+
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
2731
import org.springframework.amqp.rabbit.connection.Connection;
2832
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
2933
import org.springframework.amqp.rabbit.connection.RabbitUtils;
@@ -71,6 +75,8 @@ public class AmqpMessageSource extends AbstractMessageSource<Object> {
7175

7276
private boolean rawMessageHeader;
7377

78+
private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L);
79+
7480
public AmqpMessageSource(ConnectionFactory connectionFactory, String queue) {
7581
this(connectionFactory, new AmqpAckCallbackFactory(), queue);
7682
}
@@ -151,6 +157,21 @@ public void setRawMessageHeader(boolean rawMessageHeader) {
151157
this.rawMessageHeader = rawMessageHeader;
152158
}
153159

160+
protected BatchingStrategy getBatchingStrategy() {
161+
return this.batchingStrategy;
162+
}
163+
164+
/**
165+
* Set a batching strategy to use when de-batching messages.
166+
* Default is {@link SimpleBatchingStrategy}.
167+
* @param batchingStrategy the strategy.
168+
* @since 5.2
169+
*/
170+
public void setBatchingStrategy(BatchingStrategy batchingStrategy) {
171+
Assert.notNull(batchingStrategy, "'batchingStrategy' cannot be null");
172+
this.batchingStrategy = batchingStrategy;
173+
}
174+
154175
@Override
155176
public String getComponentType() {
156177
return "amqp:message-source";
@@ -174,7 +195,16 @@ protected AbstractIntegrationMessageBuilder<Object> doReceive() {
174195
messageProperties.setConsumerQueue(this.queue);
175196
Map<String, Object> headers = this.headerMapper.toHeadersFromRequest(messageProperties);
176197
org.springframework.amqp.core.Message amqpMessage = new org.springframework.amqp.core.Message(resp.getBody(), messageProperties);
177-
Object payload = this.messageConverter.fromMessage(amqpMessage);
198+
Object payload;
199+
if (this.batchingStrategy.canDebatch(messageProperties)) {
200+
List<Object> payloads = new ArrayList<>();
201+
this.batchingStrategy.deBatch(amqpMessage, fragment -> payloads
202+
.add(this.messageConverter.fromMessage(fragment)));
203+
payload = payloads;
204+
}
205+
else {
206+
payload = this.messageConverter.fromMessage(amqpMessage);
207+
}
178208
AbstractIntegrationMessageBuilder<Object> builder = getMessageBuilderFactory().withPayload(payload)
179209
.copyHeaders(headers)
180210
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, callback);

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpMessageSourceTests.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@
2424
import static org.mockito.Mockito.times;
2525
import static org.mockito.Mockito.verify;
2626

27+
import java.util.List;
2728
import java.util.concurrent.ExecutorService;
2829

2930
import org.junit.Test;
3031

32+
import org.springframework.amqp.core.MessageProperties;
33+
import org.springframework.amqp.rabbit.batch.MessageBatch;
34+
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
3135
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
3236
import org.springframework.amqp.support.AmqpHeaders;
3337
import org.springframework.integration.StaticMessageHeaderAccessor;
@@ -123,4 +127,39 @@ private void testNackOrRequeue(boolean requeue) throws Exception {
123127
verify(connection).close(30000);
124128
}
125129

130+
@SuppressWarnings({ "unchecked" })
131+
@Test
132+
public void testBatch() throws Exception {
133+
SimpleBatchingStrategy bs = new SimpleBatchingStrategy(2, 10_000, 10_000L);
134+
MessageProperties messageProperties = new MessageProperties();
135+
messageProperties.setContentType("text/plain");
136+
org.springframework.amqp.core.Message message =
137+
new org.springframework.amqp.core.Message("test1".getBytes(), messageProperties);
138+
bs.addToBatch("foo", "bar", message);
139+
message = new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties);
140+
MessageBatch batched = bs.addToBatch("foo", "bar", message);
141+
142+
Channel channel = mock(Channel.class);
143+
willReturn(true).given(channel).isOpen();
144+
Envelope envelope = new Envelope(123L, false, "ex", "rk");
145+
BasicProperties props = new BasicProperties.Builder()
146+
.headers(batched.getMessage().getMessageProperties().getHeaders())
147+
.contentType("text/plain")
148+
.build();
149+
GetResponse getResponse = new GetResponse(envelope, props, batched.getMessage().getBody(), 0);
150+
willReturn(getResponse).given(channel).basicGet("foo", false);
151+
Connection connection = mock(Connection.class);
152+
willReturn(true).given(connection).isOpen();
153+
willReturn(channel).given(connection).createChannel();
154+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
155+
willReturn(connection).given(connectionFactory).newConnection((ExecutorService) isNull(), anyString());
156+
157+
CachingConnectionFactory ccf = new CachingConnectionFactory(connectionFactory);
158+
AmqpMessageSource source = new AmqpMessageSource(ccf, "foo");
159+
Message<?> received = source.receive();
160+
assertThat(received).isNotNull();
161+
assertThat(((List<String>) received.getPayload())).contains("test1", "test2");
162+
}
163+
164+
126165
}

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import static org.mockito.Mockito.spy;
2828
import static org.mockito.Mockito.when;
2929

30+
import java.util.List;
3031
import java.util.Map;
3132
import java.util.concurrent.CountDownLatch;
3233
import java.util.concurrent.TimeUnit;
@@ -36,6 +37,8 @@
3637

3738
import org.springframework.amqp.core.AcknowledgeMode;
3839
import org.springframework.amqp.core.MessageProperties;
40+
import org.springframework.amqp.rabbit.batch.MessageBatch;
41+
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
3942
import org.springframework.amqp.rabbit.connection.Connection;
4043
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
4144
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -377,6 +380,54 @@ public void testRetryWithinOnMessageGateway() throws Exception {
377380
assertThat(errors.receive(0)).isNull();
378381
}
379382

383+
@SuppressWarnings({ "unchecked" })
384+
@Test
385+
public void testBatchdAdapter() throws Exception {
386+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock(ConnectionFactory.class));
387+
container.setDeBatchingEnabled(false);
388+
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container);
389+
QueueChannel out = new QueueChannel();
390+
adapter.setOutputChannel(out);
391+
adapter.afterPropertiesSet();
392+
ChannelAwareMessageListener listener = (ChannelAwareMessageListener) container.getMessageListener();
393+
SimpleBatchingStrategy bs = new SimpleBatchingStrategy(2, 10_000, 10_000L);
394+
MessageProperties messageProperties = new MessageProperties();
395+
messageProperties.setContentType("text/plain");
396+
org.springframework.amqp.core.Message message =
397+
new org.springframework.amqp.core.Message("test1".getBytes(), messageProperties);
398+
bs.addToBatch("foo", "bar", message);
399+
message = new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties);
400+
MessageBatch batched = bs.addToBatch("foo", "bar", message);
401+
listener.onMessage(batched.getMessage(), null);
402+
Message<?> received = out.receive();
403+
assertThat(received).isNotNull();
404+
assertThat(((List<String>) received.getPayload())).contains("test1", "test2");
405+
}
406+
407+
@SuppressWarnings({ "unchecked" })
408+
@Test
409+
public void testBatchGateway() throws Exception {
410+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock(ConnectionFactory.class));
411+
container.setDeBatchingEnabled(false);
412+
AmqpInboundGateway adapter = new AmqpInboundGateway(container);
413+
QueueChannel out = new QueueChannel();
414+
adapter.setRequestChannel(out);
415+
adapter.afterPropertiesSet();
416+
ChannelAwareMessageListener listener = (ChannelAwareMessageListener) container.getMessageListener();
417+
SimpleBatchingStrategy bs = new SimpleBatchingStrategy(2, 10_000, 10_000L);
418+
MessageProperties messageProperties = new MessageProperties();
419+
messageProperties.setContentType("text/plain");
420+
org.springframework.amqp.core.Message message =
421+
new org.springframework.amqp.core.Message("test1".getBytes(), messageProperties);
422+
bs.addToBatch("foo", "bar", message);
423+
message = new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties);
424+
MessageBatch batched = bs.addToBatch("foo", "bar", message);
425+
listener.onMessage(batched.getMessage(), null);
426+
Message<?> received = out.receive();
427+
assertThat(received).isNotNull();
428+
assertThat(((List<String>) received.getPayload())).contains("test1", "test2");
429+
}
430+
380431
public static class Foo {
381432

382433
private String bar;

src/reference/asciidoc/amqp.adoc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,22 @@ public class AmqpJavaApplication {
277277
----
278278
====
279279

280+
[[amqp-debatching]]
281+
==== Batched Messages
282+
283+
See https://docs.spring.io/spring-amqp/docs/current/reference/html/#template-batching[the Spring AMQP Documentation] for more information about batched messages.
284+
285+
To produce batched messages with Spring Integration, simply configure the outbound endpoint with a `BatchingRabbitTemplate`.
286+
287+
When receiving batched messages, by default, the listener containers extract each fragment message and the adapter will produce a `Message<?>` for each fragment.
288+
Starting with version 5.2, if the container's `deBatchingEnabled` property is set to `false`, the de-batching is performed by the adapter instead, and a single `Message<List<?>>` is produced with the payload being a list of the fragment payloads (after conversion if appropriate).
289+
290+
The default `BatchingStrategy` is the `SimpleBatchingStrategy`, but this can be overridden on the adapter.
291+
280292
=== Polled Inbound Channel Adapter
281293

294+
==== Overview
295+
282296
Version 5.0.1 introduced a polled channel adapter, letting you fetch individual messages on demand -- for example, with a `MessageSourcePollingTemplate` or a poller.
283297
See <<deferred-acks-message-source>> for more information.
284298

@@ -315,6 +329,13 @@ public IntegrationFlow flow() {
315329
----
316330
====
317331

332+
[[amqp-polled-debatching]]
333+
==== Batched Messages
334+
335+
See <<amqp-debatching>>.
336+
337+
For the polled adapter, there is no listener container, batched messages are always debatched (if the `BatchingStrategy` supports doing so).
338+
318339
[[amqp-inbound-gateway]]
319340
=== Inbound Gateway
320341

@@ -453,6 +474,11 @@ public class AmqpJavaApplication {
453474
----
454475
====
455476

477+
[[amqp-gatewway-debatching]]
478+
==== Batched Messages
479+
480+
See <<amqp-debatching>>.
481+
456482
[[amqp-inbound-ack]]
457483
=== Inbound Endpoint Acknowledge Mode
458484

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ See <<splitter>> for more information.
4040
The outbound endpoints can now be configured to synthesize a "nack" if no publisher confirm is received within a timeout.
4141
See <<amqp-outbound-endpoints>> for more information.
4242

43+
The inbound channel adapter can now receive batched messages as a `List<?>` payload instead of receiving a discrete message for each batch fragment.
44+
See <<amqp-debatching>> for more information.
45+
4346
[[x5.2-file]]
4447
==== File Changes
4548

0 commit comments

Comments
 (0)