Skip to content

INT-4520 Make IntegrationGraphServer customizable #2608

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
Expand All @@ -42,9 +46,9 @@
import org.springframework.integration.router.RecipientListRouterManagement;
import org.springframework.integration.support.context.NamedComponent;
import org.springframework.integration.support.management.MappingMessageRouterManagement;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.StringUtils;

/**
* Builds the runtime object model graph.
Expand All @@ -67,11 +71,17 @@ public class IntegrationGraphServer implements ApplicationContextAware, Applicat

private String applicationName;

private Function<NamedComponent, Map<String, Object>> additionalPropertiesCallback;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext; //NOSONAR (sync)
}

protected ApplicationContext getApplicationContext() {
return this.applicationContext;
}

/**
* Set the application name that will appear in the 'contentDescriptor' under
* the 'name' key. If not provided, the property 'spring.application.name' from
Expand All @@ -82,6 +92,24 @@ public void setApplicationName(String applicationName) {
this.applicationName = applicationName; //NOSONAR (sync)
}

/**
* Specify a callback {@link Function} to be called against each {@link NamedComponent}
* to populate additional properties to the target {@link IntegrationNode}.
* @param additionalPropertiesCallback the {@link Function} to use for properties.
* @since 5.1
*/
public void setAdditionalPropertiesCallback(
@Nullable Function<NamedComponent, Map<String, Object>> additionalPropertiesCallback) {
this.additionalPropertiesCallback = additionalPropertiesCallback;
}

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().equals(this.applicationContext)) {
buildGraph();
}
}

/**
* Return the cached graph. Although the graph is cached, the data therein (stats
* etc.) are dynamic.
Expand All @@ -99,11 +127,27 @@ public Graph getGraph() {
return this.graph;
}

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().equals(this.applicationContext)) {
buildGraph();
}
/**
* Rebuild the graph, re-cache it, and return it. Use this method if the application
* components have changed (added or removed).
* @return the graph.
* @see #getGraph()
*/
public Graph rebuild() {
return buildGraph();
}

/**
* Get beans for the provided type from the application context.
* This method can be extended for some custom logic, e.g. get beans
* from the parent application context as well.
* @param type the type for beans to obtain
* @param <T> the type for beans to obtain
* @return a {@link Map} of bean for the provided type
* @since 5.1
*/
protected <T> Map<String, T> getBeansOfType(Class<T> type) {
return this.applicationContext.getBeansOfType(type, true, false);
}

