Skip to content

Commit 4f8fa13

Browse files
garyrussellartembilan
authored andcommitted
GH-1234: Fix FailedRecordTracker
Fixes #1234 Previous implementation assumed the first redelivered record would be the one that failed; this is not always the case. Maintain state for each topic/partition/offset.
1 parent 13ae385 commit 4f8fa13

File tree

2 files changed

+50
-33
lines changed

2 files changed

+50
-33
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.HashMap;
20+
import java.util.Map;
1921
import java.util.function.BiConsumer;
2022

2123
import org.apache.kafka.clients.consumer.ConsumerRecord;
24+
import org.apache.kafka.common.TopicPartition;
2225

2326
import org.springframework.core.log.LogAccessor;
2427
import org.springframework.lang.Nullable;
@@ -35,7 +38,7 @@
3538
*/
3639
class FailedRecordTracker {
3740

38-
private final ThreadLocal<FailedRecord> failures = new ThreadLocal<>(); // intentionally not static
41+
private final ThreadLocal<Map<TopicPartition, FailedRecord>> failures = new ThreadLocal<>(); // intentionally not static
3942

4043
private final BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer;
4144

@@ -50,12 +53,18 @@ class FailedRecordTracker {
5053

5154
Assert.notNull(backOff, "'backOff' cannot be null");
5255
if (recoverer == null) {
53-
FailedRecord failedRecord = this.failures.get();
54-
this.recoverer = (rec, thr) -> logger.error(thr, "Backoff "
55-
+ (failedRecord == null
56-
? "none"
57-
: failedRecord.getBackOffExecution())
58-
+ " exhausted for " + rec);
56+
this.recoverer = (rec, thr) -> {
57+
Map<TopicPartition, FailedRecord> map = this.failures.get();
58+
FailedRecord failedRecord = null;
59+
if (map != null) {
60+
failedRecord = map.get(new TopicPartition(rec.topic(), rec.partition()));
61+
}
62+
logger.error(thr, "Backoff "
63+
+ (failedRecord == null
64+
? "none"
65+
: failedRecord.getBackOffExecution())
66+
+ " exhausted for " + rec);
67+
};
5968
}
6069
else {
6170
this.recoverer = recoverer;
@@ -70,10 +79,16 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
7079
recover(record, exception);
7180
return true;
7281
}
73-
FailedRecord failedRecord = this.failures.get();
74-
if (failedRecord == null || newFailure(record, failedRecord)) {
75-
failedRecord = new FailedRecord(record.topic(), record.partition(), record.offset(), this.backOff.start());
76-
this.failures.set(failedRecord);
82+
Map<TopicPartition, FailedRecord> map = this.failures.get();
83+
if (map == null) {
84+
this.failures.set(new HashMap<>());
85+
map = this.failures.get();
86+
}
87+
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
88+
FailedRecord failedRecord = map.get(topicPartition);
89+
if (failedRecord == null || failedRecord.getOffset() != record.offset()) {
90+
failedRecord = new FailedRecord(record.offset(), this.backOff.start());
91+
map.put(topicPartition, failedRecord);
7792
}
7893
long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
7994
if (nextBackOff != BackOffExecution.STOP) {
@@ -87,7 +102,10 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
87102
}
88103
else {
89104
recover(record, exception);
90-
this.failures.remove();
105+
map.remove(topicPartition);
106+
if (map.isEmpty()) {
107+
this.failures.remove();
108+
}
91109
return true;
92110
}
93111
}
@@ -101,12 +119,6 @@ private void recover(ConsumerRecord<?, ?> record, Exception exception) {
101119
}
102120
}
103121

104-
private boolean newFailure(ConsumerRecord<?, ?> record, FailedRecord failedRecord) {
105-
return !failedRecord.getTopic().equals(record.topic())
106-
|| failedRecord.getPartition() != record.partition()
107-
|| failedRecord.getOffset() != record.offset();
108-
}
109-
110122
void clearThreadState() {
111123
this.failures.remove();
112124
}
@@ -117,29 +129,15 @@ void clearThreadState() {
117129

118130
private static final class FailedRecord {
119131

120-
private final String topic;
121-
122-
private final int partition;
123-
124132
private final long offset;
125133

126134
private final BackOffExecution backOffExecution;
127135

128-
FailedRecord(String topic, int partition, long offset, BackOffExecution backOffExecution) {
129-
this.topic = topic;
130-
this.partition = partition;
136+
FailedRecord(long offset, BackOffExecution backOffExecution) {
131137
this.offset = offset;
132138
this.backOffExecution = backOffExecution;
133139
}
134140

135-
String getTopic() {
136-
return this.topic;
137-
}
138-
139-
int getPartition() {
140-
return this.partition;
141-
}
142-
143141
long getOffset() {
144142
return this.offset;
145143
}

spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.mockito.Mockito.mock;
2121

22+
import java.util.ArrayList;
23+
import java.util.List;
2224
import java.util.concurrent.atomic.AtomicBoolean;
2325

2426
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -74,4 +76,21 @@ record = new ConsumerRecord<>("bar", 1, 1L, "bar", "baz");
7476
assertThat(tracker.skip(record, new RuntimeException())).isTrue();
7577
}
7678

79+
@Test
80+
public void testDifferentOrder() {
81+
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
82+
FailedRecordTracker tracker = new FailedRecordTracker((rec, ex) -> {
83+
records.add(rec);
84+
}, new FixedBackOff(0L, 2L), mock(LogAccessor.class));
85+
ConsumerRecord<?, ?> record1 = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
86+
ConsumerRecord<?, ?> record2 = new ConsumerRecord<>("foo", 1, 0L, "bar", "baz");
87+
assertThat(tracker.skip(record1, new RuntimeException())).isFalse();
88+
assertThat(tracker.skip(record2, new RuntimeException())).isFalse();
89+
assertThat(tracker.skip(record1, new RuntimeException())).isFalse();
90+
assertThat(tracker.skip(record2, new RuntimeException())).isFalse();
91+
assertThat(tracker.skip(record1, new RuntimeException())).isTrue();
92+
assertThat(tracker.skip(record2, new RuntimeException())).isTrue();
93+
assertThat(records).hasSize(2);
94+
}
95+
7796
}

0 commit comments

Comments
 (0)