Skip to content

Commit 03dc85c

Browse files
joegallomasseyke
andcommitted
Correctly handling download_database_on_pipeline_creation within a pipeline processor within a default or final pipeline (elastic#131236) (elastic#131649)
Co-authored-by: Keith Massey <[email protected]>
1 parent 8de6d66 commit 03dc85c

File tree

3 files changed

+264
-14
lines changed

3 files changed

+264
-14
lines changed

docs/changelog/131236.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 131236
2+
summary: Correctly handling `download_database_on_pipeline_creation` within a pipeline
3+
processor within a default or final pipeline
4+
area: Ingest Node
5+
type: bug
6+
issues: []

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

Lines changed: 151 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.cluster.ClusterState;
2222
import org.elasticsearch.cluster.ClusterStateListener;
2323
import org.elasticsearch.cluster.metadata.IndexAbstraction;
24+
import org.elasticsearch.cluster.metadata.IndexMetadata;
2425
import org.elasticsearch.cluster.node.DiscoveryNode;
2526
import org.elasticsearch.cluster.service.ClusterService;
2627
import org.elasticsearch.common.settings.Setting;
@@ -44,6 +45,7 @@
4445
import org.elasticsearch.transport.RemoteTransportException;
4546

4647
import java.util.Collections;
48+
import java.util.HashMap;
4749
import java.util.HashSet;
4850
import java.util.List;
4951
import java.util.Map;
@@ -248,11 +250,14 @@ static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
248250
return false;
249251
}
250252

251-
return clusterState.getMetadata().indices().values().stream().anyMatch(indexMetadata -> {
253+
for (IndexMetadata indexMetadata : clusterState.getMetadata().indices().values()) {
252254
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings());
253255
String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings());
254-
return checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline);
255-
});
256+
if (checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline)) {
257+
return true;
258+
}
259+
}
260+
return false;
256261
}
257262

258263
/**
@@ -265,12 +270,26 @@ static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
265270
@SuppressWarnings("unchecked")
266271
private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState, boolean downloadDatabaseOnPipelineCreation) {
267272
List<PipelineConfiguration> configurations = IngestService.getPipelines(clusterState);
273+
Map<String, PipelineConfiguration> pipelineConfigById = HashMap.newHashMap(configurations.size());
274+
for (PipelineConfiguration configuration : configurations) {
275+
pipelineConfigById.put(configuration.getId(), configuration);
276+
}
277+
// this map is used to keep track of pipelines that have already been checked
278+
Map<String, Boolean> pipelineHasGeoProcessorById = HashMap.newHashMap(configurations.size());
268279
Set<String> ids = new HashSet<>();
269280
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
270281
for (PipelineConfiguration configuration : configurations) {
271282
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfig().get(Pipeline.PROCESSORS_KEY);
272-
if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) {
273-
ids.add(configuration.getId());
283+
String pipelineName = configuration.getId();
284+
if (pipelineHasGeoProcessorById.containsKey(pipelineName) == false) {
285+
if (hasAtLeastOneGeoipProcessor(
286+
processors,
287+
downloadDatabaseOnPipelineCreation,
288+
pipelineConfigById,
289+
pipelineHasGeoProcessorById
290+
)) {
291+
ids.add(pipelineName);
292+
}
274293
}
275294
}
276295
return Collections.unmodifiableSet(ids);
@@ -280,13 +299,27 @@ private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState
280299
* Check if a list of processor contains at least a geoip processor.
281300
* @param processors List of processors.
282301
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
302+
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
303+
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
304+
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
305+
* out (null).
283306
* @return true if a geoip processor is found in the processor list.
284307
*/
285-
private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors, boolean downloadDatabaseOnPipelineCreation) {
308+
private static boolean hasAtLeastOneGeoipProcessor(
309+
List<Map<String, Object>> processors,
310+
boolean downloadDatabaseOnPipelineCreation,
311+
Map<String, PipelineConfiguration> pipelineConfigById,
312+
Map<String, Boolean> pipelineHasGeoProcessorById
313+
) {
286314
if (processors != null) {
287315
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
288316
for (Map<String, Object> processor : processors) {
289-
if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation)) {
317+
if (hasAtLeastOneGeoipProcessor(
318+
processor,
319+
downloadDatabaseOnPipelineCreation,
320+
pipelineConfigById,
321+
pipelineHasGeoProcessorById
322+
)) {
290323
return true;
291324
}
292325
}
@@ -298,10 +331,19 @@ private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> pro
298331
* Check if a processor config is a geoip processor or contains at least a geoip processor.
299332
* @param processor Processor config.
300333
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
334+
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
335+
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
336+
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
337+
* out (null).
301338
* @return true if a geoip processor is found in the processor list.
302339
*/
303340
@SuppressWarnings("unchecked")
304-
private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
341+
private static boolean hasAtLeastOneGeoipProcessor(
342+
Map<String, Object> processor,
343+
boolean downloadDatabaseOnPipelineCreation,
344+
Map<String, PipelineConfiguration> pipelineConfigById,
345+
Map<String, Boolean> pipelineHasGeoProcessorById
346+
) {
305347
if (processor == null) {
306348
return false;
307349
}
@@ -320,27 +362,51 @@ private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor
320362
}
321363
}
322364

323-
return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation)
324-
|| isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation);
365+
return isProcessorWithOnFailureGeoIpProcessor(
366+
processor,
367+
downloadDatabaseOnPipelineCreation,
368+
pipelineConfigById,
369+
pipelineHasGeoProcessorById
370+
)
371+
|| isForeachProcessorWithGeoipProcessor(
372+
processor,
373+
downloadDatabaseOnPipelineCreation,
374+
pipelineConfigById,
375+
pipelineHasGeoProcessorById
376+
)
377+
|| isPipelineProcessorWithGeoIpProcessor(
378+
processor,
379+
downloadDatabaseOnPipelineCreation,
380+
pipelineConfigById,
381+
pipelineHasGeoProcessorById
382+
);
325383
}
326384

