From 1cfa4e7c17fc18208a74d5841309d4091c2110d1 Mon Sep 17 00:00:00 2001 From: Nicholas DiPiazza Date: Sat, 27 Dec 2025 11:58:21 -0600 Subject: [PATCH] TIKA-4594: Make PipesServer share ConfigStore with TikaGrpcServerImpl PipesServer now creates and uses the same type of ConfigStore configured in PipesConfig, allowing runtime fetcher/emitter configurations saved via gRPC endpoints to be available to forked worker processes. This fixes FetcherNotFoundException errors when using saveFetcher followed by fetchAndParse in tika-grpc. --- .../tika/pipes/core/server/PipesServer.java | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java index dd09db768e..431542b45d 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java @@ -70,6 +70,8 @@ import org.apache.tika.pipes.core.EmitStrategyConfig; import org.apache.tika.pipes.core.PipesClient; import org.apache.tika.pipes.core.PipesConfig; +import org.apache.tika.pipes.core.config.ConfigStore; +import org.apache.tika.pipes.core.config.ConfigStoreFactory; import org.apache.tika.pipes.core.emitter.EmitterManager; import org.apache.tika.pipes.core.fetcher.FetcherManager; import org.apache.tika.plugins.TikaPluginManager; @@ -454,10 +456,14 @@ protected void initializeResources() throws TikaException, IOException, SAXExcep TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig(); TikaPluginManager tikaPluginManager = TikaPluginManager.load(tikaJsonConfig); - //TODO allowed named configurations in tika config - this.fetcherManager = FetcherManager.load(tikaPluginManager, tikaJsonConfig); - // Always initialize emitters to support runtime overrides via ParseContext - this.emitterManager = EmitterManager.load(tikaPluginManager, tikaJsonConfig); + // Create ConfigStore using the same configuration as TikaGrpcServerImpl + // This allows fetchers saved via gRPC to be available to PipesServer + ConfigStore configStore = createConfigStore(pipesConfig, tikaPluginManager); + + // Load FetcherManager with ConfigStore to enable runtime modifications + this.fetcherManager = FetcherManager.load(tikaPluginManager, tikaJsonConfig, true, configStore); + // Always initialize emitters to support runtime overrides via ParseContext + this.emitterManager = EmitterManager.load(tikaPluginManager, tikaJsonConfig, true, configStore); this.autoDetectParser = (AutoDetectParser) tikaLoader.loadAutoDetectParser(); // Get the digester for pre-parse digesting of container documents. // If user configured skipContainerDocumentDigest=false (the default), PipesServer @@ -484,6 +490,18 @@ protected void initializeResources() throws TikaException, IOException, SAXExcep this.rMetaParser = new RecursiveParserWrapper(autoDetectParser); } + private ConfigStore createConfigStore(PipesConfig pipesConfig, TikaPluginManager tikaPluginManager) throws TikaException { + if (pipesConfig.getConfigStore() != null) { + ConfigStoreFactory factory = tikaPluginManager.getPluginManager() + .getExtensions(ConfigStoreFactory.class).stream() + .filter(f -> f.getClass().getName().equals(pipesConfig.getConfigStore().getFactoryClass())) + .findFirst() + .orElseThrow(() -> new TikaException("Could not find ConfigStoreFactory: " + + pipesConfig.getConfigStore().getFactoryClass())); + return factory.build(pipesConfig.getConfigStore().getParams()); + } + return null; + } private void write(PROCESSING_STATUS processingStatus, PipesResult pipesResult) { try {