Skip to content

Commit cb8cc1a

Browse files
garyrussellartembilan
authored andcommitted
Add seek helper abstract class
- also handle late arriving time stamp seek requests while `processSeeks()` is already running. * * Properly clean up callbacks for revoked partitions * clear() wiped them for all threads * Also remove the TL when revoked * * Use correct callback with onIdleContainer() * Add test/doc for external seek calls
1 parent 56ab074 commit cb8cc1a

File tree

8 files changed

+283
-11
lines changed

8 files changed

+283
-11
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.Collection;
20+
import java.util.Collections;
21+
import java.util.Map;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
24+
import org.apache.kafka.common.TopicPartition;
25+
26+
import org.springframework.lang.Nullable;
27+
28+
/**
29+
* Manages the {@link ConsumerSeekAware.ConsumerSeekCallback} s for the listener. If the
30+
* listener subclasses this class, it can easily seek arbitrary topics/partitions without
31+
* having to keep track of the callbacks itself.
32+
*
33+
* @author Gary Russell
34+
* @since 2.3
35+
*
36+
*/
37+
public abstract class AbstractConsumerSeekAware implements ConsumerSeekAware {
38+
39+
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
40+
41+
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
42+
43+
@Override
44+
public void registerSeekCallback(ConsumerSeekCallback callback) {
45+
this.callbackForThread.set(callback);
46+
}
47+
48+
@Override
49+
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
50+
ConsumerSeekCallback threadCallback = this.callbackForThread.get();
51+
if (threadCallback != null) {
52+
assignments.keySet().forEach(tp -> this.callbacks.put(tp, threadCallback));
53+
}
54+
}
55+
56+
@Override
57+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
58+
partitions.forEach(tp -> this.callbacks.remove(tp));
59+
this.callbackForThread.remove();
60+
}
61+
62+
/**
63+
* Return the callback for the specified topic/partition.
64+
* @param topicPartition the topic/partition.
65+
* @return the callback (or null if there is no assignment).
66+
*/
67+
@Nullable
68+
protected ConsumerSeekCallback getSeekCallbackFor(TopicPartition topicPartition) {
69+
return this.callbacks.get(topicPartition);
70+
}
71+
72+
/**
73+
* The map of callbacks for all currently assigned partitions.
74+
* @return the map.
75+
*/
76+
protected Map<TopicPartition, ConsumerSeekCallback> getSeekCallbacks() {
77+
return Collections.unmodifiableMap(this.callbacks);
78+
}
79+
80+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,17 @@ default void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consume
5151
// do nothing
5252
}
5353

