Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,21 @@
import org.opensearch.cluster.service.SourcePrioritizedRunnable;
import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.metrics.AllMetrics.MasterMetricDimensions;
import org.opensearch.performanceanalyzer.metrics.AllMetrics.MasterMetricValues;
import org.opensearch.performanceanalyzer.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.metrics.ThreadIDUtil;
import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;

@SuppressWarnings("unchecked")
public class MasterServiceEventMetrics extends PerformanceAnalyzerMetricsCollector
implements MetricsProcessor {
public static final int SAMPLING_TIME_INTERVAL =
MetricsConfiguration.CONFIG_MAP.get(MasterServiceEventMetrics.class).samplingInterval;
private static final Logger LOG = LogManager.getLogger(MasterServiceEventMetrics.class);
private static final String MASTER_NODE_NOT_UP_METRIC = "MasterNodeNotUp";
private static final int KEYS_PATH_LENGTH = 3;
private StringBuilder value;
private static final int TPEXECUTOR_ADD_PENDING_PARAM_COUNT = 3;
Expand Down Expand Up @@ -169,7 +170,8 @@ public void collectMetrics(long startTime) {
}
LOG.debug(() -> "Successfully collected Master Event Metrics.");
} catch (Exception ex) {
StatsCollector.instance().logException(StatExceptionCode.MASTER_METRICS_ERROR);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.MASTER_METRICS_ERROR, "", 1);
LOG.debug(
"Exception in Collecting Master Metrics: {} for startTime {} with ExceptionCode: {}",
() -> ex.toString(),
Expand Down Expand Up @@ -251,7 +253,8 @@ Queue<Runnable> getMasterServiceCurrentQueue() throws Exception {
getPrioritizedTPExecutorCurrentField()
.get(prioritizedOpenSearchThreadPoolExecutor);
} else {
StatsCollector.instance().logMetric(MASTER_NODE_NOT_UP_METRIC);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.MASTER_NODE_NOT_UP, "", 1);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.metrics.AllMetrics.ShardStatsValue;
import org.opensearch.performanceanalyzer.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import org.opensearch.performanceanalyzer.util.Utils;

/**
Expand Down Expand Up @@ -199,7 +201,8 @@ public void collectMetrics(long startTime) {
() -> ex.toString(),
() -> startTime,
() -> StatExceptionCode.NODESTATS_COLLECTION_ERROR.toString());
StatsCollector.instance().logException(StatExceptionCode.NODESTATS_COLLECTION_ERROR);
PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat(
ExceptionsAndErrors.NODESTATS_COLLECTION_ERROR, "", 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.metrics.AllMetrics.ShardStatsValue;
import org.opensearch.performanceanalyzer.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import org.opensearch.performanceanalyzer.util.Utils;

/**
Expand Down Expand Up @@ -266,7 +268,8 @@ public void collectMetrics(long startTime) {
() -> ex.toString(),
() -> startTime,
() -> StatExceptionCode.NODESTATS_COLLECTION_ERROR.toString());
StatsCollector.instance().logException(StatExceptionCode.NODESTATS_COLLECTION_ERROR);
PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat(
ExceptionsAndErrors.NODESTATS_COLLECTION_ERROR, "", 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerPlugin;
import org.opensearch.performanceanalyzer.collectors.ScheduledMetricCollectorsExecutor;
import org.opensearch.performanceanalyzer.collectors.StatExceptionCode;
import org.opensearch.performanceanalyzer.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper;
import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;

public class PerformanceAnalyzerController {
private static final String PERFORMANCE_ANALYZER_ENABLED_CONF =
Expand Down Expand Up @@ -292,8 +292,8 @@ private void saveStateToConf(boolean featureEnabled, String fileName) {
try {
Path destDir = Paths.get(getDataDirectory());
if (!Files.exists(destDir)) {
StatsCollector.instance()
.logException(StatExceptionCode.CONFIG_DIR_NOT_FOUND);
PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat(
ExceptionsAndErrors.CONFIG_DIR_NOT_FOUND, "", 1);
Files.createDirectory(destDir);
}
Files.write(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.collectors.StatExceptionCode;
import org.opensearch.performanceanalyzer.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;

/**
* Class that handles updating cluster settings, and notifying the listeners when cluster settings
Expand Down Expand Up @@ -245,8 +245,8 @@ private void callIntSettingListeners(final Setting<Integer> setting, int setting
}
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance()
.logException(StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR, "", 1);
}
}

Expand All @@ -267,8 +267,8 @@ private void callStringSettingListeners(final Setting<String> setting, String se
}
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance()
.logException(StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR, "", 1);
}
}
/** Class that handles response to GET /_cluster/settings */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.performanceanalyzer.collectors.StatExceptionCode;
import org.opensearch.performanceanalyzer.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.metrics.AllMetrics.CommonDimension;
import org.opensearch.performanceanalyzer.metrics.AllMetrics.CommonMetric;
import org.opensearch.performanceanalyzer.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.metrics.ThreadIDUtil;
import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;
import org.opensearch.search.internal.SearchContext;

public class PerformanceAnalyzerSearchListener
Expand Down Expand Up @@ -68,8 +68,8 @@ public void onPreQueryPhase(SearchContext searchContext) {
getSearchListener().preQueryPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance()
.logException(StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR, "", 1);
}
}

