Skip to content

Commit a05004a

Browse files
garyrussellartembilan
authored andcommitted
INT-4536: Support Kafka Tombstones
JIRA: https://jira.spring.io/browse/INT-4536 Represent `@KafkaNull` as `null` when `@Payload(required = false)`.
1 parent 8b4d1e6 commit a05004a

File tree

4 files changed

+112
-11
lines changed

4 files changed

+112
-11
lines changed

spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.springframework.integration.handler.support.PayloadExpressionArgumentResolver;
5959
import org.springframework.integration.handler.support.PayloadsArgumentResolver;
6060
import org.springframework.integration.support.DefaultMessageBuilderFactory;
61+
import org.springframework.integration.support.NullAwarePayloadArgumentResolver;
6162
import org.springframework.integration.support.SmartLifecycleRoleController;
6263
import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter;
6364
import org.springframework.integration.support.converter.DefaultDatatypeChannelMessageConverter;
@@ -99,6 +100,7 @@ public void setBeanClassLoader(ClassLoader classLoader) {
99100
this.classLoader = classLoader;
100101
}
101102

103+
@Override
102104
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
103105
if (beanFactory instanceof BeanDefinitionRegistry) {
104106
this.beanFactory = beanFactory;
@@ -477,9 +479,13 @@ private void registerListCapableArgumentResolvers() {
477479
}
478480
}
479481

