From c2ec8dba0b59d8404d8a2dd8938d52291037cbb8 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 2 Jun 2025 14:57:14 +1000 Subject: [PATCH 1/8] Stub a heap usage field on ClusterInfo --- .../java/org/elasticsearch/cluster/ClusterInfo.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index a2c260e8699e8..3a2051dfc0309 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -57,6 +57,13 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { final Map shardDataSetSizes; final Map dataPath; final Map reservedSpace; + private final Map nodeHeapUsages = Map.of(); + + public record NodeHeapUsage(String nodeId, long usedInBytes, long totalInBytes) { + public double usedPercentage() { + return (usedInBytes * 100.0d) / totalInBytes; + } + } protected ClusterInfo() { this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); @@ -211,6 +218,10 @@ public Map getNodeMostAvailableDiskUsages() { return this.mostAvailableSpaceUsage; } + public Map getNodeHeapUsages() { + return nodeHeapUsages; + } + /** * Returns the shard size for the given shardId or null if that metric is not available. */ From 3a358f2e8ad3c6856702f201c61abaa4020881f2 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Tue, 10 Jun 2025 16:12:35 +1000 Subject: [PATCH 2/8] update mock --- .../main/java/org/elasticsearch/cluster/ClusterInfo.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index 3a2051dfc0309..471499b2732ac 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -57,10 +57,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { final Map shardDataSetSizes; final Map dataPath; final Map reservedSpace; - private final Map nodeHeapUsages = Map.of(); + private final Map nodeHeapUsages = Map.of(); - public record NodeHeapUsage(String nodeId, long usedInBytes, long totalInBytes) { - public double usedPercentage() { + public record ShardHeapUsage(String nodeId, String nodeName, long usedInBytes, long totalInBytes) { + public double estimatedUsageAsPercentage() { return (usedInBytes * 100.0d) / totalInBytes; } } @@ -218,7 +218,7 @@ public Map getNodeMostAvailableDiskUsages() { return this.mostAvailableSpaceUsage; } - public Map getNodeHeapUsages() { + public Map getNodeHeapUsages() { return nodeHeapUsages; } From f7e88c944bbe579430661b7d1d3014e49754b1e4 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 12 Jun 2025 14:42:16 +1000 Subject: [PATCH 3/8] revert stub changes --- .../java/org/elasticsearch/cluster/ClusterInfo.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index 471499b2732ac..a2c260e8699e8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -57,13 +57,6 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { final Map shardDataSetSizes; final Map dataPath; final Map reservedSpace; - private final Map nodeHeapUsages = Map.of(); - - public record ShardHeapUsage(String nodeId, String nodeName, long usedInBytes, long totalInBytes) { - public double estimatedUsageAsPercentage() { - return (usedInBytes * 100.0d) / totalInBytes; - } - } protected ClusterInfo() { this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); @@ -218,10 +211,6 @@ public Map getNodeMostAvailableDiskUsages() { return this.mostAvailableSpaceUsage; } - public Map getNodeHeapUsages() { - return nodeHeapUsages; - } - /** * Returns the shard size for the given shardId or null if that metric is not available. */ From fea375fe5600982aff1851b15ecbecd9e914f28e Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 12 Jun 2025 19:36:50 +1000 Subject: [PATCH 4/8] add setting and update clusterInfoSimulator --- .../index/shard/IndexShardIT.java | 31 ++++++++++++--- .../cluster/ClusterInfoSimulator.java | 4 +- .../cluster/InternalClusterInfoService.java | 38 +++++++++++++++++-- .../common/settings/ClusterSettings.java | 1 + .../test/ESSingleNodeTestCase.java | 4 ++ 5 files changed, 67 insertions(+), 11 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 4967888e021cf..5c705569a0b16 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -264,14 +264,33 @@ public void testExpectedShardSizeIsPresent() throws InterruptedException { public void testHeapUsageEstimateIsPresent() { InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); ClusterInfoServiceUtils.refresh(clusterInfoService); - ClusterState state = getInstanceFromNode(ClusterService.class).state(); Map shardHeapUsages = clusterInfoService.getClusterInfo().getShardHeapUsages(); assertNotNull(shardHeapUsages); - assertEquals(state.nodes().size(), shardHeapUsages.size()); - for (DiscoveryNode node : state.nodes()) { - assertTrue(shardHeapUsages.containsKey(node.getId())); - ShardHeapUsage shardHeapUsage = shardHeapUsages.get(node.getId()); - assertThat(shardHeapUsage.estimatedFreeBytes(), lessThanOrEqualTo(shardHeapUsage.totalBytes())); + // Not collecting yet because it is disabled + assertTrue(shardHeapUsages.isEmpty()); + + // Enable collection for shard heap usages + updateClusterSettings( + Settings.builder() + .put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true) + .build() + ); + try { + ClusterInfoServiceUtils.refresh(clusterInfoService); + ClusterState state = getInstanceFromNode(ClusterService.class).state(); + shardHeapUsages = clusterInfoService.getClusterInfo().getShardHeapUsages(); + assertEquals(state.nodes().size(), shardHeapUsages.size()); + for (DiscoveryNode node : state.nodes()) { + assertTrue(shardHeapUsages.containsKey(node.getId())); + ShardHeapUsage shardHeapUsage = shardHeapUsages.get(node.getId()); + assertThat(shardHeapUsage.estimatedFreeBytes(), lessThanOrEqualTo(shardHeapUsage.totalBytes())); + } + } finally { + updateClusterSettings( + Settings.builder() + .putNull(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED.getKey()) + .build() + ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index 0536322b1d730..ac983672642a7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -33,6 +33,7 @@ public class ClusterInfoSimulator { private final CopyOnFirstWriteMap shardSizes; private final Map shardDataSetSizes; private final Map dataPath; + private final Map shardHeapUsages; public ClusterInfoSimulator(RoutingAllocation allocation) { this.allocation = allocation; @@ -41,6 +42,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) { this.shardSizes = new CopyOnFirstWriteMap<>(allocation.clusterInfo().shardSizes); this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes); this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath); + this.shardHeapUsages = allocation.clusterInfo().getShardHeapUsages(); } /** @@ -154,7 +156,7 @@ public ClusterInfo getClusterInfo() { shardDataSetSizes, dataPath, Map.of(), - Map.of() + shardHeapUsages ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 4c8655118dd82..276235b57778c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -83,7 +83,15 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt Property.NodeScope ); + public static final Setting CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED = Setting.boolSetting( + "cluster.routing.allocation.shard_heap.threshold_enabled", + false, + Property.Dynamic, + Property.NodeScope + ); + private volatile boolean diskThresholdEnabled; + private volatile boolean shardHeapThresholdEnabled; private volatile TimeValue updateFrequency; private volatile TimeValue fetchTimeout; @@ -130,12 +138,20 @@ public InternalClusterInfoService( DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setDiskThresholdEnabled ); + clusterSettings.initializeAndWatch( + CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED, + this::setShardHeapThresholdEnabled + ); } private void setDiskThresholdEnabled(boolean diskThresholdEnabled) { this.diskThresholdEnabled = diskThresholdEnabled; } + private void setShardHeapThresholdEnabled(boolean shardHeapThresholdEnabled) { + this.shardHeapThresholdEnabled = shardHeapThresholdEnabled; + } + private void setFetchTimeout(TimeValue fetchTimeout) { this.fetchTimeout = fetchTimeout; } @@ -193,11 +209,25 @@ void execute() { logger.trace("skipping collecting disk usage info from cluster, notifying listeners with empty cluster info"); indicesStatsSummary = IndicesStatsSummary.EMPTY; } - try (var ignored = threadPool.getThreadContext().clearTraceContext()) { - fetchNodeStats(); + + if (diskThresholdEnabled || shardHeapThresholdEnabled) { + try (var ignored = threadPool.getThreadContext().clearTraceContext()) { + fetchNodeStats(); + } + } else { + logger.trace("skipping collecting node stats from cluster, notifying listeners with empty node stats"); + leastAvailableSpaceUsages = Map.of(); + mostAvailableSpaceUsages = Map.of(); + maxHeapPerNode = Map.of(); } - try (var ignored = threadPool.getThreadContext().clearTraceContext()) { - fetchNodesHeapUsage(); + + if (shardHeapThresholdEnabled) { + try (var ignored = threadPool.getThreadContext().clearTraceContext()) { + fetchNodesHeapUsage(); + } + } else { + logger.trace("skipping collecting shard heap usage from cluster, notifying listeners with empty shard heap usage"); + shardHeapUsagePerNode = Map.of(); } } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index e9982b476c520..9b197baef406c 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) { DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_MAX_HEADROOM_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, + InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED, SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING, InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index a786eaf4aca5f..7ebc5765bda63 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -536,4 +536,8 @@ protected final void deletePipeline(String id) { ) ); } + + protected void updateClusterSettings(Settings settings) { + safeGet(clusterAdmin().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).setPersistentSettings(settings).execute()); + } } From d0593c85aa1547a44a80f86a1f752ecc1cca2f9c Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 12 Jun 2025 19:58:19 +1000 Subject: [PATCH 5/8] ratio method --- .../main/java/org/elasticsearch/cluster/ShardHeapUsage.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ShardHeapUsage.java b/server/src/main/java/org/elasticsearch/cluster/ShardHeapUsage.java index 3da97ac946f57..cc6a00421a292 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ShardHeapUsage.java +++ b/server/src/main/java/org/elasticsearch/cluster/ShardHeapUsage.java @@ -45,6 +45,10 @@ public double estimatedFreeBytesAsPercentage() { } public double estimatedUsageAsPercentage() { - return 100.0 * estimatedUsageBytes / (double) totalBytes; + return 100.0 * estimatedUsageAsRatio(); + } + + public double estimatedUsageAsRatio() { + return estimatedUsageBytes / (double) totalBytes; } } From a3fb6db915f41710236e0c26641bc0859c5a04d4 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 12 Jun 2025 20:30:51 +1000 Subject: [PATCH 6/8] fix compilation --- .../org/elasticsearch/datastreams/LookAHeadTimeTests.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java index 199bc36d833aa..9c9bcaa6d5386 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java @@ -10,7 +10,6 @@ package org.elasticsearch.datastreams; import org.elasticsearch.ResourceAlreadyExistsException; -import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; @@ -118,12 +117,6 @@ public void testLookAheadTimeSettingHigherThanTimeSeriesPollIntervalSetting() { updateIndexSettings(indexSettings); } - private void updateClusterSettings(Settings settings) { - clusterAdmin().updateSettings( - new ClusterUpdateSettingsRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).persistentSettings(settings) - ).actionGet(); - } - private void updateIndexSettings(Settings settings) { try { createIndex("test"); From f3405790525dfda5bba248713a88220fbea8cac0 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 12 Jun 2025 22:15:47 +1000 Subject: [PATCH 7/8] fix tests --- .../org/elasticsearch/datastreams/LookAHeadTimeTests.java | 8 ++++++++ .../InternalClusterInfoServiceSchedulingTests.java | 4 +++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java index 9c9bcaa6d5386..20e17720e1efb 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.datastreams; import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; @@ -117,6 +118,13 @@ public void testLookAheadTimeSettingHigherThanTimeSeriesPollIntervalSetting() { updateIndexSettings(indexSettings); } + @Override + protected void updateClusterSettings(Settings settings) { + clusterAdmin().updateSettings( + new ClusterUpdateSettingsRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).persistentSettings(settings) + ).actionGet(); + } + private void updateIndexSettings(Settings settings) { try { createIndex("test"); diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index 67a745e743b04..f7e667fd5aa9b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -53,7 +53,9 @@ public void testScheduling() { final DiscoveryNodes noMaster = DiscoveryNodes.builder().add(discoveryNode).localNodeId(discoveryNode.getId()).build(); final DiscoveryNodes localMaster = noMaster.withMasterNodeId(discoveryNode.getId()); - final Settings.Builder settingsBuilder = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName()); + final Settings.Builder settingsBuilder = Settings.builder() + .put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName()) + .put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true); if (randomBoolean()) { settingsBuilder.put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), randomIntBetween(10000, 60000) + "ms"); } From d3dd95aaf8f9f5e1108c33f759babd30c745387d Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 13 Jun 2025 12:24:17 +1000 Subject: [PATCH 8/8] extraction --- .../cluster/InternalClusterInfoService.java | 56 +++++++++++-------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 276235b57778c..c792ce377ef33 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -201,34 +201,44 @@ void execute() { logger.trace("starting async refresh"); try (var ignoredRefs = fetchRefs) { - if (diskThresholdEnabled) { - try (var ignored = threadPool.getThreadContext().clearTraceContext()) { - fetchIndicesStats(); - } - } else { - logger.trace("skipping collecting disk usage info from cluster, notifying listeners with empty cluster info"); - indicesStatsSummary = IndicesStatsSummary.EMPTY; + maybeFetchIndicesStats(diskThresholdEnabled); + maybeFetchNodeStats(diskThresholdEnabled || shardHeapThresholdEnabled); + maybeFetchNodesHeapUsage(shardHeapThresholdEnabled); + } + } + + private void maybeFetchIndicesStats(boolean shouldFetch) { + if (shouldFetch) { + try (var ignored = threadPool.getThreadContext().clearTraceContext()) { + fetchIndicesStats(); } + } else { + logger.trace("skipping collecting disk usage info from cluster, notifying listeners with empty indices stats"); + indicesStatsSummary = IndicesStatsSummary.EMPTY; + } + } - if (diskThresholdEnabled || shardHeapThresholdEnabled) { - try (var ignored = threadPool.getThreadContext().clearTraceContext()) { - fetchNodeStats(); - } - } else { - logger.trace("skipping collecting node stats from cluster, notifying listeners with empty node stats"); - leastAvailableSpaceUsages = Map.of(); - mostAvailableSpaceUsages = Map.of(); - maxHeapPerNode = Map.of(); + private void maybeFetchNodeStats(boolean shouldFetch) { + if (shouldFetch) { + try (var ignored = threadPool.getThreadContext().clearTraceContext()) { + fetchNodeStats(); } + } else { + logger.trace("skipping collecting node stats from cluster, notifying listeners with empty node stats"); + leastAvailableSpaceUsages = Map.of(); + mostAvailableSpaceUsages = Map.of(); + maxHeapPerNode = Map.of(); + } + } - if (shardHeapThresholdEnabled) { - try (var ignored = threadPool.getThreadContext().clearTraceContext()) { - fetchNodesHeapUsage(); - } - } else { - logger.trace("skipping collecting shard heap usage from cluster, notifying listeners with empty shard heap usage"); - shardHeapUsagePerNode = Map.of(); + private void maybeFetchNodesHeapUsage(boolean shouldFetch) { + if (shouldFetch) { + try (var ignored = threadPool.getThreadContext().clearTraceContext()) { + fetchNodesHeapUsage(); } + } else { + logger.trace("skipping collecting shard heap usage from cluster, notifying listeners with empty shard heap usage"); + shardHeapUsagePerNode = Map.of(); } }