Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,12 @@ public void saveFetcher(SaveFetcherRequest request,
try {
String factoryName = findFactoryNameForClass(request.getFetcherClass());
ExtensionConfig config = new ExtensionConfig(request.getFetcherId(), factoryName, request.getFetcherConfigJson());

// Save to gRPC server's fetcher manager (for schema queries, etc.)
fetcherManager.saveFetcher(config);

// Also save to PipesClient so it propagates to the forked PipesServer
pipesClient.saveFetcher(config);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -331,7 +336,18 @@ public void getFetcherConfigJsonSchema(GetFetcherConfigJsonSchemaRequest request
}

private boolean deleteFetcher(String id) {
LOG.warn("Deleting fetchers is not supported in the current implementation");
return false;
try {
// Delete from gRPC server's fetcher manager
fetcherManager.deleteFetcher(id);

// Also delete from PipesClient so it propagates to the forked PipesServer
pipesClient.deleteFetcher(id);

LOG.info("Successfully deleted fetcher: {}", id);
return true;
} catch (Exception e) {
LOG.error("Failed to delete fetcher: {}", id, e);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,47 @@ public synchronized void saveComponent(ExtensionConfig config) throws TikaConfig
configStore.put(componentId, config);
}

/**
* Deletes a component configuration by ID.
* Clears the cached instance and removes the configuration.
*
* @param componentId the component ID to delete
* @throws TikaConfigException if runtime modifications are not allowed or component not found
*/
public synchronized void deleteComponent(String componentId) throws TikaConfigException {
if (!allowRuntimeModifications) {
throw new TikaConfigException(
"Runtime modifications are not allowed. " + getClass().getSimpleName() +
" must be loaded with allowRuntimeModifications=true to use delete" +
getComponentName().substring(0, 1).toUpperCase(Locale.ROOT) + getComponentName().substring(1) + "()");
}

if (componentId == null) {
throw new IllegalArgumentException("Component ID cannot be null");
}

if (!configStore.containsKey(componentId)) {
throw new TikaConfigException(
getComponentName().substring(0, 1).toUpperCase(Locale.ROOT) +
getComponentName().substring(1) + " with ID '" + componentId + "' not found");
}

// Clear cache and remove config
componentCache.remove(componentId);
configStore.remove(componentId);
LOG.debug("Deleted {} config: id={}", getComponentName(), componentId);
}

/**
* Gets the configuration for a specific component by ID.
*
* @param componentId the component ID
* @return the component configuration, or null if not found
*/
public ExtensionConfig getComponentConfig(String componentId) {
return configStore.get(componentId);
}

/**
* Returns the set of supported component IDs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.HexFormat;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -55,6 +57,7 @@
import org.slf4j.LoggerFactory;

import org.apache.tika.config.TikaTaskTimeout;
import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.parser.ParseContext;
Expand All @@ -64,6 +67,7 @@
import org.apache.tika.pipes.core.emitter.EmitDataImpl;
import org.apache.tika.pipes.core.server.IntermediateResult;
import org.apache.tika.pipes.core.server.PipesServer;
import org.apache.tika.plugins.ExtensionConfig;
import org.apache.tika.utils.ExceptionUtils;
import org.apache.tika.utils.ProcessUtils;
import org.apache.tika.utils.StringUtils;
Expand All @@ -77,7 +81,9 @@
public class PipesClient implements Closeable {

public enum COMMANDS {
PING, ACK, NEW_REQUEST, SHUT_DOWN;
PING, ACK, NEW_REQUEST, SHUT_DOWN,
SAVE_FETCHER, DELETE_FETCHER, LIST_FETCHERS, GET_FETCHER,
SAVE_EMITTER, DELETE_EMITTER, LIST_EMITTERS, GET_EMITTER;

public byte getByte() {
return (byte) (ordinal() + 1);
Expand Down Expand Up @@ -602,4 +608,277 @@ public static long getTimeoutMillis(PipesConfig pipesConfig, ParseContext parseC
return tikaTaskTimeout.getTimeoutMillis();
}

// ========== Fetcher Management API ==========

/**
* Save (create or update) a fetcher configuration.
* The fetcher will be available immediately for use in subsequent fetch operations.
*
* @param config the fetcher configuration containing name, plugin ID, and parameters
* @throws IOException if communication with the server fails
* @throws TikaException if the server returns an error (e.g., invalid configuration)
* @throws InterruptedException if the operation is interrupted
*/
public void saveFetcher(ExtensionConfig config) throws IOException, TikaException, InterruptedException {
maybeInit();
synchronized (lock) {
serverTuple.output.write(COMMANDS.SAVE_FETCHER.getByte());
serverTuple.output.flush();

// Serialize the ExtensionConfig
UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get();
try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(config);
}
byte[] bytes = bos.toByteArray();
serverTuple.output.writeInt(bytes.length);
serverTuple.output.write(bytes);
serverTuple.output.flush();

// Read response
byte status = serverTuple.input.readByte();
int msgLen = serverTuple.input.readInt();
byte[] msgBytes = new byte[msgLen];
serverTuple.input.readFully(msgBytes);
String message = new String(msgBytes, StandardCharsets.UTF_8);

if (status != 0) { // 0 = success, 1 = error
throw new TikaException("Failed to save fetcher: " + message);
}
LOG.debug("pipesClientId={}: saved fetcher '{}'", pipesClientId, config.id());
}
}

/**
* Delete a fetcher by its name/ID.
*
* @param fetcherId the fetcher name/ID to delete
* @throws IOException if communication with the server fails
* @throws TikaException if the server returns an error (e.g., fetcher not found)
* @throws InterruptedException if the operation is interrupted
*/
public void deleteFetcher(String fetcherId) throws IOException, TikaException, InterruptedException {
maybeInit();
synchronized (lock) {
serverTuple.output.write(COMMANDS.DELETE_FETCHER.getByte());
serverTuple.output.flush();

byte[] idBytes = fetcherId.getBytes(StandardCharsets.UTF_8);
serverTuple.output.writeInt(idBytes.length);
serverTuple.output.write(idBytes);
serverTuple.output.flush();

// Read response
byte status = serverTuple.input.readByte();
int msgLen = serverTuple.input.readInt();
byte[] msgBytes = new byte[msgLen];
serverTuple.input.readFully(msgBytes);
String message = new String(msgBytes, StandardCharsets.UTF_8);

if (status != 0) {
throw new TikaException("Failed to delete fetcher: " + message);
}
LOG.debug("pipesClientId={}: deleted fetcher '{}'", pipesClientId, fetcherId);
}
}

/**
* List all available fetcher IDs (both static from config and dynamically added).
*
* @return set of fetcher IDs
* @throws IOException if communication with the server fails
* @throws InterruptedException if the operation is interrupted
*/
public Set<String> listFetchers() throws IOException, InterruptedException {
maybeInit();
synchronized (lock) {
serverTuple.output.write(COMMANDS.LIST_FETCHERS.getByte());
serverTuple.output.flush();

// Read response
int count = serverTuple.input.readInt();
Set<String> fetcherIds = new HashSet<>(count);
for (int i = 0; i < count; i++) {
int len = serverTuple.input.readInt();
byte[] bytes = new byte[len];
serverTuple.input.readFully(bytes);
fetcherIds.add(new String(bytes, StandardCharsets.UTF_8));
}
LOG.debug("pipesClientId={}: listed {} fetchers", pipesClientId, count);
return fetcherIds;
}
}

/**
* Get the configuration for a specific fetcher.
*
* @param fetcherId the fetcher ID
* @return the fetcher configuration, or null if not found
* @throws IOException if communication with the server fails
* @throws InterruptedException if the operation is interrupted
*/
public ExtensionConfig getFetcherConfig(String fetcherId) throws IOException, InterruptedException {
maybeInit();
synchronized (lock) {
serverTuple.output.write(COMMANDS.GET_FETCHER.getByte());
serverTuple.output.flush();

byte[] idBytes = fetcherId.getBytes(StandardCharsets.UTF_8);
serverTuple.output.writeInt(idBytes.length);
serverTuple.output.write(idBytes);
serverTuple.output.flush();

// Read response
byte found = serverTuple.input.readByte();
if (found == 0) {
return null;
}

int len = serverTuple.input.readInt();
byte[] bytes = new byte[len];
serverTuple.input.readFully(bytes);

try (ObjectInputStream ois = new ObjectInputStream(new UnsynchronizedByteArrayInputStream(bytes))) {
return (ExtensionConfig) ois.readObject();
} catch (ClassNotFoundException e) {
throw new IOException("Failed to deserialize ExtensionConfig", e);
}
}
}

// ========== Emitter Management API ==========

/**
* Save (create or update) an emitter configuration.
* The emitter will be available immediately for use in subsequent emit operations.
*
* @param config the emitter configuration containing name, plugin ID, and parameters
* @throws IOException if communication with the server fails
* @throws TikaException if the server returns an error (e.g., invalid configuration)
* @throws InterruptedException if the operation is interrupted
*/
public void saveEmitter(ExtensionConfig config) throws IOException, TikaException, InterruptedException {
maybeInit();
synchronized (lock) {
serverTuple.output.write(COMMANDS.SAVE_EMITTER.getByte());
serverTuple.output.flush();

UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get();
try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(config);
}
byte[] bytes = bos.toByteArray();
serverTuple.output.writeInt(bytes.length);
serverTuple.output.write(bytes);
serverTuple.output.flush();

byte status = serverTuple.input.readByte();
int msgLen = serverTuple.input.readInt();
byte[] msgBytes = new byte[msgLen];
serverTuple.input.readFully(msgBytes);
String message = new String(msgBytes, StandardCharsets.UTF_8);

if (status != 0) {
throw new TikaException("Failed to save emitter: " + message);
}
LOG.debug("pipesClientId={}: saved emitter '{}'", pipesClientId, config.id());
}
}

/**
* Delete an emitter by its name/ID.
*
* @param emitterId the emitter name/ID to delete
* @throws IOException if communication with the server fails
* @throws TikaException if the server returns an error (e.g., emitter not found)
* @throws InterruptedException if the operation is interrupted
*/
public void deleteEmitter(String emitterId) throws IOException, TikaException, InterruptedException {
maybeInit();
synchronized (lock) {
serverTuple.output.write(COMMANDS.DELETE_EMITTER.getByte());
serverTuple.output.flush();

byte[] idBytes = emitterId.getBytes(StandardCharsets.UTF_8);
serverTuple.output.writeInt(idBytes.length);
serverTuple.output.write(idBytes);
serverTuple.output.flush();

byte status = serverTuple.input.readByte();
int msgLen = serverTuple.input.readInt();
byte[] msgBytes = new byte[msgLen];
serverTuple.input.readFully(msgBytes);
String message = new String(msgBytes, StandardCharsets.UTF_8);

if (status != 0) {
throw new TikaException("Failed to delete emitter: " + message);
}
LOG.debug("pipesClientId={}: deleted emitter '{}'", pipesClientId, emitterId);
}
}

/**
* List all available emitter IDs (both static from config and dynamically added).
*
* @return set of emitter IDs
* @throws IOException if communication with the server fails
* @throws InterruptedException if the operation is interrupted
*/
public Set<String> listEmitters() throws IOException, InterruptedException {
maybeInit();
synchronized (lock) {
serverTuple.output.write(COMMANDS.LIST_EMITTERS.getByte());
serverTuple.output.flush();

int count = serverTuple.input.readInt();
Set<String> emitterIds = new HashSet<>(count);
for (int i = 0; i < count; i++) {
int len = serverTuple.input.readInt();
byte[] bytes = new byte[len];
serverTuple.input.readFully(bytes);
emitterIds.add(new String(bytes, StandardCharsets.UTF_8));
}
LOG.debug("pipesClientId={}: listed {} emitters", pipesClientId, count);
return emitterIds;
}
}

/**
* Get the configuration for a specific emitter.
*
* @param emitterId the emitter ID
* @return the emitter configuration, or null if not found
* @throws IOException if communication with the server fails
* @throws InterruptedException if the operation is interrupted
*/
public ExtensionConfig getEmitterConfig(String emitterId) throws IOException, InterruptedException {
maybeInit();
synchronized (lock) {
serverTuple.output.write(COMMANDS.GET_EMITTER.getByte());
serverTuple.output.flush();

byte[] idBytes = emitterId.getBytes(StandardCharsets.UTF_8);
serverTuple.output.writeInt(idBytes.length);
serverTuple.output.write(idBytes);
serverTuple.output.flush();

byte found = serverTuple.input.readByte();
if (found == 0) {
return null;
}

int len = serverTuple.input.readInt();
byte[] bytes = new byte[len];
serverTuple.input.readFully(bytes);

try (ObjectInputStream ois = new ObjectInputStream(new UnsynchronizedByteArrayInputStream(bytes))) {
return (ExtensionConfig) ois.readObject();
} catch (ClassNotFoundException e) {
throw new IOException("Failed to deserialize ExtensionConfig", e);
}
}
}

private final Object[] lock = new Object[0];

}
Loading
Loading