Skip to content

Commit 70272bb

Browse files
committed
INT-2277: Register ChResolver & ErrHandler beans
JIRA: https://jira.spring.io/browse/INT-2277 To avoid extra objects at runtime and reuse a central configuration, register `BeanFactoryChannelResolver` and `MessagePublishingErrorHandler` bean via `DefaultConfiguringBeanFactoryPostProcessor` * Use those beans whenever it is necessary via `IntegrationContextUtils` factory methods against provided `BeanFactory` * To avoid changes in the non-managed test, those new factories fall back to a new instance if there is no an appropriate bean in the `beanFactory`
1 parent 0b3e2c9 commit 70272bb

19 files changed

+175
-116
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aop/MessagePublishingInterceptor.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
3131
import org.springframework.core.ParameterNameDiscoverer;
3232
import org.springframework.expression.Expression;
3333
import org.springframework.expression.spel.support.StandardEvaluationContext;
34+
import org.springframework.integration.context.IntegrationContextUtils;
3435
import org.springframework.integration.core.MessagingTemplate;
3536
import org.springframework.integration.expression.ExpressionEvalMap;
3637
import org.springframework.integration.expression.ExpressionUtils;
@@ -59,29 +60,31 @@ public class MessagePublishingInterceptor implements MethodInterceptor, BeanFact
5960

6061
private final MessagingTemplate messagingTemplate = new MessagingTemplate();
6162

62-
private volatile PublisherMetadataSource metadataSource;
63+
private final ParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer();
6364

64-
private volatile DestinationResolver<MessageChannel> channelResolver;
65+
private final PublisherMetadataSource metadataSource;
6566

66-
private volatile BeanFactory beanFactory;
67+
private DestinationResolver<MessageChannel> channelResolver;
6768

68-
private final ParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer();
69+
private BeanFactory beanFactory;
6970

70-
private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();
71+
private MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();
7172

72-
private volatile boolean messageBuilderFactorySet;
73+
private boolean messageBuilderFactorySet;
7374

74-
private volatile String defaultChannelName;
75+
private String defaultChannelName;
7576

7677
public MessagePublishingInterceptor(PublisherMetadataSource metadataSource) {
7778
Assert.notNull(metadataSource, "metadataSource must not be null");
7879
this.metadataSource = metadataSource;
7980
}
8081

81-
82+
/**
83+
* @param metadataSource the {@link PublisherMetadataSource} to use.
84+
* @deprecated since 5.2 in favor constructor argument.
85+
*/
86+
@Deprecated
8287
public void setPublisherMetadataSource(PublisherMetadataSource metadataSource) {
83-
Assert.notNull(metadataSource, "metadataSource must not be null");
84-
this.metadataSource = metadataSource;
8588
}
8689

8790
/**
@@ -100,6 +103,9 @@ public void setChannelResolver(DestinationResolver<MessageChannel> channelResolv
100103
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
101104
this.beanFactory = beanFactory;
102105
this.messagingTemplate.setBeanFactory(beanFactory);
106+
if (this.channelResolver == null) {
107+
this.channelResolver = IntegrationContextUtils.getChannelResolver(beanFactory);
108+
}
103109
}
104110

105111
protected MessageBuilderFactory getMessageBuilderFactory() {
@@ -191,7 +197,6 @@ private void publishMessage(Method method, StandardEvaluationContext context) {
191197
}
192198

193199
private Map<String, Object> evaluateHeaders(Method method, StandardEvaluationContext context) {
194-
195200
Map<String, Expression> headerExpressionMap = this.metadataSource.getExpressionsForHeaders(method);
196201
if (headerExpressionMap != null) {
197202
return ExpressionEvalMap.from(headerExpressionMap)

spring-integration-core/src/main/java/org/springframework/integration/aop/PublisherAnnotationAdvisor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,7 +36,6 @@
3636
import org.springframework.beans.factory.BeanFactoryAware;
3737
import org.springframework.core.annotation.AnnotationUtils;
3838
import org.springframework.integration.annotation.Publisher;
39-
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
4039
import org.springframework.util.Assert;
4140

4241
/**
@@ -60,7 +59,8 @@ public class PublisherAnnotationAdvisor extends AbstractPointcutAdvisor implemen
6059
@SuppressWarnings("unchecked")
6160
public PublisherAnnotationAdvisor(Class<? extends Annotation>... publisherAnnotationTypes) {
6261
this.publisherAnnotationTypes = new HashSet<>(Arrays.asList(publisherAnnotationTypes));
63-
PublisherMetadataSource metadataSource = new MethodAnnotationPublisherMetadataSource(this.publisherAnnotationTypes);
62+
PublisherMetadataSource metadataSource =
63+
new MethodAnnotationPublisherMetadataSource(this.publisherAnnotationTypes);
6464
this.interceptor = new MessagePublishingInterceptor(metadataSource);
6565
}
6666

@@ -81,7 +81,6 @@ public void setDefaultChannelName(String defaultChannelName) {
8181

8282
@Override
8383
public void setBeanFactory(BeanFactory beanFactory) {
84-
this.interceptor.setChannelResolver(new BeanFactoryChannelResolver(beanFactory));
8584
this.interceptor.setBeanFactory(beanFactory);
8685
}
8786

@@ -168,6 +167,7 @@ public ClassFilter getClassFilter() {
168167
public MethodMatcher getMethodMatcher() {
169168
return this.methodMatcher;
170169
}
170+
171171
}
172172

173173

@@ -197,6 +197,7 @@ public boolean matches(Method method, Class targetClass) {
197197
return (specificMethod != method &&
198198
(AnnotationUtils.getAnnotation(specificMethod, this.annotationType) != null));
199199
}
200+
200201
}
201202

202203
}

spring-integration-core/src/main/java/org/springframework/integration/channel/ExecutorChannel.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,11 +18,11 @@
1818

1919
import java.util.concurrent.Executor;
2020

21+
import org.springframework.integration.context.IntegrationContextUtils;
2122
import org.springframework.integration.context.IntegrationProperties;
2223
import org.springframework.integration.dispatcher.LoadBalancingStrategy;
2324
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
2425
import org.springframework.integration.dispatcher.UnicastingDispatcher;
25-
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
2626
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
2727
import org.springframework.util.Assert;
2828
import org.springframework.util.ErrorHandler;
@@ -103,8 +103,7 @@ public final void onInit() {
103103
+ "bean is fully initialized by the framework. Do not subscribe in a @Bean definition");
104104
super.onInit();
105105
if (!(this.executor instanceof ErrorHandlingTaskExecutor)) {
106-
ErrorHandler errorHandler = new MessagePublishingErrorHandler(
107-
new BeanFactoryChannelResolver(this.getBeanFactory()));
106+
ErrorHandler errorHandler = IntegrationContextUtils.getErrorHandler(getBeanFactory());
108107
this.executor = new ErrorHandlingTaskExecutor(this.executor, errorHandler);
109108
}
110109
UnicastingDispatcher unicastingDispatcher = new UnicastingDispatcher(this.executor);

spring-integration-core/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public MessagePublishingErrorHandler(DestinationResolver<MessageChannel> channel
6464
}
6565

6666

67-
public void setDefaultErrorChannel(MessageChannel defaultErrorChannel) {
67+
public void setDefaultErrorChannel(@Nullable MessageChannel defaultErrorChannel) {
6868
setChannel(defaultErrorChannel);
6969
}
7070

spring-integration-core/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,9 +18,10 @@
1818

1919
import java.util.concurrent.Executor;
2020

21+
import org.springframework.beans.factory.BeanFactory;
22+
import org.springframework.integration.context.IntegrationContextUtils;
2123
import org.springframework.integration.context.IntegrationProperties;
2224
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
23-
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
2425
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
2526
import org.springframework.lang.Nullable;
2627
import org.springframework.util.Assert;
@@ -130,21 +131,23 @@ public void setMinSubscribers(int minSubscribers) {
130131
@Override
131132
public final void onInit() {
132133
super.onInit();
134+
BeanFactory beanFactory = this.getBeanFactory();
135+
BroadcastingDispatcher dispatcherToUse = getDispatcher();
133136
if (this.executor != null) {
134-
Assert.state(getDispatcher().getHandlerCount() == 0,
137+
Assert.state(dispatcherToUse.getHandlerCount() == 0,
135138
"When providing an Executor, you cannot subscribe() until the channel "
136139
+ "bean is fully initialized by the framework. Do not subscribe in a @Bean definition");
137140
if (!(this.executor instanceof ErrorHandlingTaskExecutor)) {
138141
if (this.errorHandler == null) {
139-
this.errorHandler = new MessagePublishingErrorHandler(
140-
new BeanFactoryChannelResolver(this.getBeanFactory()));
142+
this.errorHandler = IntegrationContextUtils.getErrorHandler(beanFactory);
141143
}
142144
this.executor = new ErrorHandlingTaskExecutor(this.executor, this.errorHandler);
143145
}
144-
this.dispatcher = new BroadcastingDispatcher(this.executor);
145-
getDispatcher().setIgnoreFailures(this.ignoreFailures);
146-
getDispatcher().setApplySequence(this.applySequence);
147-
getDispatcher().setMinSubscribers(this.minSubscribers);
146+
dispatcherToUse = new BroadcastingDispatcher(this.executor);
147+
dispatcherToUse.setIgnoreFailures(this.ignoreFailures);
148+
dispatcherToUse.setApplySequence(this.applySequence);
149+
dispatcherToUse.setMinSubscribers(this.minSubscribers);
150+
this.dispatcher = dispatcherToUse;
148151
}
149152
else if (this.errorHandler != null) {
150153
if (this.logger.isWarnEnabled()) {
@@ -157,11 +160,11 @@ else if (this.errorHandler != null) {
157160
if (this.maxSubscribers == null) {
158161
Integer maxSubscribers =
159162
getIntegrationProperty(IntegrationProperties.CHANNELS_MAX_BROADCAST_SUBSCRIBERS, Integer.class);
160-
this.setMaxSubscribers(maxSubscribers);
163+
setMaxSubscribers(maxSubscribers);
161164
}
162-
getDispatcher().setBeanFactory(this.getBeanFactory());
165+
dispatcherToUse.setBeanFactory(beanFactory);
163166

164-
getDispatcher().setMessageHandlingTaskDecorator(task -> {
167+
dispatcherToUse.setMessageHandlingTaskDecorator(task -> {
165168
if (PublishSubscribeChannel.this.executorInterceptorsSize > 0) {
166169
return new MessageHandlingTask(task);
167170
}

spring-integration-core/src/main/java/org/springframework/integration/channel/interceptor/WireTap.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import org.springframework.beans.factory.BeanFactory;
2424
import org.springframework.beans.factory.BeanFactoryAware;
2525
import org.springframework.context.Lifecycle;
26+
import org.springframework.integration.context.IntegrationContextUtils;
2627
import org.springframework.integration.core.MessageSelector;
27-
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
2828
import org.springframework.jmx.export.annotation.ManagedAttribute;
2929
import org.springframework.jmx.export.annotation.ManagedOperation;
3030
import org.springframework.jmx.export.annotation.ManagedResource;
@@ -179,14 +179,12 @@ public boolean shouldIntercept(String beanName, InterceptableChannel channel) {
179179
}
180180

181181
private MessageChannel getChannel() {
182-
if (this.channelName != null) {
183-
synchronized (this) {
184-
if (this.channelName != null) {
185-
this.channel = new BeanFactoryChannelResolver(this.beanFactory)
186-
.resolveDestination(this.channelName);
187-
this.channelName = null;
188-
}
189-
}
182+
String channelNameToUse = this.channelName;
183+
if (channelNameToUse != null) {
184+
this.channel =
185+
IntegrationContextUtils.getChannelResolver(this.beanFactory)
186+
.resolveDestination(channelNameToUse);
187+
this.channelName = null;
190188
}
191189
return this.channel;
192190
}

spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -63,6 +63,7 @@
6363
import org.springframework.integration.support.DefaultMessageBuilderFactory;
6464
import org.springframework.integration.support.NullAwarePayloadArgumentResolver;
6565
import org.springframework.integration.support.SmartLifecycleRoleController;
66+
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
6667
import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter;
6768
import org.springframework.integration.support.converter.DefaultDatatypeChannelMessageConverter;
6869
import org.springframework.integration.support.json.JacksonPresent;
@@ -109,6 +110,8 @@ public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)
109110
this.beanFactory = beanFactory;
110111
this.registry = (BeanDefinitionRegistry) beanFactory;
111112

113+
registerBeanFactoryChannelResolver();
114+
registerMessagePublishingErrorHandler();
112115
registerNullChannel();
113116
registerErrorChannel();
114117
registerIntegrationEvaluationContext();
@@ -144,6 +147,21 @@ public void afterSingletonsInstantiated() {
144147
}
145148
}
146149

150+
private void registerBeanFactoryChannelResolver() {
151+
if (!this.beanFactory.containsBeanDefinition(IntegrationContextUtils.CHANNEL_RESOLVER_BEAN_NAME)) {
152+
this.registry.registerBeanDefinition(IntegrationContextUtils.CHANNEL_RESOLVER_BEAN_NAME,
153+
new RootBeanDefinition(BeanFactoryChannelResolver.class));
154+
}
155+
}
156+
157+
private void registerMessagePublishingErrorHandler() {
158+
if (!this.beanFactory.containsBeanDefinition(
159+
IntegrationContextUtils.MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME)) {
160+
this.registry.registerBeanDefinition(IntegrationContextUtils.MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME,
161+
new RootBeanDefinition(MessagePublishingErrorHandler.class));
162+
}
163+
}
164+
147165
/**
148166
* Register a null channel in the application context.
149167
* The bean name is defined by the constant {@link IntegrationContextUtils#NULL_CHANNEL_BEAN_NAME}.
@@ -281,7 +299,8 @@ private void registerTaskScheduler() {
281299
.getExpressionFor(IntegrationProperties.TASK_SCHEDULER_POOL_SIZE))
282300
.addPropertyValue("threadNamePrefix", "task-scheduler-")
283301
.addPropertyValue("rejectedExecutionHandler", new CallerRunsPolicy())
284-
.addPropertyValue("errorHandler", new RootBeanDefinition(MessagePublishingErrorHandler.class))
302+
.addPropertyReference("errorHandler",
303+
IntegrationContextUtils.MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME)
285304
.getBeanDefinition();
286305

287306
this.registry.registerBeanDefinition(IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME, scheduler);

spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.springframework.integration.channel.DirectChannel;
5252
import org.springframework.integration.channel.MessagePublishingErrorHandler;
5353
import org.springframework.integration.config.IntegrationConfigUtils;
54+
import org.springframework.integration.context.IntegrationContextUtils;
5455
import org.springframework.integration.context.Orderable;
5556
import org.springframework.integration.endpoint.AbstractEndpoint;
5657
import org.springframework.integration.endpoint.AbstractPollingEndpoint;
@@ -64,7 +65,6 @@
6465
import org.springframework.integration.handler.advice.HandleMessageAdvice;
6566
import org.springframework.integration.router.AbstractMessageRouter;
6667
import org.springframework.integration.scheduling.PollerMetadata;
67-
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
6868
import org.springframework.integration.util.MessagingAnnotationUtils;
6969
import org.springframework.lang.Nullable;
7070
import org.springframework.messaging.MessageChannel;
@@ -120,9 +120,10 @@ public AbstractMethodAnnotationPostProcessor(ConfigurableListableBeanFactory bea
120120
this.conversionService = this.beanFactory.getConversionService() != null
121121
? this.beanFactory.getConversionService()
122122
: DefaultConversionService.getSharedInstance();
123-
this.channelResolver = new BeanFactoryChannelResolver(beanFactory);
124-
this.annotationType = (Class<T>) GenericTypeResolver.resolveTypeArgument(this.getClass(),
125-
MethodAnnotationPostProcessor.class);
123+
this.channelResolver = IntegrationContextUtils.getChannelResolver(beanFactory);
124+
this.annotationType =
125+
(Class<T>) GenericTypeResolver.resolveTypeArgument(this.getClass(),
126+
MethodAnnotationPostProcessor.class);
126127
Disposables disposablesBean = null;
127128
try {
128129
disposablesBean = beanFactory.getBean(Disposables.class);

0 commit comments

Comments
 (0)