480-
private static BeanDefinition internalArgumentResolversBuilder(boolean listCapable) {
482+
private BeanDefinition internalArgumentResolversBuilder(boolean listCapable) {
481483
ManagedList<BeanDefinition> resolvers = new ManagedList<>();
482484
resolvers.add(new RootBeanDefinition(PayloadExpressionArgumentResolver.class));
485+
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(NullAwarePayloadArgumentResolver.class);
486+
builder.addConstructorArgReference(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME);
487+
// TODO Validator ?
488+
resolvers.add(builder.getBeanDefinition());
483489
resolvers.add(new RootBeanDefinition(PayloadsArgumentResolver.class));
484490
resolvers.add(new RootBeanDefinition(MapArgumentResolver.class));
485491

spring-integration-core/src/main/java/org/springframework/integration/handler/support/MessagingMethodInvokerHelper.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@
7474
import org.springframework.integration.annotation.UseSpelInvoker;
7575
import org.springframework.integration.context.IntegrationContextUtils;
7676
import org.springframework.integration.support.MutableMessage;
77+
import org.springframework.integration.support.NullAwarePayloadArgumentResolver;
78+
import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter;
7779
import org.springframework.integration.support.json.JsonObjectMapper;
7880
import org.springframework.integration.support.json.JsonObjectMapperProvider;
7981
import org.springframework.integration.util.AbstractExpressionEvaluator;
@@ -538,6 +540,19 @@ private synchronized void initialize() throws Exception {
538540
* that don't run in an application context.
539541
*/
540542
private void configureLocalMessageHandlerFactory() {
543+
MessageConverter messageConverter = null;
544+
if (getBeanFactory() != null &&
545+
getBeanFactory()
546+
.containsBean(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)) {
547+
messageConverter = getBeanFactory()
548+
.getBean(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME,
549+
MessageConverter.class);
550+
this.messageHandlerMethodFactory.setMessageConverter(messageConverter);
551+
}
552+
else {
553+
messageConverter = new ConfigurableCompositeMessageConverter();
554+
}
555+
NullAwarePayloadArgumentResolver nullResolver = new NullAwarePayloadArgumentResolver(messageConverter);
541556
PayloadExpressionArgumentResolver payloadExpressionArgumentResolver = new PayloadExpressionArgumentResolver();
542557
payloadExpressionArgumentResolver.setBeanFactory(getBeanFactory());
543558

@@ -549,6 +564,7 @@ private void configureLocalMessageHandlerFactory() {
549564

550565
List<HandlerMethodArgumentResolver> customArgumentResolvers = new LinkedList<>();
551566
customArgumentResolvers.add(payloadExpressionArgumentResolver);
567+
customArgumentResolvers.add(nullResolver);
552568
customArgumentResolvers.add(payloadsArgumentResolver);
553569

554570
if (this.canProcessMessageList) {
@@ -560,15 +576,6 @@ private void configureLocalMessageHandlerFactory() {
560576
customArgumentResolvers.add(mapArgumentResolver);
561577

562578
this.messageHandlerMethodFactory.setCustomArgumentResolvers(customArgumentResolvers);
563-
564-
if (getBeanFactory() != null &&
565-
getBeanFactory()
566-
.containsBean(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)) {
567-
this.messageHandlerMethodFactory
568-
.setMessageConverter(getBeanFactory()
569-
.getBean(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME,
570-
MessageConverter.class));
571-
}
572579
}
573580

574581
@SuppressWarnings("unchecked")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.support;
18+
19+
import org.springframework.messaging.converter.MessageConverter;
20+
import org.springframework.messaging.handler.annotation.Payload;
21+
import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver;
22+
import org.springframework.validation.Validator;
23+
24+
/**
25+
* A {@link PayloadArgumentResolver} that treats KafkaNull payloads as null.
26+
* {@link Payload @Paylaod} annotation must have required = false.
27+
*
28+
* @author Gary Russell
29+
* @since 5.1
30+
*
31+
*/
32+
public class NullAwarePayloadArgumentResolver extends PayloadArgumentResolver {
33+
34+
public NullAwarePayloadArgumentResolver(MessageConverter messageConverter) {
35+
super(messageConverter, null, false);
36+
}
37+
38+
public NullAwarePayloadArgumentResolver(MessageConverter messageConverter, Validator validator) {
39+
super(messageConverter, validator, false);
40+
}
41+
42+
@Override
43+
protected boolean isEmptyPayload(Object payload) {
44+
return super.isEmptyPayload(payload) || "KafkaNull".equals(payload.getClass().getSimpleName());
45+
}
46+
47+
}

spring-integration-core/src/test/java/org/springframework/integration/endpoint/ServiceActivatorMethodResolutionTests.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2010 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,10 +30,12 @@
3030
import org.springframework.integration.handler.ServiceActivatingHandler;
3131
import org.springframework.messaging.Message;
3232
import org.springframework.messaging.PollableChannel;
33+
import org.springframework.messaging.handler.annotation.Payload;
3334
import org.springframework.messaging.support.GenericMessage;
3435

3536
/**
3637
* @author Mark Fisher
38+
* @author Gary Russell
3739
*/
3840
public class ServiceActivatorMethodResolutionTests {
3941

@@ -231,6 +233,17 @@ public String bar(String request) {
231233
assertNotEquals("FOO", outputChannel.receive(10).getPayload());
232234
}
233235

236+
@Test
237+
public void nullOk() {
238+
NullOkTestBean testBean = new NullOkTestBean();
239+
ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(testBean);
240+
QueueChannel outputChannel = new QueueChannel();
241+
serviceActivator.setOutputChannel(outputChannel);
242+
serviceActivator.handleMessage(new GenericMessage<>(new KafkaNull()));
243+
Message<?> result = outputChannel.receive(0);
244+
assertEquals("gotNull", result.getPayload());
245+
}
246+
234247

235248
@SuppressWarnings("unused")
236249
private static class SingleAnnotationTestBean {
@@ -305,4 +318,32 @@ public String lowerCase(String s) {
305318

306319
}
307320

321+
322+
@SuppressWarnings("unused")
323+
private static class NullOkTestBean {
324+
325+
NullOkTestBean() {
326+
super();
327+
}
328+
329+
@ServiceActivator
330+
public String nullOK(@Payload(required = false) String s) {
331+
if (s == null) {
332+
return "gotNull";
333+
}
334+
else {
335+
return s;
336+
}
337+
}
338+
339+
}
340+
341+
private static class KafkaNull {
342+
343+
KafkaNull() {
344+
super();
345+
}
346+
347+
}
348+
308349
}

0 commit comments

Comments
 (0)