Skip to content

Commit 2490dd5

Browse files
garyrussellartembilan
authored andcommitted
GH-1234: Fix FailedRecordTracker [2.2.x]
Fixes #1234 Previous implementation assumed the first redelivered record would be the one that failed; this is not always the case. Maintain state for each topic/partition/offset.
1 parent 3c816dc commit 2490dd5

File tree

2 files changed

+41
-28
lines changed

2 files changed

+41
-28
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717
package org.springframework.kafka.listener;
1818

1919
import java.time.temporal.ValueRange;
20+
import java.util.HashMap;
21+
import java.util.Map;
2022
import java.util.function.BiConsumer;
2123

2224
import org.apache.commons.logging.Log;
2325
import org.apache.kafka.clients.consumer.ConsumerRecord;
26+
import org.apache.kafka.common.TopicPartition;
2427

2528
import org.springframework.lang.Nullable;
2629

@@ -33,7 +36,7 @@
3336
*/
3437
class FailedRecordTracker {
3538

36-
private final ThreadLocal<FailedRecord> failures = new ThreadLocal<>(); // intentionally not static
39+
private final ThreadLocal<Map<TopicPartition, FailedRecord>> failures = new ThreadLocal<>(); // intentionally not static
3740

3841
private final BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer;
3942

@@ -60,14 +63,25 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
6063
recover(record, exception);
6164
return true;
6265
}
63-
FailedRecord failedRecord = this.failures.get();
64-
if (this.maxFailures > 0 && (failedRecord == null || newFailure(record, failedRecord))) {
65-
this.failures.set(new FailedRecord(record.topic(), record.partition(), record.offset()));
66+
Map<TopicPartition, FailedRecord> map = this.failures.get();
67+
if (map == null) {
68+
this.failures.set(new HashMap<>());
69+
map = this.failures.get();
70+
}
71+
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
72+
FailedRecord failedRecord = map.get(topicPartition);
73+
if (failedRecord == null || failedRecord.getOffset() != record.offset()) {
74+
failedRecord = new FailedRecord(record.offset());
75+
map.put(topicPartition, failedRecord);
6676
return false;
6777
}
6878
else if (this.maxFailures > 0 && failedRecord.incrementAndGet() >= this.maxFailures) {
69-
recover(record, exception);
70-
return true;
79+
recover(record, exception);
80+
map.remove(topicPartition);
81+
if (map.isEmpty()) {
82+
this.failures.remove();
83+
}
84+
return true;
7185
}
7286
else {
7387
return false;
@@ -83,42 +97,22 @@ private void recover(ConsumerRecord<?, ?> record, Exception exception) {
8397
}
8498
}
8599

86-
private boolean newFailure(ConsumerRecord<?, ?> record, FailedRecord failedRecord) {
87-
return !failedRecord.getTopic().equals(record.topic())
88-
|| failedRecord.getPartition() != record.partition()
89-
|| failedRecord.getOffset() != record.offset();
90-
}
91-
92100
void clearThreadState() {
93101
this.failures.remove();
94102
}
95103

96104
private static final class FailedRecord {
97105

98-
private final String topic;
99-
100-
private final int partition;
101-
102106
private final long offset;
103107

104108
private int count;
105109

106-
FailedRecord(String topic, int partition, long offset) {
107-
this.topic = topic;
108-
this.partition = partition;
110+
FailedRecord(long offset) {
109111
this.offset = offset;
110112
this.count = 1;
111113
}
112114

113-
private String getTopic() {
114-
return this.topic;
115-
}
116-
117-
private int getPartition() {
118-
return this.partition;
119-
}
120-
121-
private long getOffset() {
115+
long getOffset() {
122116
return this.offset;
123117
}
124118

spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.mockito.Mockito.mock;
2121

22+
import java.util.ArrayList;
23+
import java.util.List;
2224
import java.util.concurrent.atomic.AtomicBoolean;
2325

2426
import org.apache.commons.logging.Log;
@@ -57,4 +59,21 @@ public void testThreeRetries() {
5759
assertThat(recovered.get()).isTrue();
5860
}
5961

62+
@Test
63+
public void testDifferentOrder() {
64+
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
65+
FailedRecordTracker tracker = new FailedRecordTracker((rec, ex) -> {
66+
records.add(rec);
67+
}, 3, mock(Log.class));
68+
ConsumerRecord<?, ?> record1 = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
69+
ConsumerRecord<?, ?> record2 = new ConsumerRecord<>("foo", 1, 0L, "bar", "baz");
70+
assertThat(tracker.skip(record1, new RuntimeException())).isFalse();
71+
assertThat(tracker.skip(record2, new RuntimeException())).isFalse();
72+
assertThat(tracker.skip(record1, new RuntimeException())).isFalse();
73+
assertThat(tracker.skip(record2, new RuntimeException())).isFalse();
74+
assertThat(tracker.skip(record1, new RuntimeException())).isTrue();
75+
assertThat(tracker.skip(record2, new RuntimeException())).isTrue();
76+
assertThat(records).hasSize(2);
77+
}
78+
6079
}

0 commit comments

Comments
 (0)