Skip to content

Commit eb4eca2

Browse files
garyrussellartembilan
authored andcommitted
Support local bean reference in @KafkaListener
In the `@KafkaListener` bean post processor, allow property SpEL expressions to access properties and methods on the bean being post-processed. * Polishing - change the token to `__listener`.
1 parent c523861 commit eb4eca2

File tree

4 files changed

+179
-15
lines changed

4 files changed

+179
-15
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,4 +174,16 @@
174174
*/
175175
String clientIdPrefix() default "";
176176

177+
/**
178+
* A pseudo bean name used in SpEL expressions within this annotation to reference
179+
* the current bean within which this listener is defined. This allows access to
180+
* properties and methods within the enclosing bean.
181+
* Default '__listener'.
182+
* <p>
183+
* Example: {@code topics = "#{__listener.topicList}"}.
184+
* @return the pseudo bean name.
185+
* @since 2.1.2
186+
*/
187+
String beanRef() default "__listener";
188+
177189
}

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Arrays;
2222
import java.util.Collection;
2323
import java.util.Collections;
24+
import java.util.HashMap;
2425
import java.util.HashSet;
2526
import java.util.List;
2627
import java.util.Map;
@@ -40,12 +41,14 @@
4041
import org.springframework.beans.factory.BeanInitializationException;
4142
import org.springframework.beans.factory.ListableBeanFactory;
4243
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
44+
import org.springframework.beans.factory.ObjectFactory;
4345
import org.springframework.beans.factory.SmartInitializingSingleton;
4446
import org.springframework.beans.factory.config.BeanExpressionContext;
4547
import org.springframework.beans.factory.config.BeanExpressionResolver;
4648
import org.springframework.beans.factory.config.BeanPostProcessor;
4749
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
4850
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
51+
import org.springframework.beans.factory.config.Scope;
4952
import org.springframework.context.expression.StandardBeanExpressionResolver;
5053
import org.springframework.core.MethodIntrospector;
5154
import org.springframework.core.Ordered;
@@ -128,6 +131,8 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
128131

129132
private final Log logger = LogFactory.getLog(getClass());
130133

134+
private final ListenerScope listenerScope = new ListenerScope();
135+
131136
private KafkaListenerEndpointRegistry endpointRegistry;
132137

133138
private String containerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME;
@@ -192,7 +197,8 @@ public void setBeanFactory(BeanFactory beanFactory) {
192197
this.beanFactory = beanFactory;
193198
if (beanFactory instanceof ConfigurableListableBeanFactory) {
194199
this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
195-
this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory, null);
200+
this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
201+
this.listenerScope);
196202
}
197203
}
198204

@@ -384,6 +390,10 @@ private Method checkProxy(Method methodArg, Object bean) {
384390

385391
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean,
386392
Object adminTarget, String beanName) {
393+
String beanRef = kafkaListener.beanRef();
394+
if (StringUtils.hasText(beanRef)) {
395+
this.listenerScope.addListener(beanRef, bean);
396+
}
387397
endpoint.setBean(bean);
388398
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
389399
endpoint.setId(getEndpointId(kafkaListener));
@@ -416,6 +426,9 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
416426
}
417427

418428
this.registrar.registerEndpoint(endpoint, factory);
429+
if (StringUtils.hasText(beanRef)) {
430+
this.listenerScope.removeListener(beanRef);
431+
}
419432
}
420433

