Skip to content

Commit 1724aec

Browse files
garyrussellartembilan
authored andcommitted
GH-2313: IdempotentReceiver Discard Channel Name
Resolves: #2313 Allow configuration of the discard channel by name. Refactor the interceptor to extend `IntegrationObjectSupport` and use a channel resolver, if necessary. * Polishing
1 parent e2d919c commit 1724aec

File tree

3 files changed

+60
-35
lines changed

3 files changed

+60
-35
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/advice/AbstractHandleMessageAdvice.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import org.apache.commons.logging.Log;
2323
import org.apache.commons.logging.LogFactory;
2424

25+
import org.springframework.integration.context.IntegrationObjectSupport;
2526
import org.springframework.messaging.Message;
2627
import org.springframework.messaging.MessageHandler;
2728

@@ -30,9 +31,11 @@
3031
* for the {@link MessageHandler#handleMessage(Message)}.
3132
*
3233
* @author Artem Bilan
34+
* @author Gary Russell
35+
*
3336
* @since 4.3.1
3437
*/
35-
public abstract class AbstractHandleMessageAdvice implements HandleMessageAdvice {
38+
public abstract class AbstractHandleMessageAdvice extends IntegrationObjectSupport implements HandleMessageAdvice {
3639

3740
protected final Log logger = LogFactory.getLog(this.getClass());
3841

spring-integration-core/src/main/java/org/springframework/integration/handler/advice/IdempotentReceiverInterceptor.java

Lines changed: 53 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2017 the original author or authors.
2+
* Copyright 2014-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,16 +19,10 @@
1919
import org.aopalliance.intercept.MethodInterceptor;
2020
import org.aopalliance.intercept.MethodInvocation;
2121

22-
import org.springframework.beans.BeansException;
23-
import org.springframework.beans.factory.BeanFactory;
24-
import org.springframework.beans.factory.BeanFactoryAware;
2522
import org.springframework.integration.IntegrationMessageHeaderAccessor;
2623
import org.springframework.integration.MessageRejectedException;
2724
import org.springframework.integration.core.MessageSelector;
2825
import org.springframework.integration.core.MessagingTemplate;
29-
import org.springframework.integration.support.DefaultMessageBuilderFactory;
30-
import org.springframework.integration.support.MessageBuilderFactory;
31-
import org.springframework.integration.support.utils.IntegrationUtils;
3226
import org.springframework.messaging.Message;
3327
import org.springframework.messaging.MessageChannel;
3428
import org.springframework.messaging.MessageHandler;
@@ -46,30 +40,28 @@
4640
* {@code requestMessage} isn't accepted by {@link MessageSelector}.
4741
* <p>
4842
* The {@code idempotent filtering} logic depends on the provided {@link MessageSelector}.
49-
* <p>
43+
* <p>
5044
* This class is designed to be used only for the {@link MessageHandler#handleMessage},
5145
* method.
5246
*
5347
* @author Artem Bilan
48+
* @author Gary Russell
49+
*
5450
* @since 4.1
5551
* @see org.springframework.integration.selector.MetadataStoreSelector
5652
* @see org.springframework.integration.config.IdempotentReceiverAutoProxyCreatorInitializer
5753
*/
58-
public class IdempotentReceiverInterceptor extends AbstractHandleMessageAdvice implements BeanFactoryAware {
54+
public class IdempotentReceiverInterceptor extends AbstractHandleMessageAdvice {
5955

6056
private final MessagingTemplate messagingTemplate = new MessagingTemplate();
6157

6258
private final MessageSelector messageSelector;
6359

64-
private volatile MessageChannel discardChannel;
65-
66-
private volatile boolean throwExceptionOnRejection;
67-
68-
private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();
60+
private MessageChannel discardChannel;
6961

70-
private volatile boolean messageBuilderFactorySet;
62+
private String discardChannelName;
7163

72-
private BeanFactory beanFactory;
64+
private boolean throwExceptionOnRejection;
7365

7466
public IdempotentReceiverInterceptor(MessageSelector messageSelector) {
7567
Assert.notNull(messageSelector, "'messageSelector' must not be null");
@@ -120,28 +112,41 @@ public void setDiscardChannel(MessageChannel discardChannel) {
120112
this.discardChannel = discardChannel;
121113
}
122114

123-
@Override
124-
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
125-
this.beanFactory = beanFactory;
115+
/**
116+
* Specify a channel name where rejected Messages should be sent. If the discard
117+
* channel is null (the default), duplicate Messages will be enriched with
118+
* {@link IntegrationMessageHeaderAccessor#DUPLICATE_MESSAGE} header
119+
* and returned as normal to the {@code invocation.proceed()}. However,
120+
* the 'throwExceptionOnRejection' flag determines whether rejected Messages
121+
* trigger an exception. That value is evaluated regardless of the presence
122+
* of a discard channel.
123+
* <p>
124+
* If there is needed just silently 'drop' rejected messages configure the
125+
* {@link #discardChannel} to the {@code nullChannel}.
126+
* <p>
127+
* Only applies if a {@link #setDiscardChannel(MessageChannel) discardChannel}
128+
* is not provided.
129+
* @param discardChannelName The discard channel name.
130+
* @see #setThrowExceptionOnRejection(boolean)
131+
* @since 5.0.1
132+
*/
133+
public void setDiscardChannelName(String discardChannelName) {
134+
this.discardChannelName = discardChannelName;
126135
}
127136

128-
protected MessageBuilderFactory getMessageBuilderFactory() {
129-
if (!this.messageBuilderFactorySet) {
130-
if (this.beanFactory != null) {
131-
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
132-
}
133-
this.messageBuilderFactorySet = true;
134-
}
135-
return this.messageBuilderFactory;
137+
@Override
138+
public String getComponentType() {
139+
return "idempotent-receiver-interceptor";
136140
}
137141

138142
@Override
139143
protected Object doInvoke(MethodInvocation invocation, Message<?> message) throws Throwable {
140144
boolean accept = this.messageSelector.accept(message);
141145
if (!accept) {
142146
boolean discarded = false;
143-
if (this.discardChannel != null) {
144-
this.messagingTemplate.send(this.discardChannel, message);
147+
MessageChannel theDiscardChannel = obtainDiscardChannel();
148+
if (theDiscardChannel != null) {
149+
this.messagingTemplate.send(theDiscardChannel, message);
145150
discarded = true;
146151
}
147152
if (this.throwExceptionOnRejection) {
@@ -150,8 +155,11 @@ protected Object doInvoke(MethodInvocation invocation, Message<?> message) throw
150155
}
151156

152157
if (!discarded) {
153-
invocation.getArguments()[0] = getMessageBuilderFactory().fromMessage(message)
154-
.setHeader(IntegrationMessageHeaderAccessor.DUPLICATE_MESSAGE, true).build();
158+
invocation.getArguments()[0] =
159+
getMessageBuilderFactory()
160+
.fromMessage(message)
161+
.setHeader(IntegrationMessageHeaderAccessor.DUPLICATE_MESSAGE, true)
162+
.build();
155163
}
156164
else {
157165
return null;
@@ -160,4 +168,18 @@ protected Object doInvoke(MethodInvocation invocation, Message<?> message) throw
160168
return invocation.proceed();
161169
}
162170

171+
private MessageChannel obtainDiscardChannel() {
172+
if (this.discardChannel == null) {
173+
if (this.discardChannelName != null) {
174+
if (getChannelResolver() == null) {
175+
throw new IllegalStateException("No channel resolver available to resolve the discard channel '"
176+
+ this.discardChannelName + "'");
177+
}
178+
this.discardChannel = getChannelResolver()
179+
.resolveDestination(this.discardChannelName);
180+
}
181+
}
182+
return this.discardChannel;
183+
}
184+
163185
}

spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -323,7 +323,7 @@ public PollableChannel idempotentDiscardChannel() {
323323
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
324324
IdempotentReceiverInterceptor idempotentReceiverInterceptor =
325325
new IdempotentReceiverInterceptor(new MetadataStoreSelector(m -> m.getPayload().toString()));
326-
idempotentReceiverInterceptor.setDiscardChannel(idempotentDiscardChannel());
326+
idempotentReceiverInterceptor.setDiscardChannelName("idempotentDiscardChannel");
327327
idempotentReceiverInterceptor.setThrowExceptionOnRejection(true);
328328
return idempotentReceiverInterceptor;
329329
}

0 commit comments

Comments
 (0)