Expand All @@ -79,8 +79,8 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
getSearchListener().queryPhase(searchContext, tookInNanos);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance()
.logException(StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR, "", 1);
}
}

Expand All @@ -90,8 +90,8 @@ public void onFailedQueryPhase(SearchContext searchContext) {
getSearchListener().failedQueryPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance()
.logException(StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR, "", 1);
}
}

Expand All @@ -101,8 +101,8 @@ public void onPreFetchPhase(SearchContext searchContext) {
getSearchListener().preFetchPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance()
.logException(StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR, "", 1);
}
}

Expand All @@ -112,8 +112,8 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
getSearchListener().fetchPhase(searchContext, tookInNanos);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance()
.logException(StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR, "", 1);
}
}

Expand All @@ -123,8 +123,8 @@ public void onFailedFetchPhase(SearchContext searchContext) {
getSearchListener().failedFetchPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance()
.logException(StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR, "", 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest;
import org.opensearch.performanceanalyzer.collectors.StatExceptionCode;
import org.opensearch.performanceanalyzer.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportRequest;
Expand Down Expand Up @@ -114,8 +114,8 @@ private TransportChannel getShardBulkChannel(T request, TransportChannel channel
LOG.error(ex);
logOnce = true;
}
StatsCollector.instance()
.logException(StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR, "", 1);
}

return performanceanalyzerChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,11 @@ public void run() {
private static final int REQUEST_REMOTE_ERRORS = Math.abs(RANDOM.nextInt() % MAX_COUNT);
private static final int READER_PARSER_ERRORS = Math.abs(RANDOM.nextInt() % MAX_COUNT);
private static final int READER_RESTART_PROCESSINGS = Math.abs(RANDOM.nextInt() % MAX_COUNT);
private static final int OTHERS = Math.abs(RANDOM.nextInt() % MAX_COUNT);
private static final int TOTAL_ERRORS =
MASTER_METRICS_ERRORS
+ REQUEST_REMOTE_ERRORS
+ READER_PARSER_ERRORS
+ READER_RESTART_PROCESSINGS
+ OTHERS;
+ READER_RESTART_PROCESSINGS;
private static final AtomicInteger DEFAULT_VAL = new AtomicInteger(0);
private static final int EXEC_COUNT = 20;

Expand All @@ -97,10 +95,6 @@ public void testStats() throws Exception {
exceptionCodeList.add(StatExceptionCode.READER_RESTART_PROCESSING);
}

for (int i = 0; i < OTHERS; i++) {
exceptionCodeList.add(null);
}

Collections.shuffle(exceptionCodeList);
StatsCollector sc = StatsCollector.instance();
int iterateSize = exceptionCodeList.size() / EXEC_COUNT;
Expand Down Expand Up @@ -143,11 +137,6 @@ private static void assertExpected(StatsCollector sc) {
StatExceptionCode.READER_RESTART_PROCESSING.toString(), DEFAULT_VAL)
.get(),
READER_RESTART_PROCESSINGS);
assertEquals(
sc.getCounters()
.getOrDefault(StatExceptionCode.OTHER.toString(), DEFAULT_VAL)
.get(),
OTHERS);
assertEquals(
sc.getCounters()
.getOrDefault(StatExceptionCode.TOTAL_ERROR.toString(), DEFAULT_VAL)
Expand Down Expand Up @@ -195,8 +184,6 @@ private static void iterate(

if (exceptionCode != null) {
sc.logException(exceptionCode);
} else {
sc.logException();
}
count++;
}
Expand Down