Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.

Commit 2f3df02

Browse files
authored
Changes to collect shard indexing pressure metrics and push to shared location (#278)
* Changes to collect shard indexing pressure metrics and push to shared location * Code changes to remove pushing the cold store objects and removing isActiveShard flag as it becomes irrelevant now * Addressing comment to change NodeRole to IndexingStage as node role already has a predefined meaning * Checking if class is present while adding the collector in scheduled executor so that the collector will never be invoked if class in not present * Addressing comments.
1 parent 9f83059 commit 2f3df02

File tree

4 files changed

+302
-0
lines changed

4 files changed

+302
-0
lines changed

src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.FaultDetectionMetricsCollector;
2525
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.GCInfoCollector;
2626
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.HeapMetricsCollector;
27+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardIndexingPressureMetricsCollector;
2728
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterServiceEventMetrics;
2829
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterServiceMetrics;
2930
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterThrottlingMetricsCollector;
@@ -202,6 +203,13 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
202203
performanceAnalyzerController,configOverridesWrapper));
203204
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new MasterThrottlingMetricsCollector(
204205
performanceAnalyzerController,configOverridesWrapper));
206+
try {
207+
Class.forName(ShardIndexingPressureMetricsCollector.SHARD_INDEXING_PRESSURE_CLASS_NAME);
208+
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new ShardIndexingPressureMetricsCollector(
209+
performanceAnalyzerController,configOverridesWrapper));
210+
} catch (ClassNotFoundException e) {
211+
LOG.info("Shard IndexingPressure not present in this ES version. Skipping ShardIndexingPressureMetricsCollector");
212+
}
205213
scheduledMetricCollectorsExecutor.start();
206214

