Skip to content

Commit 9abc873

Browse files
garyrussellartembilan
authored andcommitted
GH-1259: Handle Failed Record Recovery
Resolves #1259 Previously if the recoverer in a `SeekToCurrentErrorHandler` or `DefaultAfterRollbackProcessor` failed to recover a record, the record could be lost; the `FailedRecordTracker` simply logged the exception. Change the `SeekUtils` to detect a failure in the recoverer (actually any failure when determining if the failed record should be recovered) and include the failed record in the seeks. In this way the recovery will be attempted once more on each delivery attempt. **cherry-pick to 2.2.x**
1 parent 0feba30 commit 9abc873

File tree

4 files changed

+28
-15
lines changed

4 files changed

+28
-15
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class FailedRecordTracker {
7676

7777
boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
7878
if (this.noRetries) {
79-
recover(record, exception);
79+
this.recoverer.accept(record, exception);
8080
return true;
8181
}
8282
Map<TopicPartition, FailedRecord> map = this.failures.get();
@@ -101,7 +101,7 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
101101
return false;
102102
}
103103
else {
104-
recover(record, exception);
104+
this.recoverer.accept(record, exception);
105105
map.remove(topicPartition);
106106
if (map.isEmpty()) {
107107
this.failures.remove();
@@ -110,15 +110,6 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
110110
}
111111
}
112112

113-
private void recover(ConsumerRecord<?, ?> record, Exception exception) {
114-
try {
115-
this.recoverer.accept(record, exception);
116-
}
117-
catch (Exception ex) {
118-
this.logger.error(ex, "Recoverer threw exception");
119-
}
120-
}
121-
122113
void clearThreadState() {
123114
this.failures.remove();
124115
}

spring-kafka/src/main/java/org/springframework/kafka/support/SeekUtils.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,14 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
7171
AtomicBoolean skipped = new AtomicBoolean();
7272
records.forEach(record -> {
7373
if (recoverable && first.get()) {
74-
skipped.set(skipper.test(record, exception));
74+
try {
75+
boolean test = skipper.test(record, exception);
76+
skipped.set(test);
77+
}
78+
catch (Exception ex) {
79+
logger.error(ex, "Failed to determine if this record should be recovererd, including in seeks");
80+
skipped.set(false);
81+
}
7582
if (skipped.get()) {
7683
logger.debug(() -> "Skipping seek of: " + record);
7784
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -536,10 +536,14 @@ public void testMaxFailures() throws Exception {
536536
container.setBeanName("testMaxFailures");
537537
final CountDownLatch recoverLatch = new CountDownLatch(1);
538538
final KafkaTemplate<Object, Object> dlTemplate = spy(new KafkaTemplate<>(pf));
539+
AtomicBoolean recovererShouldFail = new AtomicBoolean(true);
539540
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(dlTemplate) {
540541

541542
@Override
542543
public void accept(ConsumerRecord<?, ?> record, Exception exception) {
544+
if (recovererShouldFail.getAndSet(false)) {
545+
throw new RuntimeException("test recoverer failure");
546+
}
543547
super.accept(record, exception);
544548
recoverLatch.countDown();
545549
}
@@ -590,8 +594,8 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
590594
assertThat(headers.get("baz")).isEqualTo("qux".getBytes());
591595
pf.destroy();
592596
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
593-
verify(afterRollbackProcessor, times(3)).isProcessInTransaction();
594-
verify(afterRollbackProcessor, times(3)).process(any(), any(), any(), anyBoolean());
597+
verify(afterRollbackProcessor, times(4)).isProcessInTransaction();
598+
verify(afterRollbackProcessor, times(4)).process(any(), any(), any(), anyBoolean());
595599
verify(afterRollbackProcessor).clearThreadState();
596600
verify(dlTemplate).send(any(ProducerRecord.class));
597601
verify(dlTemplate).sendOffsetsToTransaction(
@@ -632,8 +636,11 @@ public void testRollbackProcessorCrash() throws Exception {
632636
KafkaMessageListenerContainer<Integer, String> container =
633637
new KafkaMessageListenerContainer<>(cf, containerProps);
634638
container.setBeanName("testRollbackNoRetries");
639+
AtomicBoolean recovererShouldFail = new AtomicBoolean(true);
635640
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (rec, ex) -> {
636-
throw new RuntimeException("arbp fail");
641+
if (recovererShouldFail.getAndSet(false)) {
642+
throw new RuntimeException("arbp fail");
643+
}
637644
};
638645
DefaultAfterRollbackProcessor<Object, Object> afterRollbackProcessor =
639646
spy(new DefaultAfterRollbackProcessor<>(recoverer, new FixedBackOff(0L, 0L)));

src/reference/asciidoc/kafka.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1813,6 +1813,8 @@ public SeekToCurrentErrorHandler eh() {
18131813

18141814
However, see the note at the beginning of this section; you can avoid using the `RetryTemplate` altogether.
18151815

1816+
IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
1817+
18161818
[[events]]
18171819
===== Listener Consumer Lifecycle Events
18181820

@@ -3588,6 +3590,8 @@ Generally, you should configure the `BackOff` to never return `STOP`.
35883590
However, since this error handler has no mechanism to "recover" after retries are exhausted, if the `BackOffExecution` returns `STOP`, the previous interval will be used for all subsequent delays.
35893591
Again, the maximum delay must be less than the `max.poll.interval.ms` consumer property.
35903592

3593+
IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
3594+
35913595
===== Container Stopping Error Handlers
35923596

35933597
The `ContainerStoppingErrorHandler` (used with record listeners) stops the container if the listener throws an exception.
@@ -3638,6 +3642,8 @@ Starting with version 2.2.5, the `DefaultAfterRollbackProcessor` can be invoked
36383642
Then, if you are using the `DeadLetterPublishingRecoverer` to publish a failed record, the processor will send the recovered record's offset in the original topic/partition to the transaction.
36393643
To enable this feature, set the `commitRecovered` and `kafkaTemplate` properties on the `DefaultAfterRollbackProcessor`.
36403644

3645+
IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
3646+
36413647
[[dead-letters]]
36423648
===== Publishing Dead-letter Records
36433649

@@ -3704,6 +3710,8 @@ public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplat
37043710
The publisher uses the map keys to locate a template that is suitable for the `value()` about to be published.
37053711
A `LinkedHashMap` is recommended so that the keys are examined in order.
37063712

3713+
IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
3714+
37073715
Starting with version 2.3, the recoverer can also be used with Kafka Streams - see <<streams-deser-recovery>> for more information.
37083716

37093717
[[kerberos]]

0 commit comments

Comments
 (0)