diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java index 0d2479eb936..b7928f0d3a0 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java @@ -22,6 +22,8 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; import org.aopalliance.aop.Advice; import org.apache.commons.logging.Log; @@ -40,6 +42,7 @@ import org.springframework.beans.factory.support.BeanDefinitionValidationException; import org.springframework.context.annotation.Bean; import org.springframework.core.GenericTypeResolver; +import org.springframework.core.ResolvableType; import org.springframework.core.annotation.AnnotatedElementUtils; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.core.annotation.Order; @@ -61,10 +64,13 @@ import org.springframework.integration.endpoint.SourcePollingChannelAdapter; import org.springframework.integration.handler.AbstractMessageProducingHandler; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; +import org.springframework.integration.handler.LambdaMessageProcessor; +import org.springframework.integration.handler.MessageProcessor; import org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper; import org.springframework.integration.handler.advice.HandleMessageAdvice; import org.springframework.integration.router.AbstractMessageRouter; import org.springframework.integration.scheduling.PollerMetadata; +import org.springframework.integration.util.ClassUtils; import org.springframework.integration.util.MessagingAnnotationUtils; import org.springframework.lang.Nullable; import org.springframework.messaging.MessageChannel; @@ -77,7 +83,6 @@ import org.springframework.scheduling.support.CronTrigger; import org.springframework.scheduling.support.PeriodicTrigger; import org.springframework.util.Assert; -import org.springframework.util.ClassUtils; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; @@ -440,7 +445,7 @@ protected String generateHandlerBeanName(String originalBeanName, Method method) String name = MessagingAnnotationUtils.endpointIdValue(method); if (!StringUtils.hasText(name)) { String baseName = originalBeanName + "." + method.getName() + "." - + ClassUtils.getShortNameAsProperty(this.annotationType); + + org.springframework.util.ClassUtils.getShortNameAsProperty(this.annotationType); name = baseName; int count = 1; while (this.beanFactory.containsBean(name)) { @@ -510,9 +515,26 @@ protected void checkMessageHandlerAttributes(String handlerBeanName, List buildLambdaMessageProcessorForBeanMethod(Method method, Object target) { + if ((target instanceof Function || target instanceof Consumer) && ClassUtils.isLambda(target.getClass()) + || ClassUtils.isKotlinFaction1(target.getClass())) { + + ResolvableType methodReturnType = ResolvableType.forMethodReturnType(method); + Class expectedPayloadType = methodReturnType.getGeneric(0).toClass(); + return new LambdaMessageProcessor(target, expectedPayloadType); + } + else { + return null; + } + } + /** * Subclasses must implement this method to create the MessageHandler. - * * @param bean The bean. * @param method The method. * @param annotations The messaging annotation (or meta-annotation hierarchy) on the method. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java index 16046c8c1db..d5c0f0b50db 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java @@ -51,7 +51,6 @@ public AggregatorAnnotationPostProcessor(ConfigurableListableBeanFactory beanFac @Override protected MessageHandler createHandler(Object bean, Method method, List annotations) { MethodInvokingMessageGroupProcessor processor = new MethodInvokingMessageGroupProcessor(bean, method); - processor.setBeanFactory(this.beanFactory); MethodInvokingReleaseStrategy releaseStrategy = null; Method releaseStrategyMethod = MessagingAnnotationUtils.findAnnotatedMethod(bean, ReleaseStrategy.class); @@ -60,7 +59,8 @@ protected MessageHandler createHandler(Object bean, Method method, ListasList("discardChannel", "throwExceptionOnRejection", + this.messageHandlerAttributes.addAll(Arrays.asList("discardChannel", "throwExceptionOnRejection", "adviceChain", "discardWithinAdvice")); } @@ -54,11 +54,11 @@ public FilterAnnotationPostProcessor(ConfigurableListableBeanFactory beanFactory protected MessageHandler createHandler(Object bean, Method method, List annotations) { MessageSelector selector; if (AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) { - Object target = this.resolveTargetBeanFromMethodWithBeanAnnotation(method); + Object target = resolveTargetBeanFromMethodWithBeanAnnotation(method); if (target instanceof MessageSelector) { selector = (MessageSelector) target; } - else if (this.extractTypeIfPossible(target, MessageFilter.class) != null) { + else if (extractTypeIfPossible(target, MessageFilter.class) != null) { checkMessageHandlerAttributes(resolveTargetBeanName(method), annotations); return (MessageHandler) target; } @@ -74,31 +74,25 @@ else if (this.extractTypeIfPossible(target, MessageFilter.class) != null) { MessageFilter filter = new MessageFilter(selector); - String discardWithinAdvice = MessagingAnnotationUtils.resolveAttribute(annotations, "discardWithinAdvice", - String.class); + String discardWithinAdvice = + MessagingAnnotationUtils.resolveAttribute(annotations, "discardWithinAdvice", String.class); if (StringUtils.hasText(discardWithinAdvice)) { - discardWithinAdvice = this.beanFactory.resolveEmbeddedValue(discardWithinAdvice); - if (StringUtils.hasText(discardWithinAdvice)) { - filter.setDiscardWithinAdvice(Boolean.parseBoolean(discardWithinAdvice)); - } + filter.setDiscardWithinAdvice(resolveAttributeToBoolean(discardWithinAdvice)); } - - String throwExceptionOnRejection = MessagingAnnotationUtils.resolveAttribute(annotations, - "throwExceptionOnRejection", String.class); + String throwExceptionOnRejection = + MessagingAnnotationUtils.resolveAttribute(annotations, "throwExceptionOnRejection", String.class); if (StringUtils.hasText(throwExceptionOnRejection)) { - String throwExceptionOnRejectionValue = this.beanFactory.resolveEmbeddedValue(throwExceptionOnRejection); - if (StringUtils.hasText(throwExceptionOnRejectionValue)) { - filter.setThrowExceptionOnRejection(Boolean.parseBoolean(throwExceptionOnRejectionValue)); - } + filter.setThrowExceptionOnRejection(resolveAttributeToBoolean(throwExceptionOnRejection)); } - String discardChannelName = MessagingAnnotationUtils.resolveAttribute(annotations, "discardChannel", String.class); + String discardChannelName = + MessagingAnnotationUtils.resolveAttribute(annotations, "discardChannel", String.class); if (StringUtils.hasText(discardChannelName)) { filter.setDiscardChannelName(discardChannelName); } - this.setOutputChannelIfPresent(annotations, filter); + setOutputChannelIfPresent(annotations, filter); return filter; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/InboundChannelAdapterAnnotationPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/InboundChannelAdapterAnnotationPostProcessor.java index e91f0eb8c80..ddabc5f8742 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/InboundChannelAdapterAnnotationPostProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/InboundChannelAdapterAnnotationPostProcessor.java @@ -31,11 +31,10 @@ import org.springframework.integration.core.MessageSource; import org.springframework.integration.endpoint.MethodInvokingMessageSource; import org.springframework.integration.endpoint.SourcePollingChannelAdapter; +import org.springframework.integration.util.ClassUtils; import org.springframework.integration.util.MessagingAnnotationUtils; import org.springframework.messaging.MessageHandler; import org.springframework.util.Assert; -import org.springframework.util.ClassUtils; -import org.springframework.util.ReflectionUtils; /** * Post-processor for Methods annotated with {@link InboundChannelAdapter @InboundChannelAdapter}. @@ -49,20 +48,6 @@ public class InboundChannelAdapterAnnotationPostProcessor extends AbstractMethodAnnotationPostProcessor { - private static final Class kotlinFunction0Class; - - static { - Class kotlinClass = null; - try { - kotlinClass = ClassUtils.forName("kotlin.jvm.functions.Function0", ClassUtils.getDefaultClassLoader()); - } - catch (ClassNotFoundException e) { - //Ignore: assume no Kotlin in classpath - } - finally { - kotlinFunction0Class = kotlinClass; - } - } public InboundChannelAdapterAnnotationPostProcessor(ConfigurableListableBeanFactory beanFactory) { super(beanFactory); @@ -75,8 +60,8 @@ protected String getInputChannelAttribute() { @Override public Object postProcess(Object bean, String beanName, Method method, List annotations) { - String channelName = MessagingAnnotationUtils - .resolveAttribute(annotations, AnnotationUtils.VALUE, String.class); + String channelName = + MessagingAnnotationUtils.resolveAttribute(annotations, AnnotationUtils.VALUE, String.class); Assert.hasText(channelName, "The channel ('value' attribute of @InboundChannelAdapter) can't be empty."); MessageSource messageSource = null; @@ -105,23 +90,25 @@ private MessageSource createMessageSource(Object beanArg, String beanName, Me Object bean = beanArg; Method method = methodArg; if (AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) { - Object target = this.resolveTargetBeanFromMethodWithBeanAnnotation(method); + Object target = resolveTargetBeanFromMethodWithBeanAnnotation(method); Class targetClass = target.getClass(); Assert.isTrue(MessageSource.class.isAssignableFrom(targetClass) || Supplier.class.isAssignableFrom(targetClass) || - (kotlinFunction0Class == null || kotlinFunction0Class.isAssignableFrom(targetClass)), - "The '" + this.annotationType + "' on @Bean method " + "level is allowed only for: " - + MessageSource.class.getName() + " or " + Supplier.class.getName() - + (kotlinFunction0Class != null ? " or " + kotlinFunction0Class.getName() : "") + " beans"); + ClassUtils.isKotlinFaction0(targetClass), + () -> "The '" + this.annotationType + "' on @Bean method " + "level is allowed only for: " + + MessageSource.class.getName() + " or " + Supplier.class.getName() + + (ClassUtils.KOTLIN_FUNCTION_0_CLASS != null + ? " or " + ClassUtils.KOTLIN_FUNCTION_0_CLASS.getName() + : "") + " beans"); if (target instanceof MessageSource) { messageSource = (MessageSource) target; } else if (target instanceof Supplier) { - method = ReflectionUtils.findMethod(Supplier.class, "get"); + method = ClassUtils.SUPPLIER_GET_METHOD; bean = target; } - else if (kotlinFunction0Class != null) { - method = ReflectionUtils.findMethod(kotlinFunction0Class, "invoke"); + else if (ClassUtils.KOTLIN_FUNCTION_0_INVOKE_METHOD != null) { + method = ClassUtils.KOTLIN_FUNCTION_0_INVOKE_METHOD; bean = target; } } @@ -129,7 +116,7 @@ else if (kotlinFunction0Class != null) { MethodInvokingMessageSource methodInvokingMessageSource = new MethodInvokingMessageSource(); methodInvokingMessageSource.setObject(bean); methodInvokingMessageSource.setMethod(method); - String messageSourceBeanName = this.generateHandlerBeanName(beanName, method); + String messageSourceBeanName = generateHandlerBeanName(beanName, method); this.beanFactory.registerSingleton(messageSourceBeanName, methodInvokingMessageSource); messageSource = (MessageSource) this.beanFactory .initializeBean(methodInvokingMessageSource, messageSourceBeanName); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessor.java index 879d57e8c6c..8b83d45fa48 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessor.java @@ -46,7 +46,7 @@ public class RouterAnnotationPostProcessor extends AbstractMethodAnnotationPostP public RouterAnnotationPostProcessor(ConfigurableListableBeanFactory beanFactory) { super(beanFactory); - this.messageHandlerAttributes.addAll(Arrays.asList("defaultOutputChannel", "applySequence", + this.messageHandlerAttributes.addAll(Arrays.asList("defaultOutputChannel", "applySequence", "ignoreSendFailures", "resolutionRequired", "channelMappings", "prefix", "suffix")); } @@ -54,12 +54,13 @@ public RouterAnnotationPostProcessor(ConfigurableListableBeanFactory beanFactory protected MessageHandler createHandler(Object bean, Method method, List annotations) { AbstractMessageRouter router; if (AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) { - Object target = this.resolveTargetBeanFromMethodWithBeanAnnotation(method); - router = this.extractTypeIfPossible(target, AbstractMessageRouter.class); + Object target = resolveTargetBeanFromMethodWithBeanAnnotation(method); + router = extractTypeIfPossible(target, AbstractMessageRouter.class); if (router == null) { if (target instanceof MessageHandler) { - Assert.isTrue(this.routerAttributesProvided(annotations), "'defaultOutputChannel', 'applySequence', " + - "'ignoreSendFailures', 'resolutionRequired', 'channelMappings', 'prefix' and 'suffix' " + + Assert.isTrue(routerAttributesProvided(annotations), + "'defaultOutputChannel', 'applySequence', 'ignoreSendFailures', 'resolutionRequired', " + + "'channelMappings', 'prefix' and 'suffix' " + "can be applied to 'AbstractMessageRouter' implementations, but target handler is: " + target.getClass()); return (MessageHandler) target; @@ -84,26 +85,23 @@ protected MessageHandler createHandler(Object bean, Method method, List messageProcessor = buildLambdaMessageProcessorForBeanMethod(method, target); + if (messageProcessor != null) { + serviceActivator = new ServiceActivatingHandler(messageProcessor); + } + else { + serviceActivator = new ServiceActivatingHandler(target); + } } } else { @@ -77,15 +84,15 @@ protected MessageHandler createHandler(Object bean, Method method, ListasList("outputChannel", "applySequence", "adviceChain")); + this.messageHandlerAttributes.addAll(Arrays.asList("outputChannel", "applySequence", "adviceChain")); } @Override @@ -52,7 +52,7 @@ protected MessageHandler createHandler(Object bean, Method method, List { +public class TransformerAnnotationPostProcessor + extends AbstractMethodAnnotationPostProcessor { public TransformerAnnotationPostProcessor(ConfigurableListableBeanFactory beanFactory) { super(beanFactory); - this.messageHandlerAttributes.addAll(Arrays.asList("outputChannel", "adviceChain")); + this.messageHandlerAttributes.addAll(Arrays.asList("outputChannel", "adviceChain")); } @Override protected MessageHandler createHandler(Object bean, Method method, List annotations) { - org.springframework.integration.transformer.Transformer transformer; + Transformer transformer; if (AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) { - Object target = this.resolveTargetBeanFromMethodWithBeanAnnotation(method); - transformer = this.extractTypeIfPossible(target, - org.springframework.integration.transformer.Transformer.class); + Object target = resolveTargetBeanFromMethodWithBeanAnnotation(method); + transformer = extractTypeIfPossible(target, Transformer.class); if (transformer == null) { - if (this.extractTypeIfPossible(target, AbstractReplyProducingMessageHandler.class) != null) { + if (extractTypeIfPossible(target, AbstractReplyProducingMessageHandler.class) != null) { checkMessageHandlerAttributes(resolveTargetBeanName(method), annotations); return (MessageHandler) target; } - transformer = new MethodInvokingTransformer(target); + MessageProcessor messageProcessor = buildLambdaMessageProcessorForBeanMethod(method, target); + if (messageProcessor != null) { + transformer = new MethodInvokingTransformer(messageProcessor); + } + else { + transformer = new MethodInvokingTransformer(target); + } } } else { @@ -64,7 +72,7 @@ protected MessageHandler createHandler(Object bean, Method method, List B transform(Class

payloadType, GenericTransformer generic Consumer> endpointConfigurer) { Assert.notNull(genericTransformer, "'genericTransformer' must not be null"); Transformer transformer = genericTransformer instanceof Transformer ? (Transformer) genericTransformer : - (isLambda(genericTransformer) + (ClassUtils.isLambda(genericTransformer.getClass()) ? new MethodInvokingTransformer(new LambdaMessageProcessor(genericTransformer, payloadType)) : new MethodInvokingTransformer(genericTransformer, ClassUtils.TRANSFORMER_TRANSFORM_METHOD)); return addComponent(transformer) @@ -887,7 +887,7 @@ public

B filter(Class

payloadType, GenericSelector

genericSelector, Consumer endpointConfigurer) { Assert.notNull(genericSelector, "'genericSelector' must not be null"); MessageSelector selector = genericSelector instanceof MessageSelector ? (MessageSelector) genericSelector : - (isLambda(genericSelector) + (ClassUtils.isLambda(genericSelector.getClass()) ? new MethodInvokingSelector(new LambdaMessageProcessor(genericSelector, payloadType)) : new MethodInvokingSelector(genericSelector, ClassUtils.SELECTOR_ACCEPT_METHOD)); return this.register(new FilterEndpointSpec(new MessageFilter(selector)), endpointConfigurer); @@ -1092,7 +1092,7 @@ public

B handle(Class

payloadType, GenericHandler

handler) { public

B handle(Class

payloadType, GenericHandler

handler, Consumer> endpointConfigurer) { ServiceActivatingHandler serviceActivatingHandler; - if (isLambda(handler)) { + if (ClassUtils.isLambda(handler.getClass())) { serviceActivatingHandler = new ServiceActivatingHandler(new LambdaMessageProcessor(handler, payloadType)); } else { @@ -1607,9 +1607,10 @@ public

B split(Function splitter, public

B split(Class

payloadType, Function splitter, Consumer> endpointConfigurer) { - MethodInvokingSplitter split = isLambda(splitter) - ? new MethodInvokingSplitter(new LambdaMessageProcessor(splitter, payloadType)) - : new MethodInvokingSplitter(splitter, ClassUtils.FUNCTION_APPLY_METHOD); + MethodInvokingSplitter split = + ClassUtils.isLambda(splitter.getClass()) + ? new MethodInvokingSplitter(new LambdaMessageProcessor(splitter, payloadType)) + : new MethodInvokingSplitter(splitter, ClassUtils.FUNCTION_APPLY_METHOD); return split(split, endpointConfigurer); } @@ -2014,14 +2015,16 @@ public B route(Function router, Consumer B route(Class

payloadType, Function router, Consumer> routerConfigurer) { - MethodInvokingRouter methodInvokingRouter = isLambda(router) - ? new MethodInvokingRouter(new LambdaMessageProcessor(router, payloadType)) - : new MethodInvokingRouter(router, ClassUtils.FUNCTION_APPLY_METHOD); + MethodInvokingRouter methodInvokingRouter = + ClassUtils.isLambda(router.getClass()) + ? new MethodInvokingRouter(new LambdaMessageProcessor(router, payloadType)) + : new MethodInvokingRouter(router, ClassUtils.FUNCTION_APPLY_METHOD); return route(new RouterSpec<>(methodInvokingRouter), routerConfigurer); } /** - * Populate the {@link MethodInvokingRouter} for the {@link org.springframework.integration.handler.MessageProcessor} + * Populate the {@link MethodInvokingRouter} for the + * {@link org.springframework.integration.handler.MessageProcessor} * from the provided {@link MessageProcessorSpec} with default options. *

 	 * {@code
@@ -2036,7 +2039,8 @@ public B route(MessageProcessorSpec messageProcessorSpec) {
 	}
 
 	/**
-	 * Populate the {@link MethodInvokingRouter} for the {@link org.springframework.integration.handler.MessageProcessor}
+	 * Populate the {@link MethodInvokingRouter} for the
+	 * {@link org.springframework.integration.handler.MessageProcessor}
 	 * from the provided {@link MessageProcessorSpec} with default options.
 	 * 
 	 * {@code
@@ -3117,10 +3121,11 @@ protected final B _this() {
 	protected StandardIntegrationFlow get() {
 		if (this.integrationFlow == null) {
 			if (this.currentMessageChannel instanceof FixedSubscriberChannelPrototype) {
-				throw new BeanCreationException("The 'currentMessageChannel' (" + this.currentMessageChannel
-						+ ") is a prototype for 'FixedSubscriberChannel' which can't be created without 'MessageHandler' "
-						+ "constructor argument. That means that '.fixedSubscriberChannel()' can't be the last "
-						+ "EIP-method in the 'IntegrationFlow' definition.");
+				throw new BeanCreationException("The 'currentMessageChannel' (" + this.currentMessageChannel +
+						") is a prototype for 'FixedSubscriberChannel' which can't be created without " +
+						"a 'MessageHandler' constructor argument. " +
+						"That means that '.fixedSubscriberChannel()' can't be the last " +
+						"EIP-method in the 'IntegrationFlow' definition.");
 			}
 
 			if (this.integrationComponents.size() == 1) {
@@ -3151,11 +3156,6 @@ else if (this.currentMessageChannel != null) {
 		return this.integrationFlow;
 	}
 
-	private static boolean isLambda(Object o) {
-		Class aClass = o.getClass();
-		return aClass.isSynthetic() && !aClass.isAnonymousClass() && !aClass.isLocalClass();
-	}
-
 	private static Object extractProxyTarget(Object target) {
 		if (!(target instanceof Advised)) {
 			return target;
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/ExpressionCommandMessageProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/ExpressionCommandMessageProcessor.java
index b9ccb02a7e8..449b833bff8 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/handler/ExpressionCommandMessageProcessor.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/ExpressionCommandMessageProcessor.java
@@ -48,7 +48,11 @@
  */
 public class ExpressionCommandMessageProcessor extends AbstractMessageProcessor {
 
+	@Nullable
+	private final MethodFilter methodFilter;
+
 	public ExpressionCommandMessageProcessor() {
+		this.methodFilter = null;
 	}
 
 	public ExpressionCommandMessageProcessor(@Nullable MethodFilter methodFilter) {
@@ -56,18 +60,19 @@ public ExpressionCommandMessageProcessor(@Nullable MethodFilter methodFilter) {
 	}
 
 	public ExpressionCommandMessageProcessor(@Nullable MethodFilter methodFilter, @Nullable BeanFactory beanFactory) {
+		this.methodFilter = methodFilter;
 		if (beanFactory != null) {
 			setBeanFactory(beanFactory);
 		}
-		if (methodFilter != null) {
-			MethodResolver methodResolver = new ExpressionCommandMethodResolver(methodFilter);
-			getEvaluationContext().setMethodResolvers(Collections.singletonList(methodResolver));
-		}
 	}
 
 	@Override
 	public final void setBeanFactory(BeanFactory beanFactory) {
 		super.setBeanFactory(beanFactory);
+		if (this.methodFilter != null) {
+			MethodResolver methodResolver = new ExpressionCommandMethodResolver(this.methodFilter);
+			getEvaluationContext().setMethodResolvers(Collections.singletonList(methodResolver));
+		}
 	}
 
 	/**
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/LambdaMessageProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/LambdaMessageProcessor.java
index fca4f69f028..220b3805c31 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/handler/LambdaMessageProcessor.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/LambdaMessageProcessor.java
@@ -20,7 +20,7 @@
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,6 +28,7 @@
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.BeanFactory;
 import org.springframework.beans.factory.BeanFactoryAware;
+import org.springframework.core.MethodIntrospector;
 import org.springframework.integration.context.IntegrationContextUtils;
 import org.springframework.integration.support.utils.IntegrationUtils;
 import org.springframework.messaging.Message;
@@ -63,26 +64,21 @@ public class LambdaMessageProcessor implements MessageProcessor, BeanFac
 	public LambdaMessageProcessor(Object target, Class payloadType) {
 		Assert.notNull(target, "'target' must not be null");
 		this.target = target;
-		final AtomicReference methodValue = new AtomicReference<>();
-		ReflectionUtils.doWithMethods(target.getClass(),
-				methodValue::set,
-				methodCandidate -> {
-					boolean isCandidate = !methodCandidate.isBridge()
-							&& !methodCandidate.isDefault()
-							&& methodCandidate.getDeclaringClass() != Object.class
-							&& Modifier.isPublic(methodCandidate.getModifiers())
-							&& !Modifier.isStatic(methodCandidate.getModifiers());
-					if (isCandidate) {
-						Assert.isNull(methodValue.get(), "LambdaMessageProcessor is applicable for inline or lambda " +
-								"classes with single method - functional interface implementations.");
-					}
-					return isCandidate;
-				});
 
-		Assert.notNull(methodValue.get(), "LambdaMessageProcessor is applicable for inline or lambda " +
-				"classes with single method - functional interface implementations.");
+		Set methods =
+				MethodIntrospector.selectMethods(target.getClass(),
+						(ReflectionUtils.MethodFilter) methodCandidate ->
+								methodCandidate.getDeclaringClass() != Object.class &&
+										!methodCandidate.getDeclaringClass().getName()
+												.equals("kotlin.jvm.internal.Lambda") &&
+										!methodCandidate.isDefault() &&
+										!Modifier.isStatic(methodCandidate.getModifiers()));
+
+		Assert.state(methods.size() == 1,
+				"LambdaMessageProcessor is applicable for inline or lambda " +
+						"classes with single method - functional interface implementations.");
 
-		this.method = methodValue.get();
+		this.method = methods.iterator().next();
 		this.method.setAccessible(true);
 		this.parameterTypes = this.method.getParameterTypes();
 		this.payloadType = payloadType;
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/ClassUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/util/ClassUtils.java
index 403d8db1619..8ac0609ab12 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/util/ClassUtils.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/util/ClassUtils.java
@@ -21,7 +21,9 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
+import org.springframework.lang.Nullable;
 import org.springframework.util.ReflectionUtils;
 
 /**
@@ -32,12 +34,25 @@
  */
 public abstract class ClassUtils {
 
+	/**
+	 * Map with primitive wrapper type as key and corresponding primitive
+	 * type as value, for example: Integer.class -> int.class.
+	 */
+	private static final Map, Class> PRIMITIVE_WRAPPER_TYPE_MAP = new HashMap<>(8);
+
+
 	/**
 	 * The {@link Function#apply(Object)} method object.
 	 */
 	public static final Method FUNCTION_APPLY_METHOD =
 			ReflectionUtils.findMethod(Function.class, "apply", (Class[]) null);
 
+	/**
+	 * The {@link Supplier#get()} method object.
+	 */
+	public static final Method SUPPLIER_GET_METHOD =
+			ReflectionUtils.findMethod(Supplier.class, "get", (Class[]) null);
+
 	/**
 	 * The {@code org.springframework.integration.core.GenericSelector#accept(Object)} method object.
 	 */
@@ -53,7 +68,31 @@ public abstract class ClassUtils {
 	 */
 	public static final Method HANDLER_HANDLE_METHOD;
 
+	/**
+	 * The {@code kotlin.jvm.functions.Function0} class object.
+	 */
+	public static final Class KOTLIN_FUNCTION_0_CLASS;
+
+	/**
+	 * The {@code kotlin.jvm.functions.Function0#invoke} method object.
+	 */
+	public static final Method KOTLIN_FUNCTION_0_INVOKE_METHOD;
+
+	/**
+	 * The {@code kotlin.jvm.functions.Function1} class object.
+	 */
+	public static final Class KOTLIN_FUNCTION_1_CLASS;
+
 	static {
+		PRIMITIVE_WRAPPER_TYPE_MAP.put(Boolean.class, boolean.class);
+		PRIMITIVE_WRAPPER_TYPE_MAP.put(Byte.class, byte.class);
+		PRIMITIVE_WRAPPER_TYPE_MAP.put(Character.class, char.class);
+		PRIMITIVE_WRAPPER_TYPE_MAP.put(Double.class, double.class);
+		PRIMITIVE_WRAPPER_TYPE_MAP.put(Float.class, float.class);
+		PRIMITIVE_WRAPPER_TYPE_MAP.put(Integer.class, int.class);
+		PRIMITIVE_WRAPPER_TYPE_MAP.put(Long.class, long.class);
+		PRIMITIVE_WRAPPER_TYPE_MAP.put(Short.class, short.class);
+
 		Class genericSelectorClass = null;
 		try {
 			genericSelectorClass =
@@ -93,25 +132,35 @@ public abstract class ClassUtils {
 		}
 
 		HANDLER_HANDLE_METHOD = ReflectionUtils.findMethod(genericHandlerClass, "handle", (Class[]) null);
-	}
-
 
-	/**
-	 * Map with primitive wrapper type as key and corresponding primitive
-	 * type as value, for example: Integer.class -> int.class.
-	 */
-	private static final Map, Class> primitiveWrapperTypeMap = new HashMap<>(8);
+		Class kotlinClass = null;
+		Method kotlinMethod = null;
+		try {
+			kotlinClass = org.springframework.util.ClassUtils.forName("kotlin.jvm.functions.Function0",
+					org.springframework.util.ClassUtils.getDefaultClassLoader());
 
+			kotlinMethod = ReflectionUtils.findMethod(kotlinClass, "invoke", (Class[]) null);
+		}
+		catch (ClassNotFoundException e) {
+			//Ignore: assume no Kotlin in classpath
+		}
+		finally {
+			KOTLIN_FUNCTION_0_CLASS = kotlinClass;
+			KOTLIN_FUNCTION_0_INVOKE_METHOD = kotlinMethod;
+		}
 
-	static {
-		primitiveWrapperTypeMap.put(Boolean.class, boolean.class);
-		primitiveWrapperTypeMap.put(Byte.class, byte.class);
-		primitiveWrapperTypeMap.put(Character.class, char.class);
-		primitiveWrapperTypeMap.put(Double.class, double.class);
-		primitiveWrapperTypeMap.put(Float.class, float.class);
-		primitiveWrapperTypeMap.put(Integer.class, int.class);
-		primitiveWrapperTypeMap.put(Long.class, long.class);
-		primitiveWrapperTypeMap.put(Short.class, short.class);
+		kotlinClass = null;
+		kotlinMethod = null;
+		try {
+			kotlinClass = org.springframework.util.ClassUtils.forName("kotlin.jvm.functions.Function1",
+					org.springframework.util.ClassUtils.getDefaultClassLoader());
+		}
+		catch (ClassNotFoundException e) {
+			//Ignore: assume no Kotlin in classpath
+		}
+		finally {
+			KOTLIN_FUNCTION_1_CLASS = kotlinClass;
+		}
 	}
 
 	public static Class findClosestMatch(Class type, Set> candidates, boolean failOnTie) {
@@ -163,8 +212,39 @@ else if (org.springframework.util.ClassUtils.isAssignable(candidate, superClass)
 	 * @param clazz the wrapper class to check
 	 * @return the corresponding primitive if the clazz is a wrapper, otherwise null
 	 */
+	@Nullable
 	public static Class resolvePrimitiveType(Class clazz) {
-		return primitiveWrapperTypeMap.get(clazz);
+		return PRIMITIVE_WRAPPER_TYPE_MAP.get(clazz);
+	}
+
+	/**
+	 * Check if class is Java lambda.
+	 * @param aClass the {@link Class} to check.
+	 * @return true if class is a Java lambda.
+	 * @since 5.2
+	 */
+	public static boolean isLambda(Class aClass) {
+		return aClass.isSynthetic() && !aClass.isAnonymousClass() && !aClass.isLocalClass();
+	}
+
+	/**
+	 * Check if class is {@code kotlin.jvm.functions.Function0}.
+	 * @param aClass the {@link Class} to check.
+	 * @return true if class is a {@code kotlin.jvm.functions.Function0} implementation.
+	 * @since 5.2
+	 */
+	public static boolean isKotlinFaction0(Class aClass) {
+		return KOTLIN_FUNCTION_0_CLASS != null && KOTLIN_FUNCTION_0_CLASS.isAssignableFrom(aClass);
+	}
+
+	/**
+	 * Check if class is {@code kotlin.jvm.functions.Function1}.
+	 * @param aClass the {@link Class} to check.
+	 * @return true if class is a {@code kotlin.jvm.functions.Function1} implementation.
+	 * @since 5.2
+	 */
+	public static boolean isKotlinFaction1(Class aClass) {
+		return KOTLIN_FUNCTION_1_CLASS != null && KOTLIN_FUNCTION_1_CLASS.isAssignableFrom(aClass);
 	}
 
 }
diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java
index f789f080209..64005297b08 100644
--- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java
+++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java
@@ -349,8 +349,7 @@ public MessageHandler service() {
 		@Filter(inputChannel = "skippedChannel5")
 		@Profile("foo")
 		public MessageHandler skippedMessageHandler() {
-			return m -> {
-			};
+			return m -> { };
 		}
 
 		@Bean
@@ -384,14 +383,7 @@ public Function functionAsService() {
 		@Bean
 		@ServiceActivator(inputChannel = "functionMessageServiceChannel")
 		public Function, String> messageFunctionAsService() {
-			return new Function, String>() { // Has to be interface for proper type inferring
-
-				@Override
-				public String apply(Message m) {
-					return m.getPayload().toLowerCase();
-				}
-
-			};
+			return (message) -> message.getPayload().toLowerCase();
 		}
 
 		@Bean
@@ -408,14 +400,7 @@ public Consumer consumerAsService() {
 		@Bean
 		@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
 		public Consumer> messageConsumerAsService() {
-			return new Consumer>() { // Has to be interface for proper type inferring
-
-				@Override
-				public void accept(Message e) {
-					collector().add(e);
-				}
-
-			};
+			return collector()::add;
 		}
 
 	}
diff --git a/spring-integration-core/src/test/kotlin/org/springframework/integration/function/FunctionsTests.kt b/spring-integration-core/src/test/kotlin/org/springframework/integration/function/FunctionsTests.kt
index a8f6f8a15e6..1acd7b83877 100644
--- a/spring-integration-core/src/test/kotlin/org/springframework/integration/function/FunctionsTests.kt
+++ b/spring-integration-core/src/test/kotlin/org/springframework/integration/function/FunctionsTests.kt
@@ -16,7 +16,6 @@
 
 package org.springframework.integration.function
 
-import assertk.all
 import assertk.assertThat
 import assertk.assertions.isEqualTo
 import assertk.assertions.isNotNull
@@ -24,8 +23,10 @@ import assertk.assertions.isTrue
 import assertk.assertions.size
 import org.junit.jupiter.api.Test
 import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.beans.factory.annotation.Qualifier
 import org.springframework.context.annotation.Bean
 import org.springframework.context.annotation.Configuration
+import org.springframework.integration.annotation.EndpointId
 import org.springframework.integration.annotation.InboundChannelAdapter
 import org.springframework.integration.annotation.Poller
 import org.springframework.integration.annotation.ServiceActivator
@@ -37,6 +38,7 @@ import org.springframework.integration.dsl.IntegrationFlows
 import org.springframework.integration.endpoint.SourcePollingChannelAdapter
 import org.springframework.messaging.Message
 import org.springframework.messaging.MessageChannel
+import org.springframework.messaging.PollableChannel
 import org.springframework.messaging.SubscribableChannel
 import org.springframework.messaging.support.GenericMessage
 import org.springframework.messaging.support.MessageBuilder
@@ -44,9 +46,7 @@ import org.springframework.test.annotation.DirtiesContext
 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
 import java.util.*
 import java.util.concurrent.CountDownLatch
-import java.util.concurrent.Executors
 import java.util.concurrent.TimeUnit
-import java.util.function.Supplier
 
 /**
  * @author Artem Bilan
@@ -70,8 +70,12 @@ class FunctionsTests {
 	private lateinit var counterChannel: SubscribableChannel
 
 	@Autowired
+	@Qualifier("kotlinSupplierChannelAdapter")
 	private lateinit var kotlinSupplierInboundChannelAdapter: SourcePollingChannelAdapter
 
+	@Autowired
+	private lateinit var fromSupplierQueue: PollableChannel
+
 	@Test
 	fun `invoke function via transformer`() {
 		val replyChannel = QueueChannel()
@@ -111,6 +115,11 @@ class FunctionsTests {
 		assertThat(countDownLatch.await(10, TimeUnit.SECONDS)).isTrue()
 	}
 
+	@Test
+	fun `verify supplier flow`() {
+		assertThat(this.fromSupplierQueue.receive(10_000)).isNotNull()
+	}
+
 	@Configuration
 	@EnableIntegration
 	class Config {
@@ -136,10 +145,23 @@ class FunctionsTests {
 		@Bean
 		@InboundChannelAdapter(value = "counterChannel", autoStartup = "false",
 				poller = [Poller(fixedRate = "10", maxMessagesPerPoll = "1")])
+		@EndpointId("kotlinSupplierChannelAdapter")
 		fun kotlinSupplier(): () -> String {
 			return { "baz" }
 		}
 
+		@Bean
+		fun flowFromSupplier() =
+				IntegrationFlows.from({ "bar" },
+						{ e ->
+							e.poller { p ->
+								p.fixedDelay(10)
+										.maxMessagesPerPoll(1)
+							}
+
+						})
+						.channel { c -> c.queue("fromSupplierQueue") }
+						.get()
 	}
 
 }