Skip to content

Commit c9f24b0

Browse files
garyrussellartembilan
authored andcommitted
GH-599: Fix initial seek
Fixes #599 Previously, initial seeks using `TopicPartitionInitialOffset`s only worked with a provided `offset`. The `SeekPosition` field was ignored, and only used for subsequent seek operations. `initPartitionsIfNeeded()` now processes both styles of initial offset.
1 parent 12c1ad2 commit c9f24b0

File tree

2 files changed

+76
-5
lines changed

2 files changed

+76
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.List;
2828
import java.util.Map;
2929
import java.util.Map.Entry;
30+
import java.util.Set;
3031
import java.util.concurrent.BlockingQueue;
3132
import java.util.concurrent.ConcurrentHashMap;
3233
import java.util.concurrent.ConcurrentMap;
@@ -420,7 +421,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
420421
this.definedPartitions = new HashMap<>(topicPartitions.size());
421422
for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
422423
this.definedPartitions.put(topicPartition.topicPartition(),
423-
new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent()));
424+
new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent(),
425+
topicPartition.getPosition()));
424426
}
425427
consumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
426428
}
@@ -647,7 +649,12 @@ public void run() {
647649
this.count = 0;
648650
this.last = System.currentTimeMillis();
649651
if (isRunning() && this.definedPartitions != null) {
650-
initPartitionsIfNeeded();
652+
try {
653+
initPartitionsIfNeeded();
654+
}
655+
catch (Exception e) {
656+
this.logger.error("Failed to set initial offsets", e);
657+
}
651658
}
652659
long lastReceive = System.currentTimeMillis();
653660
long lastAlertAt = lastReceive;
@@ -1186,9 +1193,27 @@ private void initPartitionsIfNeeded() {
11861193
/*
11871194
* Note: initial position setting is only supported with explicit topic assignment.
11881195
* When using auto assignment (subscribe), the ConsumerRebalanceListener is not
1189-
* called until we poll() the consumer.
1196+
* called until we poll() the consumer. Users can use a ConsumerAwareRebalanceListener
1197+
* or a ConsumerSeekAware listener in that case.
11901198
*/
1191-
for (Entry<TopicPartition, OffsetMetadata> entry : this.definedPartitions.entrySet()) {
1199+
Map<TopicPartition, OffsetMetadata> partitions = new HashMap<>(this.definedPartitions);
1200+
Set<TopicPartition> beginnings = partitions.entrySet().stream()
1201+
.filter(e -> SeekPosition.BEGINNING.equals(e.getValue().seekPosition))
1202+
.map(e -> e.getKey())
1203+
.collect(Collectors.toSet());
1204+
beginnings.forEach(k -> partitions.remove(k));
1205+
Set<TopicPartition> ends = partitions.entrySet().stream()
1206+
.filter(e -> SeekPosition.END.equals(e.getValue().seekPosition))
1207+
.map(e -> e.getKey())
1208+
.collect(Collectors.toSet());
1209+
ends.forEach(k -> partitions.remove(k));
1210+
if (beginnings.size() > 0) {
1211+
this.consumer.seekToBeginning(beginnings);
1212+
}
1213+
if (ends.size() > 0) {
1214+
this.consumer.seekToEnd(ends);
1215+
}
1216+
for (Entry<TopicPartition, OffsetMetadata> entry : partitions.entrySet()) {
11921217
TopicPartition topicPartition = entry.getKey();
11931218
OffsetMetadata metadata = entry.getValue();
11941219
Long offset = metadata.offset;
@@ -1378,9 +1403,12 @@ private static final class OffsetMetadata {
13781403

13791404
private final boolean relativeToCurrent;
13801405

1381-
OffsetMetadata(Long offset, boolean relativeToCurrent) {
1406+
private final SeekPosition seekPosition;
1407+
1408+
OffsetMetadata(Long offset, boolean relativeToCurrent, SeekPosition seekPosition) {
13821409
this.offset = offset;
13831410
this.relativeToCurrent = relativeToCurrent;
1411+
this.seekPosition = seekPosition;
13841412
}
13851413

13861414
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.Collection;
3636
import java.util.Collections;
3737
import java.util.HashMap;
38+
import java.util.HashSet;
3839
import java.util.List;
3940
import java.util.Map;
4041
import java.util.Map.Entry;
@@ -78,6 +79,7 @@
7879
import org.springframework.kafka.listener.config.ContainerProperties;
7980
import org.springframework.kafka.support.Acknowledgment;
8081
import org.springframework.kafka.support.TopicPartitionInitialOffset;
82+
import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition;
8183
import org.springframework.kafka.support.serializer.JsonDeserializer;
8284
import org.springframework.kafka.support.serializer.JsonSerializer;
8385
import org.springframework.kafka.test.rule.KafkaEmbedded;
@@ -1675,6 +1677,47 @@ public void testPauseResume() throws Exception {
16751677
container.stop();
16761678
}
16771679

1680+
@SuppressWarnings({ "unchecked", "rawtypes" })
1681+
@Test
1682+
public void testInitialSeek() throws Exception {
1683+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
1684+
Consumer<Integer, String> consumer = mock(Consumer.class);
1685+
given(cf.createConsumer(isNull(), eq("clientId"), isNull())).willReturn(consumer);
1686+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
1687+
final CountDownLatch latch = new CountDownLatch(1);
1688+
given(consumer.poll(anyLong())).willAnswer(i -> {
1689+
latch.countDown();
1690+
Thread.sleep(50);
1691+
return emptyRecords;
1692+
});
1693+
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
1694+
new TopicPartitionInitialOffset("foo", 0, SeekPosition.BEGINNING),
1695+
new TopicPartitionInitialOffset("foo", 1, SeekPosition.END),
1696+
new TopicPartitionInitialOffset("foo", 2, 0L),
1697+
new TopicPartitionInitialOffset("foo", 3, Long.MAX_VALUE),
1698+
new TopicPartitionInitialOffset("foo", 4, SeekPosition.BEGINNING),
1699+
new TopicPartitionInitialOffset("foo", 5, SeekPosition.END),
1700+
};
1701+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
1702+
containerProps.setAckMode(AckMode.RECORD);
1703+
containerProps.setClientId("clientId");
1704+
containerProps.setMessageListener((MessageListener) r -> { });
1705+
KafkaMessageListenerContainer<Integer, String> container =
1706+
new KafkaMessageListenerContainer<>(cf, containerProps);
1707+
container.start();
1708+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
1709+
ArgumentCaptor<Collection<TopicPartition>> captor = ArgumentCaptor.forClass(List.class);
1710+
verify(consumer).seekToBeginning(captor.capture());
1711+
assertThat(captor.getValue()
1712+
.equals(new HashSet<>(Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("foo", 4)))));
1713+
verify(consumer).seekToEnd(captor.capture());
1714+
assertThat(captor.getValue()
1715+
.equals(new HashSet<>(Arrays.asList(new TopicPartition("foo", 1), new TopicPartition("foo", 5)))));
1716+
verify(consumer).seek(new TopicPartition("foo", 2), 0L);
1717+
verify(consumer).seek(new TopicPartition("foo", 3), Long.MAX_VALUE);
1718+
container.stop();
1719+
}
1720+
16781721
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
16791722
Consumer<?, ?> consumer = spy(
16801723
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class));

0 commit comments

Comments
 (0)