Skip to content

Commit 735e82e

Browse files
artembilangaryrussell
authored andcommitted
Add support for CacheRequestHandlerAdvice
* Fix `AbstractMessageProcessingTransformer` to react for the `AbstractIntegrationMessageBuilder` invocation result and don't wrap it into the `Message` * Demonstrate functionality in the `CacheRequestHandlerAdviceTests` * Polishing and Docs * Fix JavaDocs warnings Doc polishing.
1 parent 36581bf commit 735e82e

File tree

6 files changed

+488
-13
lines changed

6 files changed

+488
-13
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/HeaderEnricherSpec.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -688,15 +688,15 @@ public <P> HeaderEnricherSpec expirationDateFunction(Function<Message<P>, ?> exp
688688
/**
689689
* Add a {@link IntegrationMessageHeaderAccessor#ROUTING_SLIP} header.
690690
* The possible values are:
691-
* <p><ul>
691+
* <ul>
692692
* <li>A {@link org.springframework.messaging.MessageChannel} instance.
693693
* <li>A {@link org.springframework.messaging.MessageChannel} bean name.
694694
* <li>A {@link org.springframework.integration.routingslip.RoutingSlipRouteStrategy} instance.
695695
* <li>A {@link org.springframework.integration.routingslip.RoutingSlipRouteStrategy} bean name.
696696
* <li>A {@code String} for SpEL expression which has to be evaluated to the
697697
* {@link org.springframework.messaging.MessageChannel} or
698698
* {@link org.springframework.integration.routingslip.RoutingSlipRouteStrategy}.
699-
* </ul><p>
699+
* </ul>
700700
* If the header exists, it will <b>not</b> be overwritten unless {@link #defaultOverwrite(boolean)} is true.
701701
* @param routingSlipPath the header value for {@link IntegrationMessageHeaderAccessor#ROUTING_SLIP}.
702702
* @return the header enricher spec.
@@ -709,15 +709,15 @@ public HeaderEnricherSpec routingSlip(Object... routingSlipPath) {
709709
/**
710710
* Add a {@link IntegrationMessageHeaderAccessor#ROUTING_SLIP} header.
711711
* The possible values are:
712-
* <p><ul>
712+
* <ul>
713713
* <li>A {@link org.springframework.messaging.MessageChannel} instance.
714714
* <li>A {@link org.springframework.messaging.MessageChannel} bean name.
715715
* <li>A {@link org.springframework.integration.routingslip.RoutingSlipRouteStrategy} instance.
716716
* <li>A {@link org.springframework.integration.routingslip.RoutingSlipRouteStrategy} bean name.
717717
* <li>A {@code String} for SpEL expression which has to be evaluated to the
718718
* {@link org.springframework.messaging.MessageChannel} or
719719
* {@link org.springframework.integration.routingslip.RoutingSlipRouteStrategy}.
720-
* </ul><p>
720+
* </ul>
721721
* @param overwrite true to overwrite an existing header.
722722
* @param routingSlipPath the header value for {@link IntegrationMessageHeaderAccessor#ROUTING_SLIP}.
723723
* @return the header enricher spec.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.handler.advice;
18+
19+
import java.lang.reflect.Method;
20+
import java.util.ArrayList;
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import java.util.function.Function;
24+
import java.util.stream.Collectors;
25+
26+
import org.springframework.beans.factory.SmartInitializingSingleton;
27+
import org.springframework.cache.CacheManager;
28+
import org.springframework.cache.interceptor.CacheAspectSupport;
29+
import org.springframework.cache.interceptor.CacheErrorHandler;
30+
import org.springframework.cache.interceptor.CacheEvictOperation;
31+
import org.springframework.cache.interceptor.CacheOperation;
32+
import org.springframework.cache.interceptor.CacheOperationInvoker;
33+
import org.springframework.cache.interceptor.CachePutOperation;
34+
import org.springframework.cache.interceptor.CacheResolver;
35+
import org.springframework.cache.interceptor.CacheableOperation;
36+
import org.springframework.expression.EvaluationContext;
37+
import org.springframework.expression.Expression;
38+
import org.springframework.integration.expression.ExpressionUtils;
39+
import org.springframework.integration.expression.FunctionExpression;
40+
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
41+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
42+
import org.springframework.lang.Nullable;
43+
import org.springframework.messaging.Message;
44+
import org.springframework.util.Assert;
45+
import org.springframework.util.ObjectUtils;
46+
import org.springframework.util.ReflectionUtils;
47+
48+
/**
49+
* The {@link AbstractRequestHandlerAdvice} implementation for caching
50+
* {@link AbstractReplyProducingMessageHandler.RequestHandler#handleRequestMessage(Message)} results.
51+
* Supports all the cache operations - cacheable, put, evict.
52+
* By default only cacheable is applied for the provided {@code cacheNames}.
53+
* The default cache {@code key} is {@code payload} of the request message.
54+
*
55+
* @author Artem Bilan
56+
*
57+
* @since 5.2
58+
*
59+
* @see AbstractReplyProducingMessageHandler.RequestHandler
60+
* @see CacheAspectSupport
61+
* @see CacheOperation
62+
*/
63+
public class CacheRequestHandlerAdvice extends AbstractRequestHandlerAdvice
64+
implements SmartInitializingSingleton {
65+
66+
private static final Method HANDLE_REQUEST_METHOD =
67+
ReflectionUtils.findMethod(AbstractReplyProducingMessageHandler.RequestHandler.class,
68+
"handleRequestMessage", Message.class);
69+
70+
private final IntegrationCacheAspect delegate = new IntegrationCacheAspect();
71+
72+
private final String[] cacheNames;
73+
74+
private final List<CacheOperation> cacheOperations = new ArrayList<>();
75+
76+
private Expression keyExpression = new FunctionExpression<Message<?>>(Message::getPayload);
77+
78+
/**
79+
* Create a {@link CacheRequestHandlerAdvice} instance based on the provided name of caches
80+
* and {@link CacheableOperation} as default one.
81+
* This can be overridden by the {@link #setCacheOperations}.
82+
* @param cacheNames the name of caches to use in the advice.
83+
* @see #setCacheOperations
84+
*/
85+
public CacheRequestHandlerAdvice(String... cacheNames) {
86+
this.cacheNames = cacheNames;
87+
CacheableOperation.Builder builder = new CacheableOperation.Builder();
88+
builder.setName(toString());
89+
this.cacheOperations.add(builder.build());
90+
}
91+
92+
/**
93+
* Configure a set of {@link CacheOperation} which are going to be applied to the
94+
* {@link AbstractReplyProducingMessageHandler.RequestHandler#handleRequestMessage(Message)}
95+
* method via {@link IntegrationCacheAspect}.
96+
* This is similar to the technique provided by the
97+
* {@link org.springframework.cache.annotation.Caching} annotation.
98+
* @param cacheOperations the array of {@link CacheOperation} to use.
99+
* @see org.springframework.cache.annotation.Caching
100+
*/
101+
public void setCacheOperations(CacheOperation... cacheOperations) {
102+
Assert.notEmpty(cacheOperations, "'cacheOperations' must not be empty");
103+
Assert.notNull(cacheOperations, "'cacheOperations' must not be null");
104+
this.cacheOperations.clear();
105+
this.cacheOperations.addAll(Arrays.asList(cacheOperations));
106+
}
107+
108+
/**
109+
* Configure a common {@link CacheManager} if some {@link CacheOperation} comes without it.
110+
* See {@link org.springframework.cache.annotation.CacheConfig} annotation for similar approach.
111+
* @param cacheManager the {@link CacheManager} to use.
112+
* @see org.springframework.cache.annotation.CacheConfig
113+
*/
114+
public void setCacheManager(CacheManager cacheManager) {
115+
this.delegate.setCacheManager(cacheManager);
116+
}
117+
118+
/**
119+
* Configure a common {@link CacheResolver} if some {@link CacheOperation} comes without it.
120+
* See {@link org.springframework.cache.annotation.CacheConfig} for similar approach.
121+
* @param cacheResolver the {@link CacheResolver} to use.
122+
* @see org.springframework.cache.annotation.CacheConfig
123+
*/
124+
public void setCacheResolver(CacheResolver cacheResolver) {
125+
this.delegate.setCacheResolver(cacheResolver);
126+
}
127+
128+
/**
129+
* Set the {@link CacheErrorHandler} instance to use to handle errors
130+
* thrown by the cache provider.
131+
* @param errorHandler the {@link CacheErrorHandler} to use.
132+
* @see CacheAspectSupport#setErrorHandler(CacheErrorHandler)
133+
*/
134+
public void setErrorHandler(CacheErrorHandler errorHandler) {
135+
Assert.notNull(errorHandler, "'errorHandler' must not be null");
136+
this.delegate.setErrorHandler(errorHandler);
137+
}
138+
139+
/**
140+
* Configure an expression in SpEL style to evaluate a cache key at runtime
141+
* against a request message.
142+
* @param keyExpression the expression to use for cache key generation.
143+
*/
144+
public void setKeyExpressionString(String keyExpression) {
145+
Assert.hasText(keyExpression, "'keyExpression' must not be empty");
146+
setKeyExpression(EXPRESSION_PARSER.parseExpression(keyExpression));
147+
}
148+
149+
/**
150+
* Configure a {@link Function} to evaluate a cache key at runtime
151+
* against a request message.
152+
* @param keyFunction the {@link Function} to use for cache key generation.
153+
*/
154+
public void setKeyFunction(Function<Message<?>, ?> keyFunction) {
155+
Assert.notNull(keyFunction, "'keyFunction' must not be null");
156+
setKeyExpression(new FunctionExpression<>(keyFunction));
157+
}
158+
159+
/**
160+
* Configure a SpEL expression to evaluate a cache key at runtime
161+
* against a request message.
162+
* @param keyExpression the expression to use for cache key generation.
163+
*/
164+
public void setKeyExpression(Expression keyExpression) {
165+
Assert.notNull(keyExpression, "'keyExpression' must not be null");
166+
this.keyExpression = keyExpression;
167+
}
168+
169+
170+
@Override
171+
public void afterSingletonsInstantiated() {
172+
this.delegate.afterSingletonsInstantiated();
173+
}
174+
175+
@Override
176+
protected void onInit() {
177+
List<CacheOperation> cacheOperations;
178+
if (!ObjectUtils.isEmpty(this.cacheNames)) {
179+
cacheOperations = this.cacheOperations.stream()
180+
.filter((operation) -> ObjectUtils.isEmpty(operation.getCacheNames()))
181+
.map((operation) -> {
182+
CacheOperation.Builder builder;
183+
if (operation instanceof CacheableOperation) {
184+
CacheableOperation cacheableOperation = (CacheableOperation) operation;
185+
CacheableOperation.Builder cacheableBuilder = new CacheableOperation.Builder();
186+
cacheableBuilder.setSync(cacheableOperation.isSync());
187+
String unless = cacheableOperation.getUnless();
188+
if (unless != null) {
189+
cacheableBuilder.setUnless(unless);
190+
}
191+
builder = cacheableBuilder;
192+
}
193+
else if (operation instanceof CacheEvictOperation) {
194+
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
195+
CacheEvictOperation cacheEvictOperation = (CacheEvictOperation) operation;
196+
cacheEvictBuilder.setBeforeInvocation(cacheEvictOperation.isBeforeInvocation());
197+
cacheEvictBuilder.setCacheWide(cacheEvictOperation.isCacheWide());
198+
builder = cacheEvictBuilder;
199+
}
200+
else {
201+
CachePutOperation cachePutOperation = (CachePutOperation) operation;
202+
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
203+
String unless = cachePutOperation.getUnless();
204+
if (unless != null) {
205+
cachePutBuilder.setUnless(unless);
206+
}
207+
builder = cachePutBuilder;
208+
}
209+
210+
builder.setName(operation.getName());
211+
builder.setCacheManager(operation.getCacheManager());
212+
builder.setCacheNames(this.cacheNames);
213+
builder.setCacheResolver(operation.getCacheResolver());
214+
builder.setCondition(operation.getCondition());
215+
builder.setKey(operation.getKey());
216+
builder.setKeyGenerator(operation.getKeyGenerator());
217+
return builder.build();
218+
})
219+
.collect(Collectors.toList());
220+
}
221+
else {
222+
cacheOperations = this.cacheOperations;
223+
}
224+
225+
this.delegate.setBeanFactory(getBeanFactory());
226+
EvaluationContext evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
227+
this.delegate.setKeyGenerator((target, method, params) ->
228+
this.keyExpression.getValue(evaluationContext, params[0])); // NOSONAR
229+
this.delegate.setCacheOperationSources((method, targetClass) -> cacheOperations);
230+
this.delegate.afterPropertiesSet();
231+
232+
}
233+
234+
@Nullable
235+
@Override
236+
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
237+
CacheOperationInvoker operationInvoker =
238+
() -> {
239+
Object result = callback.execute();
240+
// Drop MessageBuilder optimization in favor of Serializable support in cache implementation.
241+
if (result instanceof AbstractIntegrationMessageBuilder<?>) {
242+
return ((AbstractIntegrationMessageBuilder<?>) result).build();
243+
}
244+
else {
245+
return result;
246+
}
247+
248+
};
249+
250+
return this.delegate.invoke(operationInvoker, target, message);
251+
}
252+
253+
private static class IntegrationCacheAspect extends CacheAspectSupport {
254+
255+
IntegrationCacheAspect() {
256+
}
257+
258+
@Nullable
259+
Object invoke(CacheOperationInvoker invoker, Object target, Message<?> message) {
260+
return super.execute(invoker, target, HANDLE_REQUEST_METHOD, new Object[] { message }); // NOSONAR
261+
}
262+
263+
}
264+
265+
}