421434
private String getEndpointId(KafkaListener kafkaListener) {
@@ -723,4 +736,46 @@ protected boolean isEmptyPayload(Object payload) {
723736

724737
}
725738

739+
private static class ListenerScope implements Scope {
740+
741+
private final Map<String, Object> listeners = new HashMap<>();
742+
743+
ListenerScope() {
744+
super();
745+
}
746+
747+
public void addListener(String key, Object bean) {
748+
this.listeners.put(key, bean);
749+
}
750+
751+
public void removeListener(String key) {
752+
this.listeners.remove(key);
753+
}
754+
755+
@Override
756+
public Object get(String name, ObjectFactory<?> objectFactory) {
757+
return this.listeners.get(name);
758+
}
759+
760+
@Override
761+
public Object remove(String name) {
762+
return null;
763+
}
764+
765+
@Override
766+
public void registerDestructionCallback(String name, Runnable callback) {
767+
}
768+
769+
@Override
770+
public Object resolveContextualObject(String key) {
771+
return this.listeners.get(key);
772+
}
773+
774+
@Override
775+
public String getConversationId() {
776+
return null;
777+
}
778+
779+
}
780+
726781
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -70,18 +70,24 @@ public class BatchListenerConversionTests {
7070
@Autowired
7171
private KafkaTemplate<Integer, Foo> template;
7272

73-
@SuppressWarnings("unchecked")
7473
@Test
7574
public void testBatchOfPojos() throws Exception {
75+
doTest(this.config.listener1(), "blc1");
76+
doTest(this.config.listener2(), "blc2");
77+
}
78+
79+
private void doTest(Listener listener, String topic) throws InterruptedException {
7680
this.template.send(new GenericMessage<>(
77-
new Foo("bar"), Collections.singletonMap(KafkaHeaders.TOPIC, "blc1")));
81+
new Foo("bar"), Collections.singletonMap(KafkaHeaders.TOPIC, topic)));
7882
this.template.send(new GenericMessage<>(
79-
new Foo("baz"), Collections.singletonMap(KafkaHeaders.TOPIC, "blc1")));
80-
assertThat(config.listener().latch1.await(10, TimeUnit.SECONDS)).isTrue();
81-
assertThat(config.listener().received).isInstanceOf(List.class);
82-
assertThat(((List<?>) config.listener().received).size()).isGreaterThan(0);
83-
assertThat(((List<?>) config.listener().received).get(0)).isInstanceOf(Foo.class);
84-
assertThat(((List<Foo>) config.listener().received).get(0).bar).isEqualTo("bar");
83+
new Foo("baz"), Collections.singletonMap(KafkaHeaders.TOPIC, topic)));
84+
assertThat(listener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
85+
assertThat(listener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
86+
assertThat(listener.received.size()).isGreaterThan(0);
87+
assertThat(listener.received.get(0)).isInstanceOf(Foo.class);
88+
assertThat(listener.received.get(0).bar).isEqualTo("bar");
89+
assertThat((listener.receivedTopics).get(0)).isEqualTo(topic);
90+
assertThat((listener.receivedPartitions).get(0)).isEqualTo(0);
8591
}
8692

8793
@Configuration
@@ -134,24 +140,55 @@ public Map<String, Object> producerConfigs() {
134140
}
135141

136142
@Bean
137-
public Listener listener() {
138-
return new Listener();
143+
public Listener listener1() {
144+
return new Listener("blc1");
145+
}
146+
147+
@Bean
148+
public Listener listener2() {
149+
return new Listener("blc2");
139150
}
140151

141152
}
142153

143154
public static class Listener {
144155

156+
private final String topic;
157+
145158
private final CountDownLatch latch1 = new CountDownLatch(1);
146159

147-
private Object received;
160+
private final CountDownLatch latch2 = new CountDownLatch(1);
161+
162+
private List<Foo> received;
163+
164+
private List<String> receivedTopics;
148165

149-
@KafkaListener(topics = "blc1")
150-
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
151-
this.received = foos;
166+
private List<Integer> receivedPartitions;
167+
168+
public Listener(String topic) {
169+
this.topic = topic;
170+
}
171+
172+
@KafkaListener(topics = "#{__listener.topic}", groupId = "#{__listener.topic}.group")
173+
public void listen1(List<Foo> foos, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
174+
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions) {
175+
if (this.received == null) {
176+
this.received = foos;
177+
}
178+
this.receivedTopics = topics;
179+
this.receivedPartitions = partitions;
152180
this.latch1.countDown();
153181
}
154182

183+
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group2")
184+
public void listen2(List<Foo> foos) {
185+
this.latch2.countDown();
186+
}
187+
188+
public String getTopic() {
189+
return this.topic;
190+
}
191+
155192
}
156193

157194
public static class Foo {

src/reference/asciidoc/kafka.adoc

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,66 @@ public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ac
756756
Starting with _version 2.0_, the `id` attribute (if present) is used as the Kafka `group.id` property, overriding the configured property in the consumer factory, if present.
757757
You can also set `groupId` explicitly, or set `idIsGroup` to false, to restore the previous behavior of using the consumer factory `group.id`.
758758

759+
You can use property placeholders or SpEL expressions within annotation properties, for example...
760+
761+
[source, java]
762+
----
763+
@KafkaListener(topics = "${some.property}")
764+
765+
@KafkaListener(topics = "#{someBean.someProperty}",
766+
groupId = "#{someBean.someProperty}.group")
767+
----
768+
769+
Starting with _version 2.1.2_, the SpEL expressions support a special token `__kafkaListener__` which is a pseudo bean name which represents the current bean instance within which this annotation exists.
770+
771+
For example, given...
772+
773+
[source, java]
774+
----
775+
@Bean
776+
public Listener listener1() {
777+
return new Listener("topic1");
778+
}
779+
780+
@Bean
781+
public Listener listener2() {
782+
return new Listener("topic2");
783+
}
784+
----
785+
786+
...we can use...
787+
788+
[source, java]
789+
----
790+
public class Listener {
791+
792+
private final String topic;
793+
794+
public Listener(String topic) {
795+
this.topic = topic;
796+
}
797+
798+
@KafkaListener(topics = "#{__listener.topic}",
799+
groupId = "#{__listener.topic}.group")
800+
public void listen(...) {
801+
...
802+
}
803+
804+
public String getTopic() {
805+
return this.topic;
806+
}
807+
808+
}
809+
----
810+
811+
If, in the unlikely event that you have an actual bean called `__listener`, you can change the expression token using the `beanRef` attribute...
812+
813+
[source, java]
814+
----
815+
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}",
816+
groupId = "#{__x.topic}.group")
817+
----
818+
759819
===== Container Thread Naming
760820

761821
Listener containers currently use two task executors, one to invoke the consumer and another which will be used to invoke the listener, when the kafka consumer property `enable.auto.commit` is `false`.

0 commit comments

Comments
 (0)