Skip to content

Commit 2a05612

Browse files
garyrussellartembilan
authored andcommitted
GH-1158: Fix TopicPartitionInitialOff Deprecation
Fixes #1158 Revert "GH-1158: Deprecate TopicPartitionInitialOffset" Previous commit caused a breaking API change. Polish streams; move utility conversion methods to deprecated class Switch to varargs in container factory Code style polishing
1 parent a68d508 commit 2a05612

13 files changed

+405
-119
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 54 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -217,10 +217,10 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
217217
public void setReplyTemplate(KafkaTemplate<?, ?> replyTemplate) {
218218
if (replyTemplate instanceof ReplyingKafkaOperations) {
219219
this.logger.warn(
220-
"The 'replyTemplate' should not be an implementation of 'ReplyingKafkaOperations'; "
221-
+ "such implementations are for client-side request/reply operations; here we "
222-
+ "are simply sending a reply to an incoming request so the reply container will "
223-
+ "never be used and will consume unnecessary resources.");
220+
"The 'replyTemplate' should not be an implementation of 'ReplyingKafkaOperations'; "
221+
+ "such implementations are for client-side request/reply operations; here we "
222+
+ "are simply sending a reply to an incoming request so the reply container will "
223+
+ "never be used and will consume unnecessary resources.");
224224
}
225225
this.replyTemplate = replyTemplate;
226226
}
@@ -314,7 +314,7 @@ public void afterPropertiesSet() {
314314
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
315315
C instance = createContainerInstance(endpoint);
316316
JavaUtils.INSTANCE
317-
.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
317+
.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
318318
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
319319
configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
320320
}
@@ -327,14 +327,14 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
327327

328328
private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint) {
329329
JavaUtils.INSTANCE
330-
.acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy)
331-
.acceptIfNotNull(this.ackDiscarded, aklEndpoint::setAckDiscarded)
332-
.acceptIfNotNull(this.retryTemplate, aklEndpoint::setRetryTemplate)
333-
.acceptIfNotNull(this.recoveryCallback, aklEndpoint::setRecoveryCallback)
334-
.acceptIfNotNull(this.statefulRetry, aklEndpoint::setStatefulRetry)
335-
.acceptIfNotNull(this.batchListener, aklEndpoint::setBatchListener)
336-
.acceptIfNotNull(this.replyTemplate, aklEndpoint::setReplyTemplate)
337-
.acceptIfNotNull(this.replyHeadersConfigurer, aklEndpoint::setReplyHeadersConfigurer);
330+
.acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy)
331+
.acceptIfNotNull(this.ackDiscarded, aklEndpoint::setAckDiscarded)
332+
.acceptIfNotNull(this.retryTemplate, aklEndpoint::setRetryTemplate)
333+
.acceptIfNotNull(this.recoveryCallback, aklEndpoint::setRecoveryCallback)
334+
.acceptIfNotNull(this.statefulRetry, aklEndpoint::setStatefulRetry)
335+
.acceptIfNotNull(this.batchListener, aklEndpoint::setBatchListener)
336+
.acceptIfNotNull(this.replyTemplate, aklEndpoint::setReplyTemplate)
337+
.acceptIfNotNull(this.replyHeadersConfigurer, aklEndpoint::setReplyHeadersConfigurer);
338338
}
339339

