Skip to content

Commit 786c551

Browse files
garyrussellartembilan
authored andcommitted
GH-1118: Add RecordInterceptor
Resolves #1118 **cherry-pick to 2.2.x**
1 parent a7a4df7 commit 786c551

File tree

9 files changed

+135
-24
lines changed

9 files changed

+135
-24
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.springframework.kafka.listener.ContainerProperties;
3737
import org.springframework.kafka.listener.ErrorHandler;
3838
import org.springframework.kafka.listener.GenericErrorHandler;
39+
import org.springframework.kafka.listener.RecordInterceptor;
3940
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
4041
import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer;
4142
import org.springframework.kafka.requestreply.ReplyingKafkaOperations;
@@ -98,6 +99,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
9899

99100
private Boolean missingTopicsFatal;
100101

102+
private RecordInterceptor<K, V> recordInterceptor;
103+
101104
/**
102105
* Specify a {@link ConsumerFactory} to use.
103106
* @param consumerFactory The consumer factory.
@@ -280,6 +283,16 @@ public ContainerProperties getContainerProperties() {
280283
return this.containerProperties;
281284
}
282285

286+
/**
287+
* Set an interceptor to be called before calling the listener.
288+
* Does not apply to batch listeners.
289+
* @param recordInterceptor the interceptor.
290+
* @since 2.2.7
291+
*/
292+
public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
293+
this.recordInterceptor = recordInterceptor;
294+
}
295+
283296
@Override
284297
public void afterPropertiesSet() {
285298
if (this.errorHandler != null) {
@@ -356,6 +369,7 @@ protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
356369
else if (this.autoStartup != null) {
357370
instance.setAutoStartup(this.autoStartup);
358371
}
372+
instance.setRecordInterceptor(this.recordInterceptor);
359373
JavaUtils.INSTANCE
360374
.acceptIfNotNull(this.phase, instance::setPhase)
361375
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ public abstract class AbstractMessageListenerContainer<K, V>
8989

9090
private int topicCheckTimeout = DEFAULT_TOPIC_CHECK_TIMEOUT;
9191

92+
private RecordInterceptor<K, V> recordInterceptor;
93+
9294
private volatile boolean running = false;
9395

9496
private volatile boolean paused;
@@ -279,6 +281,20 @@ public void setTopicCheckTimeout(int topicCheckTimeout) {
279281
this.topicCheckTimeout = topicCheckTimeout;
280282
}
281283

284+
protected RecordInterceptor<K, V> getRecordInterceptor() {
285+
return this.recordInterceptor;
286+
}
287+
288+
/**
289+
* Set an interceptor to be called before calling the listener.
290+
* Does not apply to batch listeners.
291+
* @param recordInterceptor the interceptor.
292+
* @since 2.2.7
293+
*/
294+
public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
295+
this.recordInterceptor = recordInterceptor;
296+
}
297+
282298
@Override
283299
public void setupMessageListener(Object messageListener) {
284300
this.containerProperties.setMessageListener(messageListener);

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ protected void doStart() {
161161
container.setClientIdSuffix("-" + i);
162162
container.setGenericErrorHandler(getGenericErrorHandler());
163163
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
164+
container.setRecordInterceptor(getRecordInterceptor());
164165
container.setEmergencyStop(() -> {
165166
stop(() -> {
166167
// NOSONAR

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
466466

467467
private final Duration syncCommitTimeout;
468468

469+
private final RecordInterceptor<K, V> recordInterceptor = getRecordInterceptor();
470+
469471
private Map<TopicPartition, OffsetMetadata> definedPartitions;
470472

471473
private volatile Collection<TopicPartition> assignedPartitions;
@@ -1308,26 +1310,35 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record,
13081310
ackCurrent(record, producer);
13091311
}
13101312

1311-
private void doInvokeOnMessage(final ConsumerRecord<K, V> record) {
1312-
switch (this.listenerType) {
1313-
case ACKNOWLEDGING_CONSUMER_AWARE:
1314-
this.listener.onMessage(record,
1315-
this.isAnyManualAck
1316-
? new ConsumerAcknowledgment(record)
1317-
: null, this.consumer);
1318-
break;
1319-
case CONSUMER_AWARE:
1320-
this.listener.onMessage(record, this.consumer);
1321-
break;
1322-
case ACKNOWLEDGING:
1323-
this.listener.onMessage(record,
1324-
this.isAnyManualAck
1325-
? new ConsumerAcknowledgment(record)
1326-
: null);
1327-
break;
1328-
case SIMPLE:
1329-
this.listener.onMessage(record);
1330-
break;
1313+
private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
1314+
ConsumerRecord<K, V> record = recordArg;
1315+
if (this.recordInterceptor != null) {
1316+
record = this.recordInterceptor.intercept(record);
1317+
}
1318+
if (record == null) {
1319+
this.logger.debug(() -> "RecordInterceptor returned null, skipping: " + recordArg);
1320+
}
1321+
else {
1322+
switch (this.listenerType) {
1323+
case ACKNOWLEDGING_CONSUMER_AWARE:
1324+
this.listener.onMessage(record,
1325+
this.isAnyManualAck
1326+
? new ConsumerAcknowledgment(record)
1327+
: null, this.consumer);
1328+
break;
1329+
case CONSUMER_AWARE:
1330+
this.listener.onMessage(record, this.consumer);
1331+
break;
1332+
case ACKNOWLEDGING:
1333+
this.listener.onMessage(record,
1334+
this.isAnyManualAck
1335+
? new ConsumerAcknowledgment(record)
1336+
: null);
1337+
break;
1338+
case SIMPLE:
1339+
this.listener.onMessage(record);
1340+
break;
1341+
}
13311342
}
13321343
}
13331344

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
21+
import org.springframework.lang.Nullable;
22+
23+
/**
24+
* An interceptor for {@link ConsumerRecord} invoked by the listener
25+
* container before invoking the listener.
26+
*
27+
* @param <K> the key type.
28+
* @param <V> the value type.
29+
*
30+
* @author Gary Russell
31+
* @since 2.2.7
32+
*
33+
*/
34+
@FunctionalInterface
35+
public interface RecordInterceptor<K, V> {
36+
37+
/**
38+
* Perform some action on the record or return a different one.
39+
* If null is returned the record will be skipped.
40+
* @param record the record.
41+
* @return the record or null.
42+
*/
43+
@Nullable
44+
ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record);
45+
46+
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,7 @@ public void testKeyConversion() throws Exception {
746746
this.bytesKeyTemplate.send("annotated36", "foo".getBytes(), "bar");
747747
assertThat(this.listener.keyLatch.await(30, TimeUnit.SECONDS)).isTrue();
748748
assertThat(this.listener.convertedKey).isEqualTo("foo");
749+
assertThat(this.config.intercepted).isTrue();
749750
}
750751

751752
@Test
@@ -761,7 +762,11 @@ public void testProjection() throws InterruptedException {
761762
@EnableTransactionManagement(proxyTargetClass = true)
762763
public static class Config implements KafkaListenerConfigurer {
763764

764-
private final CountDownLatch spyLatch = new CountDownLatch(2);
765+
final CountDownLatch spyLatch = new CountDownLatch(2);
766+
767+
volatile Throwable globalErrorThrowable;
768+
769+
volatile boolean intercepted;
765770

766771
@Bean
767772
public static PropertySourcesPlaceholderConfigurer ppc() {
@@ -784,8 +789,6 @@ public ChainedKafkaTransactionManager<Integer, String> cktm() {
784789
return new ChainedKafkaTransactionManager<>(ktm(), transactionManager());
785790
}
786791

787-
private Throwable globalErrorThrowable;
788-
789792
@Bean
790793
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
791794
kafkaListenerContainerFactory() {
@@ -884,6 +887,10 @@ public KafkaListenerContainerFactory<?> bytesStringListenerContainerFactory() {
884887
ConcurrentKafkaListenerContainerFactory<byte[], String> factory =
885888
new ConcurrentKafkaListenerContainerFactory<>();
886889
factory.setConsumerFactory(bytesStringConsumerFactory());
890+
factory.setRecordInterceptor(record -> {
891+
this.intercepted = true;
892+
return record;
893+
});
887894
return factory;
888895
}
889896

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,13 @@ protected KafkaConsumer<Integer, String> createKafkaConsumer(String groupId, Str
124124
ContainerProperties containerProps = new ContainerProperties(topic1);
125125
containerProps.setLogContainerConfig(true);
126126

127-
final CountDownLatch latch = new CountDownLatch(4);
127+
final CountDownLatch latch = new CountDownLatch(3);
128128
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
129+
final List<String> payloads = new ArrayList<>();
129130
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
130131
ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
131132
listenerThreadNames.add(Thread.currentThread().getName());
133+
payloads.add(message.value());
132134
latch.countDown();
133135
});
134136

@@ -144,6 +146,11 @@ protected KafkaConsumer<Integer, String> createKafkaConsumer(String groupId, Str
144146
stopLatch.countDown();
145147
}
146148
});
149+
CountDownLatch intercepted = new CountDownLatch(4);
150+
container.setRecordInterceptor(record -> {
151+
intercepted.countDown();
152+
return record.value().equals("baz") ? null : record;
153+
});
147154
container.start();
148155

149156
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
@@ -158,6 +165,7 @@ protected KafkaConsumer<Integer, String> createKafkaConsumer(String groupId, Str
158165
template.sendDefault(0, "baz");
159166
template.sendDefault(2, "qux");
160167
template.flush();
168+
assertThat(intercepted.await(10, TimeUnit.SECONDS)).isTrue();
161169
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
162170
for (String threadName : listenerThreadNames) {
163171
assertThat(threadName).contains("-C-");
@@ -173,6 +181,7 @@ protected KafkaConsumer<Integer, String> createKafkaConsumer(String groupId, Str
173181
Set<KafkaMessageListenerContainer<Integer, String>> children = new HashSet<>(containers);
174182
container.stop();
175183
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
184+
assertThat(payloads).containsExactlyInAnyOrder("foo", "bar", "qux");
176185
events.forEach(e -> {
177186
assertThat(e.getContainer(MessageListenerContainer.class)).isSameAs(container);
178187
if (e instanceof ContainerStoppedEvent) {

src/reference/asciidoc/kafka.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,10 @@ Two `MessageListenerContainer` implementations are provided:
753753
The `KafkaMessageListenerContainer` receives all message from all topics or partitions on a single thread.
754754
The `ConcurrentMessageListenerContainer` delegates to one or more `KafkaMessageListenerContainer` instances to provide multi-threaded consumption.
755755

756+
Starting with version 2.1.7, you can add a `RecordInterceptor` to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.
757+
If the interceptor returns null, the listener is not called.
758+
The interceptor is not invoked when the listener is a <<batch-listners, batch listener>>.
759+
756760
[[kafka-container]]
757761
====== Using `KafkaMessageListenerContainer`
758762

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ See <<seek-to-current>> for more information.
3434
It is now possible to obtain the consumer's `group.id` property in the listener method.
3535
See <<listener-group-id>> for more information.
3636

37+
The container has a new property `recordInterceptor` allowing records to be inspected or modified before invoking the listener.
38+
See <<message-listener-container>> for more information.
39+
3740
==== ErrorHandler Changes
3841

3942
The `SeekToCurrentErrorHandler` now treats certain exceptions as fatal and disables retry for those, invoking the recoverer on first failure.

0 commit comments

Comments
 (0)