Description
In what version(s) of Spring AMQP are you seeing this issue?
spring-rabbit-2.4.6
Describe the bug
I need to use batch send and batch receive.
-
When messages sent in batches are de-batch, the SimpleBatchingStrategy.deBatch() method shares the same instance of MessageProperties when processing AmqpHeaders.LAST_IN_BATCH, resulting in all messages in the batch becoming amqp_lastInBatch=true.
-
When SimpleRabbitListenerContainerFactory generates an instance, the batch strategy is only set to the endpoint, but not to the instance, so the default strategy is still used when batching: new SimpleBatchingStrategy(0, 0, 0L)
To Reproduce
- Enable BatchingRabbitTemplate
- Send multiple messages
- Enable ConsumerBatchEnabled
- Receive messages in batches:
- Can't tell if it's the last message in the batch
- The configured batch strategy cannot be used correctly
Expected behavior
- Correctly set AmqpHeaders.LAST_IN_BATCH
- Custom batch policies can take effect normally.
I provide override code samples at the end.
Sample
import dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.7.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
java :
bind
@Configuration
@Slf4j
public class RabbitConfigAutoByAnnotation {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public Queue queueAnnotation() {
Map<String, Object> queueArguments = new HashMap<>(1);
queueArguments.put("x-single-active-consumer", true);
return new Queue(RabbitConstant.QUEUE_ANNOTATION, true, false, false, queueArguments);
}
@Bean
public DirectExchange exchangeAnnotation() {
return new DirectExchange(RabbitConstant.EXCHANGE_ANNOTATION, true, false, null);
}
@Bean
public Binding workbenchDataBinding() {
return BindingBuilder.bind(queueAnnotation()).to(exchangeAnnotation()).with(RabbitConstant.ROUTING_KEY_ANNOTATION);
}
}
RabbitBatchConfig.java
@Configuration
@Slf4j
public class RabbitBatchConfig {
/**
* 批量收集最大消息条数
*/
final int BATCH_SIZE = 7;
/**
* 批量发送最大内存
*/
final int BUFFER_LIMIT = 16 * 1024 * 1024;
/**
* 批量收集最长等待时间
*/
final long TIMEOUT = 10 * 1000L;
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean("batchingStrategy")
public SimpleBatchingStrategy batchingStrategy(){
return new SimpleBatchingStrategy(BATCH_SIZE, BUFFER_LIMIT, TIMEOUT);
}
@Bean
public BatchingRabbitTemplate batchingRabbitTemplate(SimpleBatchingStrategy batchingStrategy,ConnectionFactory connectionFactory) {
BatchingRabbitTemplate batchingRabbitTemplate = new BatchingRabbitTemplate(connectionFactory, batchingStrategy, new ConcurrentTaskScheduler());
batchingRabbitTemplate.setMessageConverter(jsonMessageConverter());
return batchingRabbitTemplate;
}
@Bean(name = "myBatchContainerFactory")
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory(SimpleBatchingStrategy batchingStrategy, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setBatchingStrategy(batchingStrategy);
factory.setConnectionFactory(connectionFactory);
factory.setBatchListener(true);
factory.setConsumerBatchEnabled(true);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setBatchSize(4);
factory.setReceiveTimeout(5000L);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
RabbitConstant.java
public class RabbitConstant {
public static final String QUEUE_ANNOTATION = "demo.fenhy.reliable.annotation.queue";
public static final String EXCHANGE_ANNOTATION = "demo.fenhy.reliable.annotation.exchange";
public static final String ROUTING_KEY_ANNOTATION = "demo.fenhy.reliable.annotation.routing-key";
}
user
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
private long id;
private String name;
private int age;
}
producer
@GetMapping("p0")
public String p0(@RequestParam(defaultValue = "0") int type) {
User user = User.builder().id(FAKER.number().randomNumber()).age(FAKER.number().numberBetween(1, 32)).name(FAKER.name().name()).build();
batchingRabbitTemplate.convertAndSend(RabbitConstant.EXCHANGE_ANNOTATION,
RabbitConstant.ROUTING_KEY_ANNOTATION,
user);
log.info("p0消息投递完毕");
return "p0-test";
}
consumer
@RabbitListener(queues = RabbitConstant.QUEUE_ANNOTATION, containerFactory = "myBatchContainerFactory")
public void p2(List<org.springframework.messaging.Message<User>> messages, Channel channel) throws IOException {
log.info("开始批量消费=================size:{}", messages.size());
messages.forEach(message -> {
MessageHeaders headers = message.getHeaders();
long deliverTag = Long.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG).toString());
boolean isLast = Boolean.valueOf(headers.get(AmqpHeaders.LAST_IN_BATCH).toString());
if (isLast) {
try {
channel.basicAck(deliverTag, false);
} catch (IOException e) {
e.printStackTrace();
}
}
log.info("消费者 p2 messaging deliveryTag:{}, isLast:{}, 接收消息:{}", deliverTag, isLast, message);
});
log.info("结束批量消费========================");
}
Attempt to deliver 5 messages and report the following error. Double ack due to bug in AmqpHeaders.LAST_IN_BATCH.
2022-07-25 17:28:59.381 [http-nio-8080-exec-1] INFO o.a.c.core.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring DispatcherServlet 'dispatcherServlet'
2022-07-25 17:28:59.381 [http-nio-8080-exec-1] INFO org.springframework.web.servlet.DispatcherServlet - Initializing Servlet 'dispatcherServlet'
2022-07-25 17:28:59.382 [http-nio-8080-exec-1] INFO org.springframework.web.servlet.DispatcherServlet - Completed initialization in 1 ms
2022-07-25 17:28:59.434 [http-nio-8080-exec-1] INFO org.fenhy.demo.controller.ProducerController - p0消息投递完毕
2022-07-25 17:29:00.134 [http-nio-8080-exec-3] INFO org.fenhy.demo.controller.ProducerController - p0消息投递完毕
2022-07-25 17:29:00.493 [http-nio-8080-exec-4] INFO org.fenhy.demo.controller.ProducerController - p0消息投递完毕
2022-07-25 17:29:00.942 [http-nio-8080-exec-5] INFO org.fenhy.demo.controller.ProducerController - p0消息投递完毕
2022-07-25 17:29:01.733 [http-nio-8080-exec-6] INFO org.fenhy.demo.controller.ProducerController - p0消息投递完毕
2022-07-25 17:29:11.748 [pool-5-thread-1] DEBUG o.s.a.rabbit.connection.CachingConnectionFactory - Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,2)
2022-07-25 17:29:11.749 [pool-5-thread-1] DEBUG o.s.amqp.rabbit.core.BatchingRabbitTemplate - Executing callback RabbitTemplate$$Lambda$815/949582174 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@203e705e Shared Rabbit Connection: SimpleConnection@45c80312 [delegate=amqp://[email protected]:5672/, localPort= 61016]
2022-07-25 17:29:11.750 [pool-5-thread-1] DEBUG o.s.amqp.rabbit.core.BatchingRabbitTemplate - Publishing message [(Body:'[B@47516dd6(byte[205])' MessageProperties [headers={amqp_batchSize=5, springBatchFormat=lengthHeader4, __TypeId__=org.fenhy.demo.entity.User}, contentType=application/json, contentEncoding=UTF-8, contentLength=35, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [demo.fenhy.reliable.annotation.exchange], routingKey = [demo.fenhy.reliable.annotation.routing-key]
2022-07-25 17:29:11.761 [pool-3-thread-5] DEBUG o.s.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for consumerTag: 'amq.ctag-xZNo8sCDs9gvvJNy-CYfHw' with deliveryTag: '2' in Consumer@5627cb29: tags=[[amq.ctag-xZNo8sCDs9gvvJNy-CYfHw]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@203e705e Shared Rabbit Connection: SimpleConnection@45c80312 [delegate=amqp://[email protected]:5672/, localPort= 61016], acknowledgeMode=MANUAL local queue size=0
2022-07-25 17:29:11.762 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] DEBUG o.s.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'[B@3873c73c(byte[205])' MessageProperties [headers={amqp_batchSize=5, springBatchFormat=lengthHeader4, __TypeId__=org.fenhy.demo.entity.User}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=demo.fenhy.reliable.annotation.exchange, receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, deliveryTag=2, consumerTag=amq.ctag-xZNo8sCDs9gvvJNy-CYfHw, consumerQueue=demo.fenhy.reliable.annotation.queue])
2022-07-25 17:29:16.771 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] DEBUG o.s.a.r.l.a.BatchMessagingMessageListenerAdapter - Processing [GenericMessage [payload=[GenericMessage [payload=User(id=4, name=董煜城, age=3), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=36, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=34571103-03e8-cd15-04f4-feaccc5c982e, amqp_consumerTag=amq.ctag-xZNo8sCDs9gvvJNy-CYfHw, amqp_lastInBatch=true, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741356769}], GenericMessage [payload=User(id=5482, name=廖鹏煊, age=9), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=36, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=76d59a4d-57f8-18c7-6912-b7814312b8a5, amqp_consumerTag=amq.ctag-xZNo8sCDs9gvvJNy-CYfHw, amqp_lastInBatch=true, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741356769}], GenericMessage [payload=User(id=73132, name=王致远, age=3), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=36, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=3b2ec17a-c001-7bc5-0ad0-7f30c7207bc0, amqp_consumerTag=amq.ctag-xZNo8sCDs9gvvJNy-CYfHw, amqp_lastInBatch=true, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741356769}], GenericMessage [payload=User(id=31, name=余鹏煊, age=14), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=36, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=e27ec6ba-b4ef-6524-5ff5-f85974c3f9fa, amqp_consumerTag=amq.ctag-xZNo8sCDs9gvvJNy-CYfHw, amqp_lastInBatch=true, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741356770}], GenericMessage [payload=User(id=37, name=卢建辉, age=7), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=36, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=2015a1f9-70b9-df2b-5765-0dd5cbb0fa63, amqp_consumerTag=amq.ctag-xZNo8sCDs9gvvJNy-CYfHw, amqp_lastInBatch=true, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741356770}]], headers={id=5cae78fd-0a11-0e1b-363d-08b49ff47cde, timestamp=1658741356770}]]
2022-07-25 17:29:16.772 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO org.fenhy.demo.listener.ManualConsumer - 开始批量消费=================size:5
2022-07-25 17:29:16.773 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO org.fenhy.demo.listener.ManualConsumer - 消费者 p2 messaging deliveryTag:2, isLast:true, 接收消息:GenericMessage [payload=User(id=4, name=董煜城, age=3), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=36, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=34571103-03e8-cd15-04f4-feaccc5c982e, amqp_consumerTag=amq.ctag-xZNo8sCDs9gvvJNy-CYfHw, amqp_lastInBatch=true, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741356769}]
2022-07-25 17:29:16.773 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO org.fenhy.demo.listener.ManualConsumer - 消费者 p2 messaging deliveryTag:2, isLast:true, 接收消息:GenericMessage [payload=User(id=5482, name=廖鹏煊, age=9), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=36, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=76d59a4d-57f8-18c7-6912-b7814312b8a5, amqp_consumerTag=amq.ctag-xZNo8sCDs9gvvJNy-CYfHw, amqp_lastInBatch=true, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741356769}]
2022-07-25 17:29:16.773 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO org.fenhy.demo.listener.ManualConsumer - 消费者 p2 messaging deliveryTag:2, isLast:true, 接收消息:GenericMessage [payload=User(id=73132, name=王致远, age=3), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=36, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=3b2ec17a-c001-7bc5-0ad0-7f30c7207bc0, amqp_consumerTag=amq.ctag-xZNo8sCDs9gvvJNy-CYfHw, amqp_lastInBatch=true, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741356769}]
2022-07-25 17:29:16.773 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO org.fenhy.demo.listener.ManualConsumer - 消费者 p2 messaging deliveryTag:2, isLast:true, 接收消息:GenericMessage [payload=User(id=31, name=余鹏煊, age=14), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=36, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=e27ec6ba-b4ef-6524-5ff5-f85974c3f9fa, amqp_consumerTag=amq.ctag-xZNo8sCDs9gvvJNy-CYfHw, amqp_lastInBatch=true, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741356770}]
2022-07-25 17:29:16.773 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO org.fenhy.demo.listener.ManualConsumer - 消费者 p2 messaging deliveryTag:2, isLast:true, 接收消息:GenericMessage [payload=User(id=37, name=卢建辉, age=7), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=36, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=2015a1f9-70b9-df2b-5765-0dd5cbb0fa63, amqp_consumerTag=amq.ctag-xZNo8sCDs9gvvJNy-CYfHw, amqp_lastInBatch=true, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741356770}]
2022-07-25 17:29:16.774 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO org.fenhy.demo.listener.ManualConsumer - 结束批量消费========================
2022-07-25 17:29:16.787 [AMQP Connection 127.0.0.1:5672] ERROR o.s.a.rabbit.connection.CachingConnectionFactory - Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60, method-id=80)
2022-07-25 17:29:16.789 [pool-3-thread-6] DEBUG o.s.amqp.rabbit.listener.BlockingQueueConsumer - Received shutdown signal for consumer tag=amq.ctag-xZNo8sCDs9gvvJNy-CYfHw
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60, method-id=80)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
at java.lang.Thread.run(Thread.java:750)
2022-07-25 17:29:21.784 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] DEBUG o.s.a.r.listener.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60, method-id=80)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
at java.lang.Thread.run(Thread.java:750)
2022-07-25 17:29:21.785 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO o.s.a.r.listener.SimpleMessageListenerContainer - Restarting Consumer@5627cb29: tags=[[amq.ctag-xZNo8sCDs9gvvJNy-CYfHw]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@203e705e Shared Rabbit Connection: SimpleConnection@45c80312 [delegate=amqp://[email protected]:5672/, localPort= 61016], acknowledgeMode=MANUAL local queue size=0
2022-07-25 17:29:21.785 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] DEBUG o.s.amqp.rabbit.listener.BlockingQueueConsumer - Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@203e705e Shared Rabbit Connection: SimpleConnection@45c80312 [delegate=amqp://[email protected]:5672/, localPort= 61016]
2022-07-25 17:29:21.785 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] DEBUG o.s.a.rabbit.connection.CachingConnectionFactory - Closing cached Channel: AMQChannel(amqp://[email protected]:5672/,1)
2022-07-25 17:29:21.787 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2] DEBUG o.s.amqp.rabbit.listener.BlockingQueueConsumer - Starting consumer Consumer@1cadb1a1: tags=[[]], channel=null, acknowledgeMode=MANUAL local queue size=0
2022-07-25 17:29:21.803 [pool-3-thread-7] DEBUG o.s.amqp.rabbit.listener.BlockingQueueConsumer - ConsumeOK: Consumer@1cadb1a1: tags=[[amq.ctag-DPmOqH4Wg3AyV0jpE_LBsw]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@203e705e Shared Rabbit Connection: SimpleConnection@45c80312 [delegate=amqp://[email protected]:5672/, localPort= 61016], acknowledgeMode=MANUAL local queue size=0
2022-07-25 17:29:21.803 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2] DEBUG o.s.amqp.rabbit.listener.BlockingQueueConsumer - Started on queue 'demo.fenhy.reliable.annotation.queue' with tag amq.ctag-DPmOqH4Wg3AyV0jpE_LBsw: Consumer@1cadb1a1: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@203e705e Shared Rabbit Connection: SimpleConnection@45c80312 [delegate=amqp://[email protected]:5672/, localPort= 61016], acknowledgeMode=MANUAL local queue size=0
2022-07-25 17:29:21.804 [pool-3-thread-8] DEBUG o.s.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for consumerTag: 'amq.ctag-DPmOqH4Wg3AyV0jpE_LBsw' with deliveryTag: '1' in Consumer@1cadb1a1: tags=[[amq.ctag-DPmOqH4Wg3AyV0jpE_LBsw]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@203e705e Shared Rabbit Connection: SimpleConnection@45c80312 [delegate=amqp://[email protected]:5672/, localPort= 61016], acknowledgeMode=MANUAL local queue size=0
2022-07-25 17:29:21.804 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2] DEBUG o.s.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'{"id":9,"name":"梁涛","age":13}' MessageProperties [headers={__TypeId__=org.fenhy.demo.entity.User}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=demo.fenhy.reliable.annotation.exchange, receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, deliveryTag=1, consumerTag=amq.ctag-DPmOqH4Wg3AyV0jpE_LBsw, consumerQueue=demo.fenhy.reliable.annotation.queue])
2022-07-25 17:29:26.810 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2] DEBUG o.s.a.r.l.a.BatchMessagingMessageListenerAdapter - Processing [GenericMessage [payload=[GenericMessage [payload=User(id=9, name=梁涛, age=13), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_deliveryTag=1, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=true, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, id=b21d033b-5f9a-e931-e003-1b8aea823a1a, amqp_consumerTag=amq.ctag-DPmOqH4Wg3AyV0jpE_LBsw, amqp_lastInBatch=false, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741366810}]], headers={id=d1d169ac-fd42-f751-2024-0c07697186e3, timestamp=1658741366810}]]
2022-07-25 17:29:26.811 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2] INFO org.fenhy.demo.listener.ManualConsumer - 开始批量消费=================size:1
2022-07-25 17:29:26.811 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2] INFO org.fenhy.demo.listener.ManualConsumer - 消费者 p2 messaging deliveryTag:1, isLast:false, 接收消息:GenericMessage [payload=User(id=9, name=梁涛, age=13), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_deliveryTag=1, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=true, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, id=b21d033b-5f9a-e931-e003-1b8aea823a1a, amqp_consumerTag=amq.ctag-DPmOqH4Wg3AyV0jpE_LBsw, amqp_lastInBatch=false, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741366810}]
2022-07-25 17:29:26.811 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2] INFO org.fenhy.demo.listener.ManualConsumer - 结束批量消费========================
After tracing, I override the createListenerContainer() and deBatch() methods, and send 5 messages again, the test is correct.
RabbitBatchConfig
@Configuration
@Slf4j
public class RabbitBatchConfig {
/**
* 批量收集最大消息条数
*/
final int BATCH_SIZE = 7;
/**
* 批量发送最大内存
*/
final int BUFFER_LIMIT = 16 * 1024 * 1024;
/**
* 批量收集最长等待时间
*/
final long TIMEOUT = 10 * 1000L;
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean("batchingStrategy")
public SimpleBatchingStrategy batchingStrategy(){
SimpleBatchingStrategy strategy = new SimpleBatchingStrategy(BATCH_SIZE, BUFFER_LIMIT, TIMEOUT){
@Override
public void deBatch(Message message, Consumer<Message> fragmentConsumer) {
ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBody());
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.getHeaders().remove(MessageProperties.SPRING_BATCH_FORMAT);
while (byteBuffer.hasRemaining()) {
int length = byteBuffer.getInt();
if (length < 0 || length > byteBuffer.remaining()) {
throw new ListenerExecutionFailedException("Bad batched message received",
new MessageConversionException("Insufficient batch data at offset " + byteBuffer.position()),
message);
}
byte[] body = new byte[length];
byteBuffer.get(body);
messageProperties.setContentLength(length);
// Caveat - shared MessageProperties.
Message fragment = new Message(body, messageProperties);
if (!byteBuffer.hasRemaining()) {
MessageProperties lastMessageProperties = new MessageProperties();
BeanUtils.copyProperties(messageProperties, lastMessageProperties);
fragment = new Message(body, lastMessageProperties);
lastMessageProperties.setLastInBatch(true);
log.info("这是当前批次最后的消息:{}", lastMessageProperties);
}
fragmentConsumer.accept(fragment);
}
}
};
return strategy;
}
@Bean
public BatchingRabbitTemplate batchingRabbitTemplate(SimpleBatchingStrategy batchingStrategy,ConnectionFactory connectionFactory) {
BatchingRabbitTemplate batchingRabbitTemplate = new BatchingRabbitTemplate(connectionFactory, batchingStrategy, new ConcurrentTaskScheduler());
batchingRabbitTemplate.setMessageConverter(jsonMessageConverter());
return batchingRabbitTemplate;
}
@Bean(name = "myBatchContainerFactory")
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory(SimpleBatchingStrategy batchingStrategy, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(){
@Override
public SimpleMessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint) {
SimpleMessageListenerContainer instance = super.createListenerContainer(endpoint);
if (endpoint != null) {
JavaUtils.INSTANCE.acceptIfNotNull(batchingStrategy, instance::setBatchingStrategy);
}
return instance;
}
};
factory.setBatchingStrategy(batchingStrategy);
factory.setConnectionFactory(connectionFactory);
factory.setBatchListener(true);
factory.setConsumerBatchEnabled(true);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setBatchSize(4);
factory.setReceiveTimeout(5000L);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
log
2022-07-25 17:34:20.631 [http-nio-8080-exec-1] INFO org.fenhy.demo.controller.ProducerController - p0消息投递完毕
2022-07-25 17:34:21.081 [http-nio-8080-exec-2] INFO org.fenhy.demo.controller.ProducerController - p0消息投递完毕
2022-07-25 17:34:21.495 [http-nio-8080-exec-3] INFO org.fenhy.demo.controller.ProducerController - p0消息投递完毕
2022-07-25 17:34:21.917 [http-nio-8080-exec-4] INFO org.fenhy.demo.controller.ProducerController - p0消息投递完毕
2022-07-25 17:34:22.305 [http-nio-8080-exec-5] INFO org.fenhy.demo.controller.ProducerController - p0消息投递完毕
2022-07-25 17:34:32.315 [pool-5-thread-1] DEBUG o.s.a.rabbit.connection.CachingConnectionFactory - Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,2)
2022-07-25 17:34:32.316 [pool-5-thread-1] DEBUG o.s.amqp.rabbit.core.BatchingRabbitTemplate - Executing callback RabbitTemplate$$Lambda$812/1108165996 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@1eb85a47 Shared Rabbit Connection: SimpleConnection@256589a1 [delegate=amqp://[email protected]:5672/, localPort= 61151]
2022-07-25 17:34:32.316 [pool-5-thread-1] DEBUG o.s.amqp.rabbit.core.BatchingRabbitTemplate - Publishing message [(Body:'[B@66eb887a(byte[217])' MessageProperties [headers={amqp_batchSize=5, springBatchFormat=lengthHeader4, __TypeId__=org.fenhy.demo.entity.User}, contentType=application/json, contentEncoding=UTF-8, contentLength=42, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [demo.fenhy.reliable.annotation.exchange], routingKey = [demo.fenhy.reliable.annotation.routing-key]
2022-07-25 17:34:32.325 [pool-3-thread-5] DEBUG o.s.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for consumerTag: 'amq.ctag-SbfyxsJbR4Uzsh27xmSyFA' with deliveryTag: '2' in Consumer@646d810b: tags=[[amq.ctag-SbfyxsJbR4Uzsh27xmSyFA]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@1eb85a47 Shared Rabbit Connection: SimpleConnection@256589a1 [delegate=amqp://[email protected]:5672/, localPort= 61151], acknowledgeMode=MANUAL local queue size=0
2022-07-25 17:34:32.325 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] DEBUG o.s.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'[B@7e117c00(byte[217])' MessageProperties [headers={amqp_batchSize=5, springBatchFormat=lengthHeader4, __TypeId__=org.fenhy.demo.entity.User}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=demo.fenhy.reliable.annotation.exchange, receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, deliveryTag=2, consumerTag=amq.ctag-SbfyxsJbR4Uzsh27xmSyFA, consumerQueue=demo.fenhy.reliable.annotation.queue])
2022-07-25 17:34:32.330 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO org.fenhy.demo.config.RabbitBatchConfig - 这是当前批次最后的消息:MessageProperties [headers={}, contentType=application/json, contentEncoding=UTF-8, contentLength=42, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=demo.fenhy.reliable.annotation.exchange, receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, deliveryTag=2, consumerTag=amq.ctag-SbfyxsJbR4Uzsh27xmSyFA, consumerQueue=demo.fenhy.reliable.annotation.queue]
2022-07-25 17:34:37.339 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] DEBUG o.s.a.r.l.a.BatchMessagingMessageListenerAdapter - Processing [GenericMessage [payload=[GenericMessage [payload=User(id=6108835, name=傅天宇, age=25), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=42, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=30a90c0d-455f-8d3a-e2b3-5e47cb12d94e, amqp_consumerTag=amq.ctag-SbfyxsJbR4Uzsh27xmSyFA, amqp_lastInBatch=false, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741677337}], GenericMessage [payload=User(id=308500, name=余耀杰, age=19), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=42, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=328d899c-8c4a-660d-fc46-c43f683976d0, amqp_consumerTag=amq.ctag-SbfyxsJbR4Uzsh27xmSyFA, amqp_lastInBatch=false, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741677337}], GenericMessage [payload=User(id=72, name=萧胤祥, age=7), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=42, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=d84cc6c0-c472-ca3d-98bd-d3e118a88d01, amqp_consumerTag=amq.ctag-SbfyxsJbR4Uzsh27xmSyFA, amqp_lastInBatch=false, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741677338}], GenericMessage [payload=User(id=51, name=夏擎宇, age=8), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=42, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=d22d0f6e-3d5a-0012-cc83-ba9f2cf8dd04, amqp_consumerTag=amq.ctag-SbfyxsJbR4Uzsh27xmSyFA, amqp_lastInBatch=false, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741677338}], GenericMessage [payload=User(id=1713878, name=武绍辉, age=18), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=42, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, id=9390b058-bd00-164d-0ea8-3c868bbf5e91, amqp_consumerTag=amq.ctag-SbfyxsJbR4Uzsh27xmSyFA, amqp_lastInBatch=true, contentType=application/json, timestamp=1658741677338}]], headers={id=0b0af0df-3ccb-9206-e61b-a4b5dd95893d, timestamp=1658741677338}]]
2022-07-25 17:34:37.340 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO org.fenhy.demo.listener.ManualConsumer - 开始批量消费=================size:5
2022-07-25 17:34:37.340 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO org.fenhy.demo.listener.ManualConsumer - 消费者 p2 messaging deliveryTag:2, isLast:false, 接收消息:GenericMessage [payload=User(id=6108835, name=傅天宇, age=25), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=42, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=30a90c0d-455f-8d3a-e2b3-5e47cb12d94e, amqp_consumerTag=amq.ctag-SbfyxsJbR4Uzsh27xmSyFA, amqp_lastInBatch=false, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741677337}]
2022-07-25 17:34:37.341 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO org.fenhy.demo.listener.ManualConsumer - 消费者 p2 messaging deliveryTag:2, isLast:false, 接收消息:GenericMessage [payload=User(id=308500, name=余耀杰, age=19), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=42, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=328d899c-8c4a-660d-fc46-c43f683976d0, amqp_consumerTag=amq.ctag-SbfyxsJbR4Uzsh27xmSyFA, amqp_lastInBatch=false, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741677337}]
2022-07-25 17:34:37.341 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO org.fenhy.demo.listener.ManualConsumer - 消费者 p2 messaging deliveryTag:2, isLast:false, 接收消息:GenericMessage [payload=User(id=72, name=萧胤祥, age=7), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=42, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=d84cc6c0-c472-ca3d-98bd-d3e118a88d01, amqp_consumerTag=amq.ctag-SbfyxsJbR4Uzsh27xmSyFA, amqp_lastInBatch=false, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741677338}]
2022-07-25 17:34:37.341 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO org.fenhy.demo.listener.ManualConsumer - 消费者 p2 messaging deliveryTag:2, isLast:false, 接收消息:GenericMessage [payload=User(id=51, name=夏擎宇, age=8), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=42, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, amqp_batchSize=5, id=d22d0f6e-3d5a-0012-cc83-ba9f2cf8dd04, amqp_consumerTag=amq.ctag-SbfyxsJbR4Uzsh27xmSyFA, amqp_lastInBatch=false, contentType=application/json, __TypeId__=org.fenhy.demo.entity.User, timestamp=1658741677338}]
2022-07-25 17:34:37.342 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO org.fenhy.demo.listener.ManualConsumer - 消费者 p2 messaging deliveryTag:2, isLast:true, 接收消息:GenericMessage [payload=User(id=1713878, name=武绍辉, age=18), headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=demo.fenhy.reliable.annotation.exchange, amqp_contentLength=42, amqp_deliveryTag=2, amqp_consumerQueue=demo.fenhy.reliable.annotation.queue, amqp_redelivered=false, amqp_receivedRoutingKey=demo.fenhy.reliable.annotation.routing-key, amqp_contentEncoding=UTF-8, id=9390b058-bd00-164d-0ea8-3c868bbf5e91, amqp_consumerTag=amq.ctag-SbfyxsJbR4Uzsh27xmSyFA, amqp_lastInBatch=true, contentType=application/json, timestamp=1658741677338}]
2022-07-25 17:34:37.343 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO org.fenhy.demo.listener.ManualConsumer - 结束批量消费========================