Skip to content

Commit 225d27c

Browse files
Tom van den Bergeartembilan
authored andcommitted
GH-511: Committing offset immediately on error
Fixes: #511 **Cherry-pick to 2.0.x & master** # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java
1 parent dafea32 commit 225d27c

File tree

2 files changed

+119
-3
lines changed

2 files changed

+119
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
* @author Loic Talhouarne
8989
* @author Vladimir Tsanev
9090
* @author Yang Qiju
91+
* @author Tom van den Berge
9192
*/
9293
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
9394

@@ -968,7 +969,20 @@ else if (!this.isAnyManualAck && !this.autoCommit) {
968969
}
969970
catch (RuntimeException e) {
970971
if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
971-
this.acks.add(record);
972+
if (this.isRecordAck) {
973+
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
974+
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
975+
new OffsetAndMetadata(record.offset() + 1));
976+
if (this.containerProperties.isSyncCommits()) {
977+
this.consumer.commitSync(offsetsToCommit);
978+
}
979+
else {
980+
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
981+
}
982+
}
983+
else if (!this.isAnyManualAck) {
984+
this.acks.add(record);
985+
}
972986
}
973987
if (this.errorHandler == null) {
974988
throw e;

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
* @author Martin Dam
8787
* @author Artem Bilan
8888
* @author Loic Talhouarne
89+
* @author Tom van den Berge
8990
*/
9091
public class KafkaMessageListenerContainerTests {
9192

@@ -121,9 +122,13 @@ public class KafkaMessageListenerContainerTests {
121122

122123
private static String topic17 = "testTopic17";
123124

125+
private static String topic18 = "testTopic18";
126+
127+
124128
@ClassRule
125129
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic3, topic4, topic5,
126-
topic6, topic7, topic8, topic9, topic10, topic11, topic12, topic13, topic14, topic15, topic16, topic17);
130+
topic6, topic7, topic8, topic9, topic10, topic11, topic12, topic13, topic14, topic15, topic16, topic17,
131+
topic18);
127132

128133
@Rule
129134
public TestName testName = new TestName();
@@ -1435,6 +1440,101 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
14351440
logger.info("Stop manual ack rebalance");
14361441
}
14371442

1443+
@Test
1444+
public void testRebalanceAfterFailedRecord() throws Exception {
1445+
logger.info("Start rebalance after failed record");
1446+
Map<String, Object> props = KafkaTestUtils.consumerProps("test18", "false", embeddedKafka);
1447+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
1448+
ContainerProperties containerProps = new ContainerProperties(topic18);
1449+
final List<AtomicInteger> counts = new ArrayList<>();
1450+
counts.add(new AtomicInteger());
1451+
counts.add(new AtomicInteger());
1452+
containerProps.setMessageListener(new MessageListener<Integer, String>() {
1453+
1454+
@Override
1455+
public void onMessage(ConsumerRecord<Integer, String> message) {
1456+
// The 1st message per partition fails
1457+
if (counts.get(message.partition()).incrementAndGet() < 2) {
1458+
throw new RuntimeException("Failure wile processing message");
1459+
}
1460+
}
1461+
});
1462+
containerProps.setSyncCommits(true);
1463+
containerProps.setAckMode(AckMode.RECORD);
1464+
final CountDownLatch rebalanceLatch = new CountDownLatch(2);
1465+
containerProps.setConsumerRebalanceListener(new ConsumerRebalanceListener() {
1466+
1467+
@Override
1468+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
1469+
}
1470+
1471+
@Override
1472+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
1473+
logger.info("manual ack: assigned " + partitions);
1474+
rebalanceLatch.countDown();
1475+
}
1476+
});
1477+
1478+
CountDownLatch stubbingComplete1 = new CountDownLatch(1);
1479+
KafkaMessageListenerContainer<Integer, String> container1 =
1480+
spyOnContainer(new KafkaMessageListenerContainer<>(cf, containerProps), stubbingComplete1);
1481+
container1.setBeanName("testRebalanceAfterFailedRecord");
1482+
container1.start();
1483+
Consumer<?, ?> containerConsumer = spyOnConsumer(container1);
1484+
final CountDownLatch commitLatch = new CountDownLatch(2);
1485+
willAnswer(invocation -> {
1486+
1487+
@SuppressWarnings({ "unchecked" })
1488+
Map<TopicPartition, OffsetAndMetadata> map = invocation.getArgument(0);
1489+
try {
1490+
return invocation.callRealMethod();
1491+
}
1492+
finally {
1493+
for (Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
1494+
// Decrement when the last (successful) has been committed
1495+
if (entry.getValue().offset() == 2) {
1496+
commitLatch.countDown();
1497+
}
1498+
}
1499+
}
1500+
1501+
}).given(containerConsumer).commitSync(any());
1502+
stubbingComplete1.countDown();
1503+
ContainerTestUtils.waitForAssignment(container1, embeddedKafka.getPartitionsPerTopic());
1504+
1505+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
1506+
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
1507+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
1508+
template.setDefaultTopic(topic18);
1509+
template.sendDefault(0, 0, "foo");
1510+
template.sendDefault(1, 0, "baz");
1511+
template.sendDefault(0, 0, "bar");
1512+
template.sendDefault(1, 0, "qux");
1513+
template.flush();
1514+
1515+
// Wait until both partitions have committed offset 2 (i.e. the last message)
1516+
assertThat(commitLatch.await(30, TimeUnit.SECONDS)).isTrue();
1517+
1518+
// Start a 2nd consumer, triggering a rebalance
1519+
KafkaMessageListenerContainer<Integer, String> container2 =
1520+
new KafkaMessageListenerContainer<>(cf, containerProps);
1521+
container2.setBeanName("testRebalanceAfterFailedRecord2");
1522+
container2.start();
1523+
// Wait until both consumers have finished rebalancing
1524+
assertThat(rebalanceLatch.await(60, TimeUnit.SECONDS)).isTrue();
1525+
1526+
// Stop both consumers
1527+
container1.stop();
1528+
container2.stop();
1529+
Consumer<Integer, String> consumer = cf.createConsumer();
1530+
consumer.assign(Arrays.asList(new TopicPartition(topic18, 0), new TopicPartition(topic18, 1)));
1531+
1532+
// Verify that offset of both partitions is the highest committed offset
1533+
assertThat(consumer.position(new TopicPartition(topic18, 0))).isEqualTo(2);
1534+
assertThat(consumer.position(new TopicPartition(topic18, 1))).isEqualTo(2);
1535+
consumer.close();
1536+
logger.info("Stop rebalance after failed record");
1537+
}
14381538
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
14391539
Consumer<?, ?> consumer = spy(
14401540
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class));
@@ -1443,8 +1543,10 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
14431543
return consumer;
14441544
}
14451545

1446-
private KafkaMessageListenerContainer<Integer, String> spyOnContainer(KafkaMessageListenerContainer<Integer, String> container,
1546+
private KafkaMessageListenerContainer<Integer, String> spyOnContainer(
1547+
KafkaMessageListenerContainer<Integer, String> container,
14471548
final CountDownLatch stubbingComplete) {
1549+
14481550
KafkaMessageListenerContainer<Integer, String> spy = spy(container);
14491551
willAnswer(i -> {
14501552
if (stubbingComplete.getCount() > 0 && Thread.currentThread().getName().endsWith("-C-1")) {

0 commit comments

Comments
 (0)