Skip to content

Commit 0c1423e

Browse files
artembilangaryrussell
authored andcommitted
INT-4456: Explicit methods for Kotlin lambdas
JIRA: https://jira.spring.io/browse/INT-4456 Since Kotlin compiles lambdas different way than Java, they are not synthetic classes at runtime anymore. Therefore we fallback to the method invocation logic, but since Java 8 interfaces has `default` methods as well the `MessagingMethodInvokerHelper` can't select the target method for invocation * Use explicit `Function.apply()` for non-synthetic implementations * Add `RouterDslTests` Kotlin-based test-case **Cherry-pick to 5.0.x** * Add more explicit methods for invokers
1 parent 000f297 commit 0c1423e

File tree

5 files changed

+171
-29
lines changed

5 files changed

+171
-29
lines changed

build.gradle

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
buildscript {
2+
ext.kotlinVersion = '1.2.40'
23
repositories {
34
maven { url 'https://repo.spring.io/plugins-release' }
45
}
56
dependencies {
67
classpath 'io.spring.gradle:docbook-reference-plugin:0.3.1'
78
classpath 'org.asciidoctor:asciidoctor-gradle-plugin:1.5.3'
9+
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlinVersion"
810
}
911
}
1012

@@ -58,6 +60,7 @@ subprojects { subproject ->
5860
apply plugin: 'idea'
5961
apply plugin: 'jacoco'
6062
apply plugin: 'checkstyle'
63+
apply plugin: 'kotlin'
6164

6265
sourceSets {
6366
test {
@@ -72,11 +75,18 @@ subprojects { subproject ->
7275
targetCompatibility = 1.8
7376
}
7477

78+
compileTestKotlin {
79+
kotlinOptions {
80+
jvmTarget = '1.8'
81+
}
82+
}
83+
7584
ext {
7685
activeMqVersion = '5.15.3'
7786
apacheSshdVersion = '1.7.0'
7887
aspectjVersion = '1.9.0'
7988
assertjVersion = '3.9.1'
89+
assertkVersion = '0.10'
8090
boonVersion = '0.34'
8191
commonsDbcp2Version = '2.2.0'
8292
commonsIoVersion = '2.6'
@@ -161,6 +171,14 @@ subprojects { subproject ->
161171

162172
testRuntime "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
163173
testRuntime "org.apache.logging.log4j:log4j-jcl:$log4jVersion"
174+
175+
testCompile("com.willowtreeapps.assertk:assertk:$assertkVersion") {
176+
exclude group: 'org.jetbrains.kotlin', module: 'kotlin-reflect'
177+
}
178+
179+
testCompile "org.jetbrains.kotlin:kotlin-stdlib:$kotlinVersion"
180+
testRuntime "org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion"
181+
164182
}
165183

166184
// enable all compiler warnings; individual projects may customize further

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import org.springframework.integration.transformer.MessageTransformingHandler;
8787
import org.springframework.integration.transformer.MethodInvokingTransformer;
8888
import org.springframework.integration.transformer.Transformer;
89+
import org.springframework.integration.util.ClassUtils;
8990
import org.springframework.messaging.Message;
9091
import org.springframework.messaging.MessageChannel;
9192
import org.springframework.messaging.MessageHandler;
@@ -633,7 +634,7 @@ public <P, T> B transform(Class<P> payloadType, GenericTransformer<P, T> generic
633634
Transformer transformer = genericTransformer instanceof Transformer ? (Transformer) genericTransformer :
634635
(isLambda(genericTransformer)
635636
? new MethodInvokingTransformer(new LambdaMessageProcessor(genericTransformer, payloadType))
636-
: new MethodInvokingTransformer(genericTransformer));
637+
: new MethodInvokingTransformer(genericTransformer, ClassUtils.TRANSFORMER_TRANSFORM_METHOD));
637638
return addComponent(transformer)
638639
.handle(new MessageTransformingHandler(transformer), endpointConfigurer);
639640
}
@@ -826,7 +827,7 @@ public <P> B filter(Class<P> payloadType, GenericSelector<P> genericSelector,
826827
MessageSelector selector = genericSelector instanceof MessageSelector ? (MessageSelector) genericSelector :
827828
(isLambda(genericSelector)
828829
? new MethodInvokingSelector(new LambdaMessageProcessor(genericSelector, payloadType))
829-
: new MethodInvokingSelector(genericSelector));
830+
: new MethodInvokingSelector(genericSelector, ClassUtils.SELECTOR_ACCEPT_METHOD));
830831
return this.register(new FilterEndpointSpec(new MessageFilter(selector)), endpointConfigurer);
831832
}
832833

@@ -1022,12 +1023,12 @@ public <P> B handle(Class<P> payloadType, GenericHandler<P> handler) {
10221023
*/
10231024
public <P> B handle(Class<P> payloadType, GenericHandler<P> handler,
10241025
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
1025-
ServiceActivatingHandler serviceActivatingHandler = null;
1026+
ServiceActivatingHandler serviceActivatingHandler;
10261027
if (isLambda(handler)) {
10271028
serviceActivatingHandler = new ServiceActivatingHandler(new LambdaMessageProcessor(handler, payloadType));
10281029
}
10291030
else {
1030-
serviceActivatingHandler = new ServiceActivatingHandler(handler, "handle");
1031+
serviceActivatingHandler = new ServiceActivatingHandler(handler, ClassUtils.HANDLER_HANDLE_METHOD);
10311032
}
10321033
return this.handle(serviceActivatingHandler, endpointConfigurer);
10331034
}
@@ -1533,7 +1534,7 @@ public <P> B split(Class<P> payloadType, Function<P, ?> splitter,
15331534
Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
15341535
MethodInvokingSplitter split = isLambda(splitter)
15351536
? new MethodInvokingSplitter(new LambdaMessageProcessor(splitter, payloadType))
1536-
: new MethodInvokingSplitter(splitter);
1537+
: new MethodInvokingSplitter(splitter, ClassUtils.FUNCTION_APPLY_METHOD);
15371538
return this.split(split, endpointConfigurer);
15381539
}
15391540

@@ -1933,7 +1934,7 @@ public <P, T> B route(Class<P> payloadType, Function<P, T> router,
19331934
Consumer<RouterSpec<T, MethodInvokingRouter>> routerConfigurer) {
19341935
MethodInvokingRouter methodInvokingRouter = isLambda(router)
19351936
? new MethodInvokingRouter(new LambdaMessageProcessor(router, payloadType))
1936-
: new MethodInvokingRouter(router);
1937+
: new MethodInvokingRouter(router, ClassUtils.FUNCTION_APPLY_METHOD);
19371938
return route(new RouterSpec<>(methodInvokingRouter), routerConfigurer);
19381939
}
19391940

spring-integration-core/src/main/java/org/springframework/integration/dsl/RecipientListRouterSpec.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import org.springframework.integration.core.MessageSelector;
2222
import org.springframework.integration.filter.ExpressionEvaluatingSelector;
2323
import org.springframework.integration.filter.MethodInvokingSelector;
24-
import org.springframework.integration.handler.LambdaMessageProcessor;
2524
import org.springframework.integration.router.RecipientListRouter;
25+
import org.springframework.integration.util.ClassUtils;
2626
import org.springframework.messaging.MessageChannel;
2727
import org.springframework.util.StringUtils;
2828

@@ -96,23 +96,20 @@ public RecipientListRouterSpec recipientMessageSelector(String channelName, Mess
9696
* @return the router spec.
9797
*/
9898
public <P> RecipientListRouterSpec recipient(String channelName, GenericSelector<P> selector) {
99+
MessageSelector messageSelector = wrapToMessageSelectorIfNecessary(selector);
100+
this.handler.addRecipient(channelName, messageSelector);
101+
return _this();
102+
}
103+
104+
private <P> MessageSelector wrapToMessageSelectorIfNecessary(GenericSelector<P> selector) {
99105
MessageSelector messageSelector;
100106
if (selector instanceof MessageSelector) {
101107
messageSelector = (MessageSelector) selector;
102108
}
103109
else {
104-
messageSelector =
105-
isLambda(selector)
106-
? new MethodInvokingSelector(new LambdaMessageProcessor(selector, null))
107-
: new MethodInvokingSelector(selector);
110+
messageSelector = new MethodInvokingSelector(selector, ClassUtils.SELECTOR_ACCEPT_METHOD);
108111
}
109-
this.handler.addRecipient(channelName, messageSelector);
110-
return _this();
111-
}
112-
113-
private static boolean isLambda(Object o) {
114-
Class<?> aClass = o.getClass();
115-
return aClass.isSynthetic() && !aClass.isAnonymousClass() && !aClass.isLocalClass();
112+
return messageSelector;
116113
}
117114

118115
/**
@@ -170,16 +167,7 @@ public RecipientListRouterSpec recipientMessageSelector(MessageChannel channel,
170167
* @return the router spec.
171168
*/
172169
public <P> RecipientListRouterSpec recipient(MessageChannel channel, GenericSelector<P> selector) {
173-
MessageSelector messageSelector;
174-
if (selector instanceof MessageSelector) {
175-
messageSelector = (MessageSelector) selector;
176-
}
177-
else {
178-
messageSelector =
179-
isLambda(selector)
180-
? new MethodInvokingSelector(new LambdaMessageProcessor(selector, null))
181-
: new MethodInvokingSelector(selector);
182-
}
170+
MessageSelector messageSelector = wrapToMessageSelectorIfNecessary(selector);
183171
this.handler.addRecipient(channel, messageSelector);
184172
return _this();
185173
}

spring-integration-core/src/main/java/org/springframework/integration/util/ClassUtils.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,16 +16,63 @@
1616

1717
package org.springframework.integration.util;
1818

19+
import java.lang.reflect.Method;
1920
import java.util.HashMap;
2021
import java.util.Map;
2122
import java.util.Set;
23+
import java.util.function.Function;
24+
25+
import org.springframework.integration.core.GenericSelector;
26+
import org.springframework.integration.transformer.GenericTransformer;
27+
import org.springframework.util.ReflectionUtils;
2228

2329
/**
2430
* @author Mark Fisher
31+
* @author Artem Bilan
32+
*
2533
* @since 2.0
2634
*/
2735
public abstract class ClassUtils {
2836

37+
/**
38+
* The {@link Function#apply(Object)} method object.
39+
*/
40+
public static final Method FUNCTION_APPLY_METHOD =
41+
ReflectionUtils.findMethod(Function.class, "apply", (Class<?>[]) null);
42+
43+
/**
44+
* The {@link GenericSelector#accept(Object)} method object.
45+
*/
46+
public static final Method SELECTOR_ACCEPT_METHOD =
47+
ReflectionUtils.findMethod(GenericSelector.class, "accept", (Class<?>[]) null);
48+
49+
/**
50+
* The {@link GenericSelector#accept(Object)} method object.
51+
*/
52+
public static final Method TRANSFORMER_TRANSFORM_METHOD =
53+
ReflectionUtils.findMethod(GenericTransformer.class, "transform", (Class<?>[]) null);
54+
55+
/**
56+
* The {@code org.springframework.integration.handler.GenericHandler#handle(Object, Map)} method object.
57+
*/
58+
public static final Method HANDLER_HANDLE_METHOD;
59+
60+
static {
61+
Class<?> genericHandlerClass = null;
62+
try {
63+
genericHandlerClass =
64+
org.springframework.util.ClassUtils.forName(
65+
"org.springframework.integration.handler.GenericHandler",
66+
org.springframework.util.ClassUtils.getDefaultClassLoader());
67+
}
68+
catch (ClassNotFoundException e) {
69+
ReflectionUtils.rethrowRuntimeException(e);
70+
}
71+
72+
HANDLER_HANDLE_METHOD = ReflectionUtils.findMethod(genericHandlerClass, "handle", (Class<?>[]) null);
73+
}
74+
75+
2976
/**
3077
* Map with primitive wrapper type as key and corresponding primitive
3178
* type as value, for example: Integer.class -> int.class.
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.dsl.routers
18+
19+
import assertk.assert
20+
import assertk.assertions.isEqualTo
21+
import assertk.assertions.isInstanceOf
22+
import assertk.assertions.isNotNull
23+
import org.junit.jupiter.api.Test
24+
import org.springframework.beans.factory.annotation.Autowired
25+
import org.springframework.beans.factory.annotation.Qualifier
26+
import org.springframework.context.annotation.Bean
27+
import org.springframework.context.annotation.Configuration
28+
import org.springframework.integration.config.EnableIntegration
29+
import org.springframework.integration.dsl.IntegrationFlow
30+
import org.springframework.messaging.MessageChannel
31+
import org.springframework.messaging.PollableChannel
32+
import org.springframework.messaging.support.GenericMessage
33+
import org.springframework.test.annotation.DirtiesContext
34+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
35+
36+
/**
37+
* @author Artem Bilan
38+
*
39+
* @since 5.0.5
40+
*/
41+
@SpringJUnitConfig
42+
@DirtiesContext
43+
class RouterDslTests {
44+
45+
@Autowired
46+
@Qualifier("routerTwoSubFlows.input")
47+
private lateinit var routerTwoSubFlowsInput: MessageChannel
48+
49+
@Autowired
50+
@Qualifier("routerTwoSubFlowsOutput")
51+
private lateinit var routerTwoSubFlowsOutput: PollableChannel
52+
53+
@Test
54+
fun `route to two subflows using lambda`() {
55+
56+
this.routerTwoSubFlowsInput.send(GenericMessage<Any>(listOf(1, 2, 3, 4, 5, 6)))
57+
val receive = this.routerTwoSubFlowsOutput.receive(10000)
58+
59+
val payload = receive?.payload
60+
61+
assert(payload).isNotNull {
62+
it.isInstanceOf(List::class.java)
63+
}
64+
65+
assert(payload).isEqualTo(listOf(3, 4, 9, 8, 15, 12))
66+
}
67+
68+
69+
@Configuration
70+
@EnableIntegration
71+
open class Config {
72+
73+
@Bean
74+
open fun routerTwoSubFlows() =
75+
IntegrationFlow { f ->
76+
f.split()
77+
.route<Int, Boolean>({ p -> p % 2 == 0 },
78+
{ m ->
79+
m.subFlowMapping(true, { sf -> sf.handle<Int> { p, _ -> p * 2 } })
80+
.subFlowMapping(false, { sf -> sf.handle<Int> { p, _ -> p * 3 } })
81+
})
82+
.aggregate()
83+
.channel { c -> c.queue("routerTwoSubFlowsOutput") }
84+
}
85+
86+
}
87+
88+
}

0 commit comments

Comments
 (0)