327385
/**
328386
* Check if a processor config has an on_failure clause containing at least a geoip processor.
329387
* @param processor Processor config.
330388
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
389+
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
390+
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
391+
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
392+
* out (null).
331393
* @return true if a geoip processor is found in the processor list.
332394
*/
333395
@SuppressWarnings("unchecked")
334396
private static boolean isProcessorWithOnFailureGeoIpProcessor(
335397
Map<String, Object> processor,
336-
boolean downloadDatabaseOnPipelineCreation
398+
boolean downloadDatabaseOnPipelineCreation,
399+
Map<String, PipelineConfiguration> pipelineConfigById,
400+
Map<String, Boolean> pipelineHasGeoProcessorById
337401
) {
338402
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
339403
for (Object value : processor.values()) {
340404
if (value instanceof Map
341405
&& hasAtLeastOneGeoipProcessor(
342406
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
343-
downloadDatabaseOnPipelineCreation
407+
downloadDatabaseOnPipelineCreation,
408+
pipelineConfigById,
409+
pipelineHasGeoProcessorById
344410
)) {
345411
return true;
346412
}
@@ -352,13 +418,84 @@ && hasAtLeastOneGeoipProcessor(
352418
* Check if a processor is a foreach processor containing at least a geoip processor.
353419
* @param processor Processor config.
354420
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
421+
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
422+
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
423+
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
424+
* out (null).
355425
* @return true if a geoip processor is found in the processor list.
356426
*/
357427
@SuppressWarnings("unchecked")
358-
private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
428+
private static boolean isForeachProcessorWithGeoipProcessor(
429+
Map<String, Object> processor,
430+
boolean downloadDatabaseOnPipelineCreation,
431+
Map<String, PipelineConfiguration> pipelineConfigById,
432+
Map<String, Boolean> pipelineHasGeoProcessorById
433+
) {
359434
final Map<String, Object> processorConfig = (Map<String, Object>) processor.get("foreach");
360435
return processorConfig != null
361-
&& hasAtLeastOneGeoipProcessor((Map<String, Object>) processorConfig.get("processor"), downloadDatabaseOnPipelineCreation);
436+
&& hasAtLeastOneGeoipProcessor(
437+
(Map<String, Object>) processorConfig.get("processor"),
438+
downloadDatabaseOnPipelineCreation,
439+
pipelineConfigById,
440+
pipelineHasGeoProcessorById
441+
);
442+
}
443+
444+
/**
445+
* Check if a processor is a pipeline processor containing at least a geoip processor. This method also updates
446+
* pipelineHasGeoProcessorById with a result for any pipelines it looks at.
447+
* @param processor Processor config.
448+
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
449+
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
450+
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
451+
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
452+
* out (null).
453+
* @return true if a geoip processor is found in the processors of this processor if this processor is a pipeline processor.
454+
*/
455+
@SuppressWarnings("unchecked")
456+
private static boolean isPipelineProcessorWithGeoIpProcessor(
457+
Map<String, Object> processor,
458+
boolean downloadDatabaseOnPipelineCreation,
459+
Map<String, PipelineConfiguration> pipelineConfigById,
460+
Map<String, Boolean> pipelineHasGeoProcessorById
461+
) {
462+
final Map<String, Object> processorConfig = (Map<String, Object>) processor.get("pipeline");
463+
if (processorConfig != null) {
464+
String pipelineName = (String) processorConfig.get("name");
465+
if (pipelineName != null) {
466+
if (pipelineHasGeoProcessorById.containsKey(pipelineName)) {
467+
if (pipelineHasGeoProcessorById.get(pipelineName) == null) {
468+
/*
469+
* If the value is null here, it indicates that this method has been called recursively with the same pipeline name.
470+
* This will cause a runtime error when the pipeline is executed, but we're avoiding changing existing behavior at
471+
* server startup time. Instead, we just bail out as quickly as possible. It is possible that this could lead to a
472+
* geo database not being downloaded for the pipeline, but it doesn't really matter since the pipeline was going to
473+
* fail anyway.
474+
*/
475+
pipelineHasGeoProcessorById.put(pipelineName, false);
476+
}
477+
} else {
478+
List<Map<String, Object>> childProcessors = null;
479+
PipelineConfiguration config = pipelineConfigById.get(pipelineName);
480+
if (config != null) {
481+
childProcessors = (List<Map<String, Object>>) config.getConfig().get(Pipeline.PROCESSORS_KEY);
482+
}
483+
// We initialize this to null so that we know it's in progress and can use it to avoid stack overflow errors:
484+
pipelineHasGeoProcessorById.put(pipelineName, null);
485+
pipelineHasGeoProcessorById.put(
486+
pipelineName,
487+
hasAtLeastOneGeoipProcessor(
488+
childProcessors,
489+
downloadDatabaseOnPipelineCreation,
490+
pipelineConfigById,
491+
pipelineHasGeoProcessorById
492+
)
493+
);
494+
}
495+
return pipelineHasGeoProcessorById.get(pipelineName);
496+
}
497+
}
498+
return false;
362499
}
363500

364501
@UpdateForV9 // use MINUS_ONE once that means no timeout

0 commit comments

Comments
 (0)