Skip to content

Commit 90ac259

Browse files
artembilangaryrussell
authored andcommitted
INT-4520 Make IntegrationGraphServer customizable (#2608)
* INT-4520 Make IntegrationGraphServer customizable JIRA: https://jira.spring.io/browse/INT-4520 * * Polishing according PR comments
1 parent 495dfe6 commit 90ac259

File tree

5 files changed

+267
-111
lines changed

5 files changed

+267
-111
lines changed

spring-integration-core/src/main/java/org/springframework/integration/graph/IntegrationGraphServer.java

Lines changed: 173 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818

1919
import java.lang.reflect.Method;
2020
import java.util.ArrayList;
21+
import java.util.Arrays;
2122
import java.util.Collection;
2223
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.Map.Entry;
2627
import java.util.concurrent.atomic.AtomicInteger;
28+
import java.util.function.Function;
29+
import java.util.stream.Collectors;
30+
import java.util.stream.Stream;
2731

2832
import org.springframework.beans.BeansException;
2933
import org.springframework.context.ApplicationContext;
@@ -42,9 +46,9 @@
4246
import org.springframework.integration.router.RecipientListRouterManagement;
4347
import org.springframework.integration.support.context.NamedComponent;
4448
import org.springframework.integration.support.management.MappingMessageRouterManagement;
49+
import org.springframework.lang.Nullable;
4550
import org.springframework.messaging.MessageChannel;
4651
import org.springframework.messaging.MessageHandler;
47-
import org.springframework.util.StringUtils;
4852

4953
/**
5054
* Builds the runtime object model graph.
@@ -67,11 +71,17 @@ public class IntegrationGraphServer implements ApplicationContextAware, Applicat
6771

6872
private String applicationName;
6973

74+
private Function<NamedComponent, Map<String, Object>> additionalPropertiesCallback;
75+
7076
@Override
7177
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
7278
this.applicationContext = applicationContext; //NOSONAR (sync)
7379
}
7480

81+
protected ApplicationContext getApplicationContext() {
82+
return this.applicationContext;
83+
}
84+
7585
/**
7686
* Set the application name that will appear in the 'contentDescriptor' under
7787
* the 'name' key. If not provided, the property 'spring.application.name' from
@@ -82,6 +92,24 @@ public void setApplicationName(String applicationName) {
8292
this.applicationName = applicationName; //NOSONAR (sync)
8393
}
8494

95+
/**
96+
* Specify a callback {@link Function} to be called against each {@link NamedComponent}
97+
* to populate additional properties to the target {@link IntegrationNode}.
98+
* @param additionalPropertiesCallback the {@link Function} to use for properties.
99+
* @since 5.1
100+
*/
101+
public void setAdditionalPropertiesCallback(
102+
@Nullable Function<NamedComponent, Map<String, Object>> additionalPropertiesCallback) {
103+
this.additionalPropertiesCallback = additionalPropertiesCallback;
104+
}
105+
106+
@Override
107+
public void onApplicationEvent(ContextRefreshedEvent event) {
108+
if (event.getApplicationContext().equals(this.applicationContext)) {
109+
buildGraph();
110+
}
111+
}
112+
85113
/**
86114
* Return the cached graph. Although the graph is cached, the data therein (stats
87115
* etc.) are dynamic.
@@ -99,11 +127,27 @@ public Graph getGraph() {
99127
return this.graph;
100128
}
101129

102-
@Override
103-
public void onApplicationEvent(ContextRefreshedEvent event) {
104-
if (event.getApplicationContext().equals(this.applicationContext)) {
105-
buildGraph();
106-
}
130+
/**
131+
* Rebuild the graph, re-cache it, and return it. Use this method if the application
132+
* components have changed (added or removed).
133+
* @return the graph.
134+
* @see #getGraph()
135+
*/
136+
public Graph rebuild() {
137+
return buildGraph();
138+
}
139+
140+
/**
141+
* Get beans for the provided type from the application context.
142+
* This method can be extended for some custom logic, e.g. get beans
143+
* from the parent application context as well.
144+
* @param type the type for beans to obtain
145+
* @param <T> the type for beans to obtain
146+
* @return a {@link Map} of bean for the provided type
147+
* @since 5.1
148+
*/
149+
protected <T> Map<String, T> getBeansOfType(Class<T> type) {
150+
return this.applicationContext.getBeansOfType(type, true, false);
107151
}
108152

109153
private synchronized Graph buildGraph() {
@@ -135,95 +179,129 @@ private synchronized Graph buildGraph() {
135179
}
136180

137181
private Map<String, MessageChannelNode> channels(Collection<IntegrationNode> nodes) {
138-
Map<String, MessageChannel> channels = this.applicationContext
139-
.getBeansOfType(MessageChannel.class, true, false);
140-
Map<String, MessageChannelNode> channelNodes = new HashMap<>();
141-
for (Entry<String, MessageChannel> entry : channels.entrySet()) {
142-
MessageChannel channel = entry.getValue();
143-
MessageChannelNode channelNode = this.nodeFactory.channelNode(entry.getKey(), channel);
144-
String beanName = entry.getKey();
145-
nodes.add(channelNode);
146-
channelNodes.put(beanName, channelNode);
147-
}
148-
return channelNodes;
182+
return getBeansOfType(MessageChannel.class)
183+
.entrySet()
184+
.stream()
185+
.map(e -> {
186+
MessageChannel messageChannel = e.getValue();
187+
MessageChannelNode messageChannelNode = this.nodeFactory.channelNode(e.getKey(), messageChannel);
188+
if (messageChannel instanceof NamedComponent) {
189+
messageChannelNode.addProperties(getAdditionalPropertiesIfAny((NamedComponent) messageChannel));
190+
}
191+
return messageChannelNode;
192+
})
193+
.peek(nodes::add)
194+
.collect(Collectors.toMap(MessageChannelNode::getName, Function.identity()));
149195
}
150196

151197
private void pollingAdapters(Collection<IntegrationNode> nodes, Collection<LinkNode> links,
152198
Map<String, MessageChannelNode> channelNodes) {
153199

154-
Map<String, SourcePollingChannelAdapter> spcas = this.applicationContext
155-
.getBeansOfType(SourcePollingChannelAdapter.class, true, false);
156-
for (Entry<String, SourcePollingChannelAdapter> entry : spcas.entrySet()) {
157-
SourcePollingChannelAdapter adapter = entry.getValue();
158-
MessageSourceNode sourceNode = this.nodeFactory.sourceNode(entry.getKey(), adapter);
159-
nodes.add(sourceNode);
160-
producerLink(links, channelNodes, sourceNode);
161-
}
200+
getBeansOfType(SourcePollingChannelAdapter.class)
201+
.entrySet()
202+
.stream()
203+
.map(e -> {
204+
SourcePollingChannelAdapter sourceAdapter = e.getValue();
205+
MessageSourceNode sourceNode = this.nodeFactory.sourceNode(e.getKey(), sourceAdapter);
206+
sourceNode.addProperties(getAdditionalPropertiesIfAny(sourceAdapter));
207+
return sourceNode;
208+
})
209+
.peek(nodes::add)
210+
.forEach(sourceNode -> producerLink(links, channelNodes, sourceNode));
162211
}
163212

164213
private void gateways(Collection<IntegrationNode> nodes, Collection<LinkNode> links,
165214
Map<String, MessageChannelNode> channelNodes) {
166215

167-
Map<String, MessagingGatewaySupport> gateways = this.applicationContext
168-
.getBeansOfType(MessagingGatewaySupport.class, true, false);
169-
for (Entry<String, MessagingGatewaySupport> entry : gateways.entrySet()) {
170-
MessagingGatewaySupport gateway = entry.getValue();
171-
MessageGatewayNode gatewayNode = this.nodeFactory.gatewayNode(entry.getKey(), gateway);
172-
nodes.add(gatewayNode);
173-
producerLink(links, channelNodes, gatewayNode);
174-
}
175-
Map<String, GatewayProxyFactoryBean> gpfbs = this.applicationContext
176-
.getBeansOfType(GatewayProxyFactoryBean.class, true, false);
216+
getBeansOfType(MessagingGatewaySupport.class)
217+
.entrySet()
218+
.stream()
219+
.map(e -> {
220+
MessagingGatewaySupport gateway = e.getValue();
221+
MessageGatewayNode gatewayNode = this.nodeFactory.gatewayNode(e.getKey(), gateway);
222+
gatewayNode.addProperties(getAdditionalPropertiesIfAny(gateway));
223+
return gatewayNode;
224+
})
225+
.peek(nodes::add)
226+
.forEach(gatewayNode -> producerLink(links, channelNodes, gatewayNode));
227+
228+
Map<String, GatewayProxyFactoryBean> gpfbs = getBeansOfType(GatewayProxyFactoryBean.class);
229+
177230
for (Entry<String, GatewayProxyFactoryBean> entry : gpfbs.entrySet()) {
178-
Map<Method, MessagingGatewaySupport> methodMap = entry.getValue().getGateways();
179-
for (Entry<Method, MessagingGatewaySupport> gwEntry : methodMap.entrySet()) {
180-
MessagingGatewaySupport gateway = gwEntry.getValue();
181-
Method method = gwEntry.getKey();
182-
Class<?>[] parameterTypes = method.getParameterTypes();
183-
String[] parameterTypeNames = new String[parameterTypes.length];
184-
int i = 0;
185-
for (Class<?> type : parameterTypes) {
186-
parameterTypeNames[i++] = type.getName();
187-
}
188-
String signature = method.getName() +
189-
"(" + StringUtils.arrayToCommaDelimitedString(parameterTypeNames) + ")";
190-
MessageGatewayNode gatewayNode = this.nodeFactory.gatewayNode(
191-
entry.getKey().substring(1) + "." + signature, gateway);
192-
nodes.add(gatewayNode);
193-
producerLink(links, channelNodes, gatewayNode);
194-
}
231+
entry.getValue()
232+
.getGateways()
233+
.entrySet()
234+
.stream()
235+
.map(e -> {
236+
MessagingGatewaySupport gateway = e.getValue();
237+
Method method = e.getKey();
238+
239+
String nodeName =
240+
entry.getKey().substring(1) + "." +
241+
method.getName() +
242+
"(" +
243+
Arrays.stream(method.getParameterTypes())
244+
.map(Class::getName)
245+
.collect(Collectors.joining(","))
246+
+ ")";
247+
248+
MessageGatewayNode gatewayNode = this.nodeFactory.gatewayNode(nodeName, gateway);
249+
gatewayNode.addProperties(getAdditionalPropertiesIfAny(gateway));
250+
return gatewayNode;
251+
})
252+
.peek(nodes::add)
253+
.forEach(gatewayNode -> producerLink(links, channelNodes, gatewayNode));
195254
}
196255
}
197256

198257
private void producers(Collection<IntegrationNode> nodes, Collection<LinkNode> links,
199258
Map<String, MessageChannelNode> channelNodes) {
200259

201-
Map<String, MessageProducerSupport> producers = this.applicationContext
202-
.getBeansOfType(MessageProducerSupport.class, true, false);
203-
for (Entry<String, MessageProducerSupport> entry : producers.entrySet()) {
204-
MessageProducerSupport producer = entry.getValue();
205-
MessageProducerNode producerNode = this.nodeFactory.producerNode(entry.getKey(), producer);
206-
nodes.add(producerNode);
207-
producerLink(links, channelNodes, producerNode);
208-
}
260+
getBeansOfType(MessageProducerSupport.class)
261+
.entrySet()
262+
.stream()
263+
.map(e -> {
264+
MessageProducerSupport producer = e.getValue();
265+
MessageProducerNode producerNode = this.nodeFactory.producerNode(e.getKey(), producer);
266+
producerNode.addProperties(getAdditionalPropertiesIfAny(producer));
267+
return producerNode;
268+
})
269+
.peek(nodes::add)
270+
.forEach(producerNode -> producerLink(links, channelNodes, producerNode));
209271
}
210272

211273
private void consumers(Collection<IntegrationNode> nodes, Collection<LinkNode> links,
212274
Map<String, MessageChannelNode> channelNodes) {
213275

214-
Map<String, IntegrationConsumer> consumers = this.applicationContext.getBeansOfType(IntegrationConsumer.class,
215-
true, false);
216-
for (Entry<String, IntegrationConsumer> entry : consumers.entrySet()) {
217-
IntegrationConsumer consumer = entry.getValue();
218-
MessageHandlerNode handlerNode = consumer instanceof PollingConsumer
219-
? this.nodeFactory.polledHandlerNode(entry.getKey(), (PollingConsumer) consumer)
220-
: this.nodeFactory.handlerNode(entry.getKey(), consumer);
221-
nodes.add(handlerNode);
222-
MessageChannelNode channelNode = channelNodes.get(handlerNode.getInput());
223-
if (channelNode != null) {
224-
links.add(new LinkNode(channelNode.getNodeId(), handlerNode.getNodeId(), LinkNode.Type.input));
225-
}
226-
producerLink(links, channelNodes, handlerNode);
276+
getBeansOfType(IntegrationConsumer.class)
277+
.entrySet()
278+
.stream()
279+
.map(e -> {
280+
IntegrationConsumer consumer = e.getValue();
281+
MessageHandlerNode handlerNode =
282+
consumer instanceof PollingConsumer
283+
? this.nodeFactory.polledHandlerNode(e.getKey(), (PollingConsumer) consumer)
284+
: this.nodeFactory.handlerNode(e.getKey(), consumer);
285+
handlerNode.addProperties(getAdditionalPropertiesIfAny(consumer));
286+
return handlerNode;
287+
})
288+
.peek(nodes::add)
289+
.forEach(handlerNode -> {
290+
MessageChannelNode channelNode = channelNodes.get(handlerNode.getInput());
291+
if (channelNode != null) {
292+
links.add(new LinkNode(channelNode.getNodeId(), handlerNode.getNodeId(), LinkNode.Type.input));
293+
}
294+
producerLink(links, channelNodes, handlerNode);
295+
});
296+
}
297+
298+
@Nullable
299+
private Map<String, Object> getAdditionalPropertiesIfAny(NamedComponent namedComponent) {
300+
if (this.additionalPropertiesCallback != null) {
301+
return this.additionalPropertiesCallback.apply(namedComponent);
302+
}
303+
else {
304+
return null;
227305
}
228306
}
229307

@@ -260,16 +338,6 @@ private void producerLink(Collection<LinkNode> links, Map<String, MessageChannel
260338
}
261339
}
262340

263-
/**
264-
* Rebuild the graph, re-cache it, and return it. Use this method if the application
265-
* components have changed (added or removed).
266-
* @return the graph.
267-
* @see #getGraph()
268-
*/
269-
public Graph rebuild() {
270-
return buildGraph();
271-
}
272-
273341
private static final class NodeFactory {
274342

275343
private final AtomicInteger nodeId = new AtomicInteger();
@@ -363,15 +431,17 @@ else if (handler instanceof RecipientListRouterManagement) {
363431
private MessageHandlerNode compositeHandler(String name, IntegrationConsumer consumer,
364432
CompositeMessageHandler handler, String output, String errors, boolean polled) {
365433

366-
List<MessageHandler> handlers = handler.getHandlers();
367-
List<CompositeMessageHandlerNode.InnerHandler> innerHandlers = new ArrayList<>();
368-
for (MessageHandler innerHandler : handlers) {
369-
if (innerHandler instanceof NamedComponent) {
370-
NamedComponent named = (NamedComponent) innerHandler;
371-
innerHandlers.add(new CompositeMessageHandlerNode.InnerHandler(named.getComponentName(),
372-
named.getComponentType()));
373-
}
374-
}
434+
List<CompositeMessageHandlerNode.InnerHandler> innerHandlers =
435+
handler.getHandlers()
436+
.stream()
437+
.filter(NamedComponent.class::isInstance)
438+
.map(NamedComponent.class::cast)
439+
.map(named ->
440+
new CompositeMessageHandlerNode.InnerHandler(
441+
named.getComponentName(),
442+
named.getComponentType()))
443+
.collect(Collectors.toList());
444+
375445
String inputChannel = consumer.getInputChannel() != null ? consumer.getInputChannel().toString() : null;
376446
return polled
377447
? new ErrorCapableCompositeMessageHandlerNode(this.nodeId.incrementAndGet(), name, handler,
@@ -395,12 +465,11 @@ private MessageHandlerNode discardingHandler(String name, IntegrationConsumer co
395465
private MessageHandlerNode routingHandler(String name, IntegrationConsumer consumer, MessageHandler handler,
396466
MappingMessageRouterManagement router, String output, String errors, boolean polled) {
397467

398-
Collection<String> routes = router.getChannelMappings().values();
399-
Collection<String> dynamicChannelNames = router.getDynamicChannelNames();
400-
if (dynamicChannelNames.size() > 0) {
401-
routes = new ArrayList<String>(routes);
402-
routes.addAll(dynamicChannelNames);
403-
}
468+
Collection<String> routes =
469+
Stream.concat(router.getChannelMappings().values().stream(),
470+
router.getDynamicChannelNames().stream())
471+
.collect(Collectors.toList());
472+
404473
String inputChannel = consumer.getInputChannel() != null ? consumer.getInputChannel().toString() : null;
405474
return polled
406475
? new ErrorCapableRoutingNode(this.nodeId.incrementAndGet(), name, handler,
@@ -413,11 +482,12 @@ private MessageHandlerNode recipientListRoutingHandler(String name, IntegrationC
413482
MessageHandler handler, RecipientListRouterManagement router, String output, String errors,
414483
boolean polled) {
415484

416-
Collection<?> recipients = router.getRecipients();
417-
List<String> routes = new ArrayList<>(recipients.size());
418-
for (Object recipient : recipients) {
419-
routes.add(((Recipient) recipient).getChannel().toString());
420-
}
485+
List<String> routes =
486+
router.getRecipients()
487+
.stream()
488+
.map(recipient -> ((Recipient) recipient).getChannel().toString())
489+
.collect(Collectors.toList());
490+
421491
String inputChannel = consumer.getInputChannel() != null ? consumer.getInputChannel().toString() : null;
422492
return polled
423493
? new ErrorCapableRoutingNode(this.nodeId.incrementAndGet(), name, handler,

0 commit comments

Comments
 (0)