Skip to content

Commit a68d508

Browse files
garyrussellartembilan
authored andcommitted
KMLC: Avoid many casts to ConsumerSeekAware
1 parent cb8cc1a commit a68d508

File tree

1 file changed

+18
-11
lines changed

1 file changed

+18
-11
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
403403

404404
private final GenericMessageListener<?> genericListener;
405405

406+
private final ConsumerSeekAware consumerSeekAwareListener;
407+
406408
private final MessageListener<K, V> listener;
407409

408410
private final BatchMessageListener<K, V> batchListener;
@@ -506,6 +508,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
506508

507509
this.transactionTemplate = determineTransactionTemplate();
508510
this.genericListener = listener;
511+
this.consumerSeekAwareListener = checkConsumerSeekAware(listener);
509512
subscribeOrAssignTopics(this.consumer);
510513
GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();
511514
if (listener instanceof BatchMessageListener) {
@@ -565,6 +568,11 @@ else if (listener instanceof MessageListener) {
565568
}
566569
}
567570

571+
@Nullable
572+
private ConsumerSeekAware checkConsumerSeekAware(GenericMessageListener<?> candidate) {
573+
return candidate instanceof ConsumerSeekAware ? (ConsumerSeekAware) candidate : null;
574+
}
575+
568576
boolean isConsumerPaused() {
569577
return this.consumerPaused;
570578
}
@@ -694,17 +702,16 @@ protected ErrorHandler determineErrorHandler(GenericErrorHandler<?> errHandler)
694702
}
695703

696704
private void seekPartitions(Collection<TopicPartition> partitions, boolean idle) {
697-
((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
705+
this.consumerSeekAwareListener.registerSeekCallback(this);
698706
Map<TopicPartition, Long> current = new HashMap<>();
699707
for (TopicPartition topicPartition : partitions) {
700708
current.put(topicPartition, ListenerConsumer.this.consumer.position(topicPartition));
701709
}
702710
if (idle) {
703-
((ConsumerSeekAware) ListenerConsumer.this.genericListener).onIdleContainer(current, this.seekCallback);
711+
this.consumerSeekAwareListener.onIdleContainer(current, this.seekCallback);
704712
}
705713
else {
706-
((ConsumerSeekAware) ListenerConsumer.this.genericListener).onPartitionsAssigned(current,
707-
this.seekCallback);
714+
this.consumerSeekAwareListener.onPartitionsAssigned(current, this.seekCallback);
708715
}
709716
}
710717

@@ -729,8 +736,8 @@ public boolean isLongLived() {
729736
@Override
730737
public void run() {
731738
this.consumerThread = Thread.currentThread();
732-
if (this.genericListener instanceof ConsumerSeekAware) {
733-
((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
739+
if (this.consumerSeekAwareListener != null) {
740+
this.consumerSeekAwareListener.registerSeekCallback(this);
734741
}
735742
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
736743
this.count = 0;
@@ -838,7 +845,7 @@ private void checkIdle() {
838845
publishIdleContainerEvent(now - this.lastReceive, this.isConsumerAwareListener
839846
? this.consumer : null, this.consumerPaused);
840847
this.lastAlertAt = now;
841-
if (this.genericListener instanceof ConsumerSeekAware) {
848+
if (this.consumerSeekAwareListener != null) {
842849
Collection<TopicPartition> partitions = getAssignedPartitions();
843850
if (partitions != null) {
844851
seekPartitions(partitions, true);
@@ -879,8 +886,8 @@ private void wrapUp() {
879886
if (this.errorHandler != null) {
880887
this.errorHandler.clearThreadState();
881888
}
882-
if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
883-
((ConsumerSeekAware) ListenerConsumer.this.genericListener).onPartitionsRevoked(partitions);
889+
if (this.consumerSeekAwareListener != null) {
890+
this.consumerSeekAwareListener.onPartitionsRevoked(partitions);
884891
}
885892
this.logger.info(() -> getGroupId() + ": Consumer stopped");
886893
publishConsumerStoppedEvent();
@@ -1797,8 +1804,8 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
17971804
this.consumerAwareListener.onPartitionsRevokedAfterCommit(ListenerConsumer.this.consumer,
17981805
partitions);
17991806
}
1800-
if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
1801-
((ConsumerSeekAware) ListenerConsumer.this.genericListener).onPartitionsRevoked(partitions);
1807+
if (ListenerConsumer.this.consumerSeekAwareListener != null) {
1808+
ListenerConsumer.this.consumerSeekAwareListener.onPartitionsRevoked(partitions);
18021809
}
18031810
}
18041811
finally {

0 commit comments

Comments
 (0)