private synchronized Graph buildGraph() {
Expand Down Expand Up @@ -135,95 +179,129 @@ private synchronized Graph buildGraph() {
}

private Map<String, MessageChannelNode> channels(Collection<IntegrationNode> nodes) {
Map<String, MessageChannel> channels = this.applicationContext
.getBeansOfType(MessageChannel.class, true, false);
Map<String, MessageChannelNode> channelNodes = new HashMap<>();
for (Entry<String, MessageChannel> entry : channels.entrySet()) {
MessageChannel channel = entry.getValue();
MessageChannelNode channelNode = this.nodeFactory.channelNode(entry.getKey(), channel);
String beanName = entry.getKey();
nodes.add(channelNode);
channelNodes.put(beanName, channelNode);
}
return channelNodes;
return getBeansOfType(MessageChannel.class)
.entrySet()
.stream()
.map(e -> {
MessageChannel messageChannel = e.getValue();
MessageChannelNode messageChannelNode = this.nodeFactory.channelNode(e.getKey(), messageChannel);
if (messageChannel instanceof NamedComponent) {
messageChannelNode.addProperties(getAdditionalPropertiesIfAny((NamedComponent) messageChannel));
}
return messageChannelNode;
})
.peek(nodes::add)
.collect(Collectors.toMap(MessageChannelNode::getName, Function.identity()));
}

private void pollingAdapters(Collection<IntegrationNode> nodes, Collection<LinkNode> links,
Map<String, MessageChannelNode> channelNodes) {

Map<String, SourcePollingChannelAdapter> spcas = this.applicationContext
.getBeansOfType(SourcePollingChannelAdapter.class, true, false);
for (Entry<String, SourcePollingChannelAdapter> entry : spcas.entrySet()) {
SourcePollingChannelAdapter adapter = entry.getValue();
MessageSourceNode sourceNode = this.nodeFactory.sourceNode(entry.getKey(), adapter);
nodes.add(sourceNode);
producerLink(links, channelNodes, sourceNode);
}
getBeansOfType(SourcePollingChannelAdapter.class)
.entrySet()
.stream()
.map(e -> {
SourcePollingChannelAdapter sourceAdapter = e.getValue();
MessageSourceNode sourceNode = this.nodeFactory.sourceNode(e.getKey(), sourceAdapter);
sourceNode.addProperties(getAdditionalPropertiesIfAny(sourceAdapter));
return sourceNode;
})
.peek(nodes::add)
.forEach(sourceNode -> producerLink(links, channelNodes, sourceNode));
}

private void gateways(Collection<IntegrationNode> nodes, Collection<LinkNode> links,
Map<String, MessageChannelNode> channelNodes) {

Map<String, MessagingGatewaySupport> gateways = this.applicationContext
.getBeansOfType(MessagingGatewaySupport.class, true, false);
for (Entry<String, MessagingGatewaySupport> entry : gateways.entrySet()) {
MessagingGatewaySupport gateway = entry.getValue();
MessageGatewayNode gatewayNode = this.nodeFactory.gatewayNode(entry.getKey(), gateway);
nodes.add(gatewayNode);
producerLink(links, channelNodes, gatewayNode);
}
Map<String, GatewayProxyFactoryBean> gpfbs = this.applicationContext
.getBeansOfType(GatewayProxyFactoryBean.class, true, false);
getBeansOfType(MessagingGatewaySupport.class)
.entrySet()
.stream()
.map(e -> {
MessagingGatewaySupport gateway = e.getValue();
MessageGatewayNode gatewayNode = this.nodeFactory.gatewayNode(e.getKey(), gateway);
gatewayNode.addProperties(getAdditionalPropertiesIfAny(gateway));
return gatewayNode;
})
.peek(nodes::add)
.forEach(gatewayNode -> producerLink(links, channelNodes, gatewayNode));

Map<String, GatewayProxyFactoryBean> gpfbs = getBeansOfType(GatewayProxyFactoryBean.class);

for (Entry<String, GatewayProxyFactoryBean> entry : gpfbs.entrySet()) {
Map<Method, MessagingGatewaySupport> methodMap = entry.getValue().getGateways();
for (Entry<Method, MessagingGatewaySupport> gwEntry : methodMap.entrySet()) {
MessagingGatewaySupport gateway = gwEntry.getValue();
Method method = gwEntry.getKey();
Class<?>[] parameterTypes = method.getParameterTypes();
String[] parameterTypeNames = new String[parameterTypes.length];
int i = 0;
for (Class<?> type : parameterTypes) {
parameterTypeNames[i++] = type.getName();
}
String signature = method.getName() +
"(" + StringUtils.arrayToCommaDelimitedString(parameterTypeNames) + ")";
MessageGatewayNode gatewayNode = this.nodeFactory.gatewayNode(
entry.getKey().substring(1) + "." + signature, gateway);
nodes.add(gatewayNode);
producerLink(links, channelNodes, gatewayNode);
}
entry.getValue()
.getGateways()
.entrySet()
.stream()
.map(e -> {
MessagingGatewaySupport gateway = e.getValue();
Method method = e.getKey();

String nodeName =
entry.getKey().substring(1) + "." +
method.getName() +
"(" +
Arrays.stream(method.getParameterTypes())
.map(Class::getName)
.collect(Collectors.joining(","))
+ ")";

MessageGatewayNode gatewayNode = this.nodeFactory.gatewayNode(nodeName, gateway);
gatewayNode.addProperties(getAdditionalPropertiesIfAny(gateway));
return gatewayNode;
})
.peek(nodes::add)
.forEach(gatewayNode -> producerLink(links, channelNodes, gatewayNode));
}
}

private void producers(Collection<IntegrationNode> nodes, Collection<LinkNode> links,
Map<String, MessageChannelNode> channelNodes) {

Map<String, MessageProducerSupport> producers = this.applicationContext
.getBeansOfType(MessageProducerSupport.class, true, false);
for (Entry<String, MessageProducerSupport> entry : producers.entrySet()) {
MessageProducerSupport producer = entry.getValue();
MessageProducerNode producerNode = this.nodeFactory.producerNode(entry.getKey(), producer);
nodes.add(producerNode);
producerLink(links, channelNodes, producerNode);
}
getBeansOfType(MessageProducerSupport.class)
.entrySet()
.stream()
.map(e -> {
MessageProducerSupport producer = e.getValue();
MessageProducerNode producerNode = this.nodeFactory.producerNode(e.getKey(), producer);
producerNode.addProperties(getAdditionalPropertiesIfAny(producer));
return producerNode;
})
.peek(nodes::add)
.forEach(producerNode -> producerLink(links, channelNodes, producerNode));
}

private void consumers(Collection<IntegrationNode> nodes, Collection<LinkNode> links,
Map<String, MessageChannelNode> channelNodes) {

Map<String, IntegrationConsumer> consumers = this.applicationContext.getBeansOfType(IntegrationConsumer.class,
true, false);
for (Entry<String, IntegrationConsumer> entry : consumers.entrySet()) {
IntegrationConsumer consumer = entry.getValue();
MessageHandlerNode handlerNode = consumer instanceof PollingConsumer
? this.nodeFactory.polledHandlerNode(entry.getKey(), (PollingConsumer) consumer)
: this.nodeFactory.handlerNode(entry.getKey(), consumer);
nodes.add(handlerNode);
MessageChannelNode channelNode = channelNodes.get(handlerNode.getInput());
if (channelNode != null) {
links.add(new LinkNode(channelNode.getNodeId(), handlerNode.getNodeId(), LinkNode.Type.input));
}
producerLink(links, channelNodes, handlerNode);
getBeansOfType(IntegrationConsumer.class)
.entrySet()
.stream()
.map(e -> {
IntegrationConsumer consumer = e.getValue();
MessageHandlerNode handlerNode =
consumer instanceof PollingConsumer
? this.nodeFactory.polledHandlerNode(e.getKey(), (PollingConsumer) consumer)
: this.nodeFactory.handlerNode(e.getKey(), consumer);
handlerNode.addProperties(getAdditionalPropertiesIfAny(consumer));
return handlerNode;
})
.peek(nodes::add)
.forEach(handlerNode -> {
MessageChannelNode channelNode = channelNodes.get(handlerNode.getInput());
if (channelNode != null) {
links.add(new LinkNode(channelNode.getNodeId(), handlerNode.getNodeId(), LinkNode.Type.input));
}
producerLink(links, channelNodes, handlerNode);
});
}

@Nullable
private Map<String, Object> getAdditionalPropertiesIfAny(NamedComponent namedComponent) {
if (this.additionalPropertiesCallback != null) {
return this.additionalPropertiesCallback.apply(namedComponent);
}
else {
return null;
}
}

Expand Down Expand Up @@ -260,16 +338,6 @@ private void producerLink(Collection<LinkNode> links, Map<String, MessageChannel
}
}

/**
* Rebuild the graph, re-cache it, and return it. Use this method if the application
* components have changed (added or removed).
* @return the graph.
* @see #getGraph()
*/
public Graph rebuild() {
return buildGraph();
}

private static final class NodeFactory {

private final AtomicInteger nodeId = new AtomicInteger();
Expand Down Expand Up @@ -363,15 +431,17 @@ else if (handler instanceof RecipientListRouterManagement) {
private MessageHandlerNode compositeHandler(String name, IntegrationConsumer consumer,
CompositeMessageHandler handler, String output, String errors, boolean polled) {

List<MessageHandler> handlers = handler.getHandlers();
List<CompositeMessageHandlerNode.InnerHandler> innerHandlers = new ArrayList<>();
for (MessageHandler innerHandler : handlers) {
if (innerHandler instanceof NamedComponent) {
NamedComponent named = (NamedComponent) innerHandler;
innerHandlers.add(new CompositeMessageHandlerNode.InnerHandler(named.getComponentName(),
named.getComponentType()));
}
}
List<CompositeMessageHandlerNode.InnerHandler> innerHandlers =
handler.getHandlers()
.stream()
.filter(NamedComponent.class::isInstance)
.map(NamedComponent.class::cast)
.map(named ->
new CompositeMessageHandlerNode.InnerHandler(
named.getComponentName(),
named.getComponentType()))
.collect(Collectors.toList());

String inputChannel = consumer.getInputChannel() != null ? consumer.getInputChannel().toString() : null;
return polled
? new ErrorCapableCompositeMessageHandlerNode(this.nodeId.incrementAndGet(), name, handler,
Expand All @@ -395,12 +465,11 @@ private MessageHandlerNode discardingHandler(String name, IntegrationConsumer co
private MessageHandlerNode routingHandler(String name, IntegrationConsumer consumer, MessageHandler handler,
MappingMessageRouterManagement router, String output, String errors, boolean polled) {

Collection<String> routes = router.getChannelMappings().values();
Collection<String> dynamicChannelNames = router.getDynamicChannelNames();
if (dynamicChannelNames.size() > 0) {
routes = new ArrayList<String>(routes);
routes.addAll(dynamicChannelNames);
}
Collection<String> routes =
Stream.concat(router.getChannelMappings().values().stream(),
router.getDynamicChannelNames().stream())
.collect(Collectors.toList());

String inputChannel = consumer.getInputChannel() != null ? consumer.getInputChannel().toString() : null;
return polled
? new ErrorCapableRoutingNode(this.nodeId.incrementAndGet(), name, handler,
Expand All @@ -413,11 +482,12 @@ private MessageHandlerNode recipientListRoutingHandler(String name, IntegrationC
MessageHandler handler, RecipientListRouterManagement router, String output, String errors,
boolean polled) {

Collection<?> recipients = router.getRecipients();
List<String> routes = new ArrayList<>(recipients.size());
for (Object recipient : recipients) {
routes.add(((Recipient) recipient).getChannel().toString());
}
List<String> routes =
router.getRecipients()
.stream()
.map(recipient -> ((Recipient) recipient).getChannel().toString())
.collect(Collectors.toList());

String inputChannel = consumer.getInputChannel() != null ? consumer.getInputChannel().toString() : null;
return polled
? new ErrorCapableRoutingNode(this.nodeId.incrementAndGet(), name, handler,
Expand Down
Loading