Skip to content

Commit 762cd5e

Browse files
garyrussellartembilan
authored andcommitted
AckMode.MANUAL_IMMEDIATE - wake Consumer
When processing manual acks on a "foreign" thread, the commits are not made until the consumer wakes from his `poll()`. For `AckMode.MANUAL_IMMEDIATE`, we should wake the consumer so the offset(s) are committed as soon as possible. **cherry-pick to 2.2.x**
1 parent 4e5185c commit 762cd5e

File tree

2 files changed

+21
-2
lines changed

2 files changed

+21
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,7 @@ public void run() {
756756
pollAndInvoke();
757757
}
758758
catch (@SuppressWarnings(UNUSED) WakeupException e) {
759-
// Ignore, we're stopping
759+
// Ignore, we're stopping or applying immediate foreign acks
760760
}
761761
catch (NoOffsetForPartitionException nofpe) {
762762
this.fatalError = true;
@@ -947,6 +947,9 @@ private void processAck(ConsumerRecord<K, V> record) {
947947
if (!Thread.currentThread().equals(this.consumerThread)) {
948948
try {
949949
this.acks.put(record);
950+
if (this.isManualImmediateAck) {
951+
this.consumer.wakeup();
952+
}
950953
}
951954
catch (InterruptedException e) {
952955
Thread.currentThread().interrupt();

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,15 +621,25 @@ private void testRecordAckMockForeignThreadGuts(AckMode ackMode) throws Exceptio
621621
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
622622
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
623623
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
624+
long sleepFor = ackMode.equals(AckMode.MANUAL_IMMEDIATE) ? 20_000 : 50;
625+
AtomicBoolean first = new AtomicBoolean(true);
624626
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
625-
Thread.sleep(50);
627+
if (!first.getAndSet(false)) {
628+
try {
629+
Thread.sleep(sleepFor);
630+
}
631+
catch (@SuppressWarnings("unused") InterruptedException ex) {
632+
throw new WakeupException();
633+
}
634+
}
626635
return consumerRecords;
627636
});
628637
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
629638
new TopicPartitionInitialOffset("foo", 0) };
630639
ContainerProperties containerProps = new ContainerProperties(topicPartition);
631640
containerProps.setGroupId("grp");
632641
containerProps.setAckMode(ackMode);
642+
containerProps.setMissingTopicsFatal(false);
633643
final CountDownLatch latch = new CountDownLatch(2);
634644
final List<Acknowledgment> acks = new ArrayList<>();
635645
final AtomicReference<Thread> consumerThread = new AtomicReference<>();
@@ -647,6 +657,10 @@ public void onMessage(ConsumerRecord<Integer, String> data, Acknowledgment ackno
647657
}
648658

649659
});
660+
willAnswer(inv -> {
661+
consumerThread.get().interrupt();
662+
return null;
663+
}).given(consumer).wakeup();
650664

651665
final CountDownLatch commitLatch = new CountDownLatch(1);
652666
final AtomicReference<Thread> commitThread = new AtomicReference<>();
@@ -663,6 +677,7 @@ public void onMessage(ConsumerRecord<Integer, String> data, Acknowledgment ackno
663677
new KafkaMessageListenerContainer<>(cf, containerProps);
664678
container.start();
665679
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
680+
long t1 = System.currentTimeMillis();
666681
acks.get(1).acknowledge();
667682
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
668683
InOrder inOrder = inOrder(messageListener, consumer);
@@ -671,6 +686,7 @@ public void onMessage(ConsumerRecord<Integer, String> data, Acknowledgment ackno
671686
inOrder.verify(consumer).commitSync(anyMap(), any());
672687
container.stop();
673688
assertThat(commitThread.get()).isSameAs(consumerThread.get());
689+
assertThat(System.currentTimeMillis() - t1).isLessThan(15_000);
674690
}
675691

676692
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)