Skip to content

Commit 792b8fe

Browse files
garyrussellartembilan
authored andcommitted
INT-4416: Rework Micrometer Metrics
JIRA: https://jira.spring.io/browse/INT-4416 Metrics should be under a common name, discriminated with tags.
1 parent f26c3a9 commit 792b8fe

File tree

10 files changed

+256
-108
lines changed

10 files changed

+256
-108
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ subprojects { subproject ->
120120
jythonVersion = '2.5.3'
121121
kryoShadedVersion = '3.0.3'
122122
log4jVersion = '2.10.0'
123-
micrometerVersion = '1.0.0'
123+
micrometerVersion = '1.0.1'
124124
mockitoVersion = '2.11.0'
125125
mysqlVersion = '6.0.6'
126126
pahoMqttClientVersion = '1.2.0'

spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@
4848
import org.springframework.util.Assert;
4949
import org.springframework.util.StringUtils;
5050

51+
import io.micrometer.core.instrument.MeterRegistry;
52+
import io.micrometer.core.instrument.Timer;
53+
import io.micrometer.core.instrument.Timer.Sample;
54+
5155
/**
5256
* Base class for {@link MessageChannel} implementations providing common
5357
* properties such as the channel name. Also provides the common functionality
@@ -86,6 +90,8 @@ public abstract class AbstractMessageChannel extends IntegrationObjectSupport
8690

8791
private volatile AbstractMessageChannelMetrics channelMetrics = new DefaultMessageChannelMetrics();
8892

93+
private MeterRegistry meterRegistry;
94+
8995
public AbstractMessageChannel() {
9096
this.interceptors = new ChannelInterceptorList(logger);
9197
}
@@ -100,6 +106,15 @@ public void setShouldTrack(boolean shouldTrack) {
100106
this.shouldTrack = shouldTrack;
101107
}
102108

109+
@Override
110+
public void registerMeterRegistry(MeterRegistry registry) {
111+
this.meterRegistry = registry;
112+
}
113+
114+
protected MeterRegistry getMeterRegistry() {
115+
return this.meterRegistry;
116+
}
117+
103118
@Override
104119
public void setCountsEnabled(boolean countsEnabled) {
105120
this.countsEnabled = countsEnabled;
@@ -417,6 +432,10 @@ public boolean send(Message<?> message, long timeout) {
417432
boolean countsEnabled = this.countsEnabled;
418433
ChannelInterceptorList interceptors = this.interceptors;
419434
AbstractMessageChannelMetrics channelMetrics = this.channelMetrics;
435+
Sample sample = null;
436+
if (this.meterRegistry != null) {
437+
sample = Timer.start(this.meterRegistry);
438+
}
420439
try {
421440
if (this.datatypes.length > 0) {
422441
message = this.convertPayloadIfNecessary(message);
@@ -433,16 +452,22 @@ public boolean send(Message<?> message, long timeout) {
433452
}
434453
}
435454
if (countsEnabled) {
436-
if (channelMetrics.getTimer() != null) {
437-
final Message<?> messageToSend = message;
438-
sent = channelMetrics.getTimer().recordCallable(() -> doSend(messageToSend, timeout));
455+
metrics = channelMetrics.beforeSend();
456+
if (this.meterRegistry != null) {
457+
sample = Timer.start(this.meterRegistry);
439458
}
440-
else {
441-
metrics = channelMetrics.beforeSend();
442-
sent = doSend(message, timeout);
443-
channelMetrics.afterSend(metrics, sent);
444-
metricsProcessed = true;
459+
sent = doSend(message, timeout);
460+
if (sample != null) {
461+
sample.stop(Timer.builder(SEND_TIMER_NAME)
462+
.tag("type", "channel")
463+
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
464+
.tag("result", sent ? "success" : "failure")
465+
.tag("exception", "none")
466+
.description("Subflow process time")
467+
.register(this.meterRegistry));
445468
}
469+
channelMetrics.afterSend(metrics, sent);
470+
metricsProcessed = true;
446471
}
447472
else {
448473
sent = doSend(message, timeout);
@@ -459,12 +484,16 @@ public boolean send(Message<?> message, long timeout) {
459484
}
460485
catch (Exception e) {
461486
if (countsEnabled && !metricsProcessed) {
462-
if (channelMetrics.getErrorCounter() != null) {
463-
channelMetrics.getErrorCounter().increment();
464-
}
465-
else {
466-
channelMetrics.afterSend(metrics, false);
487+
if (sample != null) {
488+
sample.stop(Timer.builder(SEND_TIMER_NAME)
489+
.tag("type", "channel")
490+
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
491+
.tag("result", "failure")
492+
.tag("exception", e.getClass().getSimpleName())
493+
.description("Subflow process time")
494+
.register(this.meterRegistry));
467495
}
496+
channelMetrics.afterSend(metrics, false);
468497
}
469498
if (interceptorStack != null) {
470499
interceptors.afterSendCompletion(message, this, sent, e, interceptorStack);

spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractPollableChannel.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.springframework.messaging.support.ExecutorChannelInterceptor;
2828
import org.springframework.util.CollectionUtils;
2929

30+
import io.micrometer.core.instrument.Counter;
31+
3032
/**
3133
* Base class for all pollable channels.
3234
*
@@ -104,6 +106,15 @@ public Message<?> receive(long timeout) {
104106
}
105107
Message<?> message = this.doReceive(timeout);
106108
if (countsEnabled && message != null) {
109+
if (getMeterRegistry() != null) {
110+
Counter.builder(RECEIVE_COUNTER_NAME)
111+
.tag("name", getComponentName())
112+
.tag("type", "channel")
113+
.tag("result", "success")
114+
.tag("exception", "none")
115+
.description("Messages received")
116+
.register(getMeterRegistry()).increment();
117+
}
107118
getMetrics().afterReceive();
108119
counted = true;
109120
}
@@ -121,6 +132,15 @@ else if (logger.isTraceEnabled()) {
121132
}
122133
catch (RuntimeException e) {
123134
if (countsEnabled && !counted) {
135+
if (getMeterRegistry() != null) {
136+
Counter.builder(RECEIVE_COUNTER_NAME)
137+
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
138+
.tag("type", "channel")
139+
.tag("result", "failure")
140+
.tag("exception", e.getClass().getSimpleName())
141+
.description("Messages received")
142+
.register(getMeterRegistry()).increment();
143+
}
124144
getMetrics().afterError();
125145
}
126146
if (!CollectionUtils.isEmpty(interceptorStack)) {

spring-integration-core/src/main/java/org/springframework/integration/channel/NullChannel.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.channel;
1818

19+
import java.util.concurrent.TimeUnit;
20+
1921
import org.apache.commons.logging.Log;
2022
import org.apache.commons.logging.LogFactory;
2123

@@ -32,6 +34,9 @@
3234
import org.springframework.util.Assert;
3335
import org.springframework.util.StringUtils;
3436

37+
import io.micrometer.core.instrument.MeterRegistry;
38+
import io.micrometer.core.instrument.Timer;
39+
3540
/**
3641
* A channel implementation that essentially behaves like "/dev/null".
3742
* All receive() calls will return <em>null</em>, and all send() calls
@@ -59,6 +64,8 @@ public class NullChannel implements PollableChannel, MessageChannelMetrics,
5964

6065
private String beanName;
6166

67+
private MeterRegistry meterRegistry;
68+
6269
@Override
6370
public void setBeanName(String beanName) {
6471
this.beanName = beanName;
@@ -86,6 +93,11 @@ public String getComponentType() {
8693
return "channel";
8794
}
8895

96+
@Override
97+
public void registerMeterRegistry(MeterRegistry registry) {
98+
this.meterRegistry = registry;
99+
}
100+
89101
@Override
90102
public void configureMetrics(AbstractMessageChannelMetrics metrics) {
91103
Assert.notNull(metrics, "'metrics' must not be null");
@@ -215,6 +227,15 @@ public boolean send(Message<?> message) {
215227
this.logger.debug("message sent to null channel: " + message);
216228
}
217229
if (this.countsEnabled) {
230+
if (this.meterRegistry != null) {
231+
Timer.builder(SEND_TIMER_NAME)
232+
.tag("type", "channel")
233+
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
234+
.tag("result", "success")
235+
.tag("exception", "none")
236+
.description("Subflow process time")
237+
.register(this.meterRegistry).record(0, TimeUnit.MILLISECONDS);
238+
}
218239
this.channelMetrics.afterSend(this.channelMetrics.beforeSend(), true);
219240
}
220241
return true;

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractMessageSource.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.springframework.util.CollectionUtils;
3535

3636
import io.micrometer.core.instrument.Counter;
37+
import io.micrometer.core.instrument.MeterRegistry;
3738

3839
/**
3940
* @author Mark Fisher
@@ -65,11 +66,18 @@ public abstract class AbstractMessageSource<T> extends AbstractExpressionEvaluat
6566

6667
private volatile boolean loggingEnabled = true;
6768

69+
private MeterRegistry meterRegistry;
70+
6871
public void setHeaderExpressions(Map<String, Expression> headerExpressions) {
6972
this.headerExpressions = (headerExpressions != null)
7073
? headerExpressions : Collections.emptyMap();
7174
}
7275

76+
@Override
77+
public void registerMeterRegistry(MeterRegistry registry) {
78+
this.meterRegistry = registry;
79+
}
80+
7381
@Override
7482
public void setBeanName(String name) {
7583
this.beanName = name;
@@ -191,12 +199,16 @@ else if (result != null) {
191199
.build();
192200
}
193201
if (this.countsEnabled && message != null) {
194-
if (this.counter != null) {
195-
this.counter.increment();
196-
}
197-
else {
198-
this.messageCount.incrementAndGet();
202+
if (this.meterRegistry != null) {
203+
Counter.builder(RECEIVE_COUNTER_NAME)
204+
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
205+
.tag("type", "source")
206+
.tag("result", "success")
207+
.tag("exception", "none")
208+
.description("Messages received")
209+
.register(this.meterRegistry).increment();
199210
}
211+
this.messageCount.incrementAndGet();
200212
}
201213
return message;
202214
}

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageHandler.java

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
import org.springframework.messaging.MessagingException;
3737
import org.springframework.util.Assert;
3838

39+
import io.micrometer.core.instrument.MeterRegistry;
40+
import io.micrometer.core.instrument.Timer;
41+
import io.micrometer.core.instrument.Timer.Sample;
3942
import reactor.core.CoreSubscriber;
4043

4144
/**
@@ -71,6 +74,8 @@ public abstract class AbstractMessageHandler extends IntegrationObjectSupport im
7174

7275
private volatile boolean loggingEnabled = true;
7376

77+
private MeterRegistry meterRegistry;
78+
7479
@Override
7580
public boolean isLoggingEnabled() {
7681
return this.loggingEnabled;
@@ -82,6 +87,11 @@ public void setLoggingEnabled(boolean loggingEnabled) {
8287
this.managementOverrides.loggingConfigured = true;
8388
}
8489

90+
@Override
91+
public void registerMeterRegistry(MeterRegistry meterRegistry) {
92+
this.meterRegistry = meterRegistry;
93+
}
94+
8595
@Override
8696
public void setOrder(int order) {
8797
this.order = order;
@@ -131,36 +141,44 @@ public void handleMessage(Message<?> message) {
131141
MetricsContext start = null;
132142
boolean countsEnabled = this.countsEnabled;
133143
AbstractMessageHandlerMetrics handlerMetrics = this.handlerMetrics;
144+
Sample sample = null;
145+
if (countsEnabled && this.meterRegistry != null) {
146+
sample = Timer.start(this.meterRegistry);
147+
}
134148
try {
135149
if (this.shouldTrack) {
136150
message = MessageHistory.write(message, this, this.getMessageBuilderFactory());
137151
}
138152
if (countsEnabled) {
139-
if (handlerMetrics.getTimer() != null) {
140-
final Message<?> messageToSend = message;
141-
handlerMetrics.getTimer().recordCallable(() -> {
142-
handleMessageInternal(messageToSend);
143-
return null;
144-
});
145-
}
146-
else {
147-
start = handlerMetrics.beforeHandle();
148-
handleMessageInternal(message);
149-
handlerMetrics.afterHandle(start, true);
153+
start = handlerMetrics.beforeHandle();
154+
handleMessageInternal(message);
155+
if (this.meterRegistry != null) {
156+
sample.stop(Timer.builder(SEND_TIMER_NAME)
157+
.tag("type", "handler")
158+
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
159+
.tag("result", "success")
160+
.tag("exception", "none")
161+
.description("Subflow process time")
162+
.register(this.meterRegistry));
150163
}
164+
handlerMetrics.afterHandle(start, true);
151165
}
152166
else {
153167
handleMessageInternal(message);
154168
}
155169
}
156170
catch (Exception e) {
171+
if (sample != null) {
172+
sample.stop(Timer.builder(SEND_TIMER_NAME)
173+
.tag("type", "handler")
174+
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
175+
.tag("result", "failure")
176+
.tag("exception", e.getClass().getSimpleName())
177+
.description("Subflow process time")
178+
.register(this.meterRegistry));
179+
}
157180
if (countsEnabled) {
158-
if (handlerMetrics.getErrorCounter() != null) {
159-
handlerMetrics.getErrorCounter().increment();
160-
}
161-
else {
162-
handlerMetrics.afterHandle(start, false);
163-
}
181+
handlerMetrics.afterHandle(start, false);
164182
}
165183
if (e instanceof MessagingException) {
166184
throw (MessagingException) e;

spring-integration-core/src/main/java/org/springframework/integration/support/management/IntegrationManagement.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.springframework.jmx.export.annotation.ManagedAttribute;
2020
import org.springframework.jmx.export.annotation.ManagedOperation;
2121

22+
import io.micrometer.core.instrument.MeterRegistry;
23+
2224
/**
2325
* Base interface for Integration managed components.
2426
*
@@ -28,6 +30,12 @@
2830
*/
2931
public interface IntegrationManagement {
3032

33+
String METER_PREFIX = "spring.integration.";
34+
35+
String SEND_TIMER_NAME = METER_PREFIX + "send";
36+
37+
String RECEIVE_COUNTER_NAME = METER_PREFIX + "receive";
38+
3139
@ManagedAttribute(description = "Use to disable debug logging during normal message flow")
3240
void setLoggingEnabled(boolean enabled);
3341

@@ -50,6 +58,15 @@ public interface IntegrationManagement {
5058
*/
5159
ManagementOverrides getOverrides();
5260

61+
/**
62+
* Inject a micrometer {@link MeterRegistry}
63+
* @param registry the registry.
64+
* @since 5.0.3
65+
*/
66+
default void registerMeterRegistry(MeterRegistry registry) {
67+
// no op
68+
}
69+
5370
/**
5471
* Toggles to inform the management configurer to not set these properties since
5572
* the user has manually configured them in a bean definition. If true, the

0 commit comments

Comments
 (0)