-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
When an application using a cache listener that writes to another Caffeine cache while running from a ForkJoinPool, it will eventually deadlock 100% of the time. This is very much an edge case, but it is very difficult to diagnose when it occurs. If your application includes a listener that uses a library that transitively writes a Caffeine cache, this can manifest as well.
The root cause appears to be that the EventDispatcher.awaitSynchronous() method checks for all pending Futures in a static ThreadLocal. When used outside of a ForkJoinPool, this is fine as the ThreadLocal will isolate any calling thread from others. But when used in a ForkJoinPool, the calling thread may be reused, which causes the listener to inadvertently block on its own Future.
I suspect changing the EventDispatcher.pending field to be non-static will skate around this issue. Here is a sample reproduction that will eventually always fail (though it may take a few hundred iterations to do so):
import javax.cache.Cache;
import javax.cache.Caching;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import java.io.Serializable;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.toList;
public class Test {
public static class FallbackCache {
private final Cache<String, String> fallbackCache;
public FallbackCache() {
this.fallbackCache = Caching.getCachingProvider().getCacheManager()
.createCache("fallbackCache", new MutableConfiguration<>());
}
public String get(String key) {
return fallbackCache.get(key);
}
public void put(String key, String value) {
fallbackCache.put(key, value);
}
}
public static class PrimaryCacheEntryListener<K, V> implements CacheEntryCreatedListener<K, V>, Serializable {
FallbackCache fallbackRepo;
public PrimaryCacheEntryListener(FallbackCache fallbackRepo) {
this.fallbackRepo = fallbackRepo;
}
@Override
public void onCreated(Iterable<CacheEntryEvent<? extends K, ? extends V>> iterable) throws CacheEntryListenerException {
for (CacheEntryEvent<? extends K, ? extends V> cacheEntryEvent : iterable) {
CacheEntryEvent<String, String> nextEvent = (CacheEntryEvent<String, String>) cacheEntryEvent;
fallbackRepo.put(nextEvent.getKey(), nextEvent.getValue());
}
}
}
public static class PrimaryCacheEntryEventFilter<K, V> implements CacheEntryEventFilter<K, V>, Serializable {
@Override
public boolean evaluate(CacheEntryEvent cacheEntryEvent) throws CacheEntryListenerException {
return true;
}
}
public static class DummyCacheLoader implements CacheLoader<String, String> {
private final HttpClient http = HttpClient.newHttpClient();
@Override
public String load(String key) throws CacheLoaderException {
try {
var resp = http.send(HttpRequest.newBuilder()
.uri(URI.create("https://google.com/"))
.GET()
.build(), HttpResponse.BodyHandlers.ofString());
return resp.body();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public Map<String, String> loadAll(Iterable<? extends String> keys) throws CacheLoaderException {
return StreamSupport.stream(keys.spliterator(), false).collect(Collectors.toMap(k -> (String) k, this::load));
}
}
public static void main(String[] args) throws Exception {
var pool = Executors.newWorkStealingPool(5);
var fallbackCache = new FallbackCache();
var listener = new PrimaryCacheEntryListener<>(fallbackCache);
var filter = new PrimaryCacheEntryEventFilter<>();
var cache = Caching.getCachingProvider().getCacheManager().createCache("primaryCache",
new MutableConfiguration<String, String>()
.addCacheEntryListenerConfiguration(
new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(listener),
FactoryBuilder.factoryOf(filter), true, true))
.setReadThrough(true)
.setCacheLoaderFactory(new FactoryBuilder.SingletonFactory<>(new DummyCacheLoader()))
.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(javax.cache.expiry.Duration.TEN_MINUTES)));
for (int i = 0; true; i++) {
var futures = IntStream.range(0, 5).mapToObj(c -> pool.submit(() -> {
cache.get("fake");
})).collect(toList());
for (var f : futures) {
f.get();
}
System.out.println("Iteration " + i);
cache.removeAll();
}
}
}