Skip to content

Commit 15a393b

Browse files
committed
Add ability to start reading from a custom offset in KafkaItemReader
Issue #737
1 parent 5fa821f commit 15a393b

File tree

2 files changed

+109
-5
lines changed

2 files changed

+109
-5
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemReader.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,30 @@ public boolean isSaveState() {
140140
return this.saveState;
141141
}
142142

143+
/**
144+
* Setter for partition offsets. This mapping tells the reader the offset to start
145+
* reading from in each partition. This is optional, defaults to starting from
146+
* offset 0 in each partition. Passing an empty map makes the reader start
147+
* from the offset stored in Kafka for the consumer group ID.
148+
*
149+
* <p><strong>In case of a restart, offsets stored in the execution context
150+
* will take precedence.</strong></p>
151+
*
152+
* @param partitionOffsets mapping of starting offset in each partition
153+
*/
154+
public void setPartitionOffsets(Map<TopicPartition, Long> partitionOffsets) {
155+
Assert.notNull(partitionOffsets, "partitionOffsets must not be null");
156+
this.partitionOffsets = partitionOffsets;
157+
}
158+
143159
@Override
144160
public void open(ExecutionContext executionContext) {
145161
this.kafkaConsumer = new KafkaConsumer<>(this.consumerProperties);
146-
this.partitionOffsets = new HashMap<>();
147-
for (TopicPartition topicPartition : this.topicPartitions) {
148-
this.partitionOffsets.put(topicPartition, 0L);
162+
if (this.partitionOffsets == null) {
163+
this.partitionOffsets = new HashMap<>();
164+
for (TopicPartition topicPartition : this.topicPartitions) {
165+
this.partitionOffsets.put(topicPartition, 0L);
166+
}
149167
}
150168
if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) {
151169
Map<TopicPartition, Long> offsets = (Map<TopicPartition, Long>) executionContext.get(TOPIC_PARTITION_OFFSETS);

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderTests.java

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626

2727
import org.apache.kafka.clients.admin.NewTopic;
2828
import org.apache.kafka.clients.consumer.ConsumerConfig;
29+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2930
import org.apache.kafka.common.TopicPartition;
3031
import org.apache.kafka.common.serialization.StringDeserializer;
3132
import org.junit.Before;
@@ -67,7 +68,9 @@ public static void setUpTopics() {
6768
new NewTopic("topic1", 1, (short) 1),
6869
new NewTopic("topic2", 2, (short) 1),
6970
new NewTopic("topic3", 1, (short) 1),
70-
new NewTopic("topic4", 2, (short) 1)
71+
new NewTopic("topic4", 2, (short) 1),
72+
new NewTopic("topic5", 1, (short) 1),
73+
new NewTopic("topic6", 1, (short) 1)
7174
);
7275
}
7376

@@ -212,6 +215,89 @@ public void testReadFromSinglePartition() {
212215
this.reader.close();
213216
}
214217

218+
@Test
219+
public void testReadFromSinglePartitionFromCustomOffset() {
220+
this.template.setDefaultTopic("topic5");
221+
this.template.sendDefault("val0"); // <-- offset 0
222+
this.template.sendDefault("val1"); // <-- offset 1
223+
this.template.sendDefault("val2"); // <-- offset 2
224+
this.template.sendDefault("val3"); // <-- offset 3
225+
226+
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic5", 0);
227+
228+
// specify which offset to start from
229+
Map<TopicPartition, Long> partitionOffsets = new HashMap<>();
230+
partitionOffsets.put(new TopicPartition("topic5", 0), 2L);
231+
this.reader.setPartitionOffsets(partitionOffsets);
232+
233+
this.reader.setPollTimeout(Duration.ofSeconds(1));
234+
this.reader.open(new ExecutionContext());
235+
236+
String item = this.reader.read();
237+
assertThat(item, is("val2"));
238+
239+
item = this.reader.read();
240+
assertThat(item, is("val3"));
241+
242+
item = this.reader.read();
243+
assertNull(item);
244+
245+
this.reader.close();
246+
}
247+
248+
@Test
249+
public void testReadFromSinglePartitionFromTheOffsetStoredInKafka() throws Exception {
250+
// first run: read a topic from the beginning
251+
252+
this.template.setDefaultTopic("topic6");
253+
this.template.sendDefault("val0"); // <-- offset 0
254+
this.template.sendDefault("val1"); // <-- offset 1
255+
256+
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic6", 0);
257+
this.reader.setPollTimeout(Duration.ofSeconds(1));
258+
this.reader.open(new ExecutionContext());
259+
260+
String item = this.reader.read();
261+
assertThat(item, is("val0"));
262+
263+
item = this.reader.read();
264+
assertThat(item, is("val1"));
265+
266+
item = this.reader.read();
267+
assertNull(item);
268+
269+
this.reader.close();
270+
271+
// The offset stored in Kafka should be equal to 2 at this point
272+
OffsetAndMetadata currentOffset = KafkaTestUtils.getCurrentOffset(
273+
embeddedKafka.getEmbeddedKafka().getBrokersAsString(),
274+
"1", "topic6",
275+
0);
276+
assertEquals(2, currentOffset.offset());
277+
278+
// second run (with same consumer group ID): new messages arrived since the last run.
279+
280+
this.template.sendDefault("val2"); // <-- offset 2
281+
this.template.sendDefault("val3"); // <-- offset 3
282+
283+
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic6", 0);
284+
// Passing an empty map means the reader should start from the offset stored in Kafka (offset 2 in this case)
285+
this.reader.setPartitionOffsets(new HashMap<>());
286+
this.reader.setPollTimeout(Duration.ofSeconds(1));
287+
this.reader.open(new ExecutionContext());
288+
289+
item = this.reader.read();
290+
assertThat(item, is("val2"));
291+
292+
item = this.reader.read();
293+
assertThat(item, is("val3"));
294+
295+
item = this.reader.read();
296+
assertNull(item);
297+
298+
this.reader.close();
299+
}
300+
215301
@Test
216302
public void testReadFromMultiplePartitions() {
217303
this.template.setDefaultTopic("topic2");

0 commit comments

Comments
 (0)