spring-integration-core/src/main/java/org/springframework/integration/transformer/AbstractMessageProcessingTransformer.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.springframework.beans.factory.BeanFactoryAware;
2323
import org.springframework.context.Lifecycle;
2424
import org.springframework.integration.handler.MessageProcessor;
25+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
2526
import org.springframework.integration.support.DefaultMessageBuilderFactory;
2627
import org.springframework.integration.support.MessageBuilderFactory;
2728
import org.springframework.integration.support.utils.IntegrationUtils;
@@ -118,10 +119,18 @@ public final Message<?> transform(Message<?> message) {
118119
return (Message<?>) result;
119120
}
120121

122+
AbstractIntegrationMessageBuilder<?> messageBuilder;
123+
124+
if (result instanceof AbstractIntegrationMessageBuilder<?>) {
125+
messageBuilder = (AbstractIntegrationMessageBuilder<?>) result;
126+
}
127+
else {
128+
messageBuilder = getMessageBuilderFactory().withPayload(result);
129+
}
130+
121131
MessageHeaders requestHeaders = message.getHeaders();
122132

123-
return getMessageBuilderFactory()
124-
.withPayload(result)
133+
return messageBuilder
125134
.filterAndCopyHeadersIfAbsent(requestHeaders,
126135
this.selectiveHeaderPropagation ? this.notPropagatedHeaders : null)
127136
.build();

0 commit comments

Comments
 (0)