Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/modules/ROOT/pages/reference/jvm.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ another. The reported value underestimates the actual total number of steals whe
* `executor.running` (`Gauge`): An estimate of the number of worker threads that are not blocked but are waiting to join tasks or for other managed synchronization threads.
* `executor.parallelism` (`Gauge`): The targeted parallelism level of this pool.
* `executor.pool.size` (`Gauge`): The current number of threads in the pool.
* `executor.delayed` (`Gauge`): An estimate of the number of delayed (including periodic) tasks scheduled but not yet ready for execution. Available on Java 25+.

To use the following `ExecutorService` instances, `--add-opens java.base/java.util.concurrent=ALL-UNNAMED` is required:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* @author Jon Schneider
* @author Clint Checketts
* @author Johnny Lim
* @author Daeho Kwon
*/
public class ExecutorServiceMetrics implements MeterBinder {

Expand All @@ -71,6 +72,8 @@ public class ExecutorServiceMetrics implements MeterBinder {

private static final @Nullable MethodHandle METHOD_HANDLE_THREAD_COUNT_FROM_THREAD_PER_TASK_EXECUTOR = getMethodHandleForThreadCountFromThreadPerTaskExecutor();

private static final @Nullable MethodHandle METHOD_HANDLE_FJP_GET_DELAYED_TASK_COUNT = getMethodHandleForDelayedTaskCount();

private static boolean allowIllegalReflectiveAccess = true;

private static final InternalLogger log = InternalLoggerFactory.getInstance(ExecutorServiceMetrics.class);
Expand Down Expand Up @@ -459,6 +462,17 @@ private void monitor(MeterRegistry registry, ForkJoinPool fj) {
.baseUnit(BaseUnits.THREADS)
.register(registry));
registeredMeterIds.addAll(meters.stream().map(Meter::getId).collect(toSet()));

// getDelayedTaskCount() was added in Java 25; only register when available
if (METHOD_HANDLE_FJP_GET_DELAYED_TASK_COUNT != null) {
Meter delayedGauge = Gauge
.builder(metricPrefix + "executor.delayed", fj, ExecutorServiceMetrics::getDelayedTaskCount)
.tags(tags)
.description("An estimate of the number of delayed tasks scheduled but not yet ready for execution")
.baseUnit(BaseUnits.TASKS)
.register(registry);
registeredMeterIds.add(delayedGauge.getId());
}
}

private void monitorThreadPerTaskExecutor(MeterRegistry registry, ExecutorService executorService) {
Expand Down Expand Up @@ -500,6 +514,25 @@ private static long getThreadCountFromThreadPerTaskExecutor(ExecutorService exec
}
}

private static long getDelayedTaskCount(ForkJoinPool pool) {
try {
return (long) Objects.requireNonNull(METHOD_HANDLE_FJP_GET_DELAYED_TASK_COUNT).invoke(pool);
}
catch (Throwable e) {
throw new RuntimeException(e);
}
}

private static @Nullable MethodHandle getMethodHandleForDelayedTaskCount() {
try {
Method method = ForkJoinPool.class.getMethod("getDelayedTaskCount");
return MethodHandles.lookup().unreflect(method);
}
catch (Throwable e) {
return null;
}
}

/**
* Disable illegal reflective accesses.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.micrometer.core.instrument.binder.jvm;

import io.micrometer.core.Issue;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.MockClock;
import io.micrometer.core.instrument.Tag;
Expand All @@ -26,6 +27,7 @@
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledForJreRange;
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.JRE;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
Expand All @@ -43,6 +45,7 @@
* @author Jon Schneider
* @author Johnny Lim
* @author Sebastian Lövdahl
* @author Daeho Kwon
*/
class ExecutorServiceMetricsTest {

Expand Down Expand Up @@ -358,6 +361,39 @@ void queuedSubmissionsAreIncludedInExecutorQueuedMetric() {
pool.shutdown();
}

@EnabledForJreRange(min = JRE.JAVA_25)
@DisplayName("ForkJoinPool executor.delayed gauge reflects delayed tasks on Java 25+")
@Test
void forkJoinPoolDelayedTaskCountMetric() {
ForkJoinPool pool = new ForkJoinPool(1);
ExecutorServiceMetrics.monitor(registry, pool, "myFjp", userTags);

Gauge gauge = registry.get("executor.delayed").tags(userTags).tag("name", "myFjp").gauge();
assertThat(gauge.value()).isEqualTo(0.0);

ScheduledFuture<?> future = ((ScheduledExecutorService) pool).schedule(() -> {
}, 10, TimeUnit.SECONDS);
try {
await().untilAsserted(() -> assertThat(gauge.value()).isEqualTo(1.0));
}
finally {
future.cancel(true);
pool.shutdown();
}

await().untilAsserted(() -> assertThat(gauge.value()).isEqualTo(0.0));
}

@DisabledForJreRange(min = JRE.JAVA_25)
@DisplayName("ForkJoinPool executor.delayed gauge is not registered on Java versions before 25")
@Test
void forkJoinPoolDelayedGaugeNotRegisteredBeforeJava25() {
ForkJoinPool pool = new ForkJoinPool(1);
ExecutorServiceMetrics.monitor(registry, pool, "myFjp", userTags);
assertThatThrownBy(() -> registry.get("executor.delayed").gauge()).isInstanceOf(MeterNotFoundException.class);
pool.shutdown();
}

@SuppressWarnings("unchecked")
private <T extends Executor> T monitorExecutorService(String executorName, String metricPrefix, T exec) {
if (metricPrefix == null) {
Expand Down
Loading