diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemReader.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemReader.java index a022b27eac..e4178d5f9a 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemReader.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemReader.java @@ -35,6 +35,9 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import static java.util.Collections.emptyMap; +import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; + /** *

* An {@link org.springframework.batch.item.ItemReader} implementation for Apache Kafka. @@ -48,6 +51,7 @@ * * @author Mathieu Ouellet * @author Mahmoud Ben Hassine + * @author Christian Allard * @since 4.2 */ public class KafkaItemReader extends AbstractItemStreamItemReader { @@ -144,11 +148,8 @@ public boolean isSaveState() { public void open(ExecutionContext executionContext) { this.kafkaConsumer = new KafkaConsumer<>(this.consumerProperties); this.partitionOffsets = new HashMap<>(); - for (TopicPartition topicPartition : this.topicPartitions) { - this.partitionOffsets.put(topicPartition, 0L); - } if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) { - Map offsets = (Map) executionContext.get(TOPIC_PARTITION_OFFSETS); + Map offsets = defaultIfNull((Map) executionContext.get(TOPIC_PARTITION_OFFSETS), emptyMap()); for (Map.Entry entry : offsets.entrySet()) { this.partitionOffsets.put(entry.getKey(), entry.getValue() == 0 ? 0 : entry.getValue() + 1); } diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderTests.java index 75e2563530..98c94cbe3c 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderTests.java @@ -51,6 +51,7 @@ /** * @author Mathieu Ouellet * @author Mahmoud Ben Hassine + * @author Christian Allard */ public class KafkaItemReaderTests { @@ -66,8 +67,10 @@ public static void setUpTopics() { embeddedKafka.getEmbeddedKafka().addTopics( new NewTopic("topic1", 1, (short) 1), new NewTopic("topic2", 2, (short) 1), - new NewTopic("topic3", 1, (short) 1), - new NewTopic("topic4", 2, (short) 1) + new NewTopic("topic3a", 1, (short) 1), + new NewTopic("topic3b", 1, (short) 1), + new NewTopic("topic4a", 2, (short) 1), + new NewTopic("topic4b", 2, (short) 1) ); } @@ -85,6 +88,7 @@ public void setUp() { StringDeserializer.class.getName()); this.consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + this.consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); } @Test @@ -238,7 +242,7 @@ public void testReadFromMultiplePartitions() { @Test public void testReadFromSinglePartitionAfterRestart() { - this.template.setDefaultTopic("topic3"); + this.template.setDefaultTopic("topic3a"); this.template.sendDefault("val0"); this.template.sendDefault("val1"); this.template.sendDefault("val2"); @@ -247,15 +251,15 @@ public void testReadFromSinglePartitionAfterRestart() { ExecutionContext executionContext = new ExecutionContext(); Map offsets = new HashMap<>(); - offsets.put(new TopicPartition("topic3", 0), 1L); + offsets.put(new TopicPartition("topic3a", 0), 1L); executionContext.put("topic.partition.offsets", offsets); - // topic3-0: val0, val1, val2, val3, val4 + // topic3a-0: val0, val1, val2, val3, val4 // ^ // | // last committed offset = 1 (should restart from offset = 2) - this.reader = new KafkaItemReader<>(this.consumerProperties, "topic3", 0); + this.reader = new KafkaItemReader<>(this.consumerProperties, "topic3a", 0); this.reader.setPollTimeout(Duration.ofSeconds(1)); this.reader.open(executionContext); @@ -270,17 +274,59 @@ public void testReadFromSinglePartitionAfterRestart() { this.reader.close(); } + @Test + public void testReadFromSinglePartitionAfterRestartWithoutSaveState() { + this.template.setDefaultTopic("topic3b"); + this.template.sendDefault("val0"); + this.template.sendDefault("val1"); + + this.reader = new KafkaItemReader<>(this.consumerProperties, "topic3b", 0); + this.reader.setPollTimeout(Duration.ofSeconds(1)); + this.reader.open(new ExecutionContext()); + + List items = new ArrayList<>(); + items.add(this.reader.read()); + items.add(this.reader.read()); + assertThat(items, containsInAnyOrder("val0", "val1")); + assertNull(this.reader.read()); + + this.reader.close(); + + this.template.sendDefault("val2"); + this.template.sendDefault("val3"); + this.template.sendDefault("val4"); + + // topic3b-0: val0, val1, val2, val3, val4 + // ^ + // | + // last committed offset = 1 (should restart from offset = 2) + + this.reader = new KafkaItemReader<>(this.consumerProperties, "topic3b", 0); + this.reader.setPollTimeout(Duration.ofSeconds(1)); + this.reader.open(new ExecutionContext()); + + items = new ArrayList<>(); + items.add(this.reader.read()); + items.add(this.reader.read()); + items.add(this.reader.read()); + assertThat(items, containsInAnyOrder("val2", "val3", "val4")); + String item = this.reader.read(); + assertNull(item); + + this.reader.close(); + } + @Test public void testReadFromMultiplePartitionsAfterRestart() throws ExecutionException, InterruptedException { List futures = new ArrayList<>(); - futures.add(this.template.send("topic4", 0, null, "val0")); - futures.add(this.template.send("topic4", 0, null, "val2")); - futures.add(this.template.send("topic4", 0, null, "val4")); - futures.add(this.template.send("topic4", 0, null, "val6")); - futures.add(this.template.send("topic4", 1, null, "val1")); - futures.add(this.template.send("topic4", 1, null, "val3")); - futures.add(this.template.send("topic4", 1, null, "val5")); - futures.add(this.template.send("topic4", 1, null, "val7")); + futures.add(this.template.send("topic4a", 0, null, "val0")); + futures.add(this.template.send("topic4a", 0, null, "val2")); + futures.add(this.template.send("topic4a", 0, null, "val4")); + futures.add(this.template.send("topic4a", 0, null, "val6")); + futures.add(this.template.send("topic4a", 1, null, "val1")); + futures.add(this.template.send("topic4a", 1, null, "val3")); + futures.add(this.template.send("topic4a", 1, null, "val5")); + futures.add(this.template.send("topic4a", 1, null, "val7")); for (ListenableFuture future : futures) { future.get(); @@ -288,20 +334,20 @@ public void testReadFromMultiplePartitionsAfterRestart() throws ExecutionExcepti ExecutionContext executionContext = new ExecutionContext(); Map offsets = new HashMap<>(); - offsets.put(new TopicPartition("topic4", 0), 1L); - offsets.put(new TopicPartition("topic4", 1), 2L); + offsets.put(new TopicPartition("topic4a", 0), 1L); + offsets.put(new TopicPartition("topic4a", 1), 2L); executionContext.put("topic.partition.offsets", offsets); - // topic4-0: val0, val2, val4, val6 + // topic4a-0: val0, val2, val4, val6 // ^ // | // last committed offset = 1 (should restart from offset = 2) - // topic4-1: val1, val3, val5, val7 + // topic4a-1: val1, val3, val5, val7 // ^ // | // last committed offset = 2 (should restart from offset = 3) - this.reader = new KafkaItemReader<>(this.consumerProperties, "topic4", 0, 1); + this.reader = new KafkaItemReader<>(this.consumerProperties, "topic4a", 0, 1); this.reader.setPollTimeout(Duration.ofSeconds(1)); this.reader.open(executionContext); @@ -316,4 +362,64 @@ public void testReadFromMultiplePartitionsAfterRestart() throws ExecutionExcepti this.reader.close(); } + @Test + public void testReadFromMultiplePartitionsAfterRestartWithoutSaveState() throws ExecutionException, InterruptedException { + List futures = new ArrayList<>(); + futures.add(this.template.send("topic4b", 0, null, "val0")); + futures.add(this.template.send("topic4b", 0, null, "val2")); + futures.add(this.template.send("topic4b", 1, null, "val1")); + futures.add(this.template.send("topic4b", 1, null, "val3")); + futures.add(this.template.send("topic4b", 1, null, "val5")); + + for (ListenableFuture future : futures) { + future.get(); + } + + this.reader = new KafkaItemReader<>(this.consumerProperties, "topic4b", 0, 1); + this.reader.setPollTimeout(Duration.ofSeconds(1)); + this.reader.open(new ExecutionContext()); + + List items = new ArrayList<>(); + items.add(this.reader.read()); + items.add(this.reader.read()); + items.add(this.reader.read()); + items.add(this.reader.read()); + items.add(this.reader.read()); + assertThat(items, containsInAnyOrder("val0", "val1", "val2", "val3", "val5")); + assertNull(this.reader.read()); + + this.reader.close(); + futures = new ArrayList<>(); + futures.add(this.template.send("topic4b", 0, null, "val4")); + futures.add(this.template.send("topic4b", 0, null, "val6")); + futures.add(this.template.send("topic4b", 1, null, "val7")); + + for (ListenableFuture future : futures) { + future.get(); + } + + // topic4b-0: val0, val2, val4, val6 + // ^ + // | + // last committed offset = 1 (should restart from offset = 2) + // topic4b-1: val1, val3, val5, val7 + // ^ + // | + // last committed offset = 2 (should restart from offset = 3) + + this.reader = new KafkaItemReader<>(this.consumerProperties, "topic4b", 0, 1); + this.reader.setPollTimeout(Duration.ofSeconds(1)); + this.reader.open(new ExecutionContext()); + + items = new ArrayList<>(); + items.add(this.reader.read()); + items.add(this.reader.read()); + items.add(this.reader.read()); + assertThat(items, containsInAnyOrder("val4", "val6", "val7")); + String item = this.reader.read(); + assertNull(item); + + this.reader.close(); + } + }