340340
/**
@@ -356,13 +356,13 @@ protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
356356
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
357357
"messageListener", "ackCount", "ackTime");
358358
JavaUtils.INSTANCE
359-
.acceptIfNotNull(this.afterRollbackProcessor, instance::setAfterRollbackProcessor)
360-
.acceptIfCondition(this.containerProperties.getAckCount() > 0, this.containerProperties.getAckCount(),
361-
properties::setAckCount)
362-
.acceptIfCondition(this.containerProperties.getAckTime() > 0, this.containerProperties.getAckTime(),
363-
properties::setAckTime)
364-
.acceptIfNotNull(this.errorHandler, instance::setGenericErrorHandler)
365-
.acceptIfNotNull(this.missingTopicsFatal, instance.getContainerProperties()::setMissingTopicsFatal);
359+
.acceptIfNotNull(this.afterRollbackProcessor, instance::setAfterRollbackProcessor)
360+
.acceptIfCondition(this.containerProperties.getAckCount() > 0, this.containerProperties.getAckCount(),
361+
properties::setAckCount)
362+
.acceptIfCondition(this.containerProperties.getAckTime() > 0, this.containerProperties.getAckTime(),
363+
properties::setAckTime)
364+
.acceptIfNotNull(this.errorHandler, instance::setGenericErrorHandler)
365+
.acceptIfNotNull(this.missingTopicsFatal, instance.getContainerProperties()::setMissingTopicsFatal);
366366
if (endpoint.getAutoStartup() != null) {
367367
instance.setAutoStartup(endpoint.getAutoStartup());
368368
}
@@ -371,54 +371,66 @@ else if (this.autoStartup != null) {
371371
}
372372
instance.setRecordInterceptor(this.recordInterceptor);
373373
JavaUtils.INSTANCE
374-
.acceptIfNotNull(this.phase, instance::setPhase)
375-
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
376-
.acceptIfNotNull(endpoint.getGroupId(), instance.getContainerProperties()::setGroupId)
377-
.acceptIfNotNull(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId)
374+
.acceptIfNotNull(this.phase, instance::setPhase)
375+
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
376+
.acceptIfNotNull(endpoint.getGroupId(), instance.getContainerProperties()::setGroupId)
377+
.acceptIfNotNull(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId)
378378
.acceptIfNotNull(endpoint.getConsumerProperties(),
379379
instance.getContainerProperties()::setConsumerProperties);
380380
}
381381

382+
/**
383+
* {@inheritDoc}
384+
* @deprecated in favor of {@link #createContainer(TopicPartitionOffset[])}
385+
*/
386+
@Deprecated
387+
@Override
388+
public C createContainer(Collection<org.springframework.kafka.support.TopicPartitionInitialOffset> topicPartitions) {
389+
return createContainer(topicPartitions.stream()
390+
.map(org.springframework.kafka.support.TopicPartitionInitialOffset::toTPO)
391+
.toArray(TopicPartitionOffset[]::new));
392+
}
393+
382394
@Override
383-
public C createContainer(final Collection<TopicPartitionOffset> topicPartitions) {
395+
public C createContainer(TopicPartitionOffset... topicsAndPartitions) {
384396
KafkaListenerEndpoint endpoint = new KafkaListenerEndpointAdapter() {
385397

386-
@Override
387-
public Collection<TopicPartitionOffset> getTopicPartitions() {
388-
return topicPartitions;
389-
}
398+
@Override
399+
public TopicPartitionOffset[] getTopicPartitionsToAssign() {
400+
return Arrays.copyOf(topicsAndPartitions, topicsAndPartitions.length);
401+
}
390402

391-
};
403+
};
392404
C container = createContainerInstance(endpoint);
393405
initializeContainer(container, endpoint);
394406
return container;
395407
}
396408

397409
@Override
398-
public C createContainer(final String... topics) {
410+
public C createContainer(String... topics) {
399411
KafkaListenerEndpoint endpoint = new KafkaListenerEndpointAdapter() {
400412

401-
@Override
402-
public Collection<String> getTopics() {
403-
return Arrays.asList(topics);
404-
}
413+
@Override
414+
public Collection<String> getTopics() {
415+
return Arrays.asList(topics);
416+
}
405417

406-
};
418+
};
407419
C container = createContainerInstance(endpoint);
408420
initializeContainer(container, endpoint);
409421
return container;
410422
}
411423

