Skip to content

Commit e113b93

Browse files
authored
TIKA-4575 -- fix race condition with unzipping plugins (#2458)
Generated-by: Claude Opus 4.5 (model ID: claude-opus-4-5-20251101)
1 parent 36c7d49 commit e113b93

File tree

10 files changed

+253
-67
lines changed

10 files changed

+253
-67
lines changed

tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/TikaAsyncCLI.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ private static BasicContentHandlerFactory.HANDLER_TYPE getHandlerType(String t)
259259

260260
private static void processWithTikaConfig(PipesIterator pipesIterator, Path tikaConfigPath, SimpleAsyncConfig asyncConfig) throws Exception {
261261
long start = System.currentTimeMillis();
262-
try (AsyncProcessor processor = new AsyncProcessor(tikaConfigPath, pipesIterator)) {
262+
try (AsyncProcessor processor = AsyncProcessor.load(tikaConfigPath, pipesIterator)) {
263263

264264
for (FetchEmitTuple t : pipesIterator) {
265265
configureExtractBytes(t, asyncConfig);

tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void setUp() throws Exception {
114114
public void testRecursiveUnpacking() throws Exception {
115115
// TikaAsyncCLI cli = new TikaAsyncCLI();
116116
// cli.main(new String[]{ configDir.resolve("tika-config.xml").toAbsolutePath().toString()});
117-
AsyncProcessor processor = new AsyncProcessor(configDir.resolve("tika-config.json"));
117+
AsyncProcessor processor = AsyncProcessor.load(configDir.resolve("tika-config.json"));
118118

119119
EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig = new EmbeddedDocumentBytesConfig(true);
120120
embeddedDocumentBytesConfig.setIncludeOriginal(true);
@@ -163,7 +163,7 @@ public void testRecursiveUnpacking() throws Exception {
163163
public void testStopsOnApplicationError() throws Exception {
164164
// Test that AsyncProcessor stops processing when an application error occurs
165165
// (TIKA-4570)
166-
AsyncProcessor processor = new AsyncProcessor(configDir.resolve("tika-config.json"));
166+
AsyncProcessor processor = AsyncProcessor.load(configDir.resolve("tika-config.json"));
167167

168168
// Create a tuple with a non-existent fetcher - this will cause FETCHER_NOT_FOUND
169169
// which is a TASK_EXCEPTION but will stop processing in CLI mode (default)

tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,59 @@
2424
import java.util.concurrent.ArrayBlockingQueue;
2525
import java.util.concurrent.TimeUnit;
2626

27+
import org.apache.tika.config.loader.TikaJsonConfig;
28+
import org.apache.tika.exception.TikaConfigException;
2729
import org.apache.tika.pipes.api.FetchEmitTuple;
2830
import org.apache.tika.pipes.api.PipesResult;
31+
import org.apache.tika.plugins.TikaPluginManager;
2932

3033
public class PipesParser implements Closeable {
3134

35+
/**
36+
* Loads a PipesParser from a configuration file path.
37+
* <p>
38+
* This method:
39+
* <ol>
40+
* <li>Loads the JSON configuration</li>
41+
* <li>Pre-extracts plugins before spawning child processes</li>
42+
* <li>Creates the PipesParser with the loaded configuration</li>
43+
* </ol>
44+
*
45+
* @param tikaConfigPath path to the tika-config.json file
46+
* @return a new PipesParser instance
47+
* @throws IOException if reading config or extraction fails
48+
* @throws TikaConfigException if configuration is invalid
49+
*/
50+
public static PipesParser load(Path tikaConfigPath) throws IOException, TikaConfigException {
51+
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
52+
PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
53+
return load(tikaJsonConfig, pipesConfig, tikaConfigPath);
54+
}
55+
56+
/**
57+
* Loads a PipesParser from pre-loaded configuration objects.
58+
* <p>
59+
* Use this method when you need to modify the PipesConfig before creating
60+
* the parser (e.g., to override emit strategy).
61+
*
62+
* @param tikaJsonConfig the pre-loaded JSON configuration
63+
* @param pipesConfig the pipes configuration (may be modified by caller)
64+
* @param tikaConfigPath path to the config file (passed to child processes)
65+
* @return a new PipesParser instance
66+
* @throws IOException if plugin extraction fails
67+
*/
68+
public static PipesParser load(TikaJsonConfig tikaJsonConfig, PipesConfig pipesConfig,
69+
Path tikaConfigPath) throws IOException {
70+
TikaPluginManager.preExtractPlugins(tikaJsonConfig);
71+
return new PipesParser(pipesConfig, tikaConfigPath);
72+
}
3273

3374
private final PipesConfig pipesConfig;
3475
private final Path tikaConfigPath;
3576
private final List<PipesClient> clients = new ArrayList<>();
36-
private final ArrayBlockingQueue<PipesClient> clientQueue ;
37-
77+
private final ArrayBlockingQueue<PipesClient> clientQueue;
3878

39-
public PipesParser(PipesConfig pipesConfig, Path tikaConfigPath) {
79+
private PipesParser(PipesConfig pipesConfig, Path tikaConfigPath) {
4080
this.pipesConfig = pipesConfig;
4181
this.tikaConfigPath = tikaConfigPath;
4282
this.clientQueue = new ArrayBlockingQueue<>(pipesConfig.getNumClients());

tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,42 @@ public class AsyncProcessor implements Closeable {
7979
private boolean addedEmitterSemaphores = false;
8080
boolean isShuttingDown = false;
8181

82-
public AsyncProcessor(Path tikaConfigPath) throws TikaException, IOException {
83-
this(tikaConfigPath, null);
82+
/**
83+
* Loads an AsyncProcessor from a configuration file path.
84+
* <p>
85+
* This method pre-extracts plugins before loading, ensuring child processes
86+
* don't race to extract the same plugins.
87+
*
88+
* @param tikaConfigPath path to the tika-config.json file
89+
* @return a new AsyncProcessor instance
90+
* @throws IOException if reading config or plugin extraction fails
91+
* @throws TikaException if configuration is invalid
92+
*/
93+
public static AsyncProcessor load(Path tikaConfigPath) throws TikaException, IOException {
94+
return load(tikaConfigPath, null);
8495
}
8596

86-
public AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator) throws TikaException, IOException {
97+
/**
98+
* Loads an AsyncProcessor from a configuration file path with a custom PipesIterator.
99+
* <p>
100+
* This method pre-extracts plugins before loading, ensuring child processes
101+
* don't race to extract the same plugins.
102+
*
103+
* @param tikaConfigPath path to the tika-config.json file
104+
* @param pipesIterator optional custom pipes iterator (may be null)
105+
* @return a new AsyncProcessor instance
106+
* @throws IOException if reading config or plugin extraction fails
107+
* @throws TikaException if configuration is invalid
108+
*/
109+
public static AsyncProcessor load(Path tikaConfigPath, PipesIterator pipesIterator)
110+
throws TikaException, IOException {
87111
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
112+
TikaPluginManager.preExtractPlugins(tikaJsonConfig);
113+
return new AsyncProcessor(tikaConfigPath, pipesIterator, tikaJsonConfig);
114+
}
115+
116+
private AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator,
117+
TikaJsonConfig tikaJsonConfig) throws TikaException, IOException {
88118
TikaPluginManager tikaPluginManager = TikaPluginManager.load(tikaJsonConfig);
89119
MetadataFilter metadataFilter = TikaLoader.load(tikaConfigPath).loadMetadataFilters();
90120
this.asyncConfig = PipesConfig.load(tikaJsonConfig);

tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.fasterxml.jackson.databind.ObjectMapper;
2828
import com.fasterxml.jackson.databind.SerializationFeature;
2929

30+
import org.apache.tika.exception.TikaConfigException;
3031
import org.apache.tika.exception.TikaException;
3132
import org.apache.tika.io.TikaInputStream;
3233
import org.apache.tika.metadata.Metadata;
@@ -119,8 +120,9 @@ public class PipesForkParser implements Closeable {
119120
* Creates a new PipesForkParser with default configuration.
120121
*
121122
* @throws IOException if the temporary config file cannot be created
123+
* @throws TikaConfigException if configuration is invalid
122124
*/
123-
public PipesForkParser() throws IOException {
125+
public PipesForkParser() throws IOException, TikaConfigException {
124126
this(new PipesForkParserConfig());
125127
}
126128

@@ -129,11 +131,12 @@ public PipesForkParser() throws IOException {
129131
*
130132
* @param config the configuration for this parser
131133
* @throws IOException if the temporary config file cannot be created
134+
* @throws TikaConfigException if configuration is invalid
132135
*/
133-
public PipesForkParser(PipesForkParserConfig config) throws IOException {
136+
public PipesForkParser(PipesForkParserConfig config) throws IOException, TikaConfigException {
134137
this.config = config;
135138
this.tikaConfigPath = createTikaConfigFile();
136-
this.pipesParser = new PipesParser(config.getPipesConfig(), tikaConfigPath);
139+
this.pipesParser = PipesParser.load(tikaConfigPath);
137140
}
138141

139142
/**

tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ private void writeLarge(Path resolve) throws IOException {
132132

133133
@Test
134134
public void testBasic(@TempDir Path tmpDir) throws Exception {
135-
AsyncProcessor processor = new AsyncProcessor(setUp(tmpDir, false));
135+
AsyncProcessor processor = AsyncProcessor.load(setUp(tmpDir, false));
136136
for (int i = 0; i < totalFiles; i++) {
137137
FetchEmitTuple t = new FetchEmitTuple("myId-" + i,
138138
new FetchKey(fetcherPluginId, i + ".xml"),
@@ -164,7 +164,7 @@ public void testBasic(@TempDir Path tmpDir) throws Exception {
164164

165165
@Test
166166
public void testEmitIntermediate(@TempDir Path tmpDir) throws Exception {
167-
AsyncProcessor processor = new AsyncProcessor(setUp(tmpDir, true));
167+
AsyncProcessor processor = AsyncProcessor.load(setUp(tmpDir, true));
168168
for (int i = 0; i < totalFiles; i++) {
169169
FetchEmitTuple t = new FetchEmitTuple("myId-" + i, new FetchKey(fetcherPluginId, i + ".xml"),
170170
new EmitKey(emitterPluginId, "emit-" + i), new Metadata());

tika-plugins-core/src/main/java/org/apache/tika/plugins/ThreadSafeUnzipper.java

Lines changed: 116 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,85 +16,150 @@
1616
*/
1717
package org.apache.tika.plugins;
1818

19-
import java.io.File;
2019
import java.io.IOException;
21-
import java.io.RandomAccessFile;
22-
import java.nio.channels.FileChannel;
23-
import java.nio.channels.FileLock;
20+
import java.nio.file.AtomicMoveNotSupportedException;
21+
import java.nio.file.DirectoryNotEmptyException;
22+
import java.nio.file.FileAlreadyExistsException;
2423
import java.nio.file.Files;
2524
import java.nio.file.Path;
26-
import java.util.ArrayList;
27-
import java.util.List;
25+
import java.nio.file.StandardCopyOption;
26+
import java.util.Comparator;
27+
import java.util.UUID;
28+
import java.util.stream.Stream;
2829

2930
import org.pf4j.util.Unzip;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233

34+
/**
35+
* Thread-safe and process-safe plugin unzipper using atomic rename.
36+
* <p>
37+
* This avoids file locking issues on Windows by using a simple strategy:
38+
* <ol>
39+
* <li>Check if destination directory exists with completion marker - if yes, already extracted</li>
40+
* <li>Extract to a temporary directory with a unique name</li>
41+
* <li>Create a completion marker file in the temp directory</li>
42+
* <li>Atomically rename temp dir to final destination</li>
43+
* <li>If rename fails (another process won), clean up temp dir</li>
44+
* </ol>
45+
* <p>
46+
* The completion marker ensures that even if atomic move is not supported,
47+
* other processes won't attempt to load a partially-moved directory.
48+
*/
3349
public class ThreadSafeUnzipper {
3450
private static final Logger LOG = LoggerFactory.getLogger(TikaPluginManager.class);
51+
private static final String COMPLETE_MARKER = ".tika-extraction-complete";
3552

36-
private static final long MAX_WAIT_MS = 60000;
37-
38-
public static synchronized void unzipPlugin(Path source) throws IOException {
39-
if (! source.getFileName().toString().endsWith(".zip")) {
53+
/**
54+
* Unzips a plugin zip file to a directory with the same name (minus .zip extension).
55+
* Safe for concurrent calls from multiple threads or processes. See
56+
* documentation at the head of this class for how it works.
57+
*
58+
* @param source path to the .zip file
59+
* @throws IOException if extraction fails
60+
*/
61+
public static void unzipPlugin(Path source) throws IOException {
62+
if (!source.getFileName().toString().endsWith(".zip")) {
4063
throw new IllegalArgumentException("source file name must end in '.zip'");
4164
}
42-
File lockFile = new File(source.toAbsolutePath() + ".lock");
43-
FileChannel fileChannel = null;
44-
FileLock fileLock = null;
45-
List<IOException> exceptions = new ArrayList<>();
65+
66+
Path destination = getDestination(source);
67+
68+
// Already extracted - check for both directory AND completion marker
69+
if (isExtractionComplete(destination)) {
70+
LOG.debug("{} is already extracted", source);
71+
return;
72+
}
73+
74+
// Extract to a unique temp directory
75+
Path tempDir = destination.resolveSibling(
76+
destination.getFileName() + ".tmp." + UUID.randomUUID());
77+
4678
try {
47-
fileChannel = new RandomAccessFile(lockFile, "rw").getChannel();
48-
LOG.debug("acquiring lock");
49-
fileLock = fileChannel.lock();
50-
LOG.debug("acquired lock");
51-
if (isExtracted(source)) {
52-
LOG.debug("{} is already extracted", source);
53-
return;
54-
}
55-
extract(source);
56-
} finally {
57-
if (fileLock != null && fileLock.isValid()) {
58-
try {
59-
fileLock.release();
60-
} catch (IOException e) {
61-
LOG.warn("failed to release the lock");
62-
exceptions.add(e);
63-
}
64-
}
65-
if (fileChannel != null) {
79+
LOG.debug("extracting {} to temp dir {}", source, tempDir);
80+
new Unzip(source.toFile(), tempDir.toFile()).extract();
81+
82+
// Create completion marker in temp dir before moving
83+
Files.createFile(tempDir.resolve(COMPLETE_MARKER));
84+
85+
// Atomically rename to final destination
86+
try {
87+
Files.move(tempDir, destination, StandardCopyOption.ATOMIC_MOVE);
88+
LOG.debug("successfully extracted {}", destination);
89+
} catch (FileAlreadyExistsException | DirectoryNotEmptyException e) {
90+
// Another process extracted it first - wait for completion marker
91+
LOG.debug("plugin already extracted by another process: {}", destination);
92+
waitForExtractionComplete(destination);
93+
} catch (AtomicMoveNotSupportedException e) {
94+
// Filesystem doesn't support atomic move, try regular move
6695
try {
67-
fileChannel.close();
68-
} catch (IOException e) {
69-
LOG.warn("failed to close the file channel");
70-
exceptions.add(e);
96+
Files.move(tempDir, destination);
97+
LOG.debug("successfully extracted {} (non-atomic)", destination);
98+
} catch (FileAlreadyExistsException | DirectoryNotEmptyException e2) {
99+
// Another process extracted it first - wait for completion marker
100+
LOG.debug("plugin already extracted by another process: {}", destination);
101+
waitForExtractionComplete(destination);
71102
}
72103
}
73-
boolean isDeleted = lockFile.delete();
74-
if (! isDeleted) {
75-
LOG.warn("failed to delete the lock file");
76-
exceptions.add(new IOException("failed to delete lock file: " + lockFile));
104+
} finally {
105+
// Clean up temp dir if it still exists (we lost the race or there was an error)
106+
if (Files.exists(tempDir)) {
107+
deleteRecursively(tempDir);
77108
}
78109
}
79-
if (! exceptions.isEmpty()) {
80-
throw exceptions.get(0);
81-
}
82110
}
83111

84-
private static void extract(Path source) throws IOException {
85-
Path destination = getDestination(source);
86-
Unzip unzip = new Unzip(source.toFile(), destination.toFile());
87-
unzip.extract();
112+
/**
113+
* Checks if extraction is complete by verifying both directory exists and completion marker is present.
114+
*/
115+
private static boolean isExtractionComplete(Path destination) {
116+
return Files.isDirectory(destination) && Files.exists(destination.resolve(COMPLETE_MARKER));
88117
}
89118

90-
private static boolean isExtracted(Path source) {
91-
Path destination = getDestination(source);
92-
return Files.isDirectory(destination);
119+
/**
120+
* Waits for extraction to complete by polling for the completion marker.
121+
* This is called when we detect another process is extracting.
122+
*/
123+
private static void waitForExtractionComplete(Path destination) throws IOException {
124+
long maxWaitMs = 60000; // 1 minute max wait
125+
long pollIntervalMs = 100;
126+
long waited = 0;
127+
128+
while (waited < maxWaitMs) {
129+
if (isExtractionComplete(destination)) {
130+
LOG.debug("extraction completed by another process: {}", destination);
131+
return;
132+
}
133+
try {
134+
Thread.sleep(pollIntervalMs);
135+
} catch (InterruptedException e) {
136+
Thread.currentThread().interrupt();
137+
throw new IOException("interrupted while waiting for extraction to complete", e);
138+
}
139+
waited += pollIntervalMs;
140+
}
141+
142+
throw new IOException("timed out waiting for extraction to complete: " + destination);
93143
}
94144

95145
private static Path getDestination(Path source) {
96146
String fName = source.getFileName().toString();
97147
fName = fName.substring(0, fName.length() - 4);
98148
return source.toAbsolutePath().getParent().resolve(fName);
99149
}
150+
151+
private static void deleteRecursively(Path path) {
152+
try (Stream<Path> walk = Files.walk(path)) {
153+
walk.sorted(Comparator.reverseOrder())
154+
.forEach(p -> {
155+
try {
156+
Files.delete(p);
157+
} catch (IOException e) {
158+
LOG.warn("failed to delete temp file: {}", p, e);
159+
}
160+
});
161+
} catch (IOException e) {
162+
LOG.warn("failed to clean up temp directory: {}", path, e);
163+
}
164+
}
100165
}

0 commit comments

Comments
 (0)