Skip to content

Commit 3657d05

Browse files
artembilangaryrussell
authored andcommitted
Defer Messaging annotations process (#2769)
* Defer Messaging annotations process The `AbstractMethodAnnotationPostProcessor` and its implementations have a `beanFactory.getBean()` call for the `@Bean` methods with Messaging annotations. This is done, actually, from the `MessagingAnnotationPostProcessor.postProcessAfterInitialization()` which might be still too early in some scenarios, like Spring Cloud Feign with its child application contexts being initialized from the `FeignClientFactoryBean`, causing a `BeanCurrentlyInCreationException` See https://stackoverflow.com/questions/54887963/beancurrentlyincreationexception-when-using-spring-integration-with-spring-cloud * Implement a `SmartInitializingSingleton` for the `MessagingAnnotationPostProcessor` and gather `Runnable` wrappers for newly introduced `postProcessMethodAndRegisterEndpointIfAny()` to be called later in the `afterSingletonsInstantiated()` when context is still in the initialization phase. All runtime-registered beans are going to be processed normally from the regular `postProcessAfterInitialization()` **Cherry-pick to 5.1.x** * * Fix unused imports in the `MessagingAnnotationsWithBeanAnnotationTests` * * Fix `IntegrationEndpointsInitializer` in the testing framework to handle all the possible `AbstractEndpoint` beans registration. See its JavaDocs for more info * Fix `AbstractCorrelatingMessageHandlerParser` and `AbstractConsumerEndpointParser` to use bean names for `outputChannel` and `discardChannel` instead of bean references. Since `MessagingAnnotationPostProcessor` now registers endpoints and beans for channels much later, than parsers, we can't rely on bean references any more there. * Fixes for failing tests which expected `outputChannel/discardChannel` bean references, when it is already just their names for late binding. * Apply some code style polishing for the affected classes. * Add `@Nullable` for `MessageSelector` parameter in the `QueueChannel.purge()`
1 parent 4365eae commit 3657d05

File tree

27 files changed

+339
-247
lines changed

27 files changed

+339
-247
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/QueueChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -175,7 +175,7 @@ public List<Message<?>> clear() {
175175
}
176176

177177
@Override
178-
public List<Message<?>> purge(MessageSelector selector) {
178+
public List<Message<?>> purge(@Nullable MessageSelector selector) {
179179
if (selector == null) {
180180
return this.clear();
181181
}

spring-integration-core/src/main/java/org/springframework/integration/channel/QueueChannelOperations.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,38 +19,41 @@
1919
import java.util.List;
2020

2121
import org.springframework.integration.core.MessageSelector;
22+
import org.springframework.lang.Nullable;
2223
import org.springframework.messaging.Message;
2324

2425
/**
2526
* Operations available on a channel that has queuing semantics.
2627
*
2728
* @author Gary Russell
29+
* @author Artem Bilan
30+
*
2831
* @since 3.0
2932
*
3033
*/
3134
public interface QueueChannelOperations {
3235

3336
/**
3437
* Remove all {@link Message Messages} from this channel.
35-
*
3638
* @return The messages that were removed.
3739
*/
3840
List<Message<?>> clear();
3941

4042
/**
4143
* Remove any {@link Message Messages} that are not accepted by the provided selector.
42-
*
4344
* @param selector The message selector.
4445
* @return The list of messages that were purged.
4546
*/
46-
List<Message<?>> purge(MessageSelector selector);
47+
List<Message<?>> purge(@Nullable MessageSelector selector);
4748

4849
/**
50+
* Obtain the current number of queued {@link Message Messages} in this channel.
4951
* @return The current number of queued {@link Message Messages} in this channel.
5052
*/
5153
int getQueueSize();
5254

5355
/**
56+
* Obtain the remaining capacity of this channel.
5457
* @return The remaining capacity of this channel.
5558
*/
5659
int getRemainingCapacity();

spring-integration-core/src/main/java/org/springframework/integration/config/AbstractSimpleMessageHandlerFactoryBean.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -70,6 +70,8 @@ public abstract class AbstractSimpleMessageHandlerFactoryBean<H extends MessageH
7070

7171
private MessageChannel outputChannel;
7272

73+
private String outputChannelName;
74+
7375
private Integer order;
7476

7577
private List<Advice> adviceChain;
@@ -119,6 +121,15 @@ public void setOutputChannel(MessageChannel outputChannel) {
119121
this.outputChannel = outputChannel;
120122
}
121123

124+
/**
125+
* Set the handler's output channel name.
126+
* @param outputChannelName the output channel bean name to set.
127+
* @since 5.1.4
128+
*/
129+
public void setOutputChannelName(String outputChannelName) {
130+
this.outputChannelName = outputChannelName;
131+
}
132+
122133
/**
123134
* Set the order in which the handler will be subscribed to its channel
124135
* (when subscribable).
@@ -197,9 +208,7 @@ protected final H createHandlerInternal() {
197208
((ApplicationEventPublisherAware) this.handler)
198209
.setApplicationEventPublisher(this.applicationEventPublisher);
199210
}
200-
if (this.handler instanceof MessageProducer && this.outputChannel != null) {
201-
((MessageProducer) this.handler).setOutputChannel(this.outputChannel);
202-
}
211+
configureOutputChannelIfAny();
203212
Object actualHandler = extractTarget(this.handler);
204213
if (actualHandler == null) {
205214
actualHandler = this.handler;
@@ -247,6 +256,18 @@ else if (this.logger.isDebugEnabled()) {
247256
return this.handler;
248257
}
249258

259+
private void configureOutputChannelIfAny() {
260+
if (this.handler instanceof MessageProducer) {
261+
MessageProducer messageProducer = (MessageProducer) this.handler;
262+
if (this.outputChannel != null) {
263+
messageProducer.setOutputChannel(this.outputChannel);
264+
}
265+
else if (this.outputChannelName != null) {
266+
messageProducer.setOutputChannelName(this.outputChannelName);
267+
}
268+
}
269+
}
270+
250271
protected abstract H createHandler();
251272

252273
@Override

spring-integration-core/src/main/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessor.java

Lines changed: 87 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.lang.annotation.Annotation;
2020
import java.lang.reflect.Method;
21+
import java.util.ArrayList;
2122
import java.util.Collections;
2223
import java.util.HashMap;
2324
import java.util.HashSet;
@@ -36,6 +37,7 @@
3637
import org.springframework.beans.factory.BeanFactory;
3738
import org.springframework.beans.factory.BeanFactoryAware;
3839
import org.springframework.beans.factory.InitializingBean;
40+
import org.springframework.beans.factory.SmartInitializingSingleton;
3941
import org.springframework.beans.factory.config.BeanPostProcessor;
4042
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
4143
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
@@ -71,7 +73,8 @@
7173
* @author Gary Russell
7274
* @author Rick Hogge
7375
*/
74-
public class MessagingAnnotationPostProcessor implements BeanPostProcessor, BeanFactoryAware, InitializingBean {
76+
public class MessagingAnnotationPostProcessor implements BeanPostProcessor, BeanFactoryAware, InitializingBean,
77+
SmartInitializingSingleton {
7578

7679
protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR
7780

@@ -81,6 +84,10 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Bean
8184

8285
private final Set<Class<?>> noAnnotationsCache = Collections.newSetFromMap(new ConcurrentHashMap<>(256));
8386

87+
private final List<Runnable> methodsToPostProcessAfterContextInitialization = new ArrayList<>();
88+
89+
private volatile boolean initialized;
90+
8491
@Override
8592
public void setBeanFactory(BeanFactory beanFactory) {
8693
Assert.isAssignable(ConfigurableListableBeanFactory.class, beanFactory.getClass(),
@@ -122,9 +129,17 @@ protected Map<Class<? extends Annotation>, MethodAnnotationPostProcessor<?>> set
122129

123130
public <A extends Annotation> void addMessagingAnnotationPostProcessor(Class<A> annotation,
124131
MethodAnnotationPostProcessor<A> postProcessor) {
132+
125133
this.postProcessors.put(annotation, postProcessor);
126134
}
127135

136+
@Override
137+
public void afterSingletonsInstantiated() {
138+
this.initialized = true;
139+
this.methodsToPostProcessAfterContextInitialization.forEach(Runnable::run);
140+
this.methodsToPostProcessAfterContextInitialization.clear();
141+
}
142+
128143
@Override
129144
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
130145
return bean;
@@ -141,38 +156,43 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
141156
return bean;
142157
}
143158

144-
ReflectionUtils.doWithMethods(beanClass, method -> {
145-
Map<Class<? extends Annotation>, List<Annotation>> annotationChains = new HashMap<>();
146-
for (Class<? extends Annotation> annotationType :
147-
this.postProcessors.keySet()) {
148-
if (AnnotatedElementUtils.isAnnotated(method, annotationType.getName())) {
149-
List<Annotation> annotationChain = getAnnotationChain(method, annotationType);
150-
if (annotationChain.size() > 0) {
151-
annotationChains.put(annotationType, annotationChain);
152-
}
153-
}
154-
}
155-
if (StringUtils.hasText(MessagingAnnotationUtils.endpointIdValue(method))
156-
&& annotationChains.keySet().size() > 1) {
157-
throw new IllegalStateException("@EndpointId on " + method.toGenericString()
158-
+ " can only have one EIP annotation, found: " + annotationChains.keySet().size());
159-
}
160-
for (Entry<Class<? extends Annotation>, List<Annotation>> entry : annotationChains.entrySet()) {
161-
Class<? extends Annotation> annotationType = entry.getKey();
162-
List<Annotation> annotations = entry.getValue();
163-
processAnnotationTypeOnMethod(bean, beanName, method, annotationType, annotations);
164-
}
159+
ReflectionUtils.doWithMethods(beanClass,
160+
method -> doWithMethod(method, bean, beanName, beanClass),
161+
ReflectionUtils.USER_DECLARED_METHODS);
162+
163+
return bean;
164+
}
165165

166-
if (annotationChains.size() == 0) {
167-
this.noAnnotationsCache.add(beanClass);
166+
private void doWithMethod(Method method, Object bean, String beanName, Class<?> beanClass) {
167+
Map<Class<? extends Annotation>, List<Annotation>> annotationChains = new HashMap<>();
168+
for (Class<? extends Annotation> annotationType :
169+
this.postProcessors.keySet()) {
170+
if (AnnotatedElementUtils.isAnnotated(method, annotationType.getName())) {
171+
List<Annotation> annotationChain = getAnnotationChain(method, annotationType);
172+
if (annotationChain.size() > 0) {
173+
annotationChains.put(annotationType, annotationChain);
174+
}
168175
}
169-
}, ReflectionUtils.USER_DECLARED_METHODS);
176+
}
177+
if (StringUtils.hasText(MessagingAnnotationUtils.endpointIdValue(method))
178+
&& annotationChains.keySet().size() > 1) {
179+
throw new IllegalStateException("@EndpointId on " + method.toGenericString()
180+
+ " can only have one EIP annotation, found: " + annotationChains.keySet().size());
181+
}
182+
for (Entry<Class<? extends Annotation>, List<Annotation>> entry : annotationChains.entrySet()) {
183+
Class<? extends Annotation> annotationType = entry.getKey();
184+
List<Annotation> annotations = entry.getValue();
185+
processAnnotationTypeOnMethod(bean, beanName, method, annotationType, annotations);
186+
}
170187

171-
return bean;
188+
if (annotationChains.size() == 0) {
189+
this.noAnnotationsCache.add(beanClass);
190+
}
172191
}
173192

174193
protected void processAnnotationTypeOnMethod(Object bean, String beanName, Method method,
175194
Class<? extends Annotation> annotationType, List<Annotation> annotations) {
195+
176196
MethodAnnotationPostProcessor<?> postProcessor =
177197
MessagingAnnotationPostProcessor.this.postProcessors.get(annotationType);
178198
if (postProcessor != null && postProcessor.shouldCreateEndpoint(method, annotations)) {
@@ -187,36 +207,53 @@ protected void processAnnotationTypeOnMethod(Object bean, String beanName, Metho
187207
+ "and its method: '" + method + "'", e);
188208
}
189209
}
190-
Object result = postProcessor.postProcess(bean, beanName, targetMethod, annotations);
191-
if (result != null && result instanceof AbstractEndpoint) {
192-
AbstractEndpoint endpoint = (AbstractEndpoint) result;
193-
String autoStartup = MessagingAnnotationUtils.resolveAttribute(annotations, "autoStartup",
194-
String.class);
210+
211+
if (this.initialized) {
212+
postProcessMethodAndRegisterEndpointIfAny(bean, beanName, method, annotationType, annotations,
213+
postProcessor, targetMethod);
214+
}
215+
else {
216+
Method methodToPostProcess = targetMethod;
217+
this.methodsToPostProcessAfterContextInitialization.add(() ->
218+
postProcessMethodAndRegisterEndpointIfAny(bean, beanName, method, annotationType, annotations,
219+
postProcessor, methodToPostProcess));
220+
}
221+
}
222+
}
223+
224+
private void postProcessMethodAndRegisterEndpointIfAny(Object bean, String beanName, Method method,
225+
Class<? extends Annotation> annotationType, List<Annotation> annotations,
226+
MethodAnnotationPostProcessor<?> postProcessor, Method targetMethod) {
227+
228+
Object result = postProcessor.postProcess(bean, beanName, targetMethod, annotations);
229+
if (result instanceof AbstractEndpoint) {
230+
AbstractEndpoint endpoint = (AbstractEndpoint) result;
231+
String autoStartup = MessagingAnnotationUtils.resolveAttribute(annotations, "autoStartup",
232+
String.class);
233+
if (StringUtils.hasText(autoStartup)) {
234+
autoStartup = getBeanFactory().resolveEmbeddedValue(autoStartup);
195235
if (StringUtils.hasText(autoStartup)) {
196-
autoStartup = getBeanFactory().resolveEmbeddedValue(autoStartup);
197-
if (StringUtils.hasText(autoStartup)) {
198-
endpoint.setAutoStartup(Boolean.parseBoolean(autoStartup));
199-
}
236+
endpoint.setAutoStartup(Boolean.parseBoolean(autoStartup));
200237
}
238+
}
201239

202-
String phase = MessagingAnnotationUtils.resolveAttribute(annotations, "phase", String.class);
240+
String phase = MessagingAnnotationUtils.resolveAttribute(annotations, "phase", String.class);
241+
if (StringUtils.hasText(phase)) {
242+
phase = getBeanFactory().resolveEmbeddedValue(phase);
203243
if (StringUtils.hasText(phase)) {
204-
phase = getBeanFactory().resolveEmbeddedValue(phase);
205-
if (StringUtils.hasText(phase)) {
206-
endpoint.setPhase(Integer.parseInt(phase));
207-
}
208-
}
209-
210-
Role role = AnnotationUtils.findAnnotation(method, Role.class);
211-
if (role != null) {
212-
endpoint.setRole(role.value());
244+
endpoint.setPhase(Integer.parseInt(phase));
213245
}
246+
}
214247

215-
String endpointBeanName = generateBeanName(beanName, method, annotationType);
216-
endpoint.setBeanName(endpointBeanName);
217-
getBeanFactory().registerSingleton(endpointBeanName, endpoint);
218-
getBeanFactory().initializeBean(endpoint, endpointBeanName);
248+
Role role = AnnotationUtils.findAnnotation(method, Role.class);
249+
if (role != null) {
250+
endpoint.setRole(role.value());
219251
}
252+
253+
String endpointBeanName = generateBeanName(beanName, method, annotationType);
254+
endpoint.setBeanName(endpointBeanName);
255+
getBeanFactory().registerSingleton(endpointBeanName, endpoint);
256+
getBeanFactory().initializeBean(endpoint, endpointBeanName);
220257
}
221258
}
222259

spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -84,8 +84,9 @@ protected String getInputChannelAttributeName() {
8484

8585
@Override
8686
protected final AbstractBeanDefinition parseInternal(Element element, ParserContext parserContext) {
87-
BeanDefinitionBuilder handlerBuilder = this.parseHandler(element, parserContext);
88-
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(handlerBuilder, element, "output-channel");
87+
BeanDefinitionBuilder handlerBuilder = parseHandler(element, parserContext);
88+
IntegrationNamespaceUtils.setValueIfAttributeDefined(handlerBuilder, element, "output-channel",
89+
"outputChannelName");
8990
IntegrationNamespaceUtils.setValueIfAttributeDefined(handlerBuilder, element, "order");
9091

9192
Element txElement = DomUtils.getChildElementByTagName(element, "transactional");

spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractCorrelatingMessageHandlerParser.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -78,8 +78,9 @@ protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetad
7878

7979
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, MESSAGE_STORE_ATTRIBUTE);
8080
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "scheduler", "taskScheduler");
81-
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE);
8281
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "lock-registry");
82+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE,
83+
"discardChannelName");
8384
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE);
8485
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_EXPIRY_ATTRIBUTE);
8586
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "empty-group-min-timeout",

0 commit comments

Comments
 (0)