Skip to content

Commit 5e877a8

Browse files
garyrussellartembilan
authored andcommitted
GH-566: Ack Concurrency Issue
Fixes #566 If `Acknowledgment.acknowledge()` is called on a foreign thread, there is a concurrency problem with the `offsets` field; the consumer thread might `clear()` unprocessed acks. Furthermore, `AckMode.MANUAL_IMMEDIATE` cannot use the `Consumer` object if the ack is called on a foreign thread - the `Consumer` is not thread-safe. - Revert `offsets` to simple `HashMap`s - Only reference `offsets` on the consumer thread - enqueue foreign acks into the `acks` queue (even "immediate" acks) **Cherry pick to 2.0.x (and 1.3.x, fixing the lambda in the test and the diamond operators).** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java # spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java
1 parent bbef267 commit 5e877a8

File tree

2 files changed

+93
-6
lines changed

2 files changed

+93
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.kafka.common.errors.WakeupException;
4949

5050
import org.springframework.core.task.SimpleAsyncTaskExecutor;
51+
import org.springframework.kafka.KafkaException;
5152
import org.springframework.kafka.core.ConsumerFactory;
5253
import org.springframework.kafka.core.KafkaResourceHolder;
5354
import org.springframework.kafka.core.ProducerFactoryUtils;
@@ -293,7 +294,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
293294

294295
private final Consumer<K, V> consumer;
295296

296-
private final Map<String, Map<Integer, Long>> offsets = new HashMap<>();
297+
private final Map<String, Map<Integer, Long>> offsets = new HashMap<String, Map<Integer, Long>>();
297298

298299
private final MessageListener<K, V> listener;
299300

@@ -348,6 +349,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
348349

349350
private volatile Collection<TopicPartition> assignedPartitions;
350351

352+
private volatile Thread consumerThread;
353+
351354
private int count;
352355

353356
private long last = System.currentTimeMillis();
@@ -600,6 +603,7 @@ public boolean isLongLived() {
600603

601604
@Override
602605
public void run() {
606+
this.consumerThread = Thread.currentThread();
603607
if (this.theListener instanceof ConsumerSeekAware) {
604608
((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
605609
}
@@ -709,16 +713,27 @@ record = this.acks.poll();
709713
}
710714

711715
private void processAck(ConsumerRecord<K, V> record) {
712-
if (this.isManualImmediateAck) {
716+
if (!Thread.currentThread().equals(this.consumerThread)) {
713717
try {
714-
ackImmediate(record);
718+
this.acks.put(record);
715719
}
716-
catch (WakeupException e) {
717-
// ignore - not polling
720+
catch (InterruptedException e) {
721+
Thread.currentThread().interrupt();
722+
throw new KafkaException("Interrupted while storing ack", e);
718723
}
719724
}
720725
else {
721-
addOffset(record);
726+
if (this.isManualImmediateAck) {
727+
try {
728+
ackImmediate(record);
729+
}
730+
catch (WakeupException e) {
731+
// ignore - not polling
732+
}
733+
}
734+
else {
735+
addOffset(record);
736+
}
722737
}
723738
}
724739

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.mockito.Matchers.any;
2323
import static org.mockito.Matchers.anyLong;
2424
import static org.mockito.Matchers.anyString;
25+
import static org.mockito.Mockito.eq;
2526
import static org.mockito.Mockito.inOrder;
2627
import static org.mockito.Mockito.mock;
2728
import static org.mockito.Mockito.spy;
@@ -308,6 +309,77 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
308309
container.stop();
309310
}
310311

312+
@Test
313+
public void testRecordAckMockForeignThread() throws Exception {
314+
testRecordAckMockForeignThreadGuts(AckMode.MANUAL);
315+
}
316+
317+
@Test
318+
public void testRecordAckMockForeignThreadImmediate() throws Exception {
319+
testRecordAckMockForeignThreadGuts(AckMode.MANUAL_IMMEDIATE);
320+
}
321+
322+
@SuppressWarnings("unchecked")
323+
private void testRecordAckMockForeignThreadGuts(AckMode ackMode) throws Exception {
324+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
325+
Consumer<Integer, String> consumer = mock(Consumer.class);
326+
given(cf.createConsumer(anyString(), eq("clientId"))).willReturn(consumer);
327+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
328+
records.put(new TopicPartition("foo", 0), Arrays.asList(
329+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
330+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
331+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
332+
given(consumer.poll(anyLong())).willAnswer(i -> {
333+
Thread.sleep(50);
334+
return consumerRecords;
335+
});
336+
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
337+
new TopicPartitionInitialOffset("foo", 0) };
338+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
339+
containerProps.setAckMode(ackMode);
340+
final CountDownLatch latch = new CountDownLatch(2);
341+
final List<Acknowledgment> acks = new ArrayList<>();
342+
final AtomicReference<Thread> consumerThread = new AtomicReference<>();
343+
AcknowledgingMessageListener<Integer, String> messageListener = spy(
344+
new AcknowledgingMessageListener<Integer, String>() {
345+
346+
@Override
347+
public void onMessage(ConsumerRecord<Integer, String> data, Acknowledgment acknowledgment) {
348+
acks.add(acknowledgment);
349+
consumerThread.set(Thread.currentThread());
350+
latch.countDown();
351+
if (latch.getCount() == 0) {
352+
records.clear();
353+
}
354+
}
355+
356+
});
357+
358+
final CountDownLatch commitLatch = new CountDownLatch(1);
359+
final AtomicReference<Thread> commitThread = new AtomicReference<>();
360+
willAnswer(i -> {
361+
commitThread.set(Thread.currentThread());
362+
commitLatch.countDown();
363+
return null;
364+
}
365+
).given(consumer).commitSync(any(Map.class));
366+
367+
containerProps.setMessageListener(messageListener);
368+
KafkaMessageListenerContainer<Integer, String> container =
369+
new KafkaMessageListenerContainer<>(cf, containerProps);
370+
container.setClientIdSuffix("clientId");
371+
container.start();
372+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
373+
acks.get(1).acknowledge();
374+
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
375+
InOrder inOrder = inOrder(messageListener, consumer);
376+
inOrder.verify(consumer).poll(1000);
377+
inOrder.verify(messageListener, times(2)).onMessage(any(ConsumerRecord.class), any(Acknowledgment.class));
378+
inOrder.verify(consumer).commitSync(any(Map.class));
379+
container.stop();
380+
assertThat(commitThread.get()).isSameAs(consumerThread.get());
381+
}
382+
311383
@SuppressWarnings("unchecked")
312384
@Test
313385
public void testBrokerDownEvent() throws Exception {

0 commit comments

Comments
 (0)