Skip to content

Commit bdd89b6

Browse files
garyrussellartembilan
authored andcommitted
Fix re-pausing consumer after a rebalance
Fixes #1111 `pause()` the consumer in the rebalance listener so that the Consumer will discard any fetched records that would otherwise be returned by the initial poll. **cherry-pick to 2.1.x**
1 parent 7641bb5 commit bdd89b6

File tree

2 files changed

+24
-11
lines changed

2 files changed

+24
-11
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1739,10 +1739,9 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
17391739
@Override
17401740
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
17411741
if (ListenerConsumer.this.consumerPaused) {
1742-
ListenerConsumer.this.consumerPaused = false;
1742+
ListenerConsumer.this.consumer.pause(partitions);
17431743
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
1744-
+ "the container will pause again before polling, unless the container's "
1745-
+ "'paused' property is reset by a custom rebalance listener");
1744+
+ "consumer paused again, so the initial poll() will never return any records");
17461745
}
17471746
ListenerConsumer.this.assignedPartitions = partitions;
17481747
if (!ListenerConsumer.this.autoCommit) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2054,8 +2054,14 @@ public void testPauseResume() throws Exception {
20542054
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
20552055
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
20562056
AtomicBoolean first = new AtomicBoolean(true);
2057+
AtomicBoolean rebalance = new AtomicBoolean(true);
2058+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
20572059
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
20582060
Thread.sleep(50);
2061+
if (rebalance.getAndSet(false)) {
2062+
rebal.get().onPartitionsRevoked(Collections.emptyList());
2063+
rebal.get().onPartitionsAssigned(records.keySet());
2064+
}
20592065
return first.getAndSet(false) ? consumerRecords : emptyRecords;
20602066
});
20612067
final CountDownLatch commitLatch = new CountDownLatch(2);
@@ -2064,9 +2070,11 @@ public void testPauseResume() throws Exception {
20642070
return null;
20652071
}).given(consumer).commitSync(anyMap(), any());
20662072
given(consumer.assignment()).willReturn(records.keySet());
2067-
final CountDownLatch pauseLatch = new CountDownLatch(2);
2073+
final CountDownLatch pauseLatch1 = new CountDownLatch(2); // consumer, event publisher
2074+
final CountDownLatch pauseLatch2 = new CountDownLatch(2); // consumer, consumer
20682075
willAnswer(i -> {
2069-
pauseLatch.countDown();
2076+
pauseLatch1.countDown();
2077+
pauseLatch2.countDown();
20702078
return null;
20712079
}).given(consumer).pause(records.keySet());
20722080
given(consumer.paused()).willReturn(records.keySet());
@@ -2075,14 +2083,17 @@ public void testPauseResume() throws Exception {
20752083
resumeLatch.countDown();
20762084
return null;
20772085
}).given(consumer).resume(records.keySet());
2078-
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
2079-
new TopicPartitionInitialOffset("foo", 0) };
2080-
ContainerProperties containerProps = new ContainerProperties(topicPartition);
2086+
willAnswer(invoc -> {
2087+
rebal.set(invoc.getArgument(1));
2088+
return null;
2089+
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
2090+
ContainerProperties containerProps = new ContainerProperties("foo");
20812091
containerProps.setGroupId("grp");
20822092
containerProps.setAckMode(AckMode.RECORD);
20832093
containerProps.setClientId("clientId");
20842094
containerProps.setIdleEventInterval(100L);
20852095
containerProps.setMessageListener((MessageListener) r -> { });
2096+
containerProps.setMissingTopicsFatal(false);
20862097
Properties consumerProps = new Properties();
20872098
consumerProps.setProperty(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "42000");
20882099
containerProps.setConsumerProperties(consumerProps);
@@ -2092,7 +2103,7 @@ public void testPauseResume() throws Exception {
20922103
CountDownLatch stopLatch = new CountDownLatch(1);
20932104
container.setApplicationEventPublisher(e -> {
20942105
if (e instanceof ConsumerPausedEvent) {
2095-
pauseLatch.countDown();
2106+
pauseLatch1.countDown();
20962107
}
20972108
else if (e instanceof ConsumerResumedEvent) {
20982109
resumeLatch.countDown();
@@ -2103,16 +2114,19 @@ else if (e instanceof ConsumerStoppedEvent) {
21032114
});
21042115
container.start();
21052116
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
2106-
verify(consumer, times(2)).commitSync(anyMap(), eq(Duration.ofSeconds(41)));
2117+
verify(consumer, times(3)).commitSync(anyMap(), eq(Duration.ofSeconds(41)));
21072118
assertThat(container.isContainerPaused()).isFalse();
21082119
container.pause();
21092120
assertThat(container.isPaused()).isTrue();
2110-
assertThat(pauseLatch.await(10, TimeUnit.SECONDS)).isTrue();
2121+
assertThat(pauseLatch1.await(10, TimeUnit.SECONDS)).isTrue();
21112122
assertThat(container.isContainerPaused()).isTrue();
2123+
rebalance.set(true); // force a re-pause
2124+
assertThat(pauseLatch2.await(10, TimeUnit.SECONDS)).isTrue();
21122125
container.resume();
21132126
assertThat(resumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
21142127
container.stop();
21152128
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
2129+
verify(consumer, times(4)).commitSync(anyMap(), eq(Duration.ofSeconds(41)));
21162130
}
21172131

21182132
@SuppressWarnings({ "unchecked", "rawtypes" })

0 commit comments

Comments
 (0)