Skip to content

Commit f5aba94

Browse files
garyrussellartembilan
authored andcommitted
GH-1474: Fix MessageProperties.lastInBatch
Resolves #1474 When consuming the whole debatched batch as a list, all messages had the `lastInBatch` property set to true. Clone the message properties for the last record. **cherry-pick to 2.4.x**
1 parent 18e4209 commit f5aba94

File tree

3 files changed

+25
-9
lines changed

3 files changed

+25
-9
lines changed

spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -140,6 +140,15 @@ public void setHeader(String key, Object value) {
140140
this.headers.put(key, value);
141141
}
142142

143+
/**
144+
* Set headers.
145+
* @param headers the headers.
146+
* @since 2.4.7
147+
*/
148+
public void setHeaders(Map<String, Object> headers) {
149+
this.headers.putAll(headers);
150+
}
151+
143152
/**
144153
* Typed getter for a header.
145154
* @param headerName the header name.

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/batch/SimpleBatchingStrategy.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
2929
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
3030
import org.springframework.amqp.support.AmqpHeaders;
3131
import org.springframework.amqp.support.converter.MessageConversionException;
32+
import org.springframework.beans.BeanUtils;
3233
import org.springframework.util.Assert;
3334

3435
/**
@@ -184,10 +185,16 @@ public void deBatch(Message message, Consumer<Message> fragmentConsumer) {
184185
byte[] body = new byte[length];
185186
byteBuffer.get(body);
186187
messageProperties.setContentLength(length);
187-
// Caveat - shared MessageProperties.
188-
Message fragment = new Message(body, messageProperties);
189-
if (!byteBuffer.hasRemaining()) {
190-
messageProperties.setLastInBatch(true);
188+
// Caveat - shared MessageProperties, except for last
189+
Message fragment;
190+
if (byteBuffer.hasRemaining()) {
191+
fragment = new Message(body, messageProperties);
192+
}
193+
else {
194+
MessageProperties lastProperties = new MessageProperties();
195+
BeanUtils.copyProperties(messageProperties, lastProperties);
196+
lastProperties.setLastInBatch(true);
197+
fragment = new Message(body, lastProperties);
191198
}
192199
fragmentConsumer.accept(fragment);
193200
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/BatchingRabbitTemplateTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2021 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -261,8 +261,8 @@ private void testDebatchByContainer(AbstractMessageListenerContainer container,
261261
if (asList) {
262262
container.setMessageListener((BatchMessageListener) messages -> {
263263
received.addAll(messages);
264-
lastInBatch.add(false);
265-
lastInBatch.add(true);
264+
lastInBatch.add(messages.get(0).getMessageProperties().isLastInBatch());
265+
lastInBatch.add(messages.get(1).getMessageProperties().isLastInBatch());
266266
batchSize.set(messages.size());
267267
latch.countDown();
268268
});

0 commit comments

Comments
 (0)