Skip to content

Commit f03356c

Browse files
artembilangaryrussell
authored andcommitted
INT-4265: Add Java DSL .routeByError() EIP-Method
JIRA: https://jira.spring.io/browse/INT-4265 * Add `.routeByError()` EIP-method to the `IntegrationFlowDefinition` based on the `ErrorMessageExceptionTypeRouter` * Add missed `dynamicChannelLimit()` option to the `RouterSpec` * Rework `RouterSpec#RouterMappingProvider` into the `ContextRefreshedEvent` phase initialization to be sure that dependent `MappingMessageRouterManagement` is fully initialized before applying mapping conversions * Rename `routeByError()` to `routeByException()` to more reflect reality of the `ErrorMessageExceptionTypeRouter` and don't mislead about Java's `Error` * Polish `ErrorMessageExceptionTypeRouter` JavaDocs a bit and some Java 8 code style * Add `ApplicationContext` assertion into the `RouterMappingProvider.onApplicationEvent()` do not trigger delegate initialization if `ContextRefreshedEvent` is from the different app context
1 parent ca23176 commit f03356c

File tree

4 files changed

+152
-41
lines changed

4 files changed

+152
-41
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.springframework.integration.handler.MethodInvokingMessageProcessor;
7070
import org.springframework.integration.handler.ServiceActivatingHandler;
7171
import org.springframework.integration.router.AbstractMessageRouter;
72+
import org.springframework.integration.router.ErrorMessageExceptionTypeRouter;
7273
import org.springframework.integration.router.ExpressionEvaluatingRouter;
7374
import org.springframework.integration.router.MethodInvokingRouter;
7475
import org.springframework.integration.router.RecipientListRouter;
@@ -2009,12 +2010,12 @@ private <R extends AbstractMessageRouter, S extends AbstractRouterSpec<S, R>> B
20092010
}
20102011

