diff --git a/src/main/java/org/opensearch/performanceanalyzer/writer/EventLogQueueProcessor.java b/src/main/java/org/opensearch/performanceanalyzer/writer/EventLogQueueProcessor.java index 139c5889..ea17f6da 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/writer/EventLogQueueProcessor.java +++ b/src/main/java/org/opensearch/performanceanalyzer/writer/EventLogQueueProcessor.java @@ -27,17 +27,6 @@ package org.opensearch.performanceanalyzer.writer; -import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp; -import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; -import org.opensearch.performanceanalyzer.config.PluginSettings; -import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerConfigAction; -import org.opensearch.performanceanalyzer.metrics.MetricsConfiguration; -import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; -import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics; -import org.opensearch.performanceanalyzer.reader_writer_shared.Event; -import org.opensearch.performanceanalyzer.reader_writer_shared.EventLogFileHandler; -import org.apache.logging.log4j.Logger; - import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -49,20 +38,31 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.LongStream; - import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.config.PluginSettings; +import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerConfigAction; +import org.opensearch.performanceanalyzer.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; +import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics; +import org.opensearch.performanceanalyzer.reader_writer_shared.Event; +import org.opensearch.performanceanalyzer.reader_writer_shared.EventLogFileHandler; public class EventLogQueueProcessor { private static final Logger LOG = LogManager.getLogger(EventLogQueueProcessor.class); private final ScheduledExecutorService writerExecutor = Executors.newScheduledThreadPool(1); - private final int filesCleanupPeriodicityMillis = PluginSettings.instance().getMetricsDeletionInterval(); // defaults to 60seconds + private final int filesCleanupPeriodicityMillis = + PluginSettings.instance().getMetricsDeletionInterval(); // defaults to 60seconds private final EventLogFileHandler eventLogFileHandler; private final long initialDelayMillis; private final long purgePeriodicityMillis; private final PerformanceAnalyzerController controller; private long lastCleanupTimeBucket; private long lastTimeBucket; + public EventLogQueueProcessor( EventLogFileHandler eventLogFileHandler, long initialDelayMillis, @@ -83,7 +83,8 @@ public void scheduleExecutor() { } catch (Exception ex) { LOG.error("Unable to cleanup lingering files from previous plugin run.", ex); } - lastCleanupTimeBucket = PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis()); + lastCleanupTimeBucket = + PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis()); ScheduledFuture futureHandle = writerExecutor.scheduleAtFixedRate( @@ -187,16 +188,25 @@ public void purgeQueueAndPersist() { } private void cleanup() { - // Delete Event log files belonging to time bucket older than past filesCleanupPeriod(defaults to 60s) - long currCleanupTimeBucket = PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis()); - if (currCleanupTimeBucket - lastCleanupTimeBucket > filesCleanupPeriodicityMillis) { - // Get list of files(time buckets) for purging, considered range : [lastCleanupTimeBucket, currCleanupTimeBucket) - List filesForCleanup = LongStream.range(lastCleanupTimeBucket, currCleanupTimeBucket) - .filter(timeMillis -> timeMillis % MetricsConfiguration.SAMPLING_INTERVAL == 0) - .mapToObj(String::valueOf) - .collect(Collectors.toList()); - eventLogFileHandler.deleteFiles(Collections.unmodifiableList(filesForCleanup)); - lastCleanupTimeBucket = currCleanupTimeBucket; + if (lastCleanupTimeBucket != 0) { + // Delete Event log files belonging to time bucket older than past + // filesCleanupPeriod(defaults to 60s) + long currCleanupTimeBucket = + PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis()); + if (currCleanupTimeBucket - lastCleanupTimeBucket > filesCleanupPeriodicityMillis) { + // Get list of files(time buckets) for purging, considered range : + // [lastCleanupTimeBucket, currCleanupTimeBucket) + List filesForCleanup = + LongStream.range(lastCleanupTimeBucket, currCleanupTimeBucket) + .filter( + timeMillis -> + timeMillis % MetricsConfiguration.SAMPLING_INTERVAL + == 0) + .mapToObj(String::valueOf) + .collect(Collectors.toList()); + eventLogFileHandler.deleteFiles(Collections.unmodifiableList(filesForCleanup)); + lastCleanupTimeBucket = currCleanupTimeBucket; + } } }