Skip to content

Commit 34b60d6

Browse files
authored
TIKA-4595: Dynamic Fetcher/Emitter Management with ConfigStore Support (#2489)
* TIKA-4595: Add dynamic fetcher/emitter management API to PipesClient - Added SAVE_FETCHER, DELETE_FETCHER, LIST_FETCHERS, GET_FETCHER commands - Added SAVE_EMITTER, DELETE_EMITTER, LIST_EMITTERS, GET_EMITTER commands - Implemented PipesClient public API methods for runtime configuration - Implemented PipesServer command handlers - Added deleteComponent() and getComponentConfig() to AbstractComponentManager - Added wrapper methods to FetcherManager and EmitterManager - Added remove() method to ConfigStore interface and implementations - All tests passing * Fix gRPC saveFetcher to propagate to PipesClient's forked PipesServer - saveFetcher now calls both fetcherManager.saveFetcher() and pipesClient.saveFetcher() - This ensures fetchers are available in the forked PipesServer process - Implemented deleteFetcher to call both managers as well - Fixes FetcherNotFoundException when using dynamic fetchers via gRPC The issue was that fetchers saved via gRPC were only stored in the gRPC server's FetcherManager, but when pipesClient.process() forks a new PipesServer process, that process has its own FetcherManager and doesn't have access to the dynamically created fetchers. Now both are updated. * Add file-based ConfigStore for shared config between gRPC and PipesServer - Created FileBasedConfigStore that persists to JSON file - Created FileBasedConfigStoreFactory with @extension annotation - Updated PipesServer.initializeResources() to create and use ConfigStore - Both gRPC server and forked PipesServer can now share fetcher configs via file This enables dynamic fetcher management across JVM processes: 1. gRPC saves fetcher → writes to config file 2. PipesServer starts → reads from same file 3. Both JVMs share the same fetcher configuration * Handle 'file' ConfigStore type as built-in (not plugin) - Added direct handling for 'file' type in ConfigStoreFactory.createConfigStore() - File-based store is in core, not a plugin, so needs special handling - Avoids ClassNotFoundException when trying to load 'file' as a class name - Also added remove() method to IgniteConfigStore for interface compliance * Make ExtensionConfig Serializable for socket communication ExtensionConfig is sent over sockets between PipesClient and PipesServer, so it needs to implement Serializable. Records can implement Serializable and all fields (String, String, String) are already serializable. Fixes NotSerializableException when calling saveFetcher via gRPC. * Add embedded Ignite server architecture - Created IgniteStoreServer class that runs as embedded server - TikaGrpcServer starts Ignite server on startup (if ignite ConfigStore configured) - IgniteConfigStore now acts as client-only (clientMode=true) - No external Ignite dependency needed in Docker/Kubernetes - Server runs in background daemon thread within tika-grpc process - Clients (gRPC + forked PipesServer) connect to embedded server Architecture: ┌─────────────────────────────────┐ │ tika-grpc Process │ │ ┌──────────────────────────┐ │ │ │ IgniteStoreServer │ │ │ │ (server mode, daemon) │ │ │ └────────▲─────────────────┘ │ │ │ │ │ ┌────────┴─────────────────┐ │ │ │ IgniteConfigStore │ │ │ │ (client mode) │ │ │ └──────────────────────────┘ │ └─────────────────────────────────┘ ▲ │ (client connection) │ ┌────────┴─────────────────┐ │ PipesServer (forked) │ │ IgniteConfigStore │ │ (client mode) │ └──────────────────────────┘ * Configure Ignite work directory to /tmp - Set workDirectory to /tmp/ignite-work in IgniteStoreServer - Set workDirectory to /tmp/ignite-work in IgniteConfigStore - Avoids 'Work directory does not exist and cannot be created: /work' error - Uses system property ignite.work.dir if set, defaults to /tmp/ignite-work - Ensures Ignite can write to work directory in Docker containers * Use /var/cache/tika/ignite-work for Ignite work directory - Changed from /tmp/ignite-work to /var/cache/tika/ignite-work - Aligns with Tika's standard cache location - /var/cache/tika is already used for plugins and other Tika cache data * Use plugin classloader for Ignite server startup - Find Ignite plugin's classloader from plugin manager - Load IgniteStoreServer and CacheMode using plugin classloader - Fixes NoClassDefFoundError for H2 classes - Ensures all Ignite dependencies (including H2) are available - Plugin classloader has all dependencies from lib/ directory * Enable peer class loading in Ignite client - Set setPeerClassLoadingEnabled(true) in IgniteConfigStore - Must match server configuration - Fixes: Remote node has peer class loading enabled flag different from local - Both server and client now have peerClassLoading=true * Disable peer class loading to fix classloader conflicts - Set setPeerClassLoadingEnabled(false) on both server and client - Fixes ClassCastException due to class loaded by different classloaders - Server uses plugin classloader, client uses app classloader - Peer class loading causes the same class to be in both, creating conflicts - We don't need peer class loading for our use case * Move Ignite dependencies directly into tika-grpc - Made tika-pipes-ignite a required (non-optional) dependency of tika-grpc - Added ignite.version and h2.version properties - Removed reflection-based classloader lookup - Direct instantiation of IgniteStoreServer - Avoids all PF4J plugin classloader issues - Ignite classes now on main classpath * Disable Ignite Object Input Filter autoconfiguration - Set IGNITE_ENABLE_OBJECT_INPUT_FILTER_AUTOCONFIGURATION=false - Fixes: Failed to autoconfigure Ignite Object Input Filter - Ignite was conflicting with existing serialization filter - Apply in both IgniteStoreServer and IgniteConfigStore * Add Ignite filter autoconfiguration disable to client too * Add Ignite as built-in ConfigStore type - Handle 'ignite' type directly in ConfigStoreFactory - Load IgniteConfigStoreFactory via reflection - Works in forked PipesServer without plugin system - Matches pattern used for 'file' type - Fixes: ClassNotFoundException: ignite in forked process * Add PipesIterator management API with save, get, and delete operations * Fix Ignite tests to use temp directory and server mode * Fix TikaGrpcServerTest - update assertion for delete operation now that it's implemented
1 parent 771e649 commit 34b60d6

File tree

19 files changed

+1575
-20
lines changed

19 files changed

+1575
-20
lines changed

tika-grpc/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
<asarkar-grpc-test.version>2.0.0</asarkar-grpc-test.version>
3939
<awaitility.version>4.3.0</awaitility.version>
4040
<j2objc-annotations.version>3.1</j2objc-annotations.version>
41+
<ignite.version>2.17.0</ignite.version>
42+
<h2.version>1.4.197</h2.version>
4143
</properties>
4244

4345
<dependencyManagement>
@@ -232,7 +234,6 @@
232234
<groupId>org.apache.tika</groupId>
233235
<artifactId>tika-pipes-ignite</artifactId>
234236
<version>${project.version}</version>
235-
<optional>true</optional>
236237
</dependency>
237238
<dependency>
238239
<groupId>org.apache.tika</groupId>

tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java

Lines changed: 151 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,22 @@
3939

4040
import org.apache.tika.DeleteFetcherReply;
4141
import org.apache.tika.DeleteFetcherRequest;
42+
import org.apache.tika.DeletePipesIteratorReply;
43+
import org.apache.tika.DeletePipesIteratorRequest;
4244
import org.apache.tika.FetchAndParseReply;
4345
import org.apache.tika.FetchAndParseRequest;
4446
import org.apache.tika.GetFetcherConfigJsonSchemaReply;
4547
import org.apache.tika.GetFetcherConfigJsonSchemaRequest;
4648
import org.apache.tika.GetFetcherReply;
4749
import org.apache.tika.GetFetcherRequest;
50+
import org.apache.tika.GetPipesIteratorReply;
51+
import org.apache.tika.GetPipesIteratorRequest;
4852
import org.apache.tika.ListFetchersReply;
4953
import org.apache.tika.ListFetchersRequest;
5054
import org.apache.tika.SaveFetcherReply;
5155
import org.apache.tika.SaveFetcherRequest;
56+
import org.apache.tika.SavePipesIteratorReply;
57+
import org.apache.tika.SavePipesIteratorRequest;
5258
import org.apache.tika.TikaGrpc;
5359
import org.apache.tika.config.ConfigContainer;
5460
import org.apache.tika.config.loader.TikaJsonConfig;
@@ -131,11 +137,43 @@ private ConfigStore createConfigStore() throws TikaConfigException {
131137
ExtensionConfig storeConfig = new ExtensionConfig(
132138
configStoreType, configStoreType, configStoreParams);
133139

140+
// If using Ignite, start the embedded server first
141+
if ("ignite".equalsIgnoreCase(configStoreType)) {
142+
startIgniteServer(storeConfig);
143+
}
144+
134145
return ConfigStoreFactory.createConfigStore(
135146
pluginManager,
136147
configStoreType,
137148
storeConfig);
138149
}
150+
151+
private void startIgniteServer(ExtensionConfig config) {
152+
try {
153+
LOG.info("Starting embedded Ignite server for ConfigStore");
154+
155+
// Parse config to get Ignite settings
156+
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
157+
com.fasterxml.jackson.databind.JsonNode params = mapper.readTree(config.json());
158+
159+
String cacheName = params.has("cacheName") ? params.get("cacheName").asText() : "tika-config-store";
160+
String cacheMode = params.has("cacheMode") ? params.get("cacheMode").asText() : "REPLICATED";
161+
String instanceName = params.has("igniteInstanceName") ? params.get("igniteInstanceName").asText() : "TikaIgniteServer";
162+
163+
// Direct instantiation - no reflection needed
164+
org.apache.ignite.cache.CacheMode mode = org.apache.ignite.cache.CacheMode.valueOf(cacheMode);
165+
org.apache.tika.pipes.ignite.server.IgniteStoreServer server =
166+
new org.apache.tika.pipes.ignite.server.IgniteStoreServer(cacheName, mode, instanceName);
167+
168+
server.startAsync();
169+
170+
LOG.info("Embedded Ignite server started successfully");
171+
172+
} catch (Exception e) {
173+
LOG.error("Failed to start embedded Ignite server", e);
174+
throw new RuntimeException("Failed to start Ignite server", e);
175+
}
176+
}
139177

140178
@Override
141179
public void fetchAndParseServerSideStreaming(FetchAndParseRequest request,
@@ -225,7 +263,12 @@ public void saveFetcher(SaveFetcherRequest request,
225263
try {
226264
String factoryName = findFactoryNameForClass(request.getFetcherClass());
227265
ExtensionConfig config = new ExtensionConfig(request.getFetcherId(), factoryName, request.getFetcherConfigJson());
266+
267+
// Save to gRPC server's fetcher manager (for schema queries, etc.)
228268
fetcherManager.saveFetcher(config);
269+
270+
// Also save to PipesClient so it propagates to the forked PipesServer
271+
pipesClient.saveFetcher(config);
229272
} catch (Exception e) {
230273
throw new RuntimeException(e);
231274
}
@@ -331,7 +374,113 @@ public void getFetcherConfigJsonSchema(GetFetcherConfigJsonSchemaRequest request
331374
}
332375

333376
private boolean deleteFetcher(String id) {
334-
LOG.warn("Deleting fetchers is not supported in the current implementation");
335-
return false;
377+
try {
378+
// Delete from gRPC server's fetcher manager
379+
fetcherManager.deleteFetcher(id);
380+
381+
// Also delete from PipesClient so it propagates to the forked PipesServer
382+
pipesClient.deleteFetcher(id);
383+
384+
LOG.info("Successfully deleted fetcher: {}", id);
385+
return true;
386+
} catch (Exception e) {
387+
LOG.error("Failed to delete fetcher: {}", id, e);
388+
return false;
389+
}
390+
}
391+
392+
// ========== PipesIterator RPC Methods ==========
393+
394+
@Override
395+
public void savePipesIterator(SavePipesIteratorRequest request,
396+
StreamObserver<SavePipesIteratorReply> responseObserver) {
397+
try {
398+
String iteratorId = request.getIteratorId();
399+
String iteratorClass = request.getIteratorClass();
400+
String iteratorConfigJson = request.getIteratorConfigJson();
401+
402+
LOG.info("Saving pipes iterator: id={}, class={}", iteratorId, iteratorClass);
403+
404+
ExtensionConfig config = new ExtensionConfig(iteratorId, iteratorClass, iteratorConfigJson);
405+
406+
// Save via PipesClient so it propagates to the forked PipesServer
407+
pipesClient.savePipesIterator(config);
408+
409+
SavePipesIteratorReply reply = SavePipesIteratorReply.newBuilder()
410+
.setMessage("Pipes iterator saved successfully")
411+
.build();
412+
responseObserver.onNext(reply);
413+
responseObserver.onCompleted();
414+
415+
LOG.info("Successfully saved pipes iterator: {}", iteratorId);
416+
417+
} catch (Exception e) {
418+
LOG.error("Failed to save pipes iterator", e);
419+
responseObserver.onError(io.grpc.Status.INTERNAL
420+
.withDescription("Failed to save pipes iterator: " + e.getMessage())
421+
.withCause(e)
422+
.asRuntimeException());
423+
}
424+
}
425+
426+
@Override
427+
public void getPipesIterator(GetPipesIteratorRequest request,
428+
StreamObserver<GetPipesIteratorReply> responseObserver) {
429+
try {
430+
String iteratorId = request.getIteratorId();
431+
LOG.info("Getting pipes iterator: {}", iteratorId);
432+
433+
ExtensionConfig config = pipesClient.getPipesIteratorConfig(iteratorId);
434+
435+
if (config == null) {
436+
responseObserver.onError(io.grpc.Status.NOT_FOUND
437+
.withDescription("Pipes iterator not found: " + iteratorId)
438+
.asRuntimeException());
439+
return;
440+
}
441+
442+
GetPipesIteratorReply reply = GetPipesIteratorReply.newBuilder()
443+
.setIteratorId(config.id())
444+
.setIteratorClass(config.name())
445+
.setIteratorConfigJson(config.json())
446+
.build();
447+
responseObserver.onNext(reply);
448+
responseObserver.onCompleted();
449+
450+
LOG.info("Successfully retrieved pipes iterator: {}", iteratorId);
451+
452+
} catch (Exception e) {
453+
LOG.error("Failed to get pipes iterator", e);
454+
responseObserver.onError(io.grpc.Status.INTERNAL
455+
.withDescription("Failed to get pipes iterator: " + e.getMessage())
456+
.withCause(e)
457+
.asRuntimeException());
458+
}
459+
}
460+
461+
@Override
462+
public void deletePipesIterator(DeletePipesIteratorRequest request,
463+
StreamObserver<DeletePipesIteratorReply> responseObserver) {
464+
try {
465+
String iteratorId = request.getIteratorId();
466+
LOG.info("Deleting pipes iterator: {}", iteratorId);
467+
468+
pipesClient.deletePipesIterator(iteratorId);
469+
470+
DeletePipesIteratorReply reply = DeletePipesIteratorReply.newBuilder()
471+
.setMessage("Pipes iterator deleted successfully")
472+
.build();
473+
responseObserver.onNext(reply);
474+
responseObserver.onCompleted();
475+
476+
LOG.info("Successfully deleted pipes iterator: {}", iteratorId);
477+
478+
} catch (Exception e) {
479+
LOG.error("Failed to delete pipes iterator", e);
480+
responseObserver.onError(io.grpc.Status.INTERNAL
481+
.withDescription("Failed to delete pipes iterator: " + e.getMessage())
482+
.withCause(e)
483+
.asRuntimeException());
484+
}
336485
}
337486
}

tika-grpc/src/main/proto/tika.proto

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,19 @@ service Tika {
5959
Get the Fetcher Config schema for a given fetcher class.
6060
*/
6161
rpc GetFetcherConfigJsonSchema(GetFetcherConfigJsonSchemaRequest) returns (GetFetcherConfigJsonSchemaReply) {}
62+
63+
/*
64+
Save a pipes iterator to the iterator store.
65+
*/
66+
rpc SavePipesIterator(SavePipesIteratorRequest) returns (SavePipesIteratorReply) {}
67+
/*
68+
Get a pipes iterator's data from the iterator store.
69+
*/
70+
rpc GetPipesIterator(GetPipesIteratorRequest) returns (GetPipesIteratorReply) {}
71+
/*
72+
Delete a pipes iterator from the iterator store.
73+
*/
74+
rpc DeletePipesIterator(DeletePipesIteratorRequest) returns (DeletePipesIteratorReply) {}
6275
}
6376

6477
message SaveFetcherRequest {
@@ -143,3 +156,43 @@ message GetFetcherConfigJsonSchemaReply {
143156
// The json schema that describes the fetcher config in string format.
144157
string fetcher_config_json_schema = 1;
145158
}
159+
160+
// ========== PipesIterator Messages ==========
161+
162+
message SavePipesIteratorRequest {
163+
// A unique identifier for each pipes iterator. If this already exists, operation will overwrite existing.
164+
string iterator_id = 1;
165+
// The full java class name of the pipes iterator class.
166+
string iterator_class = 2;
167+
// JSON string of the pipes iterator config object.
168+
string iterator_config_json = 3;
169+
}
170+
171+
message SavePipesIteratorReply {
172+
// Status message
173+
string message = 1;
174+
}
175+
176+
message GetPipesIteratorRequest {
177+
// The pipes iterator ID to retrieve
178+
string iterator_id = 1;
179+
}
180+
181+
message GetPipesIteratorReply {
182+
// The pipes iterator ID
183+
string iterator_id = 1;
184+
// The full java class name of the pipes iterator
185+
string iterator_class = 2;
186+
// JSON string of the pipes iterator config object
187+
string iterator_config_json = 3;
188+
}
189+
190+
message DeletePipesIteratorRequest {
191+
// The pipes iterator ID to delete
192+
string iterator_id = 1;
193+
}
194+
195+
message DeletePipesIteratorReply {
196+
// Status message
197+
string message = 1;
198+
}

tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,15 +184,15 @@ public void testFetcherCrud(Resources resources) throws Exception {
184184
assertEquals(FileSystemFetcher.class.getName(), getFetcherReply.getFetcherClass());
185185
}
186186

187-
// delete fetchers - note: delete is not currently supported
187+
// delete fetchers
188188
for (int i = 0; i < NUM_FETCHERS_TO_CREATE; ++i) {
189189
String fetcherId = createFetcherId(i);
190190
DeleteFetcherReply deleteFetcherReply = blockingStub.deleteFetcher(DeleteFetcherRequest
191191
.newBuilder()
192192
.setFetcherId(fetcherId)
193193
.build());
194-
// Delete is not supported, so this will return false
195-
Assertions.assertFalse(deleteFetcherReply.getSuccess());
194+
// Delete is now supported and should succeed
195+
Assertions.assertTrue(deleteFetcherReply.getSuccess());
196196
}
197197
}
198198

tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,47 @@ public synchronized void saveComponent(ExtensionConfig config) throws TikaConfig
309309
configStore.put(componentId, config);
310310
}
311311

312+
/**
313+
* Deletes a component configuration by ID.
314+
* Clears the cached instance and removes the configuration.
315+
*
316+
* @param componentId the component ID to delete
317+
* @throws TikaConfigException if runtime modifications are not allowed or component not found
318+
*/
319+
public synchronized void deleteComponent(String componentId) throws TikaConfigException {
320+
if (!allowRuntimeModifications) {
321+
throw new TikaConfigException(
322+
"Runtime modifications are not allowed. " + getClass().getSimpleName() +
323+
" must be loaded with allowRuntimeModifications=true to use delete" +
324+
getComponentName().substring(0, 1).toUpperCase(Locale.ROOT) + getComponentName().substring(1) + "()");
325+
}
326+
327+
if (componentId == null) {
328+
throw new IllegalArgumentException("Component ID cannot be null");
329+
}
330+
331+
if (!configStore.containsKey(componentId)) {
332+
throw new TikaConfigException(
333+
getComponentName().substring(0, 1).toUpperCase(Locale.ROOT) +
334+
getComponentName().substring(1) + " with ID '" + componentId + "' not found");
335+
}
336+
337+
// Clear cache and remove config
338+
componentCache.remove(componentId);
339+
configStore.remove(componentId);
340+
LOG.debug("Deleted {} config: id={}", getComponentName(), componentId);
341+
}
342+
343+
/**
344+
* Gets the configuration for a specific component by ID.
345+
*
346+
* @param componentId the component ID
347+
* @return the component configuration, or null if not found
348+
*/
349+
public ExtensionConfig getComponentConfig(String componentId) {
350+
return configStore.get(componentId);
351+
}
352+
312353
/**
313354
* Returns the set of supported component IDs.
314355
*/

0 commit comments

Comments
 (0)