Skip to content

Commit 3006e58

Browse files
artembilangaryrussell
authored andcommitted
INT-3459: Log exceptions in case of failOver (#2790)
* INT-3459: Log exceptions in case of failOver JIRA: https://jira.spring.io/browse/INT-3459 When `UnicastingDispatcher` is configured with `failOver` (true by default), it loses exceptions it caught with previous handler when the next one processes message properly * Add INFO logging for exceptions which are caught before going to fail over to the next handler * * Address PR comments * * More PR comments * Add a note about this logging into the `channel.adoc`
1 parent efea8eb commit 3006e58

File tree

3 files changed

+115
-66
lines changed

3 files changed

+115
-66
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,10 +19,12 @@
1919
import java.util.ArrayList;
2020
import java.util.Iterator;
2121
import java.util.List;
22+
import java.util.Set;
2223
import java.util.concurrent.Executor;
2324

2425
import org.springframework.integration.MessageDispatchingException;
2526
import org.springframework.integration.support.utils.IntegrationUtils;
27+
import org.springframework.lang.Nullable;
2628
import org.springframework.messaging.Message;
2729
import org.springframework.messaging.MessageHandler;
2830
import org.springframework.messaging.support.MessageHandlingRunnable;
@@ -47,25 +49,26 @@
4749
* @author Gary Russell
4850
* @author Oleg Zhurakousky
4951
* @author Artem Bilan
52+
*
5053
* @since 1.0.2
5154
*/
5255
public class UnicastingDispatcher extends AbstractDispatcher {
5356

54-
private final MessageHandler dispatchHandler = message -> doDispatch(message);
57+
private final MessageHandler dispatchHandler = this::doDispatch;
5558

5659
private final Executor executor;
5760

58-
private volatile boolean failover = true;
61+
private boolean failover = true;
5962

60-
private volatile LoadBalancingStrategy loadBalancingStrategy;
63+
private LoadBalancingStrategy loadBalancingStrategy;
6164

62-
private volatile MessageHandlingTaskDecorator messageHandlingTaskDecorator = task -> task;
65+
private MessageHandlingTaskDecorator messageHandlingTaskDecorator = task -> task;
6366

6467
public UnicastingDispatcher() {
6568
this.executor = null;
6669
}
6770

68-
public UnicastingDispatcher(Executor executor) {
71+
public UnicastingDispatcher(@Nullable Executor executor) {
6972
this.executor = executor;
7073
}
7174

@@ -74,7 +77,6 @@ public UnicastingDispatcher(Executor executor) {
7477
* Specify whether this dispatcher should failover when a single
7578
* {@link MessageHandler} throws an Exception. The default value is
7679
* <code>true</code>.
77-
*
7880
* @param failover The failover boolean.
7981
*/
8082
public void setFailover(boolean failover) {
@@ -83,10 +85,9 @@ public void setFailover(boolean failover) {
8385

8486
/**
8587
* Provide a {@link LoadBalancingStrategy} for this dispatcher.
86-
*
8788
* @param loadBalancingStrategy The load balancing strategy implementation.
8889
*/
89-
public void setLoadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy) {
90+
public void setLoadBalancingStrategy(@Nullable LoadBalancingStrategy loadBalancingStrategy) {
9091
this.loadBalancingStrategy = loadBalancingStrategy;
9192
}
9293

@@ -133,22 +134,30 @@ private boolean doDispatch(Message<?> message) {
133134
return true;
134135
}
135136
boolean success = false;
136-
Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
137+
Iterator<MessageHandler> handlerIterator = getHandlerIterator(message);
137138
if (!handlerIterator.hasNext()) {
138139
throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
139140
}
140-
List<RuntimeException> exceptions = new ArrayList<RuntimeException>();
141+
List<RuntimeException> exceptions = null;
141142
while (!success && handlerIterator.hasNext()) {
142143
MessageHandler handler = handlerIterator.next();
143144
try {
144145
handler.handleMessage(message);
145146
success = true; // we have a winner.
146147
}
147-
catch (Exception e) {
148-
RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
149-
() -> "Dispatcher failed to deliver Message", e);
148+
catch (Exception ex) {
149+
RuntimeException runtimeException =
150+
IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
151+
() -> "Dispatcher failed to deliver Message", ex);
152+
if (exceptions == null) {
153+
exceptions = new ArrayList<>();
154+
}
150155
exceptions.add(runtimeException);
151-
this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
156+
boolean isLast = !handlerIterator.hasNext();
157+
if (!isLast && this.failover) {
158+
logExceptionBeforeFailOver(ex, handler, message);
159+
}
160+
handleExceptions(exceptions, message, isLast);
152161
}
153162
}
154163
return success;
@@ -160,10 +169,22 @@ private boolean doDispatch(Message<?> message) {
160169
* it simply returns the Iterator for the existing handler List.
161170
*/
162171
private Iterator<MessageHandler> getHandlerIterator(Message<?> message) {
172+
Set<MessageHandler> handlers = getHandlers();
163173
if (this.loadBalancingStrategy != null) {
164-
return this.loadBalancingStrategy.getHandlerIterator(message, this.getHandlers());
174+
return this.loadBalancingStrategy.getHandlerIterator(message, handlers);
175+
}
176+
return handlers.iterator();
177+
}
178+
179+
private void logExceptionBeforeFailOver(Exception ex, MessageHandler handler, Message<?> message) {
180+
if (this.logger.isDebugEnabled()) {
181+
this.logger.debug("An exception was thrown by '" + handler + "' while handling '" + message +
182+
"'. Failing over to the next subscriber.", ex);
183+
}
184+
else if (this.logger.isInfoEnabled()) {
185+
this.logger.info("An exception was thrown by '" + handler + "' while handling '" + message + "': " +
186+
ex.getMessage() + ". Failing over to the next subscriber.");
165187
}
166-
return this.getHandlers().iterator();
167188
}
168189

169190
/**
@@ -176,10 +197,10 @@ private Iterator<MessageHandler> getHandlerIterator(Message<?> message) {
176197
*/
177198
private void handleExceptions(List<RuntimeException> allExceptions, Message<?> message, boolean isLast) {
178199
if (isLast || !this.failover) {
179-
if (allExceptions != null && allExceptions.size() == 1) {
200+
if (allExceptions.size() == 1) {
180201
throw allExceptions.get(0);
181202
}
182-
throw new AggregateMessageDeliveryException(message, //NOSONAR - false positive
203+
throw new AggregateMessageDeliveryException(message,
183204
"All attempts to deliver Message to MessageHandlers failed.", allExceptions);
184205
}
185206
}

spring-integration-core/src/test/java/org/springframework/integration/dispatcher/RoundRobinDispatcherTests.java

Lines changed: 72 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,19 @@
1717
package org.springframework.integration.dispatcher;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20-
import static org.assertj.core.api.Assertions.fail;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
21+
import static org.mockito.ArgumentMatchers.eq;
22+
import static org.mockito.ArgumentMatchers.startsWith;
23+
import static org.mockito.BDDMockito.given;
2124
import static org.mockito.Mockito.atLeast;
2225
import static org.mockito.Mockito.doThrow;
26+
import static org.mockito.Mockito.spy;
2327
import static org.mockito.Mockito.times;
2428
import static org.mockito.Mockito.verify;
2529

2630
import java.util.concurrent.atomic.AtomicInteger;
2731

32+
import org.apache.commons.logging.Log;
2833
import org.junit.Before;
2934
import org.junit.Test;
3035
import org.junit.runner.RunWith;
@@ -33,6 +38,7 @@
3338

3439
import org.springframework.beans.DirectFieldAccessor;
3540
import org.springframework.integration.support.MessageBuilder;
41+
import org.springframework.integration.test.util.TestUtils;
3642
import org.springframework.messaging.Message;
3743
import org.springframework.messaging.MessageHandler;
3844
import org.springframework.messaging.MessagingException;
@@ -41,6 +47,7 @@
4147
* @author Iwein Fuld
4248
* @author Mark Fisher
4349
* @author Gary Russell
50+
* @author Artem Bilan
4451
*/
4552
@RunWith(MockitoJUnitRunner.class)
4653
public class RoundRobinDispatcherTests {
@@ -63,44 +70,44 @@ public void setupDispatcher() {
6370

6471

6572
@Test
66-
public void dispatchMessageWithSingleHandler() throws Exception {
67-
dispatcher.addHandler(handler);
68-
dispatcher.dispatch(message);
73+
public void dispatchMessageWithSingleHandler() {
74+
this.dispatcher.addHandler(this.handler);
75+
this.dispatcher.dispatch(this.message);
76+
verify(this.handler).handleMessage(this.message);
6977
}
7078

7179
@Test
72-
public void differentHandlerInvokedOnSecondMessage() throws Exception {
73-
dispatcher.addHandler(handler);
74-
dispatcher.addHandler(differentHandler);
75-
dispatcher.dispatch(message);
76-
dispatcher.dispatch(message);
77-
verify(handler).handleMessage(message);
78-
verify(differentHandler).handleMessage(message);
80+
public void differentHandlerInvokedOnSecondMessage() {
81+
this.dispatcher.addHandler(this.handler);
82+
this.dispatcher.addHandler(this.differentHandler);
83+
this.dispatcher.dispatch(this.message);
84+
this.dispatcher.dispatch(this.message);
85+
verify(this.handler).handleMessage(this.message);
86+
verify(this.differentHandler).handleMessage(this.message);
7987
}
8088

8189
@Test
82-
public void multipleCyclesThroughHandlers() throws Exception {
83-
dispatcher.addHandler(handler);
84-
dispatcher.addHandler(differentHandler);
90+
public void multipleCyclesThroughHandlers() {
91+
this.dispatcher.addHandler(this.handler);
92+
this.dispatcher.addHandler(this.differentHandler);
8593
for (int i = 0; i < 7; i++) {
86-
dispatcher.dispatch(message);
94+
this.dispatcher.dispatch(this.message);
8795
}
88-
verify(handler, times(4)).handleMessage(message);
89-
verify(differentHandler, times(3)).handleMessage(message);
96+
verify(this.handler, times(4)).handleMessage(this.message);
97+
verify(this.differentHandler, times(3)).handleMessage(this.message);
9098
}
9199

92100
@Test
93-
public void currentHandlerIndexOverFlow() throws Exception {
94-
dispatcher.addHandler(handler);
95-
dispatcher.addHandler(differentHandler);
96-
DirectFieldAccessor accessor = new DirectFieldAccessor(
97-
new DirectFieldAccessor(dispatcher).getPropertyValue("loadBalancingStrategy"));
98-
((AtomicInteger) accessor.getPropertyValue("currentHandlerIndex")).set(Integer.MAX_VALUE - 5);
101+
public void currentHandlerIndexOverFlow() {
102+
this.dispatcher.addHandler(this.handler);
103+
this.dispatcher.addHandler(this.differentHandler);
104+
TestUtils.getPropertyValue(this.dispatcher, "loadBalancingStrategy.currentHandlerIndex", AtomicInteger.class)
105+
.set(Integer.MAX_VALUE - 5);
99106
for (int i = 0; i < 40; i++) {
100-
dispatcher.dispatch(message);
107+
this.dispatcher.dispatch(this.message);
101108
}
102-
verify(handler, atLeast(18)).handleMessage(message);
103-
verify(differentHandler, atLeast(18)).handleMessage(message);
109+
verify(this.handler, atLeast(18)).handleMessage(this.message);
110+
verify(this.differentHandler, atLeast(18)).handleMessage(this.message);
104111
}
105112

106113
/**
@@ -109,16 +116,14 @@ public void currentHandlerIndexOverFlow() throws Exception {
109116
*/
110117
@Test
111118
public void testExceptionEnhancement() {
112-
dispatcher.addHandler(handler);
113-
doThrow(new MessagingException("Mock Exception")).
114-
when(handler).handleMessage(message);
115-
try {
116-
dispatcher.dispatch(message);
117-
fail("Expected Exception");
118-
}
119-
catch (MessagingException e) {
120-
assertThat(e.getFailedMessage()).isEqualTo(message);
121-
}
119+
this.dispatcher.addHandler(this.handler);
120+
doThrow(new MessagingException("Mock Exception"))
121+
.when(this.handler)
122+
.handleMessage(this.message);
123+
124+
assertThatExceptionOfType(MessagingException.class)
125+
.isThrownBy(() -> this.dispatcher.dispatch(this.message))
126+
.satisfies(ex -> assertThat(ex.getFailedMessage()).isEqualTo(this.message));
122127
}
123128

124129
/**
@@ -127,16 +132,37 @@ public void testExceptionEnhancement() {
127132
*/
128133
@Test
129134
public void testNoExceptionEnhancement() {
130-
dispatcher.addHandler(handler);
135+
this.dispatcher.addHandler(this.handler);
131136
Message<String> dontReplaceThisMessage = MessageBuilder.withPayload("x").build();
132-
doThrow(new MessagingException(dontReplaceThisMessage, "Mock Exception")).
133-
when(handler).handleMessage(message);
134-
try {
135-
dispatcher.dispatch(message);
136-
fail("Expected Exception");
137-
}
138-
catch (MessagingException e) {
139-
assertThat(e.getFailedMessage()).isEqualTo(dontReplaceThisMessage);
140-
}
137+
doThrow(new MessagingException(dontReplaceThisMessage, "Mock Exception"))
138+
.when(this.handler)
139+
.handleMessage(this.message);
140+
141+
assertThatExceptionOfType(MessagingException.class)
142+
.isThrownBy(() -> this.dispatcher.dispatch(this.message))
143+
.satisfies(ex -> assertThat(ex.getFailedMessage()).isEqualTo(dontReplaceThisMessage));
141144
}
145+
146+
@Test
147+
public void testFailOverAndLogging() {
148+
RuntimeException testException = new RuntimeException("intentional");
149+
doThrow(testException)
150+
.when(this.handler)
151+
.handleMessage(this.message);
152+
this.dispatcher.addHandler(this.handler);
153+
this.dispatcher.addHandler(this.differentHandler);
154+
155+
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(this.dispatcher);
156+
Log log = (Log) spy(directFieldAccessor.getPropertyType("logger"));
157+
given(log.isDebugEnabled()).willReturn(true);
158+
directFieldAccessor.setPropertyValue("logger", log);
159+
160+
this.dispatcher.dispatch(this.message);
161+
162+
verify(this.handler).handleMessage(this.message);
163+
verify(this.differentHandler).handleMessage(this.message);
164+
165+
verify(log).debug(startsWith("An exception was thrown by '"), eq(testException));
166+
}
167+
142168
}

src/reference/asciidoc/channel.adoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ However, since version 3.0, you can provide your own implementation of the `Load
175175
Note that the `load-balancer` and `load-balancer-ref` attributes are mutually exclusive.
176176

177177
The load-balancing also works in conjunction with a boolean `failover` property.
178-
If the "`failover`" value is true (the default), the dispatcher falls back to any subsequent handlers (as necessary) when preceding handlers throw exceptions.
178+
If the `failover` value is true (the default), the dispatcher falls back to any subsequent handlers (as necessary) when preceding handlers throw exceptions.
179179
The order is determined by an optional order value defined on the handlers themselves or, if no such value exists, the order in which the handlers subscribed.
180180

181181
If a certain situation requires that the dispatcher always try to invoke the first handler and then fall back in the same fixed order sequence every time an error occurs, no load-balancing strategy should be provided.
@@ -187,6 +187,8 @@ When using the namespace support, the `order` attribute on any endpoint determin
187187
NOTE: Keep in mind that load-balancing and `failover` apply only when a channel has more than one subscribed message handler.
188188
When using the namespace support, this means that more than one endpoint shares the same channel reference defined in the `input-channel` attribute.
189189

190+
Starting with version 5.2, when `failover` is true, a failure of the current handler together with the failed message is logged under `debug` or `info` if configured respectively.
191+
190192
[[executor-channel]]
191193
===== `ExecutorChannel`
192194

0 commit comments

Comments
 (0)