54+
/**
55+
* When using group management, called when partition assignments are revoked.
56+
* Listeners should discard any callback saved from
57+
* {@link #registerSeekCallback(ConsumerSeekCallback)} on this thread.
58+
* @param partitions the partitions that have been revoked.
59+
* @since 2.3
60+
*/
61+
default void onPartitionsRevoked(Collection<TopicPartition> partitions) {
62+
// do nothing
63+
}
64+
5465
/**
5566
* If the container is configured to emit idle container events, this method is called
5667
* when the container idle event is emitted - allowing a seek operation.

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,7 @@ protected ErrorHandler determineErrorHandler(GenericErrorHandler<?> errHandler)
694694
}
695695

696696
private void seekPartitions(Collection<TopicPartition> partitions, boolean idle) {
697+
((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
697698
Map<TopicPartition, Long> current = new HashMap<>();
698699
for (TopicPartition topicPartition : partitions) {
699700
current.put(topicPartition, ListenerConsumer.this.consumer.position(topicPartition));
@@ -847,9 +848,10 @@ private void checkIdle() {
847848
}
848849
}
849850

850-
public void wrapUp() {
851+
private void wrapUp() {
851852
KafkaUtils.clearConsumerGroupId();
852853
publishConsumerStoppingEvent(this.consumer);
854+
Collection<TopicPartition> partitions = getAssignedPartitions();
853855
if (!this.fatalError) {
854856
if (this.kafkaTxManager == null) {
855857
commitPendingAcks();
@@ -861,7 +863,7 @@ public void wrapUp() {
861863
}
862864
}
863865
else {
864-
closeProducers(getAssignedPartitions());
866+
closeProducers(partitions);
865867
}
866868
}
867869
else {
@@ -877,6 +879,9 @@ public void wrapUp() {
877879
if (this.errorHandler != null) {
878880
this.errorHandler.clearThreadState();
879881
}
882+
if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
883+
((ConsumerSeekAware) ListenerConsumer.this.genericListener).onPartitionsRevoked(partitions);
884+
}
880885
this.logger.info(() -> getGroupId() + ": Consumer stopped");
881886
publishConsumerStoppedEvent();
882887
}
@@ -1475,6 +1480,7 @@ private void processSeeks() {
14751480
if (position == null) {
14761481
if (offset.isRelativeToCurrent()) {
14771482
whereTo += this.consumer.position(offset.getTopicPartition());
1483+
whereTo = Math.max(whereTo, 0);
14781484
}
14791485
this.consumer.seek(offset.getTopicPartition(), whereTo);
14801486
}
@@ -1484,6 +1490,13 @@ else if (position.equals(SeekPosition.BEGINNING)) {
14841490
this.consumer.seek(offset.getTopicPartition(), whereTo);
14851491
}
14861492
}
1493+
else if (position.equals(SeekPosition.TIMESTAMP)) {
1494+
// possible late addition since the grouped processing above
1495+
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer
1496+
.offsetsForTimes(
1497+
Collections.singletonMap(offset.getTopicPartition(), offset.getOffset()));
1498+
offsetsForTimes.forEach((tp, ot) -> this.consumer.seek(tp, ot.offset()));
1499+
}
14871500
else {
14881501
this.consumer.seekToEnd(Collections.singletonList(offset.getTopicPartition()));
14891502
if (whereTo != null) {
@@ -1511,6 +1524,7 @@ private void processTimestampSeeks() {
15111524
}
15121525
timestampSeeks.put(tpo.getTopicPartition(), tpo.getOffset());
15131526
seekIterator.remove();
1527+
traceSeek(tpo);
15141528
}
15151529
}
15161530
if (timestampSeeks != null) {
@@ -1783,6 +1797,9 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
17831797
this.consumerAwareListener.onPartitionsRevokedAfterCommit(ListenerConsumer.this.consumer,
17841798
partitions);
17851799
}
1800+
if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
1801+
((ConsumerSeekAware) ListenerConsumer.this.genericListener).onPartitionsRevoked(partitions);
1802+
}
17861803
}
17871804
finally {
17881805
if (ListenerConsumer.this.kafkaTxManager != null) {
@@ -1972,7 +1989,8 @@ private Long computeBackwardWhereTo(long offset, boolean toCurrent, TopicPartiti
19721989
end = consumerToSeek.position(topicPart);
19731990
}
19741991
if (end != null) {
1975-
return end + offset;
1992+
long newOffset = end + offset;
1993+
return newOffset < 0 ? 0 : newOffset;
19761994
}
19771995
return null;
19781996
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractDelegatingMessageListenerAdapter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.listener.adapter;
1818

19+
import java.util.Collection;
1920
import java.util.Map;
2021

2122
import org.apache.commons.logging.LogFactory;
@@ -77,6 +78,13 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
7778
}
7879
}
7980

81+
@Override
82+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
83+
if (this.seekAware != null) {
84+
this.seekAware.onPartitionsRevoked(partitions);
85+
}
86+
}
87+
8088
@Override
8189
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
8290
if (this.seekAware != null) {

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,13 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
253253
}
254254
}
255255

256+
@Override
257+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
258+
if (this.bean instanceof ConsumerSeekAware) {
259+
((ConsumerSeekAware) this.bean).onPartitionsRevoked(partitions);
260+
}
261+
}
262+
256263
@Override
257264
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
258265
if (this.bean instanceof ConsumerSeekAware) {

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import java.util.Iterator;
3333
import java.util.List;
3434
import java.util.Map;
35+
import java.util.Set;
36+
import java.util.concurrent.ConcurrentHashMap;
3537
import java.util.concurrent.CountDownLatch;
3638
import java.util.concurrent.TimeUnit;
3739
import java.util.concurrent.atomic.AtomicBoolean;
@@ -76,6 +78,7 @@
7678
import org.springframework.kafka.core.KafkaTemplate;
7779
import org.springframework.kafka.core.ProducerFactory;
7880
import org.springframework.kafka.event.ListenerContainerIdleEvent;
81+
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
7982
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
8083
import org.springframework.kafka.listener.ConsumerAwareErrorHandler;
8184
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
@@ -152,7 +155,7 @@ public class EnableKafkaIntegrationTests {
152155
"annotated22reply", "annotated23", "annotated23reply", "annotated24", "annotated24reply",
153156
"annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28",
154157
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
155-
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart");
158+
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle");
156159

157160
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
158161

@@ -201,6 +204,9 @@ public class EnableKafkaIntegrationTests {
201204
@Autowired
202205
private ConcurrentKafkaListenerContainerFactory<Integer, String> transactionalFactory;
203206

207+
@Autowired
208+
private SeekToLastOnIdleListener seekOnIdleListener;
209+
204210
@Test
205211
public void testAnonymous() {
206212
MessageListenerContainer container = this.registry
@@ -757,6 +763,24 @@ public void testProjection() throws InterruptedException {
757763
assertThat(this.listener.username).isEqualTo("SomeUsername");
758764
}
759765

766+
@SuppressWarnings("unchecked")
767+
@Test
768+
public void testSeekToLastOnIdle() throws InterruptedException {
769+
this.registry.getListenerContainer("seekOnIdle").start();
770+
this.seekOnIdleListener.waitForBalancedAssignment();
771+
this.template.send("seekOnIdle", 0, 0, "foo");
772+
this.template.send("seekOnIdle", 1, 1, "bar");
773+
assertThat(this.seekOnIdleListener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
774+
assertThat(this.seekOnIdleListener.latch2.getCount()).isEqualTo(2L);
775+
this.seekOnIdleListener.rewindAllOneRecord();
776+
assertThat(this.seekOnIdleListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
777+
assertThat(this.seekOnIdleListener.latch3.getCount()).isEqualTo(1L);
778+
this.seekOnIdleListener.rewindOnePartitionOneRecord("seekOnIdle", 1);
779+
assertThat(this.seekOnIdleListener.latch3.await(10, TimeUnit.SECONDS)).isTrue();
780+
this.registry.getListenerContainer("seekOnIdle").stop();
781+
assertThat(KafkaTestUtils.getPropertyValue(this.seekOnIdleListener, "callbacks", Map.class)).hasSize(0);
782+
}
783+
760784
@Configuration
761785
@EnableKafka
762786
@EnableTransactionManagement(proxyTargetClass = true)
@@ -933,6 +957,7 @@ public KafkaListenerContainerFactory<?> batchSpyFactory() {
933957
factory.setRecordFilterStrategy(recordFilter());
934958
// always send to the same partition so the replies are in order for the test
935959
factory.setReplyTemplate(partitionZeroReplyingTemplate());
960+
factory.setMissingTopicsFatal(false);
936961
return factory;
937962
}
938963

@@ -968,6 +993,7 @@ public KafkaListenerContainerFactory<?> batchManualFactory2() {
968993
ContainerProperties props = factory.getContainerProperties();
969994
props.setAckMode(AckMode.MANUAL_IMMEDIATE);
970995
props.setIdleEventInterval(100L);
996+
props.setPollTimeout(50L);
971997
factory.setRecordFilterStrategy(manualFilter());
972998
factory.setAckDiscarded(true);
973999
factory.setRetryTemplate(new RetryTemplate());
@@ -1050,6 +1076,11 @@ public Listener listener() {
10501076
return new Listener();
10511077
}
10521078

1079+
@Bean
1080+
public SeekToLastOnIdleListener seekOnIdle() {
1081+
return new SeekToLastOnIdleListener();
1082+
}
1083+
10531084
@Bean
10541085
public IfaceListener<String> ifaceListener() {
10551086
return new IfaceListenerImpl();
@@ -1698,6 +1729,68 @@ public void registerSeekCallback(ConsumerSeekCallback callback) {
16981729

16991730
}
17001731

1732+
public static class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
1733+
1734+
private final CountDownLatch latch1 = new CountDownLatch(10);
1735+
1736+
private final CountDownLatch latch2 = new CountDownLatch(12);
1737+
1738+
private final CountDownLatch latch3 = new CountDownLatch(13);
1739+
1740+
private final Set<Thread> consumerThreads = ConcurrentHashMap.newKeySet();
1741+
1742+
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle", autoStartup = "false", concurrency = "2",
1743+
clientIdPrefix = "seekOnIdle", containerFactory = "kafkaManualAckListenerContainerFactory")
1744+
public void listen(@SuppressWarnings("unused") String in, Acknowledgment ack) {
1745+
this.latch1.countDown();
1746+
this.latch2.countDown();
1747+
this.latch3.countDown();
1748+
ack.acknowledge();
1749+
}
1750+
1751+
@Override
1752+
public void onIdleContainer(Map<org.apache.kafka.common.TopicPartition, Long> assignments,
1753+
ConsumerSeekCallback callback) {
1754+
1755+
if (this.latch1.getCount() > 0) {
1756+
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
1757+
}
1758+
}
1759+
1760+
public void rewindAllOneRecord() {
1761+
getSeekCallbacks()
1762+
.forEach((tp, callback) ->
1763+
callback.seekRelative(tp.topic(), tp.partition(), -1, true));
1764+
}
1765+
1766+
public void rewindOnePartitionOneRecord(String topic, int partition) {
1767+
getSeekCallbackFor(new org.apache.kafka.common.TopicPartition(topic, partition))
1768+
.seekRelative(topic, partition, -1, true);
1769+
}
1770+
1771+
@Override
1772+
public synchronized void onPartitionsAssigned(Map<org.apache.kafka.common.TopicPartition, Long> assignments,
1773+
ConsumerSeekCallback callback) {
1774+
1775+
super.onPartitionsAssigned(assignments, callback);
1776+
if (assignments.size() > 0) {
1777+
this.consumerThreads.add(Thread.currentThread());
1778+
notifyAll();
1779+
}
1780+
}
1781+
1782+
public synchronized void waitForBalancedAssignment() throws InterruptedException {
1783+
int n = 0;
1784+
while (this.consumerThreads.size() < 2) {
1785+
wait(1000);
1786+
if (n++ > 20) {
1787+
throw new IllegalStateException("Balanced distribution did not occur");
1788+
}
1789+
}
1790+
}
1791+
1792+
}
1793+
17011794
interface IfaceListener<T> {
17021795

17031796
void listen(T foo);

0 commit comments

Comments
 (0)