Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,6 @@
package org.opensearch.performanceanalyzer.writer;


import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.config.PluginSettings;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerConfigAction;
import org.opensearch.performanceanalyzer.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.reader_writer_shared.Event;
import org.opensearch.performanceanalyzer.reader_writer_shared.EventLogFileHandler;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -49,20 +38,31 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.config.PluginSettings;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerConfigAction;
import org.opensearch.performanceanalyzer.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.reader_writer_shared.Event;
import org.opensearch.performanceanalyzer.reader_writer_shared.EventLogFileHandler;

public class EventLogQueueProcessor {
private static final Logger LOG = LogManager.getLogger(EventLogQueueProcessor.class);

private final ScheduledExecutorService writerExecutor = Executors.newScheduledThreadPool(1);
private final int filesCleanupPeriodicityMillis = PluginSettings.instance().getMetricsDeletionInterval(); // defaults to 60seconds
private final int filesCleanupPeriodicityMillis =
PluginSettings.instance().getMetricsDeletionInterval(); // defaults to 60seconds
private final EventLogFileHandler eventLogFileHandler;
private final long initialDelayMillis;
private final long purgePeriodicityMillis;
private final PerformanceAnalyzerController controller;
private long lastCleanupTimeBucket;
private long lastTimeBucket;

public EventLogQueueProcessor(
EventLogFileHandler eventLogFileHandler,
long initialDelayMillis,
Expand All @@ -83,7 +83,8 @@ public void scheduleExecutor() {
} catch (Exception ex) {
LOG.error("Unable to cleanup lingering files from previous plugin run.", ex);
}
lastCleanupTimeBucket = PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis());
lastCleanupTimeBucket =
PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis());

ScheduledFuture<?> futureHandle =
writerExecutor.scheduleAtFixedRate(
Expand Down Expand Up @@ -187,16 +188,25 @@ public void purgeQueueAndPersist() {
}

private void cleanup() {
// Delete Event log files belonging to time bucket older than past filesCleanupPeriod(defaults to 60s)
long currCleanupTimeBucket = PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis());
if (currCleanupTimeBucket - lastCleanupTimeBucket > filesCleanupPeriodicityMillis) {
// Get list of files(time buckets) for purging, considered range : [lastCleanupTimeBucket, currCleanupTimeBucket)
List<String> filesForCleanup = LongStream.range(lastCleanupTimeBucket, currCleanupTimeBucket)
.filter(timeMillis -> timeMillis % MetricsConfiguration.SAMPLING_INTERVAL == 0)
.mapToObj(String::valueOf)
.collect(Collectors.toList());
eventLogFileHandler.deleteFiles(Collections.unmodifiableList(filesForCleanup));
lastCleanupTimeBucket = currCleanupTimeBucket;
if (lastCleanupTimeBucket != 0) {
// Delete Event log files belonging to time bucket older than past
// filesCleanupPeriod(defaults to 60s)
long currCleanupTimeBucket =
PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis());
if (currCleanupTimeBucket - lastCleanupTimeBucket > filesCleanupPeriodicityMillis) {
// Get list of files(time buckets) for purging, considered range :
// [lastCleanupTimeBucket, currCleanupTimeBucket)
List<String> filesForCleanup =
LongStream.range(lastCleanupTimeBucket, currCleanupTimeBucket)
.filter(
timeMillis ->
timeMillis % MetricsConfiguration.SAMPLING_INTERVAL
== 0)
Comment on lines +203 to +204
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a little inefficient to test every number. What if we did something like this?

List<String> filesForCleanup = new ArrayList<>();

// ugly expression to handle the case when lastCleanupTimeBucket is divisible by SAMPLING_INTERVAL
// If I did this correctly, then k should satisfy the following: 
//   (k - 1) * SAMPLING_INTERVAL < lastCleanupTimeBucket <= k * SAMPLING_INTERVAL
long k = lastCleanupTimeBucket / SAMPLING_INTERVAL +
        lastCleanupTimebucket % SAMPLING_INTERVAL == 0 ? 0L : 1L;

for (int tick = k * SAMPLING_INTERVAL; tick < currCleanupTimeBucket; tick += SAMPLING_INTERVAL) {
  filesForCleanup.add(String::valueOf(tick));
}

.mapToObj(String::valueOf)
.collect(Collectors.toList());
eventLogFileHandler.deleteFiles(Collections.unmodifiableList(filesForCleanup));
lastCleanupTimeBucket = currCleanupTimeBucket;
}
}
}

Expand Down