Skip to content

Commit 18e4209

Browse files
garyrussellartembilan
authored andcommitted
GH-1474: Fix BatchingStrategy Propagation
See #1474 (Does not resolve because 2 issues are reported there). `BatchingStrategy` was not set by container factory. **cherry-pick to 2.4.x**
1 parent d4fc77f commit 18e4209

File tree

4 files changed

+27
-7
lines changed

4 files changed

+27
-7
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,8 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
369369
.acceptIfNotNull(this.phase, instance::setPhase)
370370
.acceptIfNotNull(this.afterReceivePostProcessors, instance::setAfterReceivePostProcessors)
371371
.acceptIfNotNull(this.deBatchingEnabled, instance::setDeBatchingEnabled)
372-
.acceptIfNotNull(this.messageAckListener, instance::setMessageAckListener);
372+
.acceptIfNotNull(this.messageAckListener, instance::setMessageAckListener)
373+
.acceptIfNotNull(this.batchingStrategy, instance::setBatchingStrategy);
373374
if (this.batchListener && this.deBatchingEnabled == null) {
374375
// turn off container debatching by default for batch listeners
375376
instance.setDeBatchingEnabled(false);
@@ -378,7 +379,7 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
378379
javaUtils
379380
.acceptIfNotNull(endpoint.getTaskExecutor(), instance::setTaskExecutor)
380381
.acceptIfNotNull(endpoint.getAckMode(), instance::setAcknowledgeMode)
381-
.acceptIfNotNull(this.batchingStrategy, endpoint::setBatchingStrategy);
382+
.acceptIfNotNull(endpoint.getBatchingStrategy(), instance::setBatchingStrategy);
382383
instance.setListenerId(endpoint.getId());
383384
endpoint.setBatchListener(this.batchListener);
384385
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractRabbitListenerEndpoint.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2021 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -308,6 +308,7 @@ public void setBatchListener(boolean batchListener) {
308308
this.batchListener = batchListener;
309309
}
310310

311+
@Override
311312
@Nullable
312313
public BatchingStrategy getBatchingStrategy() {
313314
return this.batchingStrategy;

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/RabbitListenerEndpoint.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -114,7 +114,6 @@ default TaskExecutor getTaskExecutor() {
114114
* @since 2.2
115115
*/
116116
default void setBatchListener(boolean batchListener) {
117-
// NOSONAR empty
118117
}
119118

120119
/**
@@ -124,7 +123,16 @@ default void setBatchListener(boolean batchListener) {
124123
* @see #setBatchListener(boolean)
125124
*/
126125
default void setBatchingStrategy(BatchingStrategy batchingStrategy) {
127-
// NOSONAR empty
126+
}
127+
128+
/**
129+
* Return this endpoint's batching strategy, or null.
130+
* @return the strategy.
131+
* @since 2.4.7
132+
*/
133+
@Nullable
134+
default BatchingStrategy getBatchingStrategy() {
135+
return null;
128136
}
129137

130138
/**

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,13 +28,15 @@
2828

2929
import org.springframework.amqp.core.AcknowledgeMode;
3030
import org.springframework.amqp.core.MessagePostProcessor;
31+
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
3132
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
3233
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
3334
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
3435
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
3536
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
3637
import org.springframework.amqp.support.converter.MessageConverter;
3738
import org.springframework.amqp.support.converter.SimpleMessageConverter;
39+
import org.springframework.amqp.utils.test.TestUtils;
3840
import org.springframework.beans.DirectFieldAccessor;
3941
import org.springframework.scheduling.TaskScheduler;
4042
import org.springframework.transaction.PlatformTransactionManager;
@@ -70,12 +72,17 @@ public void createSimpleContainer() {
7072
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
7173
endpoint.setMessageListener(this.messageListener);
7274
endpoint.setQueueNames("myQueue");
75+
BatchingStrategy bs1 = mock(BatchingStrategy.class);
76+
this.factory.setBatchingStrategy(bs1);
77+
BatchingStrategy bs2 = mock(BatchingStrategy.class);
78+
endpoint.setBatchingStrategy(bs2);
7379

7480
SimpleMessageListenerContainer container = this.factory.createListenerContainer(endpoint);
7581

7682
assertBasicConfig(container);
7783
assertThat(container.getMessageListener()).isEqualTo(messageListener);
7884
assertThat(container.getQueueNames()[0]).isEqualTo("myQueue");
85+
assertThat(TestUtils.getPropertyValue(container, "batchingStrategy")).isSameAs(bs2);
7986
}
8087

8188
@Test
@@ -89,6 +96,8 @@ public void createContainerFullConfig() {
8996
this.factory.setTaskExecutor(executor);
9097
this.factory.setTransactionManager(transactionManager);
9198
this.factory.setBatchSize(10);
99+
BatchingStrategy bs1 = mock(BatchingStrategy.class);
100+
this.factory.setBatchingStrategy(bs1);
92101
this.factory.setConcurrentConsumers(2);
93102
this.factory.setMaxConcurrentConsumers(5);
94103
this.factory.setStartConsumerMinInterval(2000L);
@@ -115,6 +124,7 @@ public void createContainerFullConfig() {
115124
SimpleMessageListenerContainer container = this.factory.createListenerContainer(endpoint);
116125

117126
assertBasicConfig(container);
127+
assertThat(TestUtils.getPropertyValue(container, "batchingStrategy")).isSameAs(bs1);
118128
DirectFieldAccessor fieldAccessor = new DirectFieldAccessor(container);
119129
assertThat(fieldAccessor.getPropertyValue("taskExecutor")).isSameAs(executor);
120130
assertThat(fieldAccessor.getPropertyValue("transactionManager")).isSameAs(transactionManager);

0 commit comments

Comments
 (0)