Skip to content

Short-circuit methods for lambdas from annotation #2823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;

import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
Expand All @@ -40,6 +42,7 @@
import org.springframework.beans.factory.support.BeanDefinitionValidationException;
import org.springframework.context.annotation.Bean;
import org.springframework.core.GenericTypeResolver;
import org.springframework.core.ResolvableType;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.annotation.Order;
Expand All @@ -61,10 +64,13 @@
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.LambdaMessageProcessor;
import org.springframework.integration.handler.MessageProcessor;
import org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper;
import org.springframework.integration.handler.advice.HandleMessageAdvice;
import org.springframework.integration.router.AbstractMessageRouter;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.util.ClassUtils;
import org.springframework.integration.util.MessagingAnnotationUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;
Expand All @@ -77,7 +83,6 @@
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
Expand Down Expand Up @@ -440,7 +445,7 @@ protected String generateHandlerBeanName(String originalBeanName, Method method)
String name = MessagingAnnotationUtils.endpointIdValue(method);
if (!StringUtils.hasText(name)) {
String baseName = originalBeanName + "." + method.getName() + "."
+ ClassUtils.getShortNameAsProperty(this.annotationType);
+ org.springframework.util.ClassUtils.getShortNameAsProperty(this.annotationType);
name = baseName;
int count = 1;
while (this.beanFactory.containsBean(name)) {
Expand Down Expand Up @@ -510,9 +515,26 @@ protected void checkMessageHandlerAttributes(String handlerBeanName, List<Annota
}
}

protected boolean resolveAttributeToBoolean(String attribute) {
return Boolean.parseBoolean(this.beanFactory.resolveEmbeddedValue(attribute));
}

@Nullable
protected MessageProcessor<?> buildLambdaMessageProcessorForBeanMethod(Method method, Object target) {
if ((target instanceof Function || target instanceof Consumer) && ClassUtils.isLambda(target.getClass())
|| ClassUtils.isKotlinFaction1(target.getClass())) {

ResolvableType methodReturnType = ResolvableType.forMethodReturnType(method);
Class<?> expectedPayloadType = methodReturnType.getGeneric(0).toClass();
return new LambdaMessageProcessor(target, expectedPayloadType);
}
else {
return null;
}
}

/**
* Subclasses must implement this method to create the MessageHandler.
*
* @param bean The bean.
* @param method The method.
* @param annotations The messaging annotation (or meta-annotation hierarchy) on the method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public AggregatorAnnotationPostProcessor(ConfigurableListableBeanFactory beanFac
@Override
protected MessageHandler createHandler(Object bean, Method method, List<Annotation> annotations) {
MethodInvokingMessageGroupProcessor processor = new MethodInvokingMessageGroupProcessor(bean, method);
processor.setBeanFactory(this.beanFactory);

MethodInvokingReleaseStrategy releaseStrategy = null;
Method releaseStrategyMethod = MessagingAnnotationUtils.findAnnotatedMethod(bean, ReleaseStrategy.class);
Expand All @@ -60,29 +59,30 @@ protected MessageHandler createHandler(Object bean, Method method, List<Annotati
}

MethodInvokingCorrelationStrategy correlationStrategy = null;
Method correlationStrategyMethod = MessagingAnnotationUtils.findAnnotatedMethod(bean, CorrelationStrategy.class);
Method correlationStrategyMethod =
MessagingAnnotationUtils.findAnnotatedMethod(bean, CorrelationStrategy.class);
if (correlationStrategyMethod != null) {
correlationStrategy = new MethodInvokingCorrelationStrategy(bean, correlationStrategyMethod);
}

AggregatingMessageHandler handler = new AggregatingMessageHandler(processor, new SimpleMessageStore(),
correlationStrategy, releaseStrategy);

String discardChannelName = MessagingAnnotationUtils.resolveAttribute(annotations, "discardChannel", String.class);
String discardChannelName =
MessagingAnnotationUtils.resolveAttribute(annotations, "discardChannel", String.class);
if (StringUtils.hasText(discardChannelName)) {
handler.setDiscardChannelName(discardChannelName);
}
String outputChannelName = MessagingAnnotationUtils.resolveAttribute(annotations, "outputChannel", String.class);
String outputChannelName =
MessagingAnnotationUtils.resolveAttribute(annotations, "outputChannel", String.class);
if (StringUtils.hasText(outputChannelName)) {
handler.setOutputChannelName(outputChannelName);
}
String sendPartialResultsOnExpiry = MessagingAnnotationUtils.resolveAttribute(annotations,
"sendPartialResultsOnExpiry", String.class);
if (sendPartialResultsOnExpiry != null) {
handler.setSendPartialResultOnExpiry(
Boolean.parseBoolean(this.beanFactory.resolveEmbeddedValue(sendPartialResultsOnExpiry)));
handler.setSendPartialResultOnExpiry(resolveAttributeToBoolean(sendPartialResultsOnExpiry));
}
handler.setBeanFactory(this.beanFactory);
return handler;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class FilterAnnotationPostProcessor extends AbstractMethodAnnotationPostP

public FilterAnnotationPostProcessor(ConfigurableListableBeanFactory beanFactory) {
super(beanFactory);
this.messageHandlerAttributes.addAll(Arrays.<String>asList("discardChannel", "throwExceptionOnRejection",
this.messageHandlerAttributes.addAll(Arrays.asList("discardChannel", "throwExceptionOnRejection",
"adviceChain", "discardWithinAdvice"));
}

Expand All @@ -54,11 +54,11 @@ public FilterAnnotationPostProcessor(ConfigurableListableBeanFactory beanFactory
protected MessageHandler createHandler(Object bean, Method method, List<Annotation> annotations) {
MessageSelector selector;
if (AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) {
Object target = this.resolveTargetBeanFromMethodWithBeanAnnotation(method);
Object target = resolveTargetBeanFromMethodWithBeanAnnotation(method);
if (target instanceof MessageSelector) {
selector = (MessageSelector) target;
}
else if (this.extractTypeIfPossible(target, MessageFilter.class) != null) {
else if (extractTypeIfPossible(target, MessageFilter.class) != null) {
checkMessageHandlerAttributes(resolveTargetBeanName(method), annotations);
return (MessageHandler) target;
}
Expand All @@ -74,31 +74,25 @@ else if (this.extractTypeIfPossible(target, MessageFilter.class) != null) {

MessageFilter filter = new MessageFilter(selector);

String discardWithinAdvice = MessagingAnnotationUtils.resolveAttribute(annotations, "discardWithinAdvice",
String.class);
String discardWithinAdvice =
MessagingAnnotationUtils.resolveAttribute(annotations, "discardWithinAdvice", String.class);
if (StringUtils.hasText(discardWithinAdvice)) {
discardWithinAdvice = this.beanFactory.resolveEmbeddedValue(discardWithinAdvice);
if (StringUtils.hasText(discardWithinAdvice)) {
filter.setDiscardWithinAdvice(Boolean.parseBoolean(discardWithinAdvice));
}
filter.setDiscardWithinAdvice(resolveAttributeToBoolean(discardWithinAdvice));
}


String throwExceptionOnRejection = MessagingAnnotationUtils.resolveAttribute(annotations,
"throwExceptionOnRejection", String.class);
String throwExceptionOnRejection =
MessagingAnnotationUtils.resolveAttribute(annotations, "throwExceptionOnRejection", String.class);
if (StringUtils.hasText(throwExceptionOnRejection)) {
String throwExceptionOnRejectionValue = this.beanFactory.resolveEmbeddedValue(throwExceptionOnRejection);
if (StringUtils.hasText(throwExceptionOnRejectionValue)) {
filter.setThrowExceptionOnRejection(Boolean.parseBoolean(throwExceptionOnRejectionValue));
}
filter.setThrowExceptionOnRejection(resolveAttributeToBoolean(throwExceptionOnRejection));
}

String discardChannelName = MessagingAnnotationUtils.resolveAttribute(annotations, "discardChannel", String.class);
String discardChannelName =
MessagingAnnotationUtils.resolveAttribute(annotations, "discardChannel", String.class);
if (StringUtils.hasText(discardChannelName)) {
filter.setDiscardChannelName(discardChannelName);
}

this.setOutputChannelIfPresent(annotations, filter);
setOutputChannelIfPresent(annotations, filter);
return filter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.MethodInvokingMessageSource;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.util.ClassUtils;
import org.springframework.integration.util.MessagingAnnotationUtils;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;

/**
* Post-processor for Methods annotated with {@link InboundChannelAdapter @InboundChannelAdapter}.
Expand All @@ -49,20 +48,6 @@
public class InboundChannelAdapterAnnotationPostProcessor extends
AbstractMethodAnnotationPostProcessor<InboundChannelAdapter> {

private static final Class<?> kotlinFunction0Class;

static {
Class<?> kotlinClass = null;
try {
kotlinClass = ClassUtils.forName("kotlin.jvm.functions.Function0", ClassUtils.getDefaultClassLoader());
}
catch (ClassNotFoundException e) {
//Ignore: assume no Kotlin in classpath
}
finally {
kotlinFunction0Class = kotlinClass;
}
}

public InboundChannelAdapterAnnotationPostProcessor(ConfigurableListableBeanFactory beanFactory) {
super(beanFactory);
Expand All @@ -75,8 +60,8 @@ protected String getInputChannelAttribute() {

@Override
public Object postProcess(Object bean, String beanName, Method method, List<Annotation> annotations) {
String channelName = MessagingAnnotationUtils
.resolveAttribute(annotations, AnnotationUtils.VALUE, String.class);
String channelName =
MessagingAnnotationUtils.resolveAttribute(annotations, AnnotationUtils.VALUE, String.class);
Assert.hasText(channelName, "The channel ('value' attribute of @InboundChannelAdapter) can't be empty.");

MessageSource<?> messageSource = null;
Expand Down Expand Up @@ -105,31 +90,33 @@ private MessageSource<?> createMessageSource(Object beanArg, String beanName, Me
Object bean = beanArg;
Method method = methodArg;
if (AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) {
Object target = this.resolveTargetBeanFromMethodWithBeanAnnotation(method);
Object target = resolveTargetBeanFromMethodWithBeanAnnotation(method);
Class<?> targetClass = target.getClass();
Assert.isTrue(MessageSource.class.isAssignableFrom(targetClass) ||
Supplier.class.isAssignableFrom(targetClass) ||
(kotlinFunction0Class == null || kotlinFunction0Class.isAssignableFrom(targetClass)),
"The '" + this.annotationType + "' on @Bean method " + "level is allowed only for: "
+ MessageSource.class.getName() + " or " + Supplier.class.getName()
+ (kotlinFunction0Class != null ? " or " + kotlinFunction0Class.getName() : "") + " beans");
ClassUtils.isKotlinFaction0(targetClass),
() -> "The '" + this.annotationType + "' on @Bean method " + "level is allowed only for: " +
MessageSource.class.getName() + " or " + Supplier.class.getName() +
(ClassUtils.KOTLIN_FUNCTION_0_CLASS != null
? " or " + ClassUtils.KOTLIN_FUNCTION_0_CLASS.getName()
: "") + " beans");
if (target instanceof MessageSource<?>) {
messageSource = (MessageSource<?>) target;
}
else if (target instanceof Supplier<?>) {
method = ReflectionUtils.findMethod(Supplier.class, "get");
method = ClassUtils.SUPPLIER_GET_METHOD;
bean = target;
}
else if (kotlinFunction0Class != null) {
method = ReflectionUtils.findMethod(kotlinFunction0Class, "invoke");
else if (ClassUtils.KOTLIN_FUNCTION_0_INVOKE_METHOD != null) {
method = ClassUtils.KOTLIN_FUNCTION_0_INVOKE_METHOD;
bean = target;
}
}
if (messageSource == null) {
MethodInvokingMessageSource methodInvokingMessageSource = new MethodInvokingMessageSource();
methodInvokingMessageSource.setObject(bean);
methodInvokingMessageSource.setMethod(method);
String messageSourceBeanName = this.generateHandlerBeanName(beanName, method);
String messageSourceBeanName = generateHandlerBeanName(beanName, method);
this.beanFactory.registerSingleton(messageSourceBeanName, methodInvokingMessageSource);
messageSource = (MessageSource<?>) this.beanFactory
.initializeBean(methodInvokingMessageSource, messageSourceBeanName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,21 @@ public class RouterAnnotationPostProcessor extends AbstractMethodAnnotationPostP

public RouterAnnotationPostProcessor(ConfigurableListableBeanFactory beanFactory) {
super(beanFactory);
this.messageHandlerAttributes.addAll(Arrays.<String>asList("defaultOutputChannel", "applySequence",
this.messageHandlerAttributes.addAll(Arrays.asList("defaultOutputChannel", "applySequence",
"ignoreSendFailures", "resolutionRequired", "channelMappings", "prefix", "suffix"));
}

@Override
protected MessageHandler createHandler(Object bean, Method method, List<Annotation> annotations) {
AbstractMessageRouter router;
if (AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) {
Object target = this.resolveTargetBeanFromMethodWithBeanAnnotation(method);
router = this.extractTypeIfPossible(target, AbstractMessageRouter.class);
Object target = resolveTargetBeanFromMethodWithBeanAnnotation(method);
router = extractTypeIfPossible(target, AbstractMessageRouter.class);
if (router == null) {
if (target instanceof MessageHandler) {
Assert.isTrue(this.routerAttributesProvided(annotations), "'defaultOutputChannel', 'applySequence', " +
"'ignoreSendFailures', 'resolutionRequired', 'channelMappings', 'prefix' and 'suffix' " +
Assert.isTrue(routerAttributesProvided(annotations),
"'defaultOutputChannel', 'applySequence', 'ignoreSendFailures', 'resolutionRequired', " +
"'channelMappings', 'prefix' and 'suffix' " +
"can be applied to 'AbstractMessageRouter' implementations, but target handler is: " +
target.getClass());
return (MessageHandler) target;
Expand All @@ -84,26 +85,23 @@ protected MessageHandler createHandler(Object bean, Method method, List<Annotati

String applySequence = MessagingAnnotationUtils.resolveAttribute(annotations, "applySequence", String.class);
if (StringUtils.hasText(applySequence)) {
router.setApplySequence(Boolean.parseBoolean(this.beanFactory.resolveEmbeddedValue(applySequence)));
router.setApplySequence(resolveAttributeToBoolean(applySequence));
}

String ignoreSendFailures = MessagingAnnotationUtils.resolveAttribute(annotations, "ignoreSendFailures",
String.class);
if (StringUtils.hasText(ignoreSendFailures)) {
router.setIgnoreSendFailures(Boolean.parseBoolean(this.beanFactory.resolveEmbeddedValue(ignoreSendFailures)));
router.setIgnoreSendFailures(resolveAttributeToBoolean(ignoreSendFailures));
}

if (this.routerAttributesProvided(annotations)) {
if (routerAttributesProvided(annotations)) {

MethodInvokingRouter methodInvokingRouter = (MethodInvokingRouter) router;

String resolutionRequired = MessagingAnnotationUtils.resolveAttribute(annotations, "resolutionRequired",
String.class);
if (StringUtils.hasText(resolutionRequired)) {
String resolutionRequiredValue = this.beanFactory.resolveEmbeddedValue(resolutionRequired);
if (StringUtils.hasText(resolutionRequiredValue)) {
methodInvokingRouter.setResolutionRequired(Boolean.parseBoolean(resolutionRequiredValue));
}
methodInvokingRouter.setResolutionRequired(resolveAttributeToBoolean(resolutionRequired));
}

String prefix = MessagingAnnotationUtils.resolveAttribute(annotations, "prefix", String.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.MessageProcessor;
import org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper;
import org.springframework.integration.handler.ServiceActivatingHandler;
import org.springframework.integration.util.MessagingAnnotationUtils;
Expand Down Expand Up @@ -63,7 +64,13 @@ protected MessageHandler createHandler(Object bean, Method method, List<Annotati
return new ReplyProducingMessageHandlerWrapper((MessageHandler) target);
}
else {
serviceActivator = new ServiceActivatingHandler(target);
MessageProcessor<?> messageProcessor = buildLambdaMessageProcessorForBeanMethod(method, target);
if (messageProcessor != null) {
serviceActivator = new ServiceActivatingHandler(messageProcessor);
}
else {
serviceActivator = new ServiceActivatingHandler(target);
}
}
}
else {
Expand All @@ -77,15 +84,15 @@ protected MessageHandler createHandler(Object bean, Method method, List<Annotati

String requiresReply = MessagingAnnotationUtils.resolveAttribute(annotations, "requiresReply", String.class);
if (StringUtils.hasText(requiresReply)) {
serviceActivator.setRequiresReply(Boolean.parseBoolean(this.beanFactory.resolveEmbeddedValue(requiresReply)));
serviceActivator.setRequiresReply(resolveAttributeToBoolean(requiresReply));
}

String isAsync = MessagingAnnotationUtils.resolveAttribute(annotations, "async", String.class);
if (StringUtils.hasText(isAsync)) {
serviceActivator.setAsync(Boolean.parseBoolean(this.beanFactory.resolveEmbeddedValue(isAsync)));
serviceActivator.setAsync(resolveAttributeToBoolean(isAsync));
}

this.setOutputChannelIfPresent(annotations, serviceActivator);
setOutputChannelIfPresent(annotations, serviceActivator);
return serviceActivator;
}

Expand Down
Loading