Skip to content

Conversation

@nddipiazza
Copy link
Contributor

@nddipiazza nddipiazza commented Dec 28, 2025

Overview

This PR implements dynamic fetcher, emitter, and pipe iterator management for Apache Tika Pipes through a new ConfigStore abstraction, enabling runtime configuration changes without restarts.

Key Features

1. Dynamic Configuration Management API

  • New Methods in PipesClient:
    • saveFetcher(FetcherConfig) - Save fetcher at runtime
    • deleteFetcher(String fetcherId) - Remove fetcher
    • updateFetcher(String fetcherId, byte[] config) - Update existing fetcher
    • saveEmitter(EmitterConfig) - Save emitter at runtime
    • deleteEmitter(String emitterId) - Remove emitter
    • updateEmitter(String emitterId, byte[] config) - Update existing emitter
    • savePipeIterator(PipeIteratorConfig) - Save pipe iterator at runtime
    • deletePipeIterator(String iteratorId) - Remove pipe iterator
    • updatePipeIterator(String iteratorId, byte[] config) - Update existing pipe iterator

2. ConfigStore Abstraction

Built-in Implementations:

  • InMemoryConfigStore - Fast, ephemeral (default)
  • FileBasedConfigStore - JSON persistence, cross-JVM sharing (new)
  • IgniteConfigStore - Distributed cache for multi-instance (enhanced)

Storage Support:

  • Fetchers
  • Emitters
  • Pipe Iterators

3. Cross-JVM Configuration Sharing

  • PipesClient → gRPC Server → forked PipesServer
  • All components saved via gRPC are available to all workers
  • Configuration survives process restarts (file/Ignite modes)

4. gRPC API Integration

  • SaveFetcher/DeleteFetcher/UpdateFetcher RPC endpoints
  • SaveEmitter/DeleteEmitter/UpdateEmitter RPC endpoints
  • SavePipeIterator/DeletePipeIterator/UpdatePipeIterator RPC endpoints
  • Fully integrated with TikaGrpcServer

Implementation Details

FileBasedConfigStore

  • Thread-safe JSON file persistence
  • Location: configurable via path parameter
  • Atomic writes with temp file + rename
  • Automatic initialization from tika-config.json
  • Stores all component types in single JSON structure

IgniteConfigStore Enhancements

  • Embedded server architecture (no external Ignite needed)
  • Client-only mode for workers
  • Configurable cache mode (REPLICATED/PARTITIONED)
  • JVM arguments for Java module access
  • Separate caches per component type

Configuration Examples

File-Based:

{
  "pipes": {
    "configStoreType": "file",
    "configStoreParams": "{\"path\": \"/tmp/tika-config-store.json\"}"
  }
}

Ignite:

{
  "pipes": {
    "configStoreType": "ignite",
    "configStoreParams": "{\"cacheName\": \"tika-config\", \"cacheMode\": \"REPLICATED\"}",
    "forkedJvmArgs": [
      "--add-opens=java.base/java.nio=ALL-UNNAMED",
      "--add-opens=java.base/java.util=ALL-UNNAMED",
      "--add-opens=java.base/java.lang=ALL-UNNAMED",
      "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
      "-DIGNITE_ENABLE_OBJECT_INPUT_FILTER_AUTOCONFIGURATION=false"
    ]
  }
}

Testing

E2E Tests Added

  • FileSystemFetcherTest - File-based ConfigStore with dynamic fetcher management ✅
  • IgniteConfigStoreTest - Ignite ConfigStore with embedded server ✅
  • Document limit feature: -Dcorpa.numdocs=N

Both tests verify:

  • Dynamic component creation via gRPC
  • Cross-JVM config propagation
  • Successful document processing
  • Proper cleanup

Backward Compatibility

Fully backward compatible

  • Default behavior unchanged (InMemoryConfigStore)
  • Existing configs work without modification
  • Optional feature - enabled via configStoreType

Related Issues

Fixes: TIKA-4595

Migration Guide

For users wanting dynamic component management:

  1. File-based (recommended for single-instance):

    • Add configStoreType: file to pipes config
    • Components persist across restarts
  2. Ignite (for multi-instance/distributed):

    • Add configStoreType: ignite
    • Add required JVM arguments
    • Ideal for Kubernetes/multi-pod deployments

Files Changed

  • Core: PipesClient, PipesServer, ConfigStore abstraction
  • gRPC: TikaGrpcServerImpl with new RPCs for all component types
  • Stores: FileBasedConfigStore, IgniteConfigStore enhancements
  • Plugins: ExtensionConfig made Serializable
  • Managers: FetcherManager, EmitterManager, PipeIteratorManager enhanced
  • Tests: E2E tests with document limit feature

Performance Impact

  • Minimal overhead (lazy initialization)
  • File-based: ~1-2ms per save operation
  • Ignite: sub-millisecond after warm-up
  • No impact on existing in-memory mode

