Skip to content

Conversation

@nddipiazza
Copy link
Contributor

Summary

Adds public API to PipesClient for dynamically managing fetchers and emitters at runtime through PipesServer's ConfigStore.

Changes

Protocol Enhancements

  • Extended PipesClient.COMMANDS enum with 8 new commands:
    • SAVE_FETCHER, DELETE_FETCHER, LIST_FETCHERS, GET_FETCHER
    • SAVE_EMITTER, DELETE_EMITTER, LIST_EMITTERS, GET_EMITTER

PipesClient API

Added public methods:

  • saveFetcher(ExtensionConfig) - Create/update fetcher
  • deleteFetcher(String) - Remove fetcher
  • listFetchers() - List all fetcher IDs
  • getFetcherConfig(String) - Get fetcher configuration
  • Same methods for emitters

PipesServer Handlers

Implemented request handlers for all 8 commands with proper error handling and serialization.

Core Infrastructure

  • Added deleteComponent() and getComponentConfig() to AbstractComponentManager
  • Added wrapper methods to FetcherManager and EmitterManager
  • Added remove() method to ConfigStore interface and implementations

Benefits

  • Users can add/modify fetchers and emitters without restarting
  • Supports multi-tenant scenarios with isolated configs
  • Enables programmatic configuration
  • Maintains backwards compatibility with static config

Testing

  • All existing tests pass
  • No breaking changes
  • Ready for integration testing

JIRA

https://issues.apache.org/jira/browse/TIKA-4595

- Added SAVE_FETCHER, DELETE_FETCHER, LIST_FETCHERS, GET_FETCHER commands
- Added SAVE_EMITTER, DELETE_EMITTER, LIST_EMITTERS, GET_EMITTER commands
- Implemented PipesClient public API methods for runtime configuration
- Implemented PipesServer command handlers
- Added deleteComponent() and getComponentConfig() to AbstractComponentManager
- Added wrapper methods to FetcherManager and EmitterManager
- Added remove() method to ConfigStore interface and implementations
- All tests passing
- saveFetcher now calls both fetcherManager.saveFetcher() and pipesClient.saveFetcher()
- This ensures fetchers are available in the forked PipesServer process
- Implemented deleteFetcher to call both managers as well
- Fixes FetcherNotFoundException when using dynamic fetchers via gRPC

The issue was that fetchers saved via gRPC were only stored in the gRPC
server's FetcherManager, but when pipesClient.process() forks a new
PipesServer process, that process has its own FetcherManager and doesn't
have access to the dynamically created fetchers. Now both are updated.
@nddipiazza
Copy link
Contributor Author

🔧 Critical Fix Added

Fixed a bug where dynamically created fetchers via gRPC were not available in the forked PipesServer process.

The Problem

  • saveFetcher was only saving to the gRPC server's local FetcherManager
  • When pipesClient.process() forks a new PipesServer, that process has its own FetcherManager
  • Result: FetcherNotFoundException: Can't find fetcher for id=defaultFetcher

The Solution

  • saveFetcher now calls both fetcherManager.saveFetcher() AND pipesClient.saveFetcher()
  • This propagates the fetcher to the forked PipesServer via the socket protocol
  • Also implemented deleteFetcher properly (was previously a no-op)

Testing

The e2e test should now pass - fetchers created via gRPC will be available for document processing.

@nddipiazza nddipiazza closed this Dec 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant