Skip to content

Commit a808a56

Browse files
garyrussellartembilan
authored andcommitted
GH-1142: Add CompositeRecordInterceptor (#1143)
* GH-1142: Add CompositeRecordInterceptor Resolves #1142 * Polishing * * Add assertions * * Fix checkstyle and varargs warning
1 parent d85040a commit a808a56

File tree

3 files changed

+64
-1
lines changed

3 files changed

+64
-1
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.ArrayList;
20+
import java.util.Arrays;
21+
import java.util.Collection;
22+
23+
import org.apache.kafka.clients.consumer.ConsumerRecord;
24+
25+
import org.springframework.util.Assert;
26+
27+
/**
28+
* A {@link RecordInterceptor} that delegates to one or more {@link RecordInterceptor} in
29+
* order.
30+
*
31+
* @param <K> the key type.
32+
* @param <V> the value type.
33+
*
34+
* @author Artem Bilan
35+
* @author Gary Russell
36+
* @since 2.3
37+
*
38+
*/
39+
public class CompositeRecordInterceptor<K, V> implements RecordInterceptor<K, V> {
40+
41+
private final Collection<RecordInterceptor<K, V>> delegates = new ArrayList<>();
42+
43+
@SafeVarargs
44+
@SuppressWarnings("varargs")
45+
public CompositeRecordInterceptor(RecordInterceptor<K, V>... delegates) {
46+
Assert.notNull(delegates, "'delegates' cannot be null");
47+
Assert.noNullElements(delegates, "'delegates' cannot have null entries");
48+
this.delegates.addAll(Arrays.asList(delegates));
49+
}
50+
51+
@Override
52+
public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record) {
53+
ConsumerRecord<K, V> recordToIntercept = record;
54+
for (RecordInterceptor<K, V> delegate : this.delegates) {
55+
recordToIntercept = delegate.intercept(recordToIntercept);
56+
}
57+
return recordToIntercept;
58+
}
59+
60+
}

src/reference/asciidoc/kafka.adoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -772,10 +772,12 @@ Two `MessageListenerContainer` implementations are provided:
772772
The `KafkaMessageListenerContainer` receives all message from all topics or partitions on a single thread.
773773
The `ConcurrentMessageListenerContainer` delegates to one or more `KafkaMessageListenerContainer` instances to provide multi-threaded consumption.
774774

775-
Starting with version 2.1.7, you can add a `RecordInterceptor` to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.
775+
Starting with version 2.2.7, you can add a `RecordInterceptor` to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.
776776
If the interceptor returns null, the listener is not called.
777777
The interceptor is not invoked when the listener is a <<batch-listners, batch listener>>.
778778

779+
Starting with version 2.3, the `CompositeRecordInterceptor` can be used to invoke multiple interceptors.
780+
779781
[[kafka-container]]
780782
====== Using `KafkaMessageListenerContainer`
781783

src/reference/asciidoc/whats-new.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ It is now possible to obtain the consumer's `group.id` property in the listener
3939
See <<listener-group-id>> for more information.
4040

4141
The container has a new property `recordInterceptor` allowing records to be inspected or modified before invoking the listener.
42+
A `CompositeRecordInterceptor` is also provided in case you need to invoke multiple interceptors.
4243
See <<message-listener-container>> for more information.
4344

4445
==== ErrorHandler Changes

0 commit comments

Comments
 (0)