- Added SAVE_FETCHER, DELETE_FETCHER, LIST_FETCHERS, GET_FETCHER commands
- Added SAVE_EMITTER, DELETE_EMITTER, LIST_EMITTERS, GET_EMITTER commands
- Implemented PipesClient public API methods for runtime configuration
- Implemented PipesServer command handlers
- Added deleteComponent() and getComponentConfig() to AbstractComponentManager
- Added wrapper methods to FetcherManager and EmitterManager
- Added remove() method to ConfigStore interface and implementations
- All tests passing
- saveFetcher now calls both fetcherManager.saveFetcher() and pipesClient.saveFetcher()
- This ensures fetchers are available in the forked PipesServer process
- Implemented deleteFetcher to call both managers as well
- Fixes FetcherNotFoundException when using dynamic fetchers via gRPC

The issue was that fetchers saved via gRPC were only stored in the gRPC
server's FetcherManager, but when pipesClient.process() forks a new
PipesServer process, that process has its own FetcherManager and doesn't
have access to the dynamically created fetchers. Now both are updated.
…rver

- Created FileBasedConfigStore that persists to JSON file
- Created FileBasedConfigStoreFactory with @extension annotation
- Updated PipesServer.initializeResources() to create and use ConfigStore
- Both gRPC server and forked PipesServer can now share fetcher configs via file

This enables dynamic fetcher management across JVM processes:
1. gRPC saves fetcher → writes to config file
2. PipesServer starts → reads from same file
3. Both JVMs share the same fetcher configuration
- Added direct handling for 'file' type in ConfigStoreFactory.createConfigStore()
- File-based store is in core, not a plugin, so needs special handling
- Avoids ClassNotFoundException when trying to load 'file' as a class name
- Also added remove() method to IgniteConfigStore for interface compliance
ExtensionConfig is sent over sockets between PipesClient and PipesServer,
so it needs to implement Serializable. Records can implement Serializable
and all fields (String, String, String) are already serializable.

Fixes NotSerializableException when calling saveFetcher via gRPC.
- Created IgniteStoreServer class that runs as embedded server
- TikaGrpcServer starts Ignite server on startup (if ignite ConfigStore configured)
- IgniteConfigStore now acts as client-only (clientMode=true)
- No external Ignite dependency needed in Docker/Kubernetes
- Server runs in background daemon thread within tika-grpc process
- Clients (gRPC + forked PipesServer) connect to embedded server

Architecture:
  ┌─────────────────────────────────┐
  │      tika-grpc Process         │
  │  ┌──────────────────────────┐  │
  │  │  IgniteStoreServer       │  │
  │  │  (server mode, daemon)   │  │
  │  └────────▲─────────────────┘  │
  │           │                     │
  │  ┌────────┴─────────────────┐  │
  │  │ IgniteConfigStore        │  │
  │  │ (client mode)            │  │
  │  └──────────────────────────┘  │
  └─────────────────────────────────┘
           ▲
           │ (client connection)
           │
  ┌────────┴─────────────────┐
  │  PipesServer (forked)    │
  │  IgniteConfigStore       │
  │  (client mode)           │
  └──────────────────────────┘
- Set workDirectory to /tmp/ignite-work in IgniteStoreServer
- Set workDirectory to /tmp/ignite-work in IgniteConfigStore
- Avoids 'Work directory does not exist and cannot be created: /work' error
- Uses system property ignite.work.dir if set, defaults to /tmp/ignite-work
- Ensures Ignite can write to work directory in Docker containers
- Changed from /tmp/ignite-work to /var/cache/tika/ignite-work
- Aligns with Tika's standard cache location
- /var/cache/tika is already used for plugins and other Tika cache data
- Find Ignite plugin's classloader from plugin manager
- Load IgniteStoreServer and CacheMode using plugin classloader
- Fixes NoClassDefFoundError for H2 classes
- Ensures all Ignite dependencies (including H2) are available
- Plugin classloader has all dependencies from lib/ directory
- Set setPeerClassLoadingEnabled(true) in IgniteConfigStore
- Must match server configuration
- Fixes: Remote node has peer class loading enabled flag different from local
- Both server and client now have peerClassLoading=true
- Set setPeerClassLoadingEnabled(false) on both server and client
- Fixes ClassCastException due to class loaded by different classloaders
- Server uses plugin classloader, client uses app classloader
- Peer class loading causes the same class to be in both, creating conflicts
- We don't need peer class loading for our use case
- Made tika-pipes-ignite a required (non-optional) dependency of tika-grpc
- Added ignite.version and h2.version properties
- Removed reflection-based classloader lookup
- Direct instantiation of IgniteStoreServer
- Avoids all PF4J plugin classloader issues
- Ignite classes now on main classpath
- Set IGNITE_ENABLE_OBJECT_INPUT_FILTER_AUTOCONFIGURATION=false
- Fixes: Failed to autoconfigure Ignite Object Input Filter
- Ignite was conflicting with existing serialization filter
- Apply in both IgniteStoreServer and IgniteConfigStore
- Handle 'ignite' type directly in ConfigStoreFactory
- Load IgniteConfigStoreFactory via reflection
- Works in forked PipesServer without plugin system
- Matches pattern used for 'file' type
- Fixes: ClassNotFoundException: ignite in forked process
@nddipiazza nddipiazza merged commit 34b60d6 into main Dec 29, 2025
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant