Skip to content

Commit 0baa447

Browse files
garyrussellartembilan
authored andcommitted
Pull Up Consumer Properties
- move `consumerProperties` from `ContainerProperties` to `ConsumerProperties` - rename to `kafkaConsumerProperties` - deprecate old accessors - allows overriding factory consumer properties in SIK message source * Fix typo
1 parent 4f8fa13 commit 0baa447

File tree

7 files changed

+88
-16
lines changed

7 files changed

+88
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ else if (this.autoStartup != null) {
376376
.acceptIfNotNull(endpoint.getGroupId(), instance.getContainerProperties()::setGroupId)
377377
.acceptIfNotNull(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId)
378378
.acceptIfNotNull(endpoint.getConsumerProperties(),
379-
instance.getContainerProperties()::setConsumerProperties);
379+
instance.getContainerProperties()::setKafkaConsumerProperties);
380380
}
381381

382382
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.time.Duration;
2020
import java.util.Arrays;
21+
import java.util.Properties;
2122
import java.util.regex.Pattern;
2223

2324
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -94,6 +95,8 @@ public class ConsumerProperties {
9495

9596
private LogIfLevelEnabled.Level commitLogLevel = LogIfLevelEnabled.Level.DEBUG;
9697

98+
private Properties kafkaConsumerProperties = new Properties();
99+
97100
/**
98101
* Create properties for a container that will subscribe to the specified topics.
99102
* @param topics the topics.
@@ -270,6 +273,35 @@ public void setCommitLogLevel(LogIfLevelEnabled.Level commitLogLevel) {
270273
this.commitLogLevel = commitLogLevel;
271274
}
272275

276+
/**
277+
* Get the consumer properties that will be merged with the consumer properties
278+
* provided by the consumer factory; properties here will supersede any with the same
279+
* name(s) in the consumer factory.
280+
* {@code group.id} and {@code client.id} are ignored.
281+
* @return the properties.
282+
* @see org.apache.kafka.clients.consumer.ConsumerConfig
283+
* @see #setGroupId(String)
284+
* @see #setClientId(String)
285+
*/
286+
public Properties getKafkaConsumerProperties() {
287+
return this.kafkaConsumerProperties;
288+
}
289+
290+
/**
291+
* Set the consumer properties that will be merged with the consumer properties
292+
* provided by the consumer factory; properties here will supersede any with the same
293+
* name(s) in the consumer factory.
294+
* {@code group.id} and {@code client.id} are ignored.
295+
* @param kafkaConsumerProperties the properties.
296+
* @see org.apache.kafka.clients.consumer.ConsumerConfig
297+
* @see #setGroupId(String)
298+
* @see #setClientId(String)
299+
*/
300+
public void setKafkaConsumerProperties(Properties kafkaConsumerProperties) {
301+
Assert.notNull(kafkaConsumerProperties, "'kafkaConsumerProperties' cannot be null");
302+
this.kafkaConsumerProperties = kafkaConsumerProperties;
303+
}
304+
273305
@Override
274306
public String toString() {
275307
return "ConsumerProperties ["
@@ -287,7 +319,8 @@ protected final String renderProperties() {
287319
: "")
288320
+ (this.commitCallback != null ? ", commitCallback=" + this.commitCallback : "")
289321
+ ", syncCommits=" + this.syncCommits
290-
+ (this.syncCommitTimeout != null ? ", syncCommitTimeout=" + this.syncCommitTimeout : "");
322+
+ (this.syncCommitTimeout != null ? ", syncCommitTimeout=" + this.syncCommitTimeout : "")
323+
+ (this.kafkaConsumerProperties.size() > 0 ? ", properties=" + this.kafkaConsumerProperties : "");
291324
}
292325

293326
private String renderTopics() {

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,6 @@ public enum AckMode {
169169

170170
private long idleBetweenPolls;
171171

172-
private Properties consumerProperties = new Properties();
173-
174172
/**
175173
* Create properties for a container that will subscribe to the specified topics.
176174
* @param topics the topics.
@@ -295,7 +293,7 @@ public void setShutdownTimeout(long shutdownTimeout) {
295293
* <ul>
296294
* <li>this property</li>
297295
* <li>{@code ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG} in
298-
* {@link #setConsumerProperties(Properties)}</li>
296+
* {@link #setKafkaConsumerProperties(Properties)}</li>
299297
* <li>{@code ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG} in the consumer factory
300298
* properties</li>
301299
* <li>60 seconds</li>
@@ -498,9 +496,11 @@ public void setMissingTopicsFatal(boolean missingTopicsFatal) {
498496
* @see org.apache.kafka.clients.consumer.ConsumerConfig
499497
* @see #setGroupId(String)
500498
* @see #setClientId(String)
499+
* @deprecated in favor of {@link #getKafkaConsumerProperties()}.
501500
*/
501+
@Deprecated
502502
public Properties getConsumerProperties() {
503-
return this.consumerProperties;
503+
return getKafkaConsumerProperties();
504504
}
505505

506506
/**
@@ -513,10 +513,11 @@ public Properties getConsumerProperties() {
513513
* @see org.apache.kafka.clients.consumer.ConsumerConfig
514514
* @see #setGroupId(String)
515515
* @see #setClientId(String)
516+
* @deprecated in favor of {@link #setKafkaConsumerProperties(Properties)}.
516517
*/
518+
@Deprecated
517519
public void setConsumerProperties(Properties consumerProperties) {
518-
Assert.notNull(consumerProperties, "'consumerProperties' cannot be null");
519-
this.consumerProperties = consumerProperties;
520+
setKafkaConsumerProperties(consumerProperties);
520521
}
521522

522523
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
534534

535535
@SuppressWarnings(UNCHECKED)
536536
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
537-
Properties consumerProperties = new Properties(this.containerProperties.getConsumerProperties());
537+
Properties consumerProperties = new Properties(this.containerProperties.getKafkaConsumerProperties());
538538
this.autoCommit = determineAutoCommit(consumerProperties);
539539
this.consumer =
540540
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
@@ -676,7 +676,7 @@ private Duration determineSyncCommitTimeout() {
676676
return this.containerProperties.getSyncCommitTimeout();
677677
}
678678
else {
679-
Object timeout = this.containerProperties.getConsumerProperties()
679+
Object timeout = this.containerProperties.getKafkaConsumerProperties()
680680
.get(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
681681
if (timeout == null) {
682682
timeout = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties()

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public void testNoOverridesWhenCreatingConsumer() {
8686
DefaultKafkaConsumerFactory<String, String> target =
8787
new DefaultKafkaConsumerFactory<String, String>(originalConfig) {
8888

89+
@Override
8990
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
9091
configPassedToKafkaConsumer.putAll(configProps);
9192
return null;
@@ -107,6 +108,7 @@ public void testPropertyOverridesWhenCreatingConsumer() {
107108
DefaultKafkaConsumerFactory<String, String> target =
108109
new DefaultKafkaConsumerFactory<String, String>(originalConfig) {
109110

111+
@Override
110112
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
111113
configPassedToKafkaConsumer.putAll(configProps);
112114
return null;
@@ -124,6 +126,7 @@ public void testSuffixOnExistingClientIdWhenCreatingConsumer() {
124126
DefaultKafkaConsumerFactory<String, String> target =
125127
new DefaultKafkaConsumerFactory<String, String>(originalConfig) {
126128

129+
@Override
127130
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
128131
configPassedToKafkaConsumer.putAll(configProps);
129132
return null;
@@ -139,6 +142,7 @@ public void testSuffixWithoutExistingClientIdWhenCreatingConsumer() {
139142
DefaultKafkaConsumerFactory<String, String> target =
140143
new DefaultKafkaConsumerFactory<String, String>(Collections.emptyMap()) {
141144

145+
@Override
142146
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
143147
configPassedToKafkaConsumer.putAll(configProps);
144148
return null;
@@ -155,6 +159,7 @@ public void testPrefixOnExistingClientIdWhenCreatingConsumer() {
155159
DefaultKafkaConsumerFactory<String, String> target =
156160
new DefaultKafkaConsumerFactory<String, String>(originalConfig) {
157161

162+
@Override
158163
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
159164
configPassedToKafkaConsumer.putAll(configProps);
160165
return null;
@@ -170,6 +175,7 @@ public void testPrefixWithoutExistingClientIdWhenCreatingConsumer() {
170175
DefaultKafkaConsumerFactory<String, String> target =
171176
new DefaultKafkaConsumerFactory<String, String>(Collections.emptyMap()) {
172177

178+
@Override
173179
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
174180
configPassedToKafkaConsumer.putAll(configProps);
175181
return null;
@@ -186,6 +192,7 @@ public void testSuffixAndPrefixOnExistingClientIdWhenCreatingConsumer() {
186192
DefaultKafkaConsumerFactory<String, String> target =
187193
new DefaultKafkaConsumerFactory<String, String>(originalConfig) {
188194

195+
@Override
189196
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
190197
configPassedToKafkaConsumer.putAll(configProps);
191198
return null;
@@ -204,6 +211,7 @@ public void testSuffixAndPrefixOnOverridenClientIdWhenCreatingConsumer() {
204211
DefaultKafkaConsumerFactory<String, String> target =
205212
new DefaultKafkaConsumerFactory<String, String>(originalConfig) {
206213

214+
@Override
207215
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
208216
configPassedToKafkaConsumer.putAll(configProps);
209217
return null;
@@ -221,6 +229,7 @@ public void testOverriddenGroupIdWhenCreatingConsumer() {
221229
DefaultKafkaConsumerFactory<String, String> target =
222230
new DefaultKafkaConsumerFactory<String, String>(originalConfig) {
223231

232+
@Override
224233
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
225234
configPassedToKafkaConsumer.putAll(configProps);
226235
return null;
@@ -236,6 +245,7 @@ public void testOverriddenGroupIdWithoutExistingGroupIdWhenCreatingConsumer() {
236245
DefaultKafkaConsumerFactory<String, String> target =
237246
new DefaultKafkaConsumerFactory<String, String>(Collections.emptyMap()) {
238247

248+
@Override
239249
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
240250
configPassedToKafkaConsumer.putAll(configProps);
241251
return null;
@@ -254,6 +264,7 @@ public void testOverriddenGroupIdOnOverriddenPropertyWhenCreatingConsumer() {
254264
DefaultKafkaConsumerFactory<String, String> target =
255265
new DefaultKafkaConsumerFactory<String, String>(originalConfig) {
256266

267+
@Override
257268
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
258269
configPassedToKafkaConsumer.putAll(configProps);
259270
return null;
@@ -263,6 +274,26 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
263274
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("overridden");
264275
}
265276

277+
@Test
278+
public void testOverriddenMaxPollRecordsOnly() {
279+
Map<String, Object> originalConfig = Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "original");
280+
Properties overrides = new Properties();
281+
overrides.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2");
282+
final Map<String, Object> configPassedToKafkaConsumer = new HashMap<>();
283+
DefaultKafkaConsumerFactory<String, String> target =
284+
new DefaultKafkaConsumerFactory<String, String>(originalConfig) {
285+
286+
@Override
287+
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
288+
configPassedToKafkaConsumer.putAll(configProps);
289+
return null;
290+
}
291+
292+
};
293+
target.createConsumer(null, null, null, overrides);
294+
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo("2");
295+
}
296+
266297
@SuppressWarnings("unchecked")
267298
@Test
268299
public void testNestedTxProducerIsCached() throws Exception {

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ protected KafkaConsumer<Integer, String> createKafkaConsumer(String groupId, Str
236236
});
237237
Properties consumerProperties = new Properties();
238238
consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
239-
containerProps.setConsumerProperties(consumerProperties);
239+
containerProps.setKafkaConsumerProperties(consumerProperties);
240240
final CountDownLatch rebalancePartitionsAssignedLatch = new CountDownLatch(2);
241241
final CountDownLatch rebalancePartitionsRevokedLatch = new CountDownLatch(2);
242242
containerProps.setConsumerRebalanceListener(new ConsumerRebalanceListener() {
@@ -571,7 +571,7 @@ public void testListenerException() throws Exception {
571571
});
572572
Properties consumerProperties = new Properties();
573573
consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
574-
containerProps.setConsumerProperties(consumerProperties);
574+
containerProps.setKafkaConsumerProperties(consumerProperties);
575575

576576
ConcurrentMessageListenerContainer<Integer, String> container =
577577
new ConcurrentMessageListenerContainer<>(cf, containerProps);

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,7 @@ public void testRecordAckMock() throws Exception {
575575
ContainerProperties containerProps = new ContainerProperties(topicPartition);
576576
containerProps.setGroupId("grp");
577577
containerProps.setAckMode(AckMode.RECORD);
578+
containerProps.setMissingTopicsFatal(false);
578579
final CountDownLatch latch = new CountDownLatch(2);
579580
MessageListener<Integer, String> messageListener = spy(
580581
new MessageListener<Integer, String>() { // Cannot be lambda: Mockito doesn't mock final classes
@@ -728,6 +729,7 @@ public void testNonResponsiveConsumerEvent() throws Exception {
728729
containerProps.setPollTimeout(10);
729730
containerProps.setMonitorInterval(1);
730731
containerProps.setMessageListener(mock(MessageListener.class));
732+
containerProps.setMissingTopicsFatal(false);
731733
KafkaMessageListenerContainer<Integer, String> container =
732734
new KafkaMessageListenerContainer<>(cf, containerProps);
733735
final CountDownLatch latch = new CountDownLatch(1);
@@ -761,6 +763,7 @@ public void testNonResponsiveConsumerEventNotIssuedWithActiveConsumer() throws E
761763
containerProps.setPollTimeout(100);
762764
containerProps.setMonitorInterval(1);
763765
containerProps.setMessageListener(mock(MessageListener.class));
766+
containerProps.setMissingTopicsFatal(false);
764767
KafkaMessageListenerContainer<Integer, String> container =
765768
new KafkaMessageListenerContainer<>(cf, containerProps);
766769
final AtomicInteger eventCounter = new AtomicInteger();
@@ -1288,7 +1291,7 @@ public ConsumerRecords<Integer, String> poll(Duration timeout) {
12881291
Properties defaultProperties = new Properties();
12891292
defaultProperties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "42");
12901293
Properties consumerProperties = new Properties(defaultProperties);
1291-
container1Props.setConsumerProperties(consumerProperties);
1294+
container1Props.setKafkaConsumerProperties(consumerProperties);
12921295
CountDownLatch stubbingComplete1 = new CountDownLatch(1);
12931296
KafkaMessageListenerContainer<Integer, String> container1 = spyOnContainer(
12941297
new KafkaMessageListenerContainer<>(cf, container1Props), stubbingComplete1);
@@ -1317,7 +1320,7 @@ public ConsumerRecords<Integer, String> poll(Duration timeout) {
13171320
logger.info("defined part: " + message);
13181321
latch2.countDown();
13191322
});
1320-
container2Props.setConsumerProperties(consumerProperties);
1323+
container2Props.setKafkaConsumerProperties(consumerProperties);
13211324
CountDownLatch stubbingComplete2 = new CountDownLatch(1);
13221325
KafkaMessageListenerContainer<Integer, String> container2 = spyOnContainer(
13231326
new KafkaMessageListenerContainer<>(cf, container2Props), stubbingComplete2);
@@ -2124,7 +2127,7 @@ public void testPauseResume() throws Exception {
21242127
containerProps.setMissingTopicsFatal(false);
21252128
Properties consumerProps = new Properties();
21262129
consumerProps.setProperty(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "42000");
2127-
containerProps.setConsumerProperties(consumerProps);
2130+
containerProps.setKafkaConsumerProperties(consumerProps);
21282131
containerProps.setSyncCommitTimeout(Duration.ofSeconds(41)); // wins
21292132
KafkaMessageListenerContainer<Integer, String> container =
21302133
new KafkaMessageListenerContainer<>(cf, containerProps);
@@ -2183,6 +2186,7 @@ public void testInitialSeek() throws Exception {
21832186
containerProps.setAckMode(AckMode.RECORD);
21842187
containerProps.setClientId("clientId");
21852188
containerProps.setMessageListener((MessageListener) r -> { });
2189+
containerProps.setMissingTopicsFatal(false);
21862190
KafkaMessageListenerContainer<Integer, String> container =
21872191
new KafkaMessageListenerContainer<>(cf, containerProps);
21882192
container.start();
@@ -2324,13 +2328,15 @@ public void testAckModeCount() throws Exception {
23242328
containerProps.setAckMode(AckMode.COUNT);
23252329
containerProps.setAckCount(3);
23262330
containerProps.setClientId("clientId");
2331+
containerProps.setMissingTopicsFatal(false);
23272332
AtomicInteger recordCount = new AtomicInteger();
23282333
containerProps.setMessageListener((MessageListener) r -> {
23292334
recordCount.incrementAndGet();
23302335
});
23312336
Properties consumerProps = new Properties();
23322337
consumerProps.setProperty(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "42000"); // wins
2333-
containerProps.setConsumerProperties(consumerProps);
2338+
containerProps.setKafkaConsumerProperties(consumerProps);
2339+
containerProps.setMissingTopicsFatal(false);
23342340
KafkaMessageListenerContainer<Integer, String> container =
23352341
new KafkaMessageListenerContainer<>(cf, containerProps);
23362342
container.start();
@@ -2374,6 +2380,7 @@ public void testCommitErrorHandlerCalled() throws Exception {
23742380
containerProps.setIdleEventInterval(100L);
23752381
containerProps.setMessageListener((MessageListener) r -> {
23762382
});
2383+
containerProps.setMissingTopicsFatal(false);
23772384
KafkaMessageListenerContainer<Integer, String> container =
23782385
new KafkaMessageListenerContainer<>(cf, containerProps);
23792386
final CountDownLatch ehl = new CountDownLatch(1);

0 commit comments

Comments
 (0)