Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,10 +19,12 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;

import org.springframework.integration.MessageDispatchingException;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.MessageHandlingRunnable;
Expand All @@ -47,25 +49,26 @@
* @author Gary Russell
* @author Oleg Zhurakousky
* @author Artem Bilan
*
* @since 1.0.2
*/
public class UnicastingDispatcher extends AbstractDispatcher {

private final MessageHandler dispatchHandler = message -> doDispatch(message);
private final MessageHandler dispatchHandler = this::doDispatch;

private final Executor executor;

private volatile boolean failover = true;
private boolean failover = true;

private volatile LoadBalancingStrategy loadBalancingStrategy;
private LoadBalancingStrategy loadBalancingStrategy;

private volatile MessageHandlingTaskDecorator messageHandlingTaskDecorator = task -> task;
private MessageHandlingTaskDecorator messageHandlingTaskDecorator = task -> task;

public UnicastingDispatcher() {
this.executor = null;
}

public UnicastingDispatcher(Executor executor) {
public UnicastingDispatcher(@Nullable Executor executor) {
this.executor = executor;
}

Expand All @@ -74,7 +77,6 @@ public UnicastingDispatcher(Executor executor) {
* Specify whether this dispatcher should failover when a single
* {@link MessageHandler} throws an Exception. The default value is
* <code>true</code>.
*
* @param failover The failover boolean.
*/
public void setFailover(boolean failover) {
Expand All @@ -83,10 +85,9 @@ public void setFailover(boolean failover) {

/**
* Provide a {@link LoadBalancingStrategy} for this dispatcher.
*
* @param loadBalancingStrategy The load balancing strategy implementation.
*/
public void setLoadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy) {
public void setLoadBalancingStrategy(@Nullable LoadBalancingStrategy loadBalancingStrategy) {
this.loadBalancingStrategy = loadBalancingStrategy;
}

Expand Down Expand Up @@ -133,22 +134,30 @@ private boolean doDispatch(Message<?> message) {
return true;
}
boolean success = false;
Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
Iterator<MessageHandler> handlerIterator = getHandlerIterator(message);
if (!handlerIterator.hasNext()) {
throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
}
List<RuntimeException> exceptions = new ArrayList<RuntimeException>();
List<RuntimeException> exceptions = null;
while (!success && handlerIterator.hasNext()) {
MessageHandler handler = handlerIterator.next();
try {
handler.handleMessage(message);
success = true; // we have a winner.
}
catch (Exception e) {
RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
() -> "Dispatcher failed to deliver Message", e);
catch (Exception ex) {
RuntimeException runtimeException =
IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
() -> "Dispatcher failed to deliver Message", ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this line wrap? It was previously 112.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, my own preferences: I find it is much readable when variable initialization is multi-line. So, I start initialization block fully from a new line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

if (exceptions == null) {
exceptions = new ArrayList<>();
}
exceptions.add(runtimeException);
this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
boolean isLast = !handlerIterator.hasNext();
if (!isLast && this.failover) {
logExceptionBeforeFailOver(ex, handler, message);
}
handleExceptions(exceptions, message, isLast);
}
}
return success;
Expand All @@ -160,10 +169,22 @@ private boolean doDispatch(Message<?> message) {
* it simply returns the Iterator for the existing handler List.
*/
private Iterator<MessageHandler> getHandlerIterator(Message<?> message) {
Set<MessageHandler> handlers = getHandlers();
if (this.loadBalancingStrategy != null) {
return this.loadBalancingStrategy.getHandlerIterator(message, this.getHandlers());
return this.loadBalancingStrategy.getHandlerIterator(message, handlers);
}
return handlers.iterator();
}

private void logExceptionBeforeFailOver(Exception ex, MessageHandler handler, Message<?> message) {
if (this.logger.isInfoEnabled()) {
this.logger.info("An exception was thrown by '" + handler + "' while handling '" + message + "': " +
ex.getMessage() + ". Failing over to the next subscriber.");
}
else if (this.logger.isDebugEnabled()) {
this.logger.debug("An exception was thrown by '" + handler + "' while handling '" + message +
"'. Failing over to the next subscriber.", ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug has to be detected first otherwise we'll always take the INFO branch.

}
return this.getHandlers().iterator();
}

/**
Expand All @@ -176,10 +197,10 @@ private Iterator<MessageHandler> getHandlerIterator(Message<?> message) {
*/
private void handleExceptions(List<RuntimeException> allExceptions, Message<?> message, boolean isLast) {
if (isLast || !this.failover) {
if (allExceptions != null && allExceptions.size() == 1) {
if (allExceptions.size() == 1) {
throw allExceptions.get(0);
}
throw new AggregateMessageDeliveryException(message, //NOSONAR - false positive
throw new AggregateMessageDeliveryException(message,
"All attempts to deliver Message to MessageHandlers failed.", allExceptions);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@
package org.springframework.integration.dispatcher;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.logging.Log;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -33,6 +38,7 @@

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
Expand All @@ -41,6 +47,7 @@
* @author Iwein Fuld
* @author Mark Fisher
* @author Gary Russell
* @author Artem Bilan
*/
@RunWith(MockitoJUnitRunner.class)
public class RoundRobinDispatcherTests {
Expand All @@ -63,44 +70,44 @@ public void setupDispatcher() {


@Test
public void dispatchMessageWithSingleHandler() throws Exception {
dispatcher.addHandler(handler);
dispatcher.dispatch(message);
public void dispatchMessageWithSingleHandler() {
this.dispatcher.addHandler(this.handler);
this.dispatcher.dispatch(this.message);
verify(this.handler).handleMessage(this.message);
}

@Test
public void differentHandlerInvokedOnSecondMessage() throws Exception {
dispatcher.addHandler(handler);
dispatcher.addHandler(differentHandler);
dispatcher.dispatch(message);
dispatcher.dispatch(message);
verify(handler).handleMessage(message);
verify(differentHandler).handleMessage(message);
public void differentHandlerInvokedOnSecondMessage() {
this.dispatcher.addHandler(this.handler);
this.dispatcher.addHandler(this.differentHandler);
this.dispatcher.dispatch(this.message);
this.dispatcher.dispatch(this.message);
verify(this.handler).handleMessage(this.message);
verify(this.differentHandler).handleMessage(this.message);
}

@Test
public void multipleCyclesThroughHandlers() throws Exception {
dispatcher.addHandler(handler);
dispatcher.addHandler(differentHandler);
public void multipleCyclesThroughHandlers() {
this.dispatcher.addHandler(this.handler);
this.dispatcher.addHandler(this.differentHandler);
for (int i = 0; i < 7; i++) {
dispatcher.dispatch(message);
this.dispatcher.dispatch(this.message);
}
verify(handler, times(4)).handleMessage(message);
verify(differentHandler, times(3)).handleMessage(message);
verify(this.handler, times(4)).handleMessage(this.message);
verify(this.differentHandler, times(3)).handleMessage(this.message);
}

@Test
public void currentHandlerIndexOverFlow() throws Exception {
dispatcher.addHandler(handler);
dispatcher.addHandler(differentHandler);
DirectFieldAccessor accessor = new DirectFieldAccessor(
new DirectFieldAccessor(dispatcher).getPropertyValue("loadBalancingStrategy"));
((AtomicInteger) accessor.getPropertyValue("currentHandlerIndex")).set(Integer.MAX_VALUE - 5);
public void currentHandlerIndexOverFlow() {
this.dispatcher.addHandler(this.handler);
this.dispatcher.addHandler(this.differentHandler);
TestUtils.getPropertyValue(this.dispatcher, "loadBalancingStrategy.currentHandlerIndex", AtomicInteger.class)
.set(Integer.MAX_VALUE - 5);
for (int i = 0; i < 40; i++) {
dispatcher.dispatch(message);
this.dispatcher.dispatch(this.message);
}
verify(handler, atLeast(18)).handleMessage(message);
verify(differentHandler, atLeast(18)).handleMessage(message);
verify(this.handler, atLeast(18)).handleMessage(this.message);
verify(this.differentHandler, atLeast(18)).handleMessage(this.message);
}

/**
Expand All @@ -109,16 +116,14 @@ public void currentHandlerIndexOverFlow() throws Exception {
*/
@Test
public void testExceptionEnhancement() {
dispatcher.addHandler(handler);
doThrow(new MessagingException("Mock Exception")).
when(handler).handleMessage(message);
try {
dispatcher.dispatch(message);
fail("Expected Exception");
}
catch (MessagingException e) {
assertThat(e.getFailedMessage()).isEqualTo(message);
}
this.dispatcher.addHandler(this.handler);
doThrow(new MessagingException("Mock Exception"))
.when(this.handler)
.handleMessage(this.message);

assertThatExceptionOfType(MessagingException.class)
.isThrownBy(() -> this.dispatcher.dispatch(this.message))
.satisfies(ex -> assertThat(ex.getFailedMessage()).isEqualTo(this.message));
}

/**
Expand All @@ -127,16 +132,37 @@ public void testExceptionEnhancement() {
*/
@Test
public void testNoExceptionEnhancement() {
dispatcher.addHandler(handler);
this.dispatcher.addHandler(this.handler);
Message<String> dontReplaceThisMessage = MessageBuilder.withPayload("x").build();
doThrow(new MessagingException(dontReplaceThisMessage, "Mock Exception")).
when(handler).handleMessage(message);
try {
dispatcher.dispatch(message);
fail("Expected Exception");
}
catch (MessagingException e) {
assertThat(e.getFailedMessage()).isEqualTo(dontReplaceThisMessage);
}
doThrow(new MessagingException(dontReplaceThisMessage, "Mock Exception"))
.when(this.handler)
.handleMessage(this.message);

assertThatExceptionOfType(MessagingException.class)
.isThrownBy(() -> this.dispatcher.dispatch(this.message))
.satisfies(ex -> assertThat(ex.getFailedMessage()).isEqualTo(dontReplaceThisMessage));
}

@Test
public void testFailOverAndLogging() {
RuntimeException testException = new RuntimeException("intentional");
doThrow(testException)
.when(this.handler)
.handleMessage(this.message);
this.dispatcher.addHandler(this.handler);
this.dispatcher.addHandler(this.differentHandler);

DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(this.dispatcher);
Log log = (Log) spy(directFieldAccessor.getPropertyType("logger"));
given(log.isDebugEnabled()).willReturn(true);
directFieldAccessor.setPropertyValue("logger", log);

this.dispatcher.dispatch(this.message);

verify(this.handler).handleMessage(this.message);
verify(this.differentHandler).handleMessage(this.message);

verify(log).debug(startsWith("An exception was thrown by '"), eq(testException));
}

}