Skip to content

Commit 1cfa4e7

Browse files
committed
TIKA-4594: Make PipesServer share ConfigStore with TikaGrpcServerImpl
PipesServer now creates and uses the same type of ConfigStore configured in PipesConfig, allowing runtime fetcher/emitter configurations saved via gRPC endpoints to be available to forked worker processes. This fixes FetcherNotFoundException errors when using saveFetcher followed by fetchAndParse in tika-grpc.
1 parent 70da6e0 commit 1cfa4e7

File tree

1 file changed

+22
-4
lines changed
  • tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server

1 file changed

+22
-4
lines changed

tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@
7070
import org.apache.tika.pipes.core.EmitStrategyConfig;
7171
import org.apache.tika.pipes.core.PipesClient;
7272
import org.apache.tika.pipes.core.PipesConfig;
73+
import org.apache.tika.pipes.core.config.ConfigStore;
74+
import org.apache.tika.pipes.core.config.ConfigStoreFactory;
7375
import org.apache.tika.pipes.core.emitter.EmitterManager;
7476
import org.apache.tika.pipes.core.fetcher.FetcherManager;
7577
import org.apache.tika.plugins.TikaPluginManager;
@@ -454,10 +456,14 @@ protected void initializeResources() throws TikaException, IOException, SAXExcep
454456
TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig();
455457
TikaPluginManager tikaPluginManager = TikaPluginManager.load(tikaJsonConfig);
456458

457-
//TODO allowed named configurations in tika config
458-
this.fetcherManager = FetcherManager.load(tikaPluginManager, tikaJsonConfig);
459-
// Always initialize emitters to support runtime overrides via ParseContext
460-
this.emitterManager = EmitterManager.load(tikaPluginManager, tikaJsonConfig);
459+
// Create ConfigStore using the same configuration as TikaGrpcServerImpl
460+
// This allows fetchers saved via gRPC to be available to PipesServer
461+
ConfigStore configStore = createConfigStore(pipesConfig, tikaPluginManager);
462+
463+
// Load FetcherManager with ConfigStore to enable runtime modifications
464+
this.fetcherManager = FetcherManager.load(tikaPluginManager, tikaJsonConfig, true, configStore);
465+
// Always initialize emitters to support runtime overrides via ParseContext
466+
this.emitterManager = EmitterManager.load(tikaPluginManager, tikaJsonConfig, true, configStore);
461467
this.autoDetectParser = (AutoDetectParser) tikaLoader.loadAutoDetectParser();
462468
// Get the digester for pre-parse digesting of container documents.
463469
// If user configured skipContainerDocumentDigest=false (the default), PipesServer
@@ -484,6 +490,18 @@ protected void initializeResources() throws TikaException, IOException, SAXExcep
484490
this.rMetaParser = new RecursiveParserWrapper(autoDetectParser);
485491
}
486492

493+
private ConfigStore createConfigStore(PipesConfig pipesConfig, TikaPluginManager tikaPluginManager) throws TikaException {
494+
if (pipesConfig.getConfigStore() != null) {
495+
ConfigStoreFactory factory = tikaPluginManager.getPluginManager()
496+
.getExtensions(ConfigStoreFactory.class).stream()
497+
.filter(f -> f.getClass().getName().equals(pipesConfig.getConfigStore().getFactoryClass()))
498+
.findFirst()
499+
.orElseThrow(() -> new TikaException("Could not find ConfigStoreFactory: " +
500+
pipesConfig.getConfigStore().getFactoryClass()));
501+
return factory.build(pipesConfig.getConfigStore().getParams());
502+
}
503+
return null;
504+
}
487505

488506
private void write(PROCESSING_STATUS processingStatus, PipesResult pipesResult) {
489507
try {

0 commit comments

Comments
 (0)