20112012
/**
2012-
* Populate the {@link RecipientListRouter} options from {@link RecipientListRouterSpec}.
2013+
* Populate the {@link RecipientListRouter} with options from the {@link RecipientListRouterSpec}.
20132014
* Typically used with a Java 8 Lambda expression:
20142015
* <pre class="code">
20152016
* {@code
20162017
* .routeToRecipients(r -> r
2017-
*.recipient("bar-channel", m ->
2018+
* .recipient("bar-channel", m ->
20182019
* m.getHeaders().containsKey("recipient") && (boolean) m.getHeaders().get("recipient"))
20192020
* .recipientFlow("'foo' == payload or 'bar' == payload or 'baz' == payload",
20202021
* f -> f.transform(String.class, p -> p.toUpperCase())
@@ -2028,6 +2029,27 @@ public B routeToRecipients(Consumer<RecipientListRouterSpec> routerConfigurer) {
20282029
return route(new RecipientListRouterSpec(), routerConfigurer);
20292030
}
20302031

2032+
/**
2033+
* Populate the {@link ErrorMessageExceptionTypeRouter} with options from the {@link RouterSpec}.
2034+
* Typically used with a Java 8 Lambda expression:
2035+
* <pre class="code">
2036+
* {@code
2037+
* .routeByException(r -> r
2038+
* .channelMapping(IllegalArgumentException.class, "illegalArgumentChannel")
2039+
* .subFlowMapping(MessageHandlingException.class, sf ->
2040+
* sf.handle(...))
2041+
* )
2042+
* }
2043+
* </pre>
2044+
* @param routerConfigurer the {@link Consumer} to provide {@link ErrorMessageExceptionTypeRouter} options.
2045+
* @return the current {@link IntegrationFlowDefinition}.
2046+
* @see ErrorMessageExceptionTypeRouter
2047+
*/
2048+
public B routeByException(
2049+
Consumer<RouterSpec<Class<? extends Throwable>, ErrorMessageExceptionTypeRouter>> routerConfigurer) {
2050+
return route(new RouterSpec<>(new ErrorMessageExceptionTypeRouter()), routerConfigurer);
2051+
}
2052+
20312053
/**
20322054
* Populate the provided {@link AbstractMessageRouter} implementation to the
20332055
* current integration flow position.
@@ -2561,6 +2583,7 @@ public <T> Publisher<Message<T>> toReactivePublisher() {
25612583
return new PublisherIntegrationFlow<>(this.integrationComponents, publisher);
25622584
}
25632585

2586+
@SuppressWarnings("unchecked")
25642587
private <S extends ConsumerEndpointSpec<S, ? extends MessageHandler>> B register(S endpointSpec,
25652588
Consumer<S> endpointConfigurer) {
25662589
if (endpointConfigurer != null) {

spring-integration-core/src/main/java/org/springframework/integration/dsl/RouterSpec.java

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import java.util.Collection;
2020
import java.util.HashMap;
2121
import java.util.Map;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223

24+
import org.springframework.context.ApplicationListener;
25+
import org.springframework.context.event.ContextRefreshedEvent;
2326
import org.springframework.core.convert.ConversionService;
2427
import org.springframework.core.convert.support.DefaultConversionService;
2528
import org.springframework.integration.channel.DirectChannel;
@@ -68,6 +71,20 @@ public RouterSpec<K, R> resolutionRequired(boolean resolutionRequired) {
6871
return _this();
6972
}
7073

74+
/**
75+
* Set a limit for how many dynamic channels are retained (for reporting purposes).
76+
* When the limit is exceeded, the oldest channel is discarded.
77+
* <p><b>NOTE: this does not affect routing, just the reporting which dynamically
78+
* resolved channels have been routed to.</b> Default {@code 100}.
79+
* @param dynamicChannelLimit the limit.
80+
* @return the router spec.
81+
* @see AbstractMappingMessageRouter#setDynamicChannelLimit(int)
82+
*/
83+
public RouterSpec<K, R> dynamicChannelLimit(int dynamicChannelLimit) {
84+
this.handler.setDynamicChannelLimit(dynamicChannelLimit);
85+
return _this();
86+
}
87+
7188
/**
7289
* Cannot be invoked if {@link #subFlowMapping(Object, IntegrationFlow)} is used.
7390
* @param prefix the prefix.
@@ -163,7 +180,10 @@ public Collection<Object> getComponentsToRegister() {
163180
return super.getComponentsToRegister();
164181
}
165182

166-
private static class RouterMappingProvider extends IntegrationObjectSupport {
183+
private static class RouterMappingProvider extends IntegrationObjectSupport
184+
implements ApplicationListener<ContextRefreshedEvent> {
185+
186+
private final AtomicBoolean initialized = new AtomicBoolean();
167187

168188
private final MappingMessageRouterManagement router;
169189

@@ -178,28 +198,31 @@ void addMapping(Object key, NamedComponent channel) {
178198
}
179199

180200
@Override
181-
protected void onInit() throws Exception {
182-
ConversionService conversionService = getConversionService();
183-
if (conversionService == null) {
184-
conversionService = DefaultConversionService.getSharedInstance();
185-
}
186-
for (Map.Entry<Object, NamedComponent> entry : this.mapping.entrySet()) {
187-
Object key = entry.getKey();
188-
String channelKey;
189-
if (key instanceof String) {
190-
channelKey = (String) key;
191-
}
192-
else if (key instanceof Class) {
193-
channelKey = ((Class<?>) key).getName();
201+
public void onApplicationEvent(ContextRefreshedEvent event) {
202+
if (event.getApplicationContext() == getApplicationContext() && !this.initialized.getAndSet(true)) {
203+
ConversionService conversionService = getConversionService();
204+
if (conversionService == null) {
205+
conversionService = DefaultConversionService.getSharedInstance();
194206
}
195-
else if (conversionService.canConvert(key.getClass(), String.class)) {
196-
channelKey = conversionService.convert(key, String.class);
207+
for (Map.Entry<Object, NamedComponent> entry : this.mapping.entrySet()) {
208+
Object key = entry.getKey();
209+
String channelKey;
210+
if (key instanceof String) {
211+
channelKey = (String) key;
212+
}
213+
else if (key instanceof Class) {
214+
channelKey = ((Class<?>) key).getName();
215+
}
216+
else if (conversionService.canConvert(key.getClass(), String.class)) {
217+
channelKey = conversionService.convert(key, String.class);
218+
}
219+
else {
220+
throw new MessagingException("Unsupported channel mapping type for router ["
221+
+ key.getClass() + "]");
222+
}
223+
224+
this.router.setChannelMapping(channelKey, entry.getValue().getComponentName());
197225
}
198-
else {
199-
throw new MessagingException("unsupported channel mapping type for router [" + key.getClass() + "]");
200-
}
201-
202-
this.router.setChannelMapping(channelKey, entry.getValue().getComponentName());
203226
}
204227
}
205228

spring-integration-core/src/main/java/org/springframework/integration/router/ErrorMessageExceptionTypeRouter.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,8 +31,9 @@
3131

3232
/**
3333
* A Message Router that resolves the target {@link MessageChannel} for
34-
* messages whose payload is an Exception. The channel resolution is based upon
35-
* the most specific cause of the error for which a channel-mapping exists.
34+
* messages whose payload is a {@link Throwable}.
35+
* The channel resolution is based upon the most specific cause
36+
* of the error for which a channel-mapping exists.
3637
* <p>
3738
* The channel-mapping can be specified for the super classes to avoid mapping duplication
3839
* for the particular exception implementation.
@@ -43,7 +44,7 @@
4344
*/
4445
public class ErrorMessageExceptionTypeRouter extends AbstractMappingMessageRouter {
4546

46-
private volatile Map<String, Class<?>> classNameMappings = new ConcurrentHashMap<String, Class<?>>();
47+
private volatile Map<String, Class<?>> classNameMappings = new ConcurrentHashMap<>();
4748

4849
private volatile boolean initialized;
4950

@@ -57,7 +58,7 @@ public void setChannelMappings(Map<String, String> channelMappings) {
5758
}
5859

5960
private void populateClassNameMapping(Set<String> classNames) {
60-
Map<String, Class<?>> newClassNameMappings = new ConcurrentHashMap<String, Class<?>>();
61+
Map<String, Class<?>> newClassNameMappings = new ConcurrentHashMap<>();
6162
for (String className : classNames) {
6263
newClassNameMappings.put(className, resolveClassFromName(className));
6364
}
@@ -77,7 +78,7 @@ private Class<?> resolveClassFromName(String className) {
7778
@ManagedOperation
7879
public void setChannelMapping(String key, String channelName) {
7980
super.setChannelMapping(key, channelName);
80-
Map<String, Class<?>> newClassNameMappings = new ConcurrentHashMap<String, Class<?>>(this.classNameMappings);
81+
Map<String, Class<?>> newClassNameMappings = new ConcurrentHashMap<>(this.classNameMappings);
8182
newClassNameMappings.put(key, resolveClassFromName(key));
8283
this.classNameMappings = newClassNameMappings;
8384
}
@@ -86,7 +87,7 @@ public void setChannelMapping(String key, String channelName) {
8687
@ManagedOperation
8788
public void removeChannelMapping(String key) {
8889
super.removeChannelMapping(key);
89-
Map<String, Class<?>> newClassNameMappings = new ConcurrentHashMap<String, Class<?>>(this.classNameMappings);
90+
Map<String, Class<?>> newClassNameMappings = new ConcurrentHashMap<>(this.classNameMappings);
9091
newClassNameMappings.remove(key);
9192
this.classNameMappings = newClassNameMappings;
9293
}
@@ -122,7 +123,7 @@ protected List<Object> getChannelKeys(Message<?> message) {
122123
cause = cause.getCause();
123124
}
124125
}
125-
return Collections.<Object>singletonList(mostSpecificCause);
126+
return Collections.singletonList(mostSpecificCause);
126127
}
127128

128129
}

spring-integration-core/src/test/java/org/springframework/integration/dsl/routers/RouterTests.java

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 the original author or authors.
2+
* Copyright 2016-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,23 +50,23 @@
5050
import org.springframework.messaging.Message;
5151
import org.springframework.messaging.MessageChannel;
5252
import org.springframework.messaging.MessageDeliveryException;
53+
import org.springframework.messaging.MessageHandlingException;
5354
import org.springframework.messaging.MessagingException;
5455
import org.springframework.messaging.PollableChannel;
5556
import org.springframework.messaging.core.DestinationResolutionException;
5657
import org.springframework.messaging.handler.annotation.Header;
58+
import org.springframework.messaging.support.ErrorMessage;
5759
import org.springframework.messaging.support.GenericMessage;
5860
import org.springframework.test.annotation.DirtiesContext;
59-
import org.springframework.test.context.ContextConfiguration;
60-
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
61+
import org.springframework.test.context.junit4.SpringRunner;
6162

6263
/**
6364
* @author Artem Bilan
6465
* @author Gary Russell
6566
*
6667
* @since 5.0
6768
*/
68-
@ContextConfiguration
69-
@RunWith(SpringJUnit4ClassRunner.class)
69+
@RunWith(SpringRunner.class)
7070
@DirtiesContext
7171
public class RouterTests {
7272

@@ -90,7 +90,7 @@ public class RouterTests {
9090
public void testRouter() {
9191
this.beanFactory.containsBean("routeFlow.subFlow#0.channel#0");
9292

93-
int[] payloads = new int[] {1, 2, 3, 4, 5, 6};
93+
int[] payloads = new int[] { 1, 2, 3, 4, 5, 6 };
9494

9595
for (int payload : payloads) {
9696
this.routerInput.send(new GenericMessage<>(payload));
@@ -125,7 +125,7 @@ public void testRouterWithTwoSubflows() {
125125
@SuppressWarnings("unchecked")
126126
List<Integer> results = (List<Integer>) payload;
127127

128-
assertArrayEquals(new Integer[] {3, 4, 9, 8, 15, 12}, results.toArray(new Integer[results.size()]));
128+
assertArrayEquals(new Integer[] { 3, 4, 9, 8, 15, 12 }, results.toArray(new Integer[results.size()]));
129129
}
130130

131131
@Autowired
@@ -498,6 +498,39 @@ public void testScatterGather() {
498498
assertThat(((List<?>) payload).size(), greaterThanOrEqualTo(1));
499499
}
500500

501+
502+
@Autowired
503+
@Qualifier("exceptionTypeRouteFlow.input")
504+
private MessageChannel exceptionTypeRouteFlowInput;
505+
506+
@Autowired
507+
private PollableChannel illegalArgumentChannel;
508+
509+
@Autowired
510+
private PollableChannel runtimeExceptionChannel;
511+
512+
@Autowired
513+
private PollableChannel messageHandlingExceptionChannel;
514+
515+
@Autowired
516+
private PollableChannel exceptionRouterDefaultChannel;
517+
518+
@Test
519+
public void testExceptionTypeRouteFlow() {
520+
Message<?> failedMessage = new GenericMessage<>("foo");
521+
IllegalArgumentException rootCause = new IllegalArgumentException("bad argument");
522+
RuntimeException middleCause = new RuntimeException(rootCause);
523+
MessageHandlingException error = new MessageHandlingException(failedMessage, "failed", middleCause);
524+
ErrorMessage message = new ErrorMessage(error);
525+
526+
this.exceptionTypeRouteFlowInput.send(message);
527+
528+
assertNotNull(this.illegalArgumentChannel.receive(1000));
529+
assertNull(this.exceptionRouterDefaultChannel.receive(0));
530+
assertNull(this.runtimeExceptionChannel.receive(0));
531+
assertNull(this.messageHandlingExceptionChannel.receive(0));
532+
}
533+
501534
@Configuration
502535
@EnableIntegration
503536
@EnableMessageHistory({ "recipientListOrder*", "recipient1*", "recipient2*" })
@@ -525,8 +558,8 @@ public IntegrationFlow routeSubflowToReplyChannelFlow() {
525558
return f -> f
526559
.<Boolean>route("true", m -> m
527560
.subFlowMapping(true, sf -> sf
528-
.<String>handle((p, h) -> p.toUpperCase())
529-
)
561+
.<String>handle((p, h) -> p.toUpperCase())
562+
)
530563
);
531564
}
532565

@@ -616,7 +649,7 @@ public IntegrationFlow routeMethodInvocationFlow3() {
616649
@Bean
617650
public IntegrationFlow routeMultiMethodInvocationFlow() {
618651
return IntegrationFlows.from("routerMultiInput")
619-
.route(String.class, p -> p.equals("foo") || p.equals("bar") ? new String[] {"foo", "bar"} : null,
652+
.route(String.class, p -> p.equals("foo") || p.equals("bar") ? new String[] { "foo", "bar" } : null,
620653
s -> s.suffix("-channel"))
621654
.get();
622655
}
@@ -639,6 +672,37 @@ public IntegrationFlow payloadTypeRouteFlow() {
639672
.channelMapping(Integer.class, "integersChannel"));
640673
}
641674

675+
@Bean
676+
public IntegrationFlow exceptionTypeRouteFlow() {
677+
return f -> f
678+
.routeByException(r -> r
679+
.channelMapping(IllegalArgumentException.class, "illegalArgumentChannel")
680+
.channelMapping(RuntimeException.class, "runtimeExceptionChannel")
681+
.subFlowMapping(MessageHandlingException.class, sf ->
682+
sf.channel("messageHandlingExceptionChannel"))
683+
.defaultOutputChannel("exceptionRouterDefaultChannel"));
684+
}
685+
686+
@Bean
687+
public PollableChannel exceptionRouterDefaultChannel() {
688+
return new QueueChannel();
689+
}
690+
691+
@Bean
692+
public PollableChannel illegalArgumentChannel() {
693+
return new QueueChannel();
694+
}
695+
696+
@Bean
697+
public PollableChannel runtimeExceptionChannel() {
698+
return new QueueChannel();
699+
}
700+
701+
@Bean
702+
public PollableChannel messageHandlingExceptionChannel() {
703+
return new QueueChannel();
704+
}
705+
642706
@Bean
643707
public IntegrationFlow routerAsNonLastFlow() {
644708
return f -> f

0 commit comments

Comments
 (0)