Skip to content

Commit 0702e42

Browse files
nicktindallywangdrjernst
authored
Add heap usage estimate to ClusterInfo (#128723)
Co-authored-by: ywangd <[email protected]> Co-authored-by: rjernst <[email protected]> Relates: ES-11445
1 parent be703a0 commit 0702e42

File tree

31 files changed

+376
-43
lines changed

31 files changed

+376
-43
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.apache.lucene.index.DirectoryReader;
1212
import org.apache.lucene.store.LockObtainFailedException;
13+
import org.apache.lucene.util.SetOnce;
1314
import org.elasticsearch.ExceptionsHelper;
1415
import org.elasticsearch.action.ActionListener;
1516
import org.elasticsearch.action.index.IndexRequest;
@@ -19,6 +20,8 @@
1920
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
2021
import org.elasticsearch.cluster.ClusterState;
2122
import org.elasticsearch.cluster.InternalClusterInfoService;
23+
import org.elasticsearch.cluster.ShardHeapUsage;
24+
import org.elasticsearch.cluster.ShardHeapUsageCollector;
2225
import org.elasticsearch.cluster.metadata.IndexMetadata;
2326
import org.elasticsearch.cluster.node.DiscoveryNode;
2427
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@@ -62,6 +65,7 @@
6265
import org.elasticsearch.indices.IndicesService;
6366
import org.elasticsearch.indices.breaker.CircuitBreakerService;
6467
import org.elasticsearch.indices.recovery.RecoveryState;
68+
import org.elasticsearch.plugins.ClusterPlugin;
6569
import org.elasticsearch.plugins.Plugin;
6670
import org.elasticsearch.search.builder.SearchSourceBuilder;
6771
import org.elasticsearch.test.DummyShardLock;
@@ -82,6 +86,7 @@
8286
import java.util.Collections;
8387
import java.util.List;
8488
import java.util.Locale;
89+
import java.util.Map;
8590
import java.util.Optional;
8691
import java.util.concurrent.BrokenBarrierException;
8792
import java.util.concurrent.CountDownLatch;
@@ -90,6 +95,7 @@
9095
import java.util.concurrent.atomic.AtomicBoolean;
9196
import java.util.concurrent.atomic.AtomicReference;
9297
import java.util.function.Predicate;
98+
import java.util.stream.Collectors;
9399
import java.util.stream.Stream;
94100

95101
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
@@ -111,12 +117,13 @@
111117
import static org.hamcrest.Matchers.equalTo;
112118
import static org.hamcrest.Matchers.greaterThan;
113119
import static org.hamcrest.Matchers.instanceOf;
120+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
114121

115122
public class IndexShardIT extends ESSingleNodeTestCase {
116123

117124
@Override
118125
protected Collection<Class<? extends Plugin>> getPlugins() {
119-
return pluginList(InternalSettingsPlugin.class);
126+
return pluginList(InternalSettingsPlugin.class, BogusShardHeapUsagePlugin.class);
120127
}
121128

122129
public void testLockTryingToDelete() throws Exception {
@@ -254,6 +261,20 @@ public void testExpectedShardSizeIsPresent() throws InterruptedException {
254261
assertThat(dataSetSize.get(), greaterThan(0L));
255262
}
256263

264+
public void testHeapUsageEstimateIsPresent() {
265+
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
266+
ClusterInfoServiceUtils.refresh(clusterInfoService);
267+
ClusterState state = getInstanceFromNode(ClusterService.class).state();
268+
Map<String, ShardHeapUsage> shardHeapUsages = clusterInfoService.getClusterInfo().getShardHeapUsages();
269+
assertNotNull(shardHeapUsages);
270+
assertEquals(state.nodes().size(), shardHeapUsages.size());
271+
for (DiscoveryNode node : state.nodes()) {
272+
assertTrue(shardHeapUsages.containsKey(node.getId()));
273+
ShardHeapUsage shardHeapUsage = shardHeapUsages.get(node.getId());
274+
assertThat(shardHeapUsage.estimatedFreeBytes(), lessThanOrEqualTo(shardHeapUsage.totalBytes()));
275+
}
276+
}
277+
257278
public void testIndexCanChangeCustomDataPath() throws Exception {
258279
final String index = "test-custom-data-path";
259280
final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10));
@@ -797,4 +818,40 @@ private static void assertAllIndicesRemovedAndDeletionCompleted(Iterable<Indices
797818
assertBusy(() -> assertFalse(indicesService.hasUncompletedPendingDeletes()), 1, TimeUnit.MINUTES);
798819
}
799820
}
821+
822+
public static class BogusShardShardHeapUsageCollector implements ShardHeapUsageCollector {
823+
824+
private final BogusShardHeapUsagePlugin plugin;
825+
826+
public BogusShardShardHeapUsageCollector(BogusShardHeapUsagePlugin plugin) {
827+
this.plugin = plugin;
828+
}
829+
830+
@Override
831+
public void collectClusterHeapUsage(ActionListener<Map<String, Long>> listener) {
832+
ActionListener.completeWith(
833+
listener,
834+
() -> plugin.getClusterService()
835+
.state()
836+
.nodes()
837+
.stream()
838+
.collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> randomNonNegativeLong()))
839+
);
840+
}
841+
}
842+
843+
public static class BogusShardHeapUsagePlugin extends Plugin implements ClusterPlugin {
844+
845+
private final SetOnce<ClusterService> clusterService = new SetOnce<>();
846+
847+
@Override
848+
public Collection<?> createComponents(PluginServices services) {
849+
clusterService.set(services.clusterService());
850+
return List.of();
851+
}
852+
853+
public ClusterService getClusterService() {
854+
return clusterService.get();
855+
}
856+
}
800857
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#
2+
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
# or more contributor license agreements. Licensed under the "Elastic License
4+
# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
# Public License v 1"; you may not use this file except in compliance with, at
6+
# your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
# License v3.0 only", or the "Server Side Public License, v 1".
8+
#
9+
10+
org.elasticsearch.index.shard.IndexShardIT$BogusShardShardHeapUsageCollector

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ static TransportVersion def(int id) {
293293
public static final TransportVersion SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS = def(9_093_0_00);
294294
public static final TransportVersion ML_INFERENCE_ELASTIC_RERANK = def(9_094_0_00);
295295
public static final TransportVersion SEARCH_LOAD_PER_INDEX_STATS = def(9_095_0_00);
296+
public static final TransportVersion HEAP_USAGE_IN_CLUSTER_INFO = def(9_096_0_00);
296297

297298
/*
298299
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
5757
final Map<ShardId, Long> shardDataSetSizes;
5858
final Map<NodeAndShard, String> dataPath;
5959
final Map<NodeAndPath, ReservedSpace> reservedSpace;
60+
final Map<String, ShardHeapUsage> shardHeapUsages;
6061

6162
protected ClusterInfo() {
62-
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
63+
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
6364
}
6465

6566
/**
@@ -71,6 +72,7 @@ protected ClusterInfo() {
7172
* @param shardDataSetSizes a shard id to data set size in bytes mapping per shard
7273
* @param dataPath the shard routing to datapath mapping
7374
* @param reservedSpace reserved space per shard broken down by node and data path
75+
* @param shardHeapUsages shard heap usage broken down by node
7476
* @see #shardIdentifierFromRouting
7577
*/
7678
public ClusterInfo(
@@ -79,14 +81,16 @@ public ClusterInfo(
7981
Map<String, Long> shardSizes,
8082
Map<ShardId, Long> shardDataSetSizes,
8183
Map<NodeAndShard, String> dataPath,
82-
Map<NodeAndPath, ReservedSpace> reservedSpace
84+
Map<NodeAndPath, ReservedSpace> reservedSpace,
85+
Map<String, ShardHeapUsage> shardHeapUsages
8386
) {
8487
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
8588
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
8689
this.shardSizes = Map.copyOf(shardSizes);
8790
this.shardDataSetSizes = Map.copyOf(shardDataSetSizes);
8891
this.dataPath = Map.copyOf(dataPath);
8992
this.reservedSpace = Map.copyOf(reservedSpace);
93+
this.shardHeapUsages = Map.copyOf(shardHeapUsages);
9094
}
9195

9296
public ClusterInfo(StreamInput in) throws IOException {
@@ -98,6 +102,11 @@ public ClusterInfo(StreamInput in) throws IOException {
98102
? in.readImmutableMap(NodeAndShard::new, StreamInput::readString)
99103
: in.readImmutableMap(nested -> NodeAndShard.from(new ShardRouting(nested)), StreamInput::readString);
100104
this.reservedSpace = in.readImmutableMap(NodeAndPath::new, ReservedSpace::new);
105+
if (in.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
106+
this.shardHeapUsages = in.readImmutableMap(ShardHeapUsage::new);
107+
} else {
108+
this.shardHeapUsages = Map.of();
109+
}
101110
}
102111

103112
@Override
@@ -112,6 +121,9 @@ public void writeTo(StreamOutput out) throws IOException {
112121
out.writeMap(this.dataPath, (o, k) -> createFakeShardRoutingFromNodeAndShard(k).writeTo(o), StreamOutput::writeString);
113122
}
114123
out.writeMap(this.reservedSpace);
124+
if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
125+
out.writeMap(this.shardHeapUsages, StreamOutput::writeWriteable);
126+
}
115127
}
116128

117129
/**
@@ -192,9 +204,22 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
192204
return builder.endObject(); // NodeAndPath
193205
}),
194206
endArray() // end "reserved_sizes"
207+
// NOTE: We don't serialize shardHeapUsages at this stage, to avoid
208+
// committing to API payloads until the feature is settled
195209
);
196210
}
197211

212+
/**
213+
* Returns a node id to estimated heap usage mapping for all nodes that we have such data for.
214+
* Note that these estimates should be considered minimums. They may be used to determine whether
215+
* there IS NOT capacity to do something, but not to determine that there IS capacity to do something.
216+
* Also note that the map may not be complete, it may contain none, or a subset of the nodes in
217+
* the cluster at any time. It may also contain entries for nodes that have since left the cluster.
218+
*/
219+
public Map<String, ShardHeapUsage> getShardHeapUsages() {
220+
return shardHeapUsages;
221+
}
222+
198223
/**
199224
* Returns a node id to disk usage mapping for the path that has the least available space on the node.
200225
* Note that this does not take account of reserved space: there may be another path with less available _and unreserved_ space.

server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ public ClusterInfo getClusterInfo() {
153153
shardSizes.toImmutableMap(),
154154
shardDataSetSizes,
155155
dataPath,
156+
Map.of(),
156157
Map.of()
157158
);
158159
}

0 commit comments

Comments
 (0)