207215
EventLog eventLog = new EventLog();
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors;
17+
18+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources;
19+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PerformanceAnalyzerController;
20+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper;
21+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
22+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ShardIndexingPressureDimension;
23+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ShardIndexingPressureValue;
24+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration;
25+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsProcessor;
26+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
27+
import com.fasterxml.jackson.annotation.JsonProperty;
28+
import com.fasterxml.jackson.core.JsonProcessingException;
29+
import com.fasterxml.jackson.databind.ObjectMapper;
30+
import org.apache.logging.log4j.LogManager;
31+
import org.apache.logging.log4j.Logger;
32+
import org.elasticsearch.cluster.service.ClusterService;
33+
import org.elasticsearch.index.IndexingPressure;
34+
import org.jooq.tools.json.JSONObject;
35+
import org.jooq.tools.json.JSONParser;
36+
import org.jooq.tools.json.ParseException;
37+
38+
import java.lang.reflect.Field;
39+
import java.util.Map;
40+
41+
public class ShardIndexingPressureMetricsCollector extends PerformanceAnalyzerMetricsCollector implements MetricsProcessor {
42+
public static final int SAMPLING_TIME_INTERVAL = MetricsConfiguration
43+
.CONFIG_MAP.get(ShardIndexingPressureMetricsCollector.class).samplingInterval;
44+
private static final int KEYS_PATH_LENGTH = 0;
45+
private static final Logger LOG = LogManager.getLogger(ShardIndexingPressureMetricsCollector.class);
46+
private static final ObjectMapper mapper = new ObjectMapper();
47+
private static final JSONParser parser = new JSONParser();
48+
49+
public static final String SHARD_INDEXING_PRESSURE_CLASS_NAME = "org.elasticsearch.index.ShardIndexingPressure";
50+
public static final String CLUSTER_SERVICE_CLASS_NAME = "org.elasticsearch.cluster.service.ClusterService";
51+
public static final String INDEXING_PRESSURE_CLASS_NAME = "org.elasticsearch.index.IndexingPressure";
52+
public static final String SHARD_INDEXING_PRESSURE_STORE_CLASS_NAME = "org.elasticsearch.index.ShardIndexingPressureStore";
53+
public static final String INDEXING_PRESSURE_FIELD_NAME = "indexingPressure";
54+
public static final String SHARD_INDEXING_PRESSURE_FIELD_NAME = "shardIndexingPressure";
55+
public static final String SHARD_INDEXING_PRESSURE_STORE_FIELD_NAME = "shardIndexingPressureStore";
56+
public static final String SHARD_INDEXING_PRESSURE_HOT_STORE_FIELD_NAME = "shardIndexingPressureHotStore";
57+
58+
private final ConfigOverridesWrapper configOverridesWrapper;
59+
private final PerformanceAnalyzerController controller;
60+
private StringBuilder value;
61+
62+
public ShardIndexingPressureMetricsCollector(PerformanceAnalyzerController controller,
63+
ConfigOverridesWrapper configOverridesWrapper) {
64+
super(SAMPLING_TIME_INTERVAL, "ShardIndexingPressureMetricsCollector");
65+
value = new StringBuilder();
66+
this.configOverridesWrapper = configOverridesWrapper;
67+
this.controller = controller;
68+
}
69+
70+
@Override
71+
@SuppressWarnings("unchecked")
72+
public void collectMetrics(long startTime) {
73+
if(!controller.isCollectorEnabled(configOverridesWrapper, getCollectorName())) {
74+
return;
75+
}
76+
77+
try {
78+
ClusterService clusterService = ESResources.INSTANCE.getClusterService();
79+
if (clusterService != null) {
80+
IndexingPressure indexingPressure = (IndexingPressure) getField(CLUSTER_SERVICE_CLASS_NAME, INDEXING_PRESSURE_FIELD_NAME).get(clusterService);
81+
if(indexingPressure != null) {
82+
Object shardIndexingPressure = getField(INDEXING_PRESSURE_CLASS_NAME, SHARD_INDEXING_PRESSURE_FIELD_NAME).get(indexingPressure);
83+
Object shardIndexingPressureStore = getField(SHARD_INDEXING_PRESSURE_CLASS_NAME, SHARD_INDEXING_PRESSURE_STORE_FIELD_NAME).get(shardIndexingPressure);
84+
Map<Long, Object> shardIndexingPressureHotStore =
85+
(Map<Long, Object>) getField(SHARD_INDEXING_PRESSURE_STORE_CLASS_NAME, SHARD_INDEXING_PRESSURE_HOT_STORE_FIELD_NAME)
86+
.get(shardIndexingPressureStore);
87+
88+
value.setLength(0);
89+
shardIndexingPressureHotStore.entrySet().stream().forEach(storeObject -> {
90+
try {
91+
JSONObject tracker = (JSONObject) parser.parse(mapper.writeValueAsString(storeObject.getValue()));
92+
JSONObject shardId = (JSONObject) parser.parse(mapper.writeValueAsString(tracker.get("shardId")));
93+
value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds())
94+
.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor);
95+
value.append(new ShardIndexingPressureStatus(AllMetrics.IndexingStage.COORDINATING.toString(),
96+
shardId.get("indexName").toString(), shardId.get("id").toString(),
97+
Long.parseLong(tracker.get("coordinatingRejections").toString()),
98+
Long.parseLong(tracker.get("currentCoordinatingBytes").toString()),
99+
Long.parseLong(tracker.get("primaryAndCoordinatingLimits").toString()),
100+
Double.longBitsToDouble(Long.parseLong(tracker.get("coordinatingThroughputMovingAverage").toString())),
101+
Long.parseLong(tracker.get("lastSuccessfulCoordinatingRequestTimestamp").toString())).serialize())
102+
.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor);
103+
value.append(new ShardIndexingPressureStatus(AllMetrics.IndexingStage.PRIMARY.toString(),
104+
shardId.get("indexName").toString(), shardId.get("id").toString(),
105+
Long.parseLong(tracker.get("primaryRejections").toString()),
106+
Long.parseLong(tracker.get("currentPrimaryBytes").toString()),
107+
Long.parseLong(tracker.get("primaryAndCoordinatingLimits").toString()),
108+
Double.longBitsToDouble(Long.parseLong(tracker.get("primaryThroughputMovingAverage").toString())),
109+
Long.parseLong(tracker.get("lastSuccessfulPrimaryRequestTimestamp").toString())).serialize())
110+
.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor);
111+
value.append(new ShardIndexingPressureStatus(AllMetrics.IndexingStage.REPLICA.toString(),
112+
shardId.get("indexName").toString(), shardId.get("id").toString(),
113+
Long.parseLong(tracker.get("replicaRejections").toString()),
114+
Long.parseLong(tracker.get("currentReplicaBytes").toString()),
115+
Long.parseLong(tracker.get("replicaLimits").toString()),
116+
Double.longBitsToDouble(Long.parseLong(tracker.get("replicaThroughputMovingAverage").toString())),
117+
Long.parseLong(tracker.get("lastSuccessfulReplicaRequestTimestamp").toString())).serialize())
118+
.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor);
119+
} catch (JsonProcessingException | ParseException e) {
120+
LOG.debug("Exception raised while parsing string to json object. Skipping IndexingPressureMetricsCollector");
121+
}
122+
});
123+
}
124+
}
125+
} catch (NoSuchFieldException | IllegalAccessException | ClassNotFoundException e) {
126+
LOG.debug("Exception raised. Skipping IndexingPressureMetricsCollector");
127+
}
128+
129+
saveMetricValues(value.toString(), startTime);
130+
}
131+
132+
Field getField(String className, String fieldName) throws NoSuchFieldException, ClassNotFoundException {
133+
Class<?> clusterServiceClass = Class.forName(className);
134+
Field indexingPressureField = clusterServiceClass.getDeclaredField(fieldName);
135+
indexingPressureField.setAccessible(true);
136+
return indexingPressureField;
137+
}
138+
139+
@Override
140+
public String getMetricsPath(long startTime, String... keysPath) {
141+
if (keysPath.length != KEYS_PATH_LENGTH) {
142+
throw new RuntimeException("keys length should be " + KEYS_PATH_LENGTH);
143+
}
144+
145+
return PerformanceAnalyzerMetrics.generatePath(startTime, PerformanceAnalyzerMetrics.sShardIndexingPressurePath);
146+
}
147+
148+
static class ShardIndexingPressureStatus extends MetricStatus {
149+
private final String indexingStage;
150+
private final String indexName;
151+
private final String shardId;
152+
private final long rejectionCount;
153+
private final long currentBytes;
154+
private final long currentLimits;
155+
private final double averageWindowThroughput;
156+
private final long lastSuccessfulTimestamp;
157+
158+
public ShardIndexingPressureStatus(String indexingStage, String indexName, String shardId, long rejectionCount, long currentBytes,
159+
long currentLimits, double averageWindowThroughput, long lastSuccessfulTimestamp) {
160+
this.indexingStage = indexingStage;
161+
this.indexName = indexName;
162+
this.shardId = shardId;
163+
this.rejectionCount = rejectionCount;
164+
this.currentBytes = currentBytes;
165+
this.currentLimits = currentLimits;
166+
this.averageWindowThroughput = averageWindowThroughput;
167+
this.lastSuccessfulTimestamp = lastSuccessfulTimestamp;
168+
}
169+
170+
@JsonProperty(ShardIndexingPressureDimension.Constants.INDEXING_STAGE)
171+
public String getIndexingStage() {
172+
return indexingStage;
173+
}
174+
175+
@JsonProperty(ShardIndexingPressureDimension.Constants.INDEX_NAME_VALUE)
176+
public String getIndexName() {
177+
return indexName;
178+
}
179+
180+
@JsonProperty(ShardIndexingPressureDimension.Constants.SHARD_ID_VALUE)
181+
public String getShardId() {
182+
return shardId;
183+
}
184+
185+
@JsonProperty(ShardIndexingPressureValue.Constants.REJECTION_COUNT_VALUE)
186+
public long getRejectionCount() {
187+
return rejectionCount;
188+
}
189+
190+
@JsonProperty(ShardIndexingPressureValue.Constants.CURRENT_BYTES)
191+
public long getCurrentBytes() {
192+
return rejectionCount;
193+
}
194+
195+
@JsonProperty(ShardIndexingPressureValue.Constants.CURRENT_LIMITS)
196+
public long getCurrentLimits() {
197+
return currentLimits;
198+
}
199+
200+
@JsonProperty(ShardIndexingPressureValue.Constants.AVERAGE_WINDOW_THROUGHPUT)
201+
public double getAverageWindowThroughput() {
202+
return averageWindowThroughput;
203+
}
204+
205+
@JsonProperty(ShardIndexingPressureValue.Constants.LAST_SUCCESSFUL_TIMESTAMP)
206+
public long getLastSuccessfulTimestamp() {
207+
return lastSuccessfulTimestamp;
208+
}
209+
}
210+
}

src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources;
1919
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CacheConfigMetricsCollector;
2020
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.FaultDetectionMetricsCollector;
21+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardIndexingPressureMetricsCollector;
2122
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterThrottlingMetricsCollector;
2223
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardStateCollector;
2324
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration;
@@ -57,6 +58,7 @@ public static void configureMetrics() {
5758
MetricsConfiguration.CONFIG_MAP.put(FaultDetectionMetricsCollector.class, cdefault);
5859
MetricsConfiguration.CONFIG_MAP.put(ShardStateCollector.class, cdefault);
5960
MetricsConfiguration.CONFIG_MAP.put(MasterThrottlingMetricsCollector.class, cdefault);
61+
MetricsConfiguration.CONFIG_MAP.put(ShardIndexingPressureMetricsCollector.class, cdefault);
6062
}
6163

6264
// These methods are utility functions for the Node Stat Metrics Collectors. These methods are used by both the all
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors;
17+
18+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.CustomMetricsLocationTestBase;
19+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources;
20+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PerformanceAnalyzerController;
21+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper;
22+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration;
23+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader_writer_shared.Event;
24+
import com.amazon.opendistro.elasticsearch.performanceanalyzer.util.TestUtil;
25+
import org.elasticsearch.cluster.service.ClusterService;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
import org.mockito.Mock;
29+
import org.mockito.Mockito;
30+
import org.powermock.core.classloader.annotations.PrepareForTest;
31+
32+
import java.util.List;
33+
34+
import static org.junit.Assert.assertEquals;
35+
import static org.junit.Assert.assertTrue;
36+
import static org.mockito.MockitoAnnotations.initMocks;
37+
38+
@PrepareForTest(Class.class)
39+
public class ShardIndexingPressureMetricsCollectorTests extends CustomMetricsLocationTestBase {
40+
41+
private ShardIndexingPressureMetricsCollector shardIndexingPressureMetricsCollector;
42+
43+
@Mock
44+
private ClusterService mockClusterService;
45+
46+
@Mock
47+
PerformanceAnalyzerController mockController;
48+
49+
@Mock
50+
ConfigOverridesWrapper mockConfigOverrides;
51+
52+
@Before
53+
public void init() {
54+
initMocks(this);
55+
ESResources.INSTANCE.setClusterService(mockClusterService);
56+
System.setProperty("performanceanalyzer.metrics.log.enabled", "False");
57+
MetricsConfiguration.CONFIG_MAP.put(ShardIndexingPressureMetricsCollector.class, MetricsConfiguration.cdefault);
58+
shardIndexingPressureMetricsCollector = new ShardIndexingPressureMetricsCollector(mockController, mockConfigOverrides);
59+
60+
//clean metricQueue before running every test
61+
TestUtil.readEvents();
62+
}
63+
64+
@Test
65+
public void testShardIndexingPressureMetrics() {
66+
long startTimeInMills = 1153721339;
67+
Mockito.when(mockController.isCollectorEnabled(mockConfigOverrides, "ShardIndexingPressureMetricsCollector"))
68+
.thenReturn(true);
69+
shardIndexingPressureMetricsCollector.saveMetricValues("shard_indexing_pressure_metrics", startTimeInMills);
70+
71+
List<Event> metrics = TestUtil.readEvents();
72+
assertEquals(1, metrics.size());
73+
assertEquals("shard_indexing_pressure_metrics", metrics.get(0).value);
74+
75+
try {
76+
shardIndexingPressureMetricsCollector.saveMetricValues("shard_indexing_pressure_metrics", startTimeInMills, "123");
77+
assertTrue("Negative scenario test: Should have been a RuntimeException", true);
78+
} catch (RuntimeException ex) {
79+
//- expecting exception...1 values passed; 0 expected
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)