Skip to content

Commit 5c96d3f

Browse files
garyrussellartembilan
authored andcommitted
GH-1231: KafkaTestUtils Enhancements
Resolves #1231 - `getOneRecord()` - `getCurrentOffset()`
1 parent 1be7ee4 commit 5c96d3f

File tree

2 files changed

+89
-1
lines changed

2 files changed

+89
-1
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,22 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020

2121
import java.time.Duration;
22+
import java.util.Collections;
2223
import java.util.HashMap;
2324
import java.util.Iterator;
2425
import java.util.Map;
2526
import java.util.Properties;
2627
import java.util.stream.Collectors;
2728

2829
import org.apache.commons.logging.LogFactory;
30+
import org.apache.kafka.clients.admin.AdminClient;
31+
import org.apache.kafka.clients.admin.AdminClientConfig;
2932
import org.apache.kafka.clients.consumer.Consumer;
3033
import org.apache.kafka.clients.consumer.ConsumerConfig;
3134
import org.apache.kafka.clients.consumer.ConsumerRecord;
3235
import org.apache.kafka.clients.consumer.ConsumerRecords;
36+
import org.apache.kafka.clients.consumer.KafkaConsumer;
37+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
3338
import org.apache.kafka.clients.producer.ProducerConfig;
3439
import org.apache.kafka.common.TopicPartition;
3540
import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -40,6 +45,7 @@
4045
import org.springframework.beans.DirectFieldAccessor;
4146
import org.springframework.core.log.LogAccessor;
4247
import org.springframework.kafka.test.EmbeddedKafkaBroker;
48+
import org.springframework.lang.Nullable;
4349
import org.springframework.util.Assert;
4450

4551
/**
@@ -159,6 +165,63 @@ public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consume
159165
return received.records(topic).iterator().next();
160166
}
161167

168+
/**
169+
* Get a single record for the group from the topic/partition. Optionally, seeking to the current last record.
170+
* @param brokerAddresses the broker address(es).
171+
* @param group the group.
172+
* @param topic the topic.
173+
* @param partition the partition.
174+
* @param seekToLast true to fetch an existing last record, if present.
175+
* @param timeout the timeout.
176+
* @return the record or null if no record received.
177+
* @since 2.3
178+
*/
179+
@Nullable
180+
@SuppressWarnings({ "rawtypes", "unchecked" })
181+
public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition,
182+
boolean seekToLast, boolean commit, long timeout) {
183+
184+
Map<String, Object> consumerConfig = consumerProps(brokerAddresses, group, "false");
185+
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
186+
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
187+
try (KafkaConsumer consumer = new KafkaConsumer(consumerConfig)) {
188+
TopicPartition topicPart = new TopicPartition(topic, partition);
189+
consumer.assign(Collections.singletonList(topicPart));
190+
if (seekToLast) {
191+
consumer.seekToEnd(Collections.singletonList(topicPart));
192+
if (consumer.position(topicPart) > 0) {
193+
consumer.seek(topicPart, consumer.position(topicPart) - 1);
194+
}
195+
}
196+
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofMillis(timeout));
197+
ConsumerRecord<?, ?> record = records.count() == 1 ? records.iterator().next() : null;
198+
if (record != null && commit) {
199+
consumer.commitSync();
200+
}
201+
return record;
202+
}
203+
}
204+
205+
/**
206+
* Get the current offset and metadata for the provided group/topic/partition.
207+
* @param brokerAddresses the broker address(es).
208+
* @param group the group.
209+
* @param topic the topic.
210+
* @param partition the partition.
211+
* @return the offset and metadata.
212+
* @throws Exception if an exception occurs.
213+
* @since 2.3
214+
*/
215+
public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition)
216+
throws Exception {
217+
218+
try (AdminClient client = AdminClient
219+
.create(Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddresses))) {
220+
return client.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get()
221+
.get(new TopicPartition(topic, partition));
222+
}
223+
}
224+
162225
/**
163226
* Poll the consumer for records.
164227
* @param consumer the consumer.

spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616

1717
package org.springframework.kafka.test.utils;
1818

19+
import static org.assertj.core.api.Assertions.assertThat;
20+
1921
import java.util.Map;
2022

2123
import org.apache.kafka.clients.consumer.ConsumerConfig;
24+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2225
import org.apache.kafka.clients.consumer.KafkaConsumer;
2326
import org.apache.kafka.clients.producer.KafkaProducer;
2427
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -32,7 +35,7 @@
3235
* @since 2.2.7
3336
*
3437
*/
35-
@EmbeddedKafka(topics = { "singleTopic1", "singleTopic2" })
38+
@EmbeddedKafka(topics = { "singleTopic1", "singleTopic2", "singleTopic3" })
3639
public class KafkaTestUtilsTests {
3740

3841
@Test
@@ -51,4 +54,26 @@ void testGetSingleWithMoreThatOneTopic(EmbeddedKafkaBroker broker) {
5154
consumer.close();
5255
}
5356

57+
@Test
58+
public void testGetOneRecord(EmbeddedKafkaBroker broker) throws Exception {
59+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
60+
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
61+
producer.send(new ProducerRecord<>("singleTopic3", 0, 1, "foo"));
62+
producer.close();
63+
ConsumerRecord<?, ?> oneRecord = KafkaTestUtils.getOneRecord(broker.getBrokersAsString(), "getOne",
64+
"singleTopic3", 0, false, true, 10_000L);
65+
assertThat(oneRecord.value()).isEqualTo("foo");
66+
assertThat(KafkaTestUtils.getCurrentOffset(broker.getBrokersAsString(), "getOne", "singleTopic3", 0))
67+
.isNotNull()
68+
.extracting(omd -> omd.offset())
69+
.isEqualTo(1L);
70+
oneRecord = KafkaTestUtils.getOneRecord(broker.getBrokersAsString(), "getOne",
71+
"singleTopic3", 0, true, true, 10_000L);
72+
assertThat(oneRecord.value()).isEqualTo("foo");
73+
assertThat(KafkaTestUtils.getCurrentOffset(broker.getBrokersAsString(), "getOne", "singleTopic3", 0))
74+
.isNotNull()
75+
.extracting(omd -> omd.offset())
76+
.isEqualTo(1L);
77+
}
78+
5479
}

0 commit comments

Comments
 (0)