412424
@Override
413-
public C createContainer(final Pattern topicPattern) {
425+
public C createContainer(Pattern topicPattern) {
414426
KafkaListenerEndpoint endpoint = new KafkaListenerEndpointAdapter() {
415427

416-
@Override
417-
public Pattern getTopicPattern() {
418-
return topicPattern;
419-
}
428+
@Override
429+
public Pattern getTopicPattern() {
430+
return topicPattern;
431+
}
420432

421-
};
433+
};
422434
C container = createContainerInstance(endpoint);
423435
initializeContainer(container, endpoint);
424436
return container;

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.springframework.retry.RecoveryCallback;
5252
import org.springframework.retry.support.RetryTemplate;
5353
import org.springframework.util.Assert;
54+
import org.springframework.util.ObjectUtils;
5455

5556
/**
5657
* Base model for a Kafka listener endpoint.
@@ -190,6 +191,25 @@ public Collection<String> getTopics() {
190191
* Either this or 'topic' or 'topicPattern'
191192
* should be provided, but not a mixture.
192193
* @param topicPartitions to set.
194+
* @deprecated in favor of {@link #setTopicPartitions(TopicPartitionOffset...)}.
195+
* @see #setTopics(String...)
196+
* @see #setTopicPattern(Pattern)
197+
*/
198+
@Deprecated
199+
public void setTopicPartitions(org.springframework.kafka.support.TopicPartitionInitialOffset... topicPartitions) {
200+
Assert.notNull(topicPartitions, "'topics' must not be null");
201+
this.topicPartitions.clear();
202+
Arrays.stream(topicPartitions)
203+
.map(org.springframework.kafka.support.TopicPartitionInitialOffset::toTPO)
204+
.forEach(this.topicPartitions::add);
205+
}
206+
207+
/**
208+
* Set the topicPartitions to use.
209+
* Either this or 'topic' or 'topicPattern'
210+
* should be provided, but not a mixture.
211+
* @param topicPartitions to set.
212+
* @since 2.3
193213
* @see #setTopics(String...)
194214
* @see #setTopicPattern(Pattern)
195215
*/
@@ -202,10 +222,11 @@ public void setTopicPartitions(TopicPartitionOffset... topicPartitions) {
202222
/**
203223
* Return the topicPartitions for this endpoint.
204224
* @return the topicPartitions for this endpoint.
225+
* @since 2.3
205226
*/
206227
@Override
207-
public Collection<TopicPartitionOffset> getTopicPartitions() {
208-
return Collections.unmodifiableCollection(this.topicPartitions);
228+
public TopicPartitionOffset[] getTopicPartitionsToAssign() {
229+
return this.topicPartitions.toArray(new TopicPartitionOffset[0]);
209230
}
210231

211232
/**
@@ -414,7 +435,7 @@ public void setConsumerProperties(Properties consumerProperties) {
414435
@Override
415436
public void afterPropertiesSet() {
416437
boolean topicsEmpty = getTopics().isEmpty();
417-
boolean topicPartitionsEmpty = getTopicPartitions().isEmpty();
438+
boolean topicPartitionsEmpty = ObjectUtils.isEmpty(getTopicPartitionsToAssign());
418439
if (!topicsEmpty && !topicPartitionsEmpty) {
419440
throw new IllegalStateException("Topics or topicPartitions must be provided but not both for " + this);
420441
}
@@ -459,11 +480,11 @@ private void setupMessageListener(MessageListenerContainer container, MessageCon
459480
if (this.batchListener) {
460481
if (((MessagingMessageListenerAdapter<K, V>) messageListener).isConsumerRecords()) {
461482
this.logger.warn(() -> "Filter strategy ignored when consuming 'ConsumerRecords'"
462-
+ (this.id != null ? " id: " + this.id : ""));
483+
+ (this.id != null ? " id: " + this.id : ""));
463484
}
464485
else {
465486
messageListener = new FilteringBatchMessageListenerAdapter<>(
466-
(BatchMessageListener<K, V>) messageListener, this.recordFilterStrategy, this.ackDiscarded);
487+
(BatchMessageListener<K, V>) messageListener, this.recordFilterStrategy, this.ackDiscarded);
467488
}
468489
}
469490
else {

spring-kafka/src/main/java/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,27 +59,28 @@ public void setConcurrency(Integer concurrency) {
5959

6060
@Override
6161
protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
62-
Collection<TopicPartitionOffset> topicPartitions = endpoint.getTopicPartitions();
63-
if (!topicPartitions.isEmpty()) {
64-
ContainerProperties properties = new ContainerProperties(
65-
topicPartitions.toArray(new TopicPartitionOffset[0]));
66-
return new ConcurrentMessageListenerContainer<K, V>(getConsumerFactory(), properties);
62+
TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
63+
if (topicPartitions != null && topicPartitions.length > 0) {
64+
ContainerProperties properties = new ContainerProperties(topicPartitions);
65+
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
6766
}
6867
else {
6968
Collection<String> topics = endpoint.getTopics();
7069
if (!topics.isEmpty()) {
7170
ContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));
72-
return new ConcurrentMessageListenerContainer<K, V>(getConsumerFactory(), properties);
71+
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
7372
}
7473
else {
7574
ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern());
76-
return new ConcurrentMessageListenerContainer<K, V>(getConsumerFactory(), properties);
75+
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
7776
}
7877
}
7978
}
8079

8180
@Override
82-
protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance, KafkaListenerEndpoint endpoint) {
81+
protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance,
82+
KafkaListenerEndpoint endpoint) {
83+
8384
super.initializeContainer(instance, endpoint);
8485
if (endpoint.getConcurrency() != null) {
8586
instance.setConcurrency(endpoint.getConcurrency());

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,23 @@ public interface KafkaListenerContainerFactory<C extends MessageListenerContaine
4646
* Create and configure a container without a listener; used to create containers that
4747
* are not used for KafkaListener annotations. Containers created using this method
4848
* are not added to the listener endpoint registry.
49-
* @param topicPartitions the topicPartitions.
49+
* @param topicPartitions the topicPartitions to assign.
50+
* @deprecated in favor of {@link #createContainer(TopicPartitionOffset[])}.
5051
* @return the container.
5152
* @since 2.2
5253
*/
53-
C createContainer(Collection<TopicPartitionOffset> topicPartitions);
54+
@Deprecated
55+
C createContainer(Collection<org.springframework.kafka.support.TopicPartitionInitialOffset> topicPartitions);
56+
57+
/**
58+
* Create and configure a container without a listener; used to create containers that
59+
* are not used for KafkaListener annotations. Containers created using this method
60+
* are not added to the listener endpoint registry.
61+
* @param topicPartitions the topicPartitions to assign.
62+
* @return the container.
63+
* @since 2.3
64+
*/
65+
C createContainer(TopicPartitionOffset... topicPartitions);
5466

5567
/**
5668
* Create and configure a container without a listener; used to create containers that

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616

1717
package org.springframework.kafka.config;
1818

19+
import java.util.Arrays;
1920
import java.util.Collection;
21+
import java.util.Collections;
2022
import java.util.Properties;
2123
import java.util.regex.Pattern;
24+
import java.util.stream.Collectors;
2225

2326
import org.springframework.kafka.listener.MessageListenerContainer;
2427
import org.springframework.kafka.support.TopicPartitionOffset;
@@ -67,8 +70,21 @@ public interface KafkaListenerEndpoint {
6770
/**
6871
* Return the topicPartitions for this endpoint.
6972
* @return the topicPartitions for this endpoint.
73+
* @deprecated in favor of {@link #getTopicPartitionsToAssign()}.
7074
*/
71-
Collection<TopicPartitionOffset> getTopicPartitions();
75+
@Deprecated
76+
default Collection<org.springframework.kafka.support.TopicPartitionInitialOffset> getTopicPartitions() {
77+
return Arrays.stream(getTopicPartitionsToAssign())
78+
.map(org.springframework.kafka.support.TopicPartitionInitialOffset::fromTPO)
79+
.collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
80+
}
81+
82+
/**
83+
* Return the topicPartitions for this endpoint.
84+
* @return the topicPartitions for this endpoint.
85+
* @since 2.3
86+
*/
87+
TopicPartitionOffset[] getTopicPartitionsToAssign();
7288

7389
/**
7490
* Return the topicPattern for this endpoint.

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ public Collection<String> getTopics() {
5858
}
5959

6060
@Override
61-
public Collection<TopicPartitionOffset> getTopicPartitions() {
62-
return Collections.emptyList();
61+
public TopicPartitionOffset[] getTopicPartitionsToAssign() {
62+
return new TopicPartitionOffset[0];
6363
}
6464

6565
@Override

0 commit comments

Comments
 (0)