Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c48d6c1
TIKA-4595: Add dynamic fetcher/emitter management API to PipesClient
nddipiazza Dec 28, 2025
d97e3a6
Fix gRPC saveFetcher to propagate to PipesClient's forked PipesServer
nddipiazza Dec 28, 2025
dab0ac7
Add file-based ConfigStore for shared config between gRPC and PipesSe…
nddipiazza Dec 28, 2025
470b1d7
Handle 'file' ConfigStore type as built-in (not plugin)
nddipiazza Dec 28, 2025
fe9284a
Make ExtensionConfig Serializable for socket communication
nddipiazza Dec 28, 2025
8954c37
Add embedded Ignite server architecture
nddipiazza Dec 28, 2025
00a7ac3
Configure Ignite work directory to /tmp
nddipiazza Dec 28, 2025
059019d
Use /var/cache/tika/ignite-work for Ignite work directory
nddipiazza Dec 28, 2025
7892162
Use plugin classloader for Ignite server startup
nddipiazza Dec 28, 2025
98d9999
Enable peer class loading in Ignite client
nddipiazza Dec 28, 2025
28be7cd
Disable peer class loading to fix classloader conflicts
nddipiazza Dec 28, 2025
b941d73
Move Ignite dependencies directly into tika-grpc
nddipiazza Dec 28, 2025
10b35a2
Disable Ignite Object Input Filter autoconfiguration
nddipiazza Dec 28, 2025
d7d9f10
Add Ignite filter autoconfiguration disable to client too
nddipiazza Dec 28, 2025
52b14c5
Add Ignite as built-in ConfigStore type
nddipiazza Dec 28, 2025
01430c6
Add PipesIterator management API with save, get, and delete operations
nddipiazza Dec 28, 2025
3d9d020
Fix Ignite tests to use temp directory and server mode
nddipiazza Dec 28, 2025
b5e8a50
Fix TikaGrpcServerTest - update assertion for delete operation now th…
nddipiazza Dec 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tika-grpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
<asarkar-grpc-test.version>2.0.0</asarkar-grpc-test.version>
<awaitility.version>4.3.0</awaitility.version>
<j2objc-annotations.version>3.1</j2objc-annotations.version>
<ignite.version>2.17.0</ignite.version>
<h2.version>1.4.197</h2.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -232,7 +234,6 @@
<groupId>org.apache.tika</groupId>
<artifactId>tika-pipes-ignite</artifactId>
<version>${project.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,22 @@

import org.apache.tika.DeleteFetcherReply;
import org.apache.tika.DeleteFetcherRequest;
import org.apache.tika.DeletePipesIteratorReply;
import org.apache.tika.DeletePipesIteratorRequest;
import org.apache.tika.FetchAndParseReply;
import org.apache.tika.FetchAndParseRequest;
import org.apache.tika.GetFetcherConfigJsonSchemaReply;
import org.apache.tika.GetFetcherConfigJsonSchemaRequest;
import org.apache.tika.GetFetcherReply;
import org.apache.tika.GetFetcherRequest;
import org.apache.tika.GetPipesIteratorReply;
import org.apache.tika.GetPipesIteratorRequest;
import org.apache.tika.ListFetchersReply;
import org.apache.tika.ListFetchersRequest;
import org.apache.tika.SaveFetcherReply;
import org.apache.tika.SaveFetcherRequest;
import org.apache.tika.SavePipesIteratorReply;
import org.apache.tika.SavePipesIteratorRequest;
import org.apache.tika.TikaGrpc;
import org.apache.tika.config.ConfigContainer;
import org.apache.tika.config.loader.TikaJsonConfig;
Expand Down Expand Up @@ -131,11 +137,43 @@ private ConfigStore createConfigStore() throws TikaConfigException {
ExtensionConfig storeConfig = new ExtensionConfig(
configStoreType, configStoreType, configStoreParams);

// If using Ignite, start the embedded server first
if ("ignite".equalsIgnoreCase(configStoreType)) {
startIgniteServer(storeConfig);
}

return ConfigStoreFactory.createConfigStore(
pluginManager,
configStoreType,
storeConfig);
}

private void startIgniteServer(ExtensionConfig config) {
try {
LOG.info("Starting embedded Ignite server for ConfigStore");

// Parse config to get Ignite settings
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
com.fasterxml.jackson.databind.JsonNode params = mapper.readTree(config.json());

String cacheName = params.has("cacheName") ? params.get("cacheName").asText() : "tika-config-store";
String cacheMode = params.has("cacheMode") ? params.get("cacheMode").asText() : "REPLICATED";
String instanceName = params.has("igniteInstanceName") ? params.get("igniteInstanceName").asText() : "TikaIgniteServer";

// Direct instantiation - no reflection needed
org.apache.ignite.cache.CacheMode mode = org.apache.ignite.cache.CacheMode.valueOf(cacheMode);
org.apache.tika.pipes.ignite.server.IgniteStoreServer server =
new org.apache.tika.pipes.ignite.server.IgniteStoreServer(cacheName, mode, instanceName);

server.startAsync();

LOG.info("Embedded Ignite server started successfully");

} catch (Exception e) {
LOG.error("Failed to start embedded Ignite server", e);
throw new RuntimeException("Failed to start Ignite server", e);
}
}

@Override
public void fetchAndParseServerSideStreaming(FetchAndParseRequest request,
Expand Down Expand Up @@ -225,7 +263,12 @@ public void saveFetcher(SaveFetcherRequest request,
try {
String factoryName = findFactoryNameForClass(request.getFetcherClass());
ExtensionConfig config = new ExtensionConfig(request.getFetcherId(), factoryName, request.getFetcherConfigJson());

// Save to gRPC server's fetcher manager (for schema queries, etc.)
fetcherManager.saveFetcher(config);

// Also save to PipesClient so it propagates to the forked PipesServer
pipesClient.saveFetcher(config);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -331,7 +374,113 @@ public void getFetcherConfigJsonSchema(GetFetcherConfigJsonSchemaRequest request
}

private boolean deleteFetcher(String id) {
LOG.warn("Deleting fetchers is not supported in the current implementation");
return false;
try {
// Delete from gRPC server's fetcher manager
fetcherManager.deleteFetcher(id);

// Also delete from PipesClient so it propagates to the forked PipesServer
pipesClient.deleteFetcher(id);

LOG.info("Successfully deleted fetcher: {}", id);
return true;
} catch (Exception e) {
LOG.error("Failed to delete fetcher: {}", id, e);
return false;
}
}

// ========== PipesIterator RPC Methods ==========

@Override
public void savePipesIterator(SavePipesIteratorRequest request,
StreamObserver<SavePipesIteratorReply> responseObserver) {
try {
String iteratorId = request.getIteratorId();
String iteratorClass = request.getIteratorClass();
String iteratorConfigJson = request.getIteratorConfigJson();

LOG.info("Saving pipes iterator: id={}, class={}", iteratorId, iteratorClass);

ExtensionConfig config = new ExtensionConfig(iteratorId, iteratorClass, iteratorConfigJson);

// Save via PipesClient so it propagates to the forked PipesServer
pipesClient.savePipesIterator(config);

SavePipesIteratorReply reply = SavePipesIteratorReply.newBuilder()
.setMessage("Pipes iterator saved successfully")
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();

LOG.info("Successfully saved pipes iterator: {}", iteratorId);

} catch (Exception e) {
LOG.error("Failed to save pipes iterator", e);
responseObserver.onError(io.grpc.Status.INTERNAL
.withDescription("Failed to save pipes iterator: " + e.getMessage())
.withCause(e)
.asRuntimeException());
}
}

@Override
public void getPipesIterator(GetPipesIteratorRequest request,
StreamObserver<GetPipesIteratorReply> responseObserver) {
try {
String iteratorId = request.getIteratorId();
LOG.info("Getting pipes iterator: {}", iteratorId);

ExtensionConfig config = pipesClient.getPipesIteratorConfig(iteratorId);

if (config == null) {
responseObserver.onError(io.grpc.Status.NOT_FOUND
.withDescription("Pipes iterator not found: " + iteratorId)
.asRuntimeException());
return;
}

GetPipesIteratorReply reply = GetPipesIteratorReply.newBuilder()
.setIteratorId(config.id())
.setIteratorClass(config.name())
.setIteratorConfigJson(config.json())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();

LOG.info("Successfully retrieved pipes iterator: {}", iteratorId);

} catch (Exception e) {
LOG.error("Failed to get pipes iterator", e);
responseObserver.onError(io.grpc.Status.INTERNAL
.withDescription("Failed to get pipes iterator: " + e.getMessage())
.withCause(e)
.asRuntimeException());
}
}

@Override
public void deletePipesIterator(DeletePipesIteratorRequest request,
StreamObserver<DeletePipesIteratorReply> responseObserver) {
try {
String iteratorId = request.getIteratorId();
LOG.info("Deleting pipes iterator: {}", iteratorId);

pipesClient.deletePipesIterator(iteratorId);

DeletePipesIteratorReply reply = DeletePipesIteratorReply.newBuilder()
.setMessage("Pipes iterator deleted successfully")
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();

LOG.info("Successfully deleted pipes iterator: {}", iteratorId);

} catch (Exception e) {
LOG.error("Failed to delete pipes iterator", e);
responseObserver.onError(io.grpc.Status.INTERNAL
.withDescription("Failed to delete pipes iterator: " + e.getMessage())
.withCause(e)
.asRuntimeException());
}
}
}
53 changes: 53 additions & 0 deletions tika-grpc/src/main/proto/tika.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ service Tika {
Get the Fetcher Config schema for a given fetcher class.
*/
rpc GetFetcherConfigJsonSchema(GetFetcherConfigJsonSchemaRequest) returns (GetFetcherConfigJsonSchemaReply) {}

/*
Save a pipes iterator to the iterator store.
*/
rpc SavePipesIterator(SavePipesIteratorRequest) returns (SavePipesIteratorReply) {}
/*
Get a pipes iterator's data from the iterator store.
*/
rpc GetPipesIterator(GetPipesIteratorRequest) returns (GetPipesIteratorReply) {}
/*
Delete a pipes iterator from the iterator store.
*/
rpc DeletePipesIterator(DeletePipesIteratorRequest) returns (DeletePipesIteratorReply) {}
}

message SaveFetcherRequest {
Expand Down Expand Up @@ -143,3 +156,43 @@ message GetFetcherConfigJsonSchemaReply {
// The json schema that describes the fetcher config in string format.
string fetcher_config_json_schema = 1;
}

// ========== PipesIterator Messages ==========

message SavePipesIteratorRequest {
// A unique identifier for each pipes iterator. If this already exists, operation will overwrite existing.
string iterator_id = 1;
// The full java class name of the pipes iterator class.
string iterator_class = 2;
// JSON string of the pipes iterator config object.
string iterator_config_json = 3;
}

message SavePipesIteratorReply {
// Status message
string message = 1;
}

message GetPipesIteratorRequest {
// The pipes iterator ID to retrieve
string iterator_id = 1;
}

message GetPipesIteratorReply {
// The pipes iterator ID
string iterator_id = 1;
// The full java class name of the pipes iterator
string iterator_class = 2;
// JSON string of the pipes iterator config object
string iterator_config_json = 3;
}

message DeletePipesIteratorRequest {
// The pipes iterator ID to delete
string iterator_id = 1;
}

message DeletePipesIteratorReply {
// Status message
string message = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,15 @@ public void testFetcherCrud(Resources resources) throws Exception {
assertEquals(FileSystemFetcher.class.getName(), getFetcherReply.getFetcherClass());
}

// delete fetchers - note: delete is not currently supported
// delete fetchers
for (int i = 0; i < NUM_FETCHERS_TO_CREATE; ++i) {
String fetcherId = createFetcherId(i);
DeleteFetcherReply deleteFetcherReply = blockingStub.deleteFetcher(DeleteFetcherRequest
.newBuilder()
.setFetcherId(fetcherId)
.build());
// Delete is not supported, so this will return false
Assertions.assertFalse(deleteFetcherReply.getSuccess());
// Delete is now supported and should succeed
Assertions.assertTrue(deleteFetcherReply.getSuccess());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,47 @@ public synchronized void saveComponent(ExtensionConfig config) throws TikaConfig
configStore.put(componentId, config);
}

/**
* Deletes a component configuration by ID.
* Clears the cached instance and removes the configuration.
*
* @param componentId the component ID to delete
* @throws TikaConfigException if runtime modifications are not allowed or component not found
*/
public synchronized void deleteComponent(String componentId) throws TikaConfigException {
if (!allowRuntimeModifications) {
throw new TikaConfigException(
"Runtime modifications are not allowed. " + getClass().getSimpleName() +
" must be loaded with allowRuntimeModifications=true to use delete" +
getComponentName().substring(0, 1).toUpperCase(Locale.ROOT) + getComponentName().substring(1) + "()");
}

if (componentId == null) {
throw new IllegalArgumentException("Component ID cannot be null");
}

if (!configStore.containsKey(componentId)) {
throw new TikaConfigException(
getComponentName().substring(0, 1).toUpperCase(Locale.ROOT) +
getComponentName().substring(1) + " with ID '" + componentId + "' not found");
}

// Clear cache and remove config
componentCache.remove(componentId);
configStore.remove(componentId);
LOG.debug("Deleted {} config: id={}", getComponentName(), componentId);
}

/**
* Gets the configuration for a specific component by ID.
*
* @param componentId the component ID
* @return the component configuration, or null if not found
*/
public ExtensionConfig getComponentConfig(String componentId) {
return configStore.get(componentId);
}

/**
* Returns the set of supported component IDs.
*/
Expand Down
Loading
Loading