diff --git a/docs/changelog/131236.yaml b/docs/changelog/131236.yaml new file mode 100644 index 0000000000000..13e3190098404 --- /dev/null +++ b/docs/changelog/131236.yaml @@ -0,0 +1,6 @@ +pr: 131236 +summary: Correctly handling `download_database_on_pipeline_creation` within a pipeline + processor within a default or final pipeline +area: Ingest Node +type: bug +issues: [] diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index 087a5b4e6296c..6d55e94b0a23d 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexAbstraction; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -47,6 +48,7 @@ import org.elasticsearch.transport.RemoteTransportException; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -280,11 +282,14 @@ static boolean hasAtLeastOneGeoipProcessor(ProjectMetadata projectMetadata) { return false; } - return projectMetadata.indices().values().stream().anyMatch(indexMetadata -> { + for (IndexMetadata indexMetadata : projectMetadata.indices().values()) { String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings()); String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings()); - return checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline); - }); + if (checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline)) { + return true; + } + } + return false; } /** @@ -297,12 +302,26 @@ static boolean hasAtLeastOneGeoipProcessor(ProjectMetadata projectMetadata) { @SuppressWarnings("unchecked") private static Set pipelinesWithGeoIpProcessor(ProjectMetadata projectMetadata, boolean downloadDatabaseOnPipelineCreation) { List configurations = IngestService.getPipelines(projectMetadata); + Map pipelineConfigById = HashMap.newHashMap(configurations.size()); + for (PipelineConfiguration configuration : configurations) { + pipelineConfigById.put(configuration.getId(), configuration); + } + // this map is used to keep track of pipelines that have already been checked + Map pipelineHasGeoProcessorById = HashMap.newHashMap(configurations.size()); Set ids = new HashSet<>(); // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (PipelineConfiguration configuration : configurations) { List> processors = (List>) configuration.getConfig().get(Pipeline.PROCESSORS_KEY); - if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) { - ids.add(configuration.getId()); + String pipelineName = configuration.getId(); + if (pipelineHasGeoProcessorById.containsKey(pipelineName) == false) { + if (hasAtLeastOneGeoipProcessor( + processors, + downloadDatabaseOnPipelineCreation, + pipelineConfigById, + pipelineHasGeoProcessorById + )) { + ids.add(pipelineName); + } } } return Collections.unmodifiableSet(ids); @@ -312,13 +331,27 @@ private static Set pipelinesWithGeoIpProcessor(ProjectMetadata projectMe * Check if a list of processor contains at least a geoip processor. * @param processors List of processors. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. + * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration + * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor + * (true), does not reference a geoip processor (false), or we are currently trying to figure that + * out (null). * @return true if a geoip processor is found in the processor list. */ - private static boolean hasAtLeastOneGeoipProcessor(List> processors, boolean downloadDatabaseOnPipelineCreation) { + private static boolean hasAtLeastOneGeoipProcessor( + List> processors, + boolean downloadDatabaseOnPipelineCreation, + Map pipelineConfigById, + Map pipelineHasGeoProcessorById + ) { if (processors != null) { // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (Map processor : processors) { - if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation)) { + if (hasAtLeastOneGeoipProcessor( + processor, + downloadDatabaseOnPipelineCreation, + pipelineConfigById, + pipelineHasGeoProcessorById + )) { return true; } } @@ -330,10 +363,19 @@ private static boolean hasAtLeastOneGeoipProcessor(List> pro * Check if a processor config is a geoip processor or contains at least a geoip processor. * @param processor Processor config. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. + * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration + * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor + * (true), does not reference a geoip processor (false), or we are currently trying to figure that + * out (null). * @return true if a geoip processor is found in the processor list. */ @SuppressWarnings("unchecked") - private static boolean hasAtLeastOneGeoipProcessor(Map processor, boolean downloadDatabaseOnPipelineCreation) { + private static boolean hasAtLeastOneGeoipProcessor( + Map processor, + boolean downloadDatabaseOnPipelineCreation, + Map pipelineConfigById, + Map pipelineHasGeoProcessorById + ) { if (processor == null) { return false; } @@ -352,27 +394,51 @@ private static boolean hasAtLeastOneGeoipProcessor(Map processor } } - return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation) - || isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation); + return isProcessorWithOnFailureGeoIpProcessor( + processor, + downloadDatabaseOnPipelineCreation, + pipelineConfigById, + pipelineHasGeoProcessorById + ) + || isForeachProcessorWithGeoipProcessor( + processor, + downloadDatabaseOnPipelineCreation, + pipelineConfigById, + pipelineHasGeoProcessorById + ) + || isPipelineProcessorWithGeoIpProcessor( + processor, + downloadDatabaseOnPipelineCreation, + pipelineConfigById, + pipelineHasGeoProcessorById + ); } /** * Check if a processor config has an on_failure clause containing at least a geoip processor. * @param processor Processor config. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. + * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration + * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor + * (true), does not reference a geoip processor (false), or we are currently trying to figure that + * out (null). * @return true if a geoip processor is found in the processor list. */ @SuppressWarnings("unchecked") private static boolean isProcessorWithOnFailureGeoIpProcessor( Map processor, - boolean downloadDatabaseOnPipelineCreation + boolean downloadDatabaseOnPipelineCreation, + Map pipelineConfigById, + Map pipelineHasGeoProcessorById ) { // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (Object value : processor.values()) { if (value instanceof Map && hasAtLeastOneGeoipProcessor( ((Map>>) value).get("on_failure"), - downloadDatabaseOnPipelineCreation + downloadDatabaseOnPipelineCreation, + pipelineConfigById, + pipelineHasGeoProcessorById )) { return true; } @@ -384,13 +450,84 @@ && hasAtLeastOneGeoipProcessor( * Check if a processor is a foreach processor containing at least a geoip processor. * @param processor Processor config. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. + * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration + * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor + * (true), does not reference a geoip processor (false), or we are currently trying to figure that + * out (null). * @return true if a geoip processor is found in the processor list. */ @SuppressWarnings("unchecked") - private static boolean isForeachProcessorWithGeoipProcessor(Map processor, boolean downloadDatabaseOnPipelineCreation) { + private static boolean isForeachProcessorWithGeoipProcessor( + Map processor, + boolean downloadDatabaseOnPipelineCreation, + Map pipelineConfigById, + Map pipelineHasGeoProcessorById + ) { final Map processorConfig = (Map) processor.get("foreach"); return processorConfig != null - && hasAtLeastOneGeoipProcessor((Map) processorConfig.get("processor"), downloadDatabaseOnPipelineCreation); + && hasAtLeastOneGeoipProcessor( + (Map) processorConfig.get("processor"), + downloadDatabaseOnPipelineCreation, + pipelineConfigById, + pipelineHasGeoProcessorById + ); + } + + /** + * Check if a processor is a pipeline processor containing at least a geoip processor. This method also updates + * pipelineHasGeoProcessorById with a result for any pipelines it looks at. + * @param processor Processor config. + * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. + * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration + * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor + * (true), does not reference a geoip processor (false), or we are currently trying to figure that + * out (null). + * @return true if a geoip processor is found in the processors of this processor if this processor is a pipeline processor. + */ + @SuppressWarnings("unchecked") + private static boolean isPipelineProcessorWithGeoIpProcessor( + Map processor, + boolean downloadDatabaseOnPipelineCreation, + Map pipelineConfigById, + Map pipelineHasGeoProcessorById + ) { + final Map processorConfig = (Map) processor.get("pipeline"); + if (processorConfig != null) { + String pipelineName = (String) processorConfig.get("name"); + if (pipelineName != null) { + if (pipelineHasGeoProcessorById.containsKey(pipelineName)) { + if (pipelineHasGeoProcessorById.get(pipelineName) == null) { + /* + * If the value is null here, it indicates that this method has been called recursively with the same pipeline name. + * This will cause a runtime error when the pipeline is executed, but we're avoiding changing existing behavior at + * server startup time. Instead, we just bail out as quickly as possible. It is possible that this could lead to a + * geo database not being downloaded for the pipeline, but it doesn't really matter since the pipeline was going to + * fail anyway. + */ + pipelineHasGeoProcessorById.put(pipelineName, false); + } + } else { + List> childProcessors = null; + PipelineConfiguration config = pipelineConfigById.get(pipelineName); + if (config != null) { + childProcessors = (List>) config.getConfig().get(Pipeline.PROCESSORS_KEY); + } + // We initialize this to null so that we know it's in progress and can use it to avoid stack overflow errors: + pipelineHasGeoProcessorById.put(pipelineName, null); + pipelineHasGeoProcessorById.put( + pipelineName, + hasAtLeastOneGeoipProcessor( + childProcessors, + downloadDatabaseOnPipelineCreation, + pipelineConfigById, + pipelineHasGeoProcessorById + ) + ); + } + return pipelineHasGeoProcessorById.get(pipelineName); + } + } + return false; } // starts GeoIP downloader task for a single project diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java index eccc29d22277f..09eb7fb65585d 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java @@ -52,6 +52,101 @@ public void testHasAtLeastOneGeoipProcessorWhenDownloadDatabaseOnPipelineCreatio } + /* + * This tests that if a default or final pipeline has a pipeline processor that has a geoip processor that has + * download_database_on_pipeline_creation set to false, then we will correctly acknowledge that the pipeline has a geoip processor so + * that we download it appropriately. + */ + public void testHasAtLeastOneGeoipProcessorInPipelineProcessorWhenDownloadDatabaseOnPipelineCreationIsFalse() throws IOException { + String innerInnerPipelineJson = """ + { + "processors":[""" + getGeoIpProcessor(false) + """ + ] + } + """; + String innerPipelineJson = """ + { + "processors":[{"pipeline": {"name": "innerInnerPipeline"}} + ] + } + """; + String outerPipelineJson = """ + { + "processors":[{"pipeline": {"name": "innerPipeline"}} + ] + } + """; + IngestMetadata ingestMetadata = new IngestMetadata( + Map.of( + "innerInnerPipeline", + new PipelineConfiguration("innerInnerPipeline", new BytesArray(innerInnerPipelineJson), XContentType.JSON), + "innerPipeline", + new PipelineConfiguration("innerPipeline", new BytesArray(innerPipelineJson), XContentType.JSON), + "outerPipeline", + new PipelineConfiguration("outerPipeline", new BytesArray(outerPipelineJson), XContentType.JSON) + ) + ); + // The pipeline is not used in any index, expected to return false. + var projectMetadata = projectMetadataWithIndex(b -> {}, ingestMetadata); + assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); + + // The pipeline is set as default pipeline in an index, expected to return true. + projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "outerPipeline"), ingestMetadata); + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); + + // The pipeline is set as final pipeline in an index, expected to return true. + projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.FINAL_PIPELINE.getKey(), "outerPipeline"), ingestMetadata); + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); + } + + public void testHasAtLeastOneGeoipProcessorRecursion() throws IOException { + /* + * The pipeline in this test is invalid -- it has a cycle from outerPipeline -> innerPipeline -> innerInnerPipeline -> + * innerPipeline. Since this method is called at server startup, we want to make sure that we don't get a StackOverFlowError and + * that we don't throw any kind of validation exception (since that would be an unexpected change of behavior). + */ + String innerInnerPipelineJson = """ + { + "processors":[""" + getGeoIpProcessor(false) + """ + , {"pipeline": {"name": "innerPipeline"}} + ] + } + """; + String innerPipelineJson = """ + { + "processors":[{"pipeline": {"name": "innerInnerPipeline"}} + ] + } + """; + String outerPipelineJson = """ + { + "processors":[{"pipeline": {"name": "innerPipeline"}} + ] + } + """; + IngestMetadata ingestMetadata = new IngestMetadata( + Map.of( + "innerInnerPipeline", + new PipelineConfiguration("innerInnerPipeline", new BytesArray(innerInnerPipelineJson), XContentType.JSON), + "innerPipeline", + new PipelineConfiguration("innerPipeline", new BytesArray(innerPipelineJson), XContentType.JSON), + "outerPipeline", + new PipelineConfiguration("outerPipeline", new BytesArray(outerPipelineJson), XContentType.JSON) + ) + ); + // The pipeline is not used in any index, expected to return false. + var projectMetadata = projectMetadataWithIndex(b -> {}, ingestMetadata); + assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); + + // The pipeline is set as default pipeline in an index, expected to return true. + projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "outerPipeline"), ingestMetadata); + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); + + // The pipeline is set as final pipeline in an index, expected to return true. + projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.FINAL_PIPELINE.getKey(), "outerPipeline"), ingestMetadata); + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); + } + public void testHasAtLeastOneGeoipProcessor() throws IOException { var projectId = Metadata.DEFAULT_PROJECT_ID; List expectHitsInputs = getPipelinesWithGeoIpProcessors(true);