diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java index 47dd3d86e9983..08067379bc0a9 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java @@ -120,7 +120,7 @@ public int count() { * @return Current stats about this cache */ public CacheStats getCacheStats() { - Cache.CacheStats stats = cache.stats(); + Cache.Stats stats = cache.stats(); return new CacheStats( cache.count(), stats.getHits(), diff --git a/server/src/main/java/org/elasticsearch/common/cache/Cache.java b/server/src/main/java/org/elasticsearch/common/cache/Cache.java index d57e087a56a25..f447782ddac3f 100644 --- a/server/src/main/java/org/elasticsearch/common/cache/Cache.java +++ b/server/src/main/java/org/elasticsearch/common/cache/Cache.java @@ -9,368 +9,43 @@ package org.elasticsearch.common.cache; -import org.elasticsearch.core.Tuple; - -import java.lang.reflect.Array; -import java.util.HashMap; import java.util.Iterator; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.LongAdder; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.Consumer; import java.util.function.ToLongBiFunction; /** - * A simple concurrent cache. - *

- * Cache is a simple concurrent cache that supports time-based and weight-based evictions, with notifications for all - * evictions. The design goals for this cache were simplicity and read performance. This means that we are willing to - * accept reduced write performance in exchange for easy-to-understand code. Cache statistics for hits, misses and - * evictions are exposed. - *

- * The design of the cache is relatively simple. The cache is segmented into 256 segments which are backed by HashMaps. - * Each segment is protected by a re-entrant read/write lock. The read/write locks permit multiple concurrent readers - * without contention, and the segments gives us write throughput without impacting readers (so readers are blocked only - * if they are reading a segment that a writer is writing to). + * Interface for cache implementations, currently still quite tied to {@link LRUCache} it's currently an ongoing effort to decouple. *

- * The LRU functionality is backed by a single doubly-linked list chaining the entries in order of insertion. This - * LRU list is protected by a lock that serializes all writes to it. There are opportunities for improvements - * here if write throughput is a concern. - *

    - *
  1. LRU list mutations could be inserted into a blocking queue that a single thread is reading from - * and applying to the LRU list.
  2. - *
  3. Promotions could be deferred for entries that were "recently" promoted.
  4. - *
  5. Locks on the list could be taken per node being modified instead of globally.
  6. - *
- *

- * Evictions only occur after a mutation to the cache (meaning an entry promotion, a cache insertion, or a manual - * invalidation) or an explicit call to {@link #refresh()}. + * Implementations are expected to notify through a {@link RemovalListener} but the interface does not feature a method for registering a + * listener as how it's left to the implementation; which could possibly be supplied through a constructor. If an implementation supplies + * a means of modifying the listener(s) it should specify its guarantees with respect to delivery of notifies. If the implementation + * features an eviction strategy, and events are evicted, a removal notification with + * {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} EVICTED should be emitted. If {@link #invalidate(Key)}, + * {@link #invalidate(Key, Value)}, or {@link #invalidateAll()} is used, a removal notification with + * {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED should be emitted. + *

* - * @param The type of the keys - * @param The type of the values + * @param Type of keys to lookup values + * @param Type of values stored in the cache */ -public class Cache { - - private final LongAdder hits = new LongAdder(); - - private final LongAdder misses = new LongAdder(); - - private final LongAdder evictions = new LongAdder(); - - // positive if entries have an expiration - private long expireAfterAccessNanos = -1; - - // true if entries can expire after access - private boolean entriesExpireAfterAccess; - - // positive if entries have an expiration after write - private long expireAfterWriteNanos = -1; - - // true if entries can expire after initial insertion - private boolean entriesExpireAfterWrite; - - // the number of entries in the cache - private int count = 0; - - // the weight of the entries in the cache - private long weight = 0; - - // the maximum weight that this cache supports - private long maximumWeight = -1; - - // the weigher of entries - private ToLongBiFunction weigher = (k, v) -> 1; - - // the removal callback - private RemovalListener removalListener = notification -> {}; - - // use CacheBuilder to construct - Cache() {} - - void setExpireAfterAccessNanos(long expireAfterAccessNanos) { - if (expireAfterAccessNanos <= 0) { - throw new IllegalArgumentException("expireAfterAccessNanos <= 0"); - } - this.expireAfterAccessNanos = expireAfterAccessNanos; - this.entriesExpireAfterAccess = true; - } - - // public for testing - public long getExpireAfterAccessNanos() { - return this.expireAfterAccessNanos; - } - - void setExpireAfterWriteNanos(long expireAfterWriteNanos) { - if (expireAfterWriteNanos <= 0) { - throw new IllegalArgumentException("expireAfterWriteNanos <= 0"); - } - this.expireAfterWriteNanos = expireAfterWriteNanos; - this.entriesExpireAfterWrite = true; - } - - // pkg-private for testing - long getExpireAfterWriteNanos() { - return this.expireAfterWriteNanos; - } - - void setMaximumWeight(long maximumWeight) { - if (maximumWeight < 0) { - throw new IllegalArgumentException("maximumWeight < 0"); - } - this.maximumWeight = maximumWeight; - } - - void setWeigher(ToLongBiFunction weigher) { - Objects.requireNonNull(weigher); - this.weigher = weigher; - } - - void setRemovalListener(RemovalListener removalListener) { - Objects.requireNonNull(removalListener); - this.removalListener = removalListener; - } - - /** - * The relative time used to track time-based evictions. - * - * @return the current relative time - */ - protected long now() { - // System.nanoTime takes non-negligible time, so we only use it if we need it - // use System.nanoTime because we want relative time, not absolute time - return entriesExpireAfterAccess || entriesExpireAfterWrite ? System.nanoTime() : 0; - } - - // the state of an entry in the LRU list - enum State { - NEW, - EXISTING, - DELETED - } - - private static final class Entry { - final K key; - final V value; - final long writeTime; - volatile long accessTime; - Entry before; - Entry after; - State state = State.NEW; - - Entry(K key, V value, long writeTime) { - this.key = key; - this.value = value; - this.writeTime = this.accessTime = writeTime; - } - } - - /** - * A cache segment. - *

- * A CacheSegment is backed by a HashMap and is protected by a read/write lock. - */ - private final class CacheSegment { - // read/write lock protecting mutations to the segment - final ReadWriteLock segmentLock = new ReentrantReadWriteLock(); - - final Lock readLock = segmentLock.readLock(); - final Lock writeLock = segmentLock.writeLock(); - - Map>> map; - - /** - * get an entry from the segment; expired entries will be returned as null but not removed from the cache until the LRU list is - * pruned or a manual {@link Cache#refresh()} is performed however a caller can take action using the provided callback - * - * @param key the key of the entry to get from the cache - * @param now the access time of this entry - * @param eagerEvict whether entries should be eagerly evicted on expiration - * @return the entry if there was one, otherwise null - */ - Entry get(K key, long now, boolean eagerEvict) { - CompletableFuture> future; - readLock.lock(); - try { - future = map == null ? null : map.get(key); - } finally { - readLock.unlock(); - } - if (future != null) { - Entry entry; - try { - entry = future.get(); - } catch (ExecutionException e) { - assert future.isCompletedExceptionally(); - misses.increment(); - return null; - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - if (isExpired(entry, now)) { - misses.increment(); - if (eagerEvict) { - lruLock.lock(); - try { - evictEntry(entry); - } finally { - lruLock.unlock(); - } - } - return null; - } else { - hits.increment(); - entry.accessTime = now; - return entry; - } - } else { - misses.increment(); - return null; - } - } - - /** - * put an entry into the segment - * - * @param key the key of the entry to add to the cache - * @param value the value of the entry to add to the cache - * @param now the access time of this entry - * @return a tuple of the new entry and the existing entry, if there was one otherwise null - */ - Tuple, Entry> put(K key, V value, long now) { - Entry entry = new Entry<>(key, value, now); - Entry existing = null; - writeLock.lock(); - try { - try { - if (map == null) { - map = new HashMap<>(); - } - CompletableFuture> future = map.put(key, CompletableFuture.completedFuture(entry)); - if (future != null) { - existing = future.handle((ok, ex) -> ok).get(); - } - } catch (ExecutionException | InterruptedException e) { - throw new IllegalStateException(e); - } - } finally { - writeLock.unlock(); - } - return Tuple.tuple(entry, existing); - } - - /** - * remove an entry from the segment - * - * @param key the key of the entry to remove from the cache - */ - void remove(K key) { - CompletableFuture> future; - writeLock.lock(); - try { - if (map == null) { - future = null; - } else { - future = map.remove(key); - if (map.isEmpty()) { - map = null; - } - } - } finally { - writeLock.unlock(); - } - if (future != null) { - evictions.increment(); - notifyWithInvalidated(future); - } - } - - /** - * remove an entry from the segment iff the future is done and the value is equal to the - * expected value - * - * @param key the key of the entry to remove from the cache - * @param value the value expected to be associated with the key - * @param notify whether to trigger a removal notification if the entry has been removed - */ - void remove(K key, V value, boolean notify) { - CompletableFuture> future; - boolean removed = false; - writeLock.lock(); - try { - future = map == null ? null : map.get(key); - try { - if (future != null) { - if (future.isDone()) { - Entry entry = future.get(); - if (Objects.equals(value, entry.value)) { - removed = map.remove(key, future); - if (map.isEmpty()) { - map = null; - } - } - } - } - } catch (ExecutionException | InterruptedException e) { - throw new IllegalStateException(e); - } - } finally { - writeLock.unlock(); - } - - if (future != null && removed) { - evictions.increment(); - if (notify) { - notifyWithInvalidated(future); - } - } - } - - } - - public static final int NUMBER_OF_SEGMENTS = 256; - @SuppressWarnings("unchecked") - private final CacheSegment[] segments = (CacheSegment[]) Array.newInstance(CacheSegment.class, NUMBER_OF_SEGMENTS); - - { - for (int i = 0; i < segments.length; i++) { - segments[i] = new CacheSegment(); - } - } - - Entry head; - Entry tail; - - // lock protecting mutations to the LRU list - private final ReentrantLock lruLock = new ReentrantLock(); - +public interface Cache { /** * Returns the value to which the specified key is mapped, or null if this map contains no mapping for the key. * * @param key the key whose associated value is to be returned * @return the value to which the specified key is mapped, or null if this map contains no mapping for the key */ - public V get(K key) { - return get(key, now(), false); - } + Value get(Key key); - private V get(K key, long now, boolean eagerEvict) { - CacheSegment segment = getCacheSegment(key); - Entry entry = segment.get(key, now, eagerEvict); - if (entry == null) { - return null; - } else { - promote(entry, now); - return entry.value; - } - } + /** + * Associates the specified value with the specified key in this map. If the map previously contained a mapping for + * the key, the old value is replaced. + * + * @param key key with which the specified value is to be associated + * @param value value to be associated with the specified key + */ + void put(Key key, Value value); /** * If the specified key is not already associated with a value (or is mapped to null), attempts to compute its @@ -386,136 +61,7 @@ private V get(K key, long now, boolean eagerEvict) { * @return the current (existing or computed) non-null value associated with the specified key * @throws ExecutionException thrown if loader throws an exception or returns a null value */ - public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionException { - long now = now(); - // we have to eagerly evict expired entries or our putIfAbsent call below will fail - V value = get(key, now, true); - if (value == null) { - // we need to synchronize loading of a value for a given key; however, holding the segment lock while - // invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we - // need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding - // the segment lock; to do this, we atomically put a future in the map that can load the value, and then - // get the value from this future on the thread that won the race to place the future into the segment map - final CacheSegment segment = getCacheSegment(key); - CompletableFuture> future; - CompletableFuture> completableFuture = new CompletableFuture<>(); - - segment.writeLock.lock(); - try { - if (segment.map == null) { - segment.map = new HashMap<>(); - } - future = segment.map.putIfAbsent(key, completableFuture); - } finally { - segment.writeLock.unlock(); - } - - BiFunction, Throwable, ? extends V> handler = (ok, ex) -> { - if (ok != null) { - promote(ok, now); - return ok.value; - } else { - segment.writeLock.lock(); - try { - CompletableFuture> sanity = segment.map == null ? null : segment.map.get(key); - if (sanity != null && sanity.isCompletedExceptionally()) { - segment.map.remove(key); - if (segment.map.isEmpty()) { - segment.map = null; - } - } - } finally { - segment.writeLock.unlock(); - } - return null; - } - }; - - CompletableFuture completableValue; - if (future == null) { - future = completableFuture; - completableValue = future.handle(handler); - V loaded; - try { - loaded = loader.load(key); - } catch (Exception e) { - future.completeExceptionally(e); - throw new ExecutionException(e); - } - if (loaded == null) { - NullPointerException npe = new NullPointerException("loader returned a null value"); - future.completeExceptionally(npe); - throw new ExecutionException(npe); - } else { - future.complete(new Entry<>(key, loaded, now)); - } - } else { - completableValue = future.handle(handler); - } - - try { - value = completableValue.get(); - // check to ensure the future hasn't been completed with an exception - if (future.isCompletedExceptionally()) { - future.get(); // call get to force the exception to be thrown for other concurrent callers - throw new IllegalStateException("the future was completed exceptionally but no exception was thrown"); - } - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - } - return value; - } - - /** - * Associates the specified value with the specified key in this map. If the map previously contained a mapping for - * the key, the old value is replaced. - * - * @param key key with which the specified value is to be associated - * @param value value to be associated with the specified key - */ - public void put(K key, V value) { - long now = now(); - put(key, value, now); - } - - private void put(K key, V value, long now) { - CacheSegment segment = getCacheSegment(key); - Tuple, Entry> tuple = segment.put(key, value, now); - boolean replaced = false; - lruLock.lock(); - try { - if (tuple.v2() != null && tuple.v2().state == State.EXISTING) { - if (unlink(tuple.v2())) { - replaced = true; - } - } - promote(tuple.v1(), now); - } finally { - lruLock.unlock(); - } - if (replaced) { - removalListener.onRemoval( - new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalNotification.RemovalReason.REPLACED) - ); - } - } - - private void notifyWithInvalidated(CompletableFuture> f) { - try { - Entry entry = f.get(); - lruLock.lock(); - try { - delete(entry, RemovalNotification.RemovalReason.INVALIDATED); - } finally { - lruLock.unlock(); - } - } catch (ExecutionException e) { - // ok - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - } + Value computeIfAbsent(Key key, CacheLoader loader) throws ExecutionException; /** * Invalidate the association for the specified key. A removal notification will be issued for invalidated @@ -523,10 +69,7 @@ private void notifyWithInvalidated(CompletableFuture> f) { * * @param key the key whose mapping is to be invalidated from the cache */ - public void invalidate(K key) { - CacheSegment segment = getCacheSegment(key); - segment.remove(key); - } + void invalidate(Key key); /** * Invalidate the entry for the specified key and value. If the value provided is not equal to the value in @@ -536,230 +79,85 @@ public void invalidate(K key) { * @param key the key whose mapping is to be invalidated from the cache * @param value the expected value that should be associated with the key */ - public void invalidate(K key, V value) { - CacheSegment segment = getCacheSegment(key); - segment.remove(key, value, true); - } + void invalidate(Key key, Value value); /** * Invalidate all cache entries. A removal notification will be issued for invalidated entries with * {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED. */ - public void invalidateAll() { - Entry h; - - boolean[] haveSegmentLock = new boolean[NUMBER_OF_SEGMENTS]; - lruLock.lock(); - try { - try { - for (int i = 0; i < NUMBER_OF_SEGMENTS; i++) { - segments[i].segmentLock.writeLock().lock(); - haveSegmentLock[i] = true; - } - h = head; - for (CacheSegment segment : segments) { - segment.map = null; - } - Entry current = head; - while (current != null) { - current.state = State.DELETED; - current = current.after; - } - head = tail = null; - count = 0; - weight = 0; - } finally { - for (int i = NUMBER_OF_SEGMENTS - 1; i >= 0; i--) { - if (haveSegmentLock[i]) { - segments[i].segmentLock.writeLock().unlock(); - } - } - } - } finally { - lruLock.unlock(); - } - while (h != null) { - removalListener.onRemoval(new RemovalNotification<>(h.key, h.value, RemovalNotification.RemovalReason.INVALIDATED)); - h = h.after; - } - } + void invalidateAll(); /** - * Force any outstanding size-based and time-based evictions to occur + * Blocking call that evaluates all entries if they meet the requirements and evicts the entries based on the eviction strategy if + * provided / supported by the implementation, no-op otherwise. + * A removal notification will be issued for evicted entries with + * {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} EVICTED */ - public void refresh() { - long now = now(); - lruLock.lock(); - try { - evict(now); - } finally { - lruLock.unlock(); - } - } + default void refresh() {} /** * The number of entries in the cache. * * @return the number of entries in the cache */ - public int count() { - return count; - } + int count(); /** - * The weight of the entries in the cache. + * The total weight of all entries in the cache. This interface does not specify the property that gives a entry weight, + * implementations are expected to allow the user to specify an external weigher in the form of a {@link ToLongBiFunction} * - * @return the weight of the entries in the cache + * @return the weight of all the entries in the cache */ - public long weight() { - return weight; - } + long weight(); /** - * An LRU sequencing of the keys in the cache that supports removal. This sequence is not protected from mutations - * to the cache (except for {@link Iterator#remove()}. The result of iteration under any other mutation is - * undefined. + * An Iterable that allows to transverse all keys in the cache. Modifications might be visible and no guarantee is made on ordering of + * these modifications as new entries might end up in the already transversed part of the cache. So while transversing; if A and B are + * added to the cache in that order, we might only observe B. Implementations should allow the cache to be modified while transversing + * but only {@link Iterator#remove()} is guaranteed to not affect further transversal. + * Implementations might guarantee a specific sequencing or other stronger guarantees. + * {@link Iterator#remove()} issues a removal notification with + * {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED. * - * @return an LRU-ordered {@link Iterable} over the keys in the cache + * @return an {@link Iterable} over the keys in the cache */ - public Iterable keys() { - return () -> new Iterator<>() { - private final CacheIterator iterator = new CacheIterator(head); - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public K next() { - return iterator.next().key; - } - - @Override - public void remove() { - iterator.remove(); - } - }; - } + Iterable keys(); /** - * An LRU sequencing of the values in the cache. This sequence is not protected from mutations - * to the cache (except for {@link Iterator#remove()}. The result of iteration under any other mutation is - * undefined. + * An Iterable that allows to transverse all keys in the cache. Modifications might be visible and no guarantee is made on ordering of + * these modifications as new entries might end up in the already transversed part of the cache. So while transversing; if A and B are + * added to the cache, and in that order, we might only observe B. Implementations should allow the cache to be modified while + * transversing but only {@link Iterator#remove()} is guaranteed to not affect further transversal. + * Implementations might guarantee a specific sequencing or other stronger guarantees. + * {@link Iterator#remove()} issues a removal notification with + * {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED. * - * @return an LRU-ordered {@link Iterable} over the values in the cache + * @return an {@link Iterable} over the values in the cache */ - public Iterable values() { - return () -> new Iterator<>() { - private final CacheIterator iterator = new CacheIterator(head); - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public V next() { - return iterator.next().value; - } - - @Override - public void remove() { - iterator.remove(); - } - }; - } + Iterable values(); /** - * Performs an action for each cache entry in the cache. While iterating over the cache entries this method is protected from mutations - * that occurs within the same cache segment by acquiring the segment's read lock during all the iteration. As such, the specified - * consumer should not try to modify the cache. Modifications that occur in already traveled segments won't been seen by the consumer - * but modification that occur in non yet traveled segments should be. + * The cache statistics tracking hits, misses and evictions. These are captured on a best-effort basis. * - * @param consumer the {@link Consumer} + * @return the current cache statistics */ - public void forEach(BiConsumer consumer) { - for (CacheSegment segment : segments) { - segment.readLock.lock(); - try { - if (segment.map == null) { - continue; - } - for (CompletableFuture> future : segment.map.values()) { - try { - if (future != null && future.isDone()) { - final Entry entry = future.get(); - consumer.accept(entry.key, entry.value); - } - } catch (ExecutionException | InterruptedException e) { - throw new IllegalStateException(e); - } - } - } finally { - segment.readLock.unlock(); - } - } - } - - private class CacheIterator implements Iterator> { - private Entry current; - private Entry next; - - CacheIterator(Entry head) { - current = null; - next = head; - } - - @Override - public boolean hasNext() { - return next != null; - } - - @Override - public Entry next() { - current = next; - next = next.after; - return current; - } - - @Override - public void remove() { - Entry entry = current; - if (entry != null) { - CacheSegment segment = getCacheSegment(entry.key); - segment.remove(entry.key, entry.value, false); - lruLock.lock(); - try { - current = null; - delete(entry, RemovalNotification.RemovalReason.INVALIDATED); - } finally { - lruLock.unlock(); - } - } - } - } + Stats stats(); /** - * The cache statistics tracking hits, misses and evictions. These are taken on a best-effort basis meaning that - * they could be out-of-date mid-flight. + * Performs an action for each cache entry in the cache. While iterating over the cache entries this method might use locks. As such, + * the specified consumer should not try to modify the cache. Visibility of modifications might or might not be seen by the consumer. * - * @return the current cache statistics + * @param consumer the {@link BiConsumer} */ - public CacheStats stats() { - return new CacheStats(this.hits.sum(), misses.sum(), evictions.sum()); - } - - public static class CacheStats { - private final long hits; - private final long misses; - private final long evictions; + void forEach(BiConsumer consumer); - public CacheStats(long hits, long misses, long evictions) { - this.hits = hits; - this.misses = misses; - this.evictions = evictions; - } + /** + * Point in time capture of cache statistics + * @param hits number of times a cached value was hit + * @param misses number of times no cached value could be found + * @param evictions number of entries that have been evicted + */ + record Stats(long hits, long misses, long evictions) { public long getHits() { return hits; @@ -773,133 +171,4 @@ public long getEvictions() { return evictions; } } - - private void promote(Entry entry, long now) { - boolean promoted = true; - lruLock.lock(); - try { - switch (entry.state) { - case DELETED -> promoted = false; - case EXISTING -> relinkAtHead(entry); - case NEW -> linkAtHead(entry); - } - if (promoted) { - evict(now); - } - } finally { - lruLock.unlock(); - } - } - - private void evict(long now) { - assert lruLock.isHeldByCurrentThread(); - - while (tail != null && shouldPrune(tail, now)) { - evictEntry(tail); - } - } - - private void evictEntry(Entry entry) { - assert lruLock.isHeldByCurrentThread(); - - CacheSegment segment = getCacheSegment(entry.key); - if (segment != null) { - segment.remove(entry.key, entry.value, false); - } - delete(entry, RemovalNotification.RemovalReason.EVICTED); - } - - private void delete(Entry entry, RemovalNotification.RemovalReason removalReason) { - assert lruLock.isHeldByCurrentThread(); - - if (unlink(entry)) { - removalListener.onRemoval(new RemovalNotification<>(entry.key, entry.value, removalReason)); - } - } - - private boolean shouldPrune(Entry entry, long now) { - return exceedsWeight() || isExpired(entry, now); - } - - private boolean exceedsWeight() { - return maximumWeight != -1 && weight > maximumWeight; - } - - private boolean isExpired(Entry entry, long now) { - return (entriesExpireAfterAccess && now - entry.accessTime > expireAfterAccessNanos) - || (entriesExpireAfterWrite && now - entry.writeTime > expireAfterWriteNanos); - } - - private boolean unlink(Entry entry) { - assert lruLock.isHeldByCurrentThread(); - - if (entry.state == State.EXISTING) { - final Entry before = entry.before; - final Entry after = entry.after; - - if (before == null) { - // removing the head - assert head == entry; - head = after; - if (head != null) { - head.before = null; - } - } else { - // removing inner element - before.after = after; - entry.before = null; - } - - if (after == null) { - // removing tail - assert tail == entry; - tail = before; - if (tail != null) { - tail.after = null; - } - } else { - // removing inner element - after.before = before; - entry.after = null; - } - - count--; - weight -= weigher.applyAsLong(entry.key, entry.value); - entry.state = State.DELETED; - return true; - } else { - return false; - } - } - - private void linkAtHead(Entry entry) { - assert lruLock.isHeldByCurrentThread(); - - Entry h = head; - entry.before = null; - entry.after = head; - head = entry; - if (h == null) { - tail = entry; - } else { - h.before = entry; - } - - count++; - weight += weigher.applyAsLong(entry.key, entry.value); - entry.state = State.EXISTING; - } - - private void relinkAtHead(Entry entry) { - assert lruLock.isHeldByCurrentThread(); - - if (head != entry) { - unlink(entry); - linkAtHead(entry); - } - } - - private CacheSegment getCacheSegment(K key) { - return segments[key.hashCode() & 0xff]; - } } diff --git a/server/src/main/java/org/elasticsearch/common/cache/CacheBuilder.java b/server/src/main/java/org/elasticsearch/common/cache/CacheBuilder.java index 088ae3594a10e..5dfc9066cd143 100644 --- a/server/src/main/java/org/elasticsearch/common/cache/CacheBuilder.java +++ b/server/src/main/java/org/elasticsearch/common/cache/CacheBuilder.java @@ -80,7 +80,7 @@ public CacheBuilder removalListener(RemovalListener removalListener) } public Cache build() { - Cache cache = new Cache<>(); + LRUCache cache = new LRUCache<>(); if (maximumWeight != -1) { cache.setMaximumWeight(maximumWeight); } diff --git a/server/src/main/java/org/elasticsearch/common/cache/LRUCache.java b/server/src/main/java/org/elasticsearch/common/cache/LRUCache.java new file mode 100644 index 0000000000000..fc93a268cce56 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/cache/LRUCache.java @@ -0,0 +1,881 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.cache; + +import org.elasticsearch.core.Tuple; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.ToLongBiFunction; + +/** + * A simple concurrent cache. + *

+ * Cache is a simple concurrent cache that supports time-based and weight-based evictions, with notifications for all + * evictions. The design goals for this cache were simplicity and read performance. This means that we are willing to + * accept reduced write performance in exchange for easy-to-understand code. Cache statistics for hits, misses and + * evictions are exposed. + *

+ * The design of the cache is relatively simple. The cache is segmented into 256 segments which are backed by HashMaps. + * Each segment is protected by a re-entrant read/write lock. The read/write locks permit multiple concurrent readers + * without contention, and the segments gives us write throughput without impacting readers (so readers are blocked only + * if they are reading a segment that a writer is writing to). + *

+ * The LRU functionality is backed by a single doubly-linked list chaining the entries in order of insertion. This + * LRU list is protected by a lock that serializes all writes to it. There are opportunities for improvements + * here if write throughput is a concern. + *

    + *
  1. LRU list mutations could be inserted into a blocking queue that a single thread is reading from + * and applying to the LRU list.
  2. + *
  3. Promotions could be deferred for entries that were "recently" promoted.
  4. + *
  5. Locks on the list could be taken per node being modified instead of globally.
  6. + *
+ *

+ * Evictions only occur after a mutation to the cache (meaning an entry promotion, a cache insertion, or a manual + * invalidation) or an explicit call to {@link #refresh()}. + * + * @param The type of the keys + * @param The type of the values + */ +public class LRUCache implements Cache { + + private final LongAdder hits = new LongAdder(); + + private final LongAdder misses = new LongAdder(); + + private final LongAdder evictions = new LongAdder(); + + // positive if entries have an expiration + private long expireAfterAccessNanos = -1; + + // true if entries can expire after access + private boolean entriesExpireAfterAccess; + + // positive if entries have an expiration after write + private long expireAfterWriteNanos = -1; + + // true if entries can expire after initial insertion + private boolean entriesExpireAfterWrite; + + // the number of entries in the cache + private int count = 0; + + // the weight of the entries in the cache + private long weight = 0; + + // the maximum weight that this cache supports + private long maximumWeight = -1; + + // the weigher of entries + private ToLongBiFunction weigher = (k, v) -> 1; + + // the removal callback + private RemovalListener removalListener = notification -> {}; + + // use CacheBuilder to construct + LRUCache() {} + + void setExpireAfterAccessNanos(long expireAfterAccessNanos) { + if (expireAfterAccessNanos <= 0) { + throw new IllegalArgumentException("expireAfterAccessNanos <= 0"); + } + this.expireAfterAccessNanos = expireAfterAccessNanos; + this.entriesExpireAfterAccess = true; + } + + // public for testing + public long getExpireAfterAccessNanos() { + return this.expireAfterAccessNanos; + } + + void setExpireAfterWriteNanos(long expireAfterWriteNanos) { + if (expireAfterWriteNanos <= 0) { + throw new IllegalArgumentException("expireAfterWriteNanos <= 0"); + } + this.expireAfterWriteNanos = expireAfterWriteNanos; + this.entriesExpireAfterWrite = true; + } + + // pkg-private for testing + long getExpireAfterWriteNanos() { + return this.expireAfterWriteNanos; + } + + void setMaximumWeight(long maximumWeight) { + if (maximumWeight < 0) { + throw new IllegalArgumentException("maximumWeight < 0"); + } + this.maximumWeight = maximumWeight; + } + + void setWeigher(ToLongBiFunction weigher) { + Objects.requireNonNull(weigher); + this.weigher = weigher; + } + + void setRemovalListener(RemovalListener removalListener) { + Objects.requireNonNull(removalListener); + this.removalListener = removalListener; + } + + /** + * The relative time used to track time-based evictions. + * + * @return the current relative time + */ + protected long now() { + // System.nanoTime takes non-negligible time, so we only use it if we need it + // use System.nanoTime because we want relative time, not absolute time + return entriesExpireAfterAccess || entriesExpireAfterWrite ? System.nanoTime() : 0; + } + + // the state of an entry in the LRU list + enum State { + NEW, + EXISTING, + DELETED + } + + private static final class Entry { + final K key; + final V value; + final long writeTime; + volatile long accessTime; + Entry before; + Entry after; + State state = State.NEW; + + Entry(K key, V value, long writeTime) { + this.key = key; + this.value = value; + this.writeTime = this.accessTime = writeTime; + } + } + + /** + * A cache segment. + *

+ * A CacheSegment is backed by a HashMap and is protected by a read/write lock. + */ + private final class CacheSegment { + // read/write lock protecting mutations to the segment + final ReadWriteLock segmentLock = new ReentrantReadWriteLock(); + + final Lock readLock = segmentLock.readLock(); + final Lock writeLock = segmentLock.writeLock(); + + Map>> map; + + /** + * get an entry from the segment; expired entries will be returned as null but not removed from the cache until the LRU list is + * pruned or a manual {@link LRUCache#refresh()} is performed however a caller can take action using the provided callback + * + * @param key the key of the entry to get from the cache + * @param now the access time of this entry + * @param eagerEvict whether entries should be eagerly evicted on expiration + * @return the entry if there was one, otherwise null + */ + Entry get(K key, long now, boolean eagerEvict) { + CompletableFuture> future; + readLock.lock(); + try { + future = map == null ? null : map.get(key); + } finally { + readLock.unlock(); + } + if (future != null) { + Entry entry; + try { + entry = future.get(); + } catch (ExecutionException e) { + assert future.isCompletedExceptionally(); + misses.increment(); + return null; + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + if (isExpired(entry, now)) { + misses.increment(); + if (eagerEvict) { + lruLock.lock(); + try { + evictEntry(entry); + } finally { + lruLock.unlock(); + } + } + return null; + } else { + hits.increment(); + entry.accessTime = now; + return entry; + } + } else { + misses.increment(); + return null; + } + } + + /** + * put an entry into the segment + * + * @param key the key of the entry to add to the cache + * @param value the value of the entry to add to the cache + * @param now the access time of this entry + * @return a tuple of the new entry and the existing entry, if there was one otherwise null + */ + Tuple, Entry> put(K key, V value, long now) { + Entry entry = new Entry<>(key, value, now); + Entry existing = null; + writeLock.lock(); + try { + try { + if (map == null) { + map = new HashMap<>(); + } + CompletableFuture> future = map.put(key, CompletableFuture.completedFuture(entry)); + if (future != null) { + existing = future.handle((ok, ex) -> ok).get(); + } + } catch (ExecutionException | InterruptedException e) { + throw new IllegalStateException(e); + } + } finally { + writeLock.unlock(); + } + return Tuple.tuple(entry, existing); + } + + /** + * remove an entry from the segment + * + * @param key the key of the entry to remove from the cache + */ + void remove(K key) { + CompletableFuture> future; + writeLock.lock(); + try { + if (map == null) { + future = null; + } else { + future = map.remove(key); + if (map.isEmpty()) { + map = null; + } + } + } finally { + writeLock.unlock(); + } + if (future != null) { + evictions.increment(); + notifyWithInvalidated(future); + } + } + + /** + * remove an entry from the segment iff the future is done and the value is equal to the + * expected value + * + * @param key the key of the entry to remove from the cache + * @param value the value expected to be associated with the key + * @param notify whether to trigger a removal notification if the entry has been removed + */ + void remove(K key, V value, boolean notify) { + CompletableFuture> future; + boolean removed = false; + writeLock.lock(); + try { + future = map == null ? null : map.get(key); + try { + if (future != null) { + if (future.isDone()) { + Entry entry = future.get(); + if (Objects.equals(value, entry.value)) { + removed = map.remove(key, future); + if (map.isEmpty()) { + map = null; + } + } + } + } + } catch (ExecutionException | InterruptedException e) { + throw new IllegalStateException(e); + } + } finally { + writeLock.unlock(); + } + + if (future != null && removed) { + evictions.increment(); + if (notify) { + notifyWithInvalidated(future); + } + } + } + + } + + public static final int NUMBER_OF_SEGMENTS = 256; + @SuppressWarnings("unchecked") + private final CacheSegment[] segments = (CacheSegment[]) Array.newInstance(CacheSegment.class, NUMBER_OF_SEGMENTS); + + { + for (int i = 0; i < segments.length; i++) { + segments[i] = new CacheSegment(); + } + } + + Entry head; + Entry tail; + + // lock protecting mutations to the LRU list + private final ReentrantLock lruLock = new ReentrantLock(); + + /** + * Returns the value to which the specified key is mapped, or null if this map contains no mapping for the key. + * + * @param key the key whose associated value is to be returned + * @return the value to which the specified key is mapped, or null if this map contains no mapping for the key + */ + public V get(K key) { + return get(key, now(), false); + } + + private V get(K key, long now, boolean eagerEvict) { + CacheSegment segment = getCacheSegment(key); + Entry entry = segment.get(key, now, eagerEvict); + if (entry == null) { + return null; + } else { + promote(entry, now); + return entry.value; + } + } + + /** + * If the specified key is not already associated with a value (or is mapped to null), attempts to compute its + * value using the given mapping function and enters it into this map unless null. The load method for a given key + * will be invoked at most once. + * + * Use of different {@link CacheLoader} implementations on the same key concurrently may result in only the first + * loader function being called and the second will be returned the result provided by the first including any exceptions + * thrown during the execution of the first. + * + * @param key the key whose associated value is to be returned or computed for if non-existent + * @param loader the function to compute a value given a key + * @return the current (existing or computed) non-null value associated with the specified key + * @throws ExecutionException thrown if loader throws an exception or returns a null value + */ + public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionException { + long now = now(); + // we have to eagerly evict expired entries or our putIfAbsent call below will fail + V value = get(key, now, true); + if (value == null) { + // we need to synchronize loading of a value for a given key; however, holding the segment lock while + // invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we + // need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding + // the segment lock; to do this, we atomically put a future in the map that can load the value, and then + // get the value from this future on the thread that won the race to place the future into the segment map + final CacheSegment segment = getCacheSegment(key); + CompletableFuture> future; + CompletableFuture> completableFuture = new CompletableFuture<>(); + + segment.writeLock.lock(); + try { + if (segment.map == null) { + segment.map = new HashMap<>(); + } + future = segment.map.putIfAbsent(key, completableFuture); + } finally { + segment.writeLock.unlock(); + } + + BiFunction, Throwable, ? extends V> handler = (ok, ex) -> { + if (ok != null) { + promote(ok, now); + return ok.value; + } else { + segment.writeLock.lock(); + try { + CompletableFuture> sanity = segment.map == null ? null : segment.map.get(key); + if (sanity != null && sanity.isCompletedExceptionally()) { + segment.map.remove(key); + if (segment.map.isEmpty()) { + segment.map = null; + } + } + } finally { + segment.writeLock.unlock(); + } + return null; + } + }; + + CompletableFuture completableValue; + if (future == null) { + future = completableFuture; + completableValue = future.handle(handler); + V loaded; + try { + loaded = loader.load(key); + } catch (Exception e) { + future.completeExceptionally(e); + throw new ExecutionException(e); + } + if (loaded == null) { + NullPointerException npe = new NullPointerException("loader returned a null value"); + future.completeExceptionally(npe); + throw new ExecutionException(npe); + } else { + future.complete(new Entry<>(key, loaded, now)); + } + } else { + completableValue = future.handle(handler); + } + + try { + value = completableValue.get(); + // check to ensure the future hasn't been completed with an exception + if (future.isCompletedExceptionally()) { + future.get(); // call get to force the exception to be thrown for other concurrent callers + throw new IllegalStateException("the future was completed exceptionally but no exception was thrown"); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + return value; + } + + /** + * Associates the specified value with the specified key in this map. If the map previously contained a mapping for + * the key, the old value is replaced. + * + * @param key key with which the specified value is to be associated + * @param value value to be associated with the specified key + */ + public void put(K key, V value) { + long now = now(); + put(key, value, now); + } + + private void put(K key, V value, long now) { + CacheSegment segment = getCacheSegment(key); + Tuple, Entry> tuple = segment.put(key, value, now); + boolean replaced = false; + lruLock.lock(); + try { + if (tuple.v2() != null && tuple.v2().state == State.EXISTING) { + if (unlink(tuple.v2())) { + replaced = true; + } + } + promote(tuple.v1(), now); + } finally { + lruLock.unlock(); + } + if (replaced) { + removalListener.onRemoval( + new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalNotification.RemovalReason.REPLACED) + ); + } + } + + private void notifyWithInvalidated(CompletableFuture> f) { + try { + Entry entry = f.get(); + lruLock.lock(); + try { + delete(entry, RemovalNotification.RemovalReason.INVALIDATED); + } finally { + lruLock.unlock(); + } + } catch (ExecutionException e) { + // ok + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + + /** + * Invalidate the association for the specified key. A removal notification will be issued for invalidated + * entries with {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED. + * + * @param key the key whose mapping is to be invalidated from the cache + */ + public void invalidate(K key) { + CacheSegment segment = getCacheSegment(key); + segment.remove(key); + } + + /** + * Invalidate the entry for the specified key and value. If the value provided is not equal to the value in + * the cache, no removal will occur. A removal notification will be issued for invalidated + * entries with {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED. + * + * @param key the key whose mapping is to be invalidated from the cache + * @param value the expected value that should be associated with the key + */ + public void invalidate(K key, V value) { + CacheSegment segment = getCacheSegment(key); + segment.remove(key, value, true); + } + + /** + * Invalidate all cache entries. A removal notification will be issued for invalidated entries with + * {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED. + */ + public void invalidateAll() { + Entry h; + + boolean[] haveSegmentLock = new boolean[NUMBER_OF_SEGMENTS]; + lruLock.lock(); + try { + try { + for (int i = 0; i < NUMBER_OF_SEGMENTS; i++) { + segments[i].segmentLock.writeLock().lock(); + haveSegmentLock[i] = true; + } + h = head; + for (CacheSegment segment : segments) { + segment.map = null; + } + Entry current = head; + while (current != null) { + current.state = State.DELETED; + current = current.after; + } + head = tail = null; + count = 0; + weight = 0; + } finally { + for (int i = NUMBER_OF_SEGMENTS - 1; i >= 0; i--) { + if (haveSegmentLock[i]) { + segments[i].segmentLock.writeLock().unlock(); + } + } + } + } finally { + lruLock.unlock(); + } + while (h != null) { + removalListener.onRemoval(new RemovalNotification<>(h.key, h.value, RemovalNotification.RemovalReason.INVALIDATED)); + h = h.after; + } + } + + /** + * Force any outstanding size-based and time-based evictions to occur + */ + public void refresh() { + long now = now(); + lruLock.lock(); + try { + evict(now); + } finally { + lruLock.unlock(); + } + } + + /** + * The number of entries in the cache. + * + * @return the number of entries in the cache + */ + public int count() { + return count; + } + + /** + * The weight of the entries in the cache. + * + * @return the weight of the entries in the cache + */ + public long weight() { + return weight; + } + + /** + * An LRU sequencing of the keys in the cache that supports removal. This sequence is not protected from mutations + * to the cache (except for {@link Iterator#remove()}. The result of iteration under any other mutation is + * undefined. + * + * @return an LRU-ordered {@link Iterable} over the keys in the cache + */ + public Iterable keys() { + return () -> new Iterator<>() { + private final CacheIterator iterator = new CacheIterator(head); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public K next() { + return iterator.next().key; + } + + @Override + public void remove() { + iterator.remove(); + } + }; + } + + /** + * An LRU sequencing of the values in the cache. This sequence is not protected from mutations + * to the cache (except for {@link Iterator#remove()}. The result of iteration under any other mutation is + * undefined. + * + * @return an LRU-ordered {@link Iterable} over the values in the cache + */ + public Iterable values() { + return () -> new Iterator<>() { + private final CacheIterator iterator = new CacheIterator(head); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public V next() { + return iterator.next().value; + } + + @Override + public void remove() { + iterator.remove(); + } + }; + } + + /** + * Performs an action for each cache entry in the cache. While iterating over the cache entries this method is protected from mutations + * that occurs within the same cache segment by acquiring the segment's read lock during all the iteration. As such, the specified + * consumer should not try to modify the cache. Modifications that occur in already traveled segments won't been seen by the consumer + * but modification that occur in non yet traveled segments should be. + * + * @param consumer the {@link Consumer} + */ + public void forEach(BiConsumer consumer) { + for (CacheSegment segment : segments) { + segment.readLock.lock(); + try { + if (segment.map == null) { + continue; + } + for (CompletableFuture> future : segment.map.values()) { + try { + if (future != null && future.isDone()) { + final Entry entry = future.get(); + consumer.accept(entry.key, entry.value); + } + } catch (ExecutionException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + } finally { + segment.readLock.unlock(); + } + } + } + + private class CacheIterator implements Iterator> { + private Entry current; + private Entry next; + + CacheIterator(Entry head) { + current = null; + next = head; + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public Entry next() { + current = next; + next = next.after; + return current; + } + + @Override + public void remove() { + Entry entry = current; + if (entry != null) { + CacheSegment segment = getCacheSegment(entry.key); + segment.remove(entry.key, entry.value, false); + lruLock.lock(); + try { + current = null; + delete(entry, RemovalNotification.RemovalReason.INVALIDATED); + } finally { + lruLock.unlock(); + } + } + } + } + + /** + * The cache statistics tracking hits, misses and evictions. These are taken on a best-effort basis meaning that + * they could be out-of-date mid-flight. + * + * @return the current cache statistics + */ + public Stats stats() { + return new Stats(hits.sum(), misses.sum(), evictions.sum()); + } + + private void promote(Entry entry, long now) { + boolean promoted = true; + lruLock.lock(); + try { + switch (entry.state) { + case DELETED -> promoted = false; + case EXISTING -> relinkAtHead(entry); + case NEW -> linkAtHead(entry); + } + if (promoted) { + evict(now); + } + } finally { + lruLock.unlock(); + } + } + + private void evict(long now) { + assert lruLock.isHeldByCurrentThread(); + + while (tail != null && shouldPrune(tail, now)) { + evictEntry(tail); + } + } + + private void evictEntry(Entry entry) { + assert lruLock.isHeldByCurrentThread(); + + CacheSegment segment = getCacheSegment(entry.key); + if (segment != null) { + segment.remove(entry.key, entry.value, false); + } + delete(entry, RemovalNotification.RemovalReason.EVICTED); + } + + private void delete(Entry entry, RemovalNotification.RemovalReason removalReason) { + assert lruLock.isHeldByCurrentThread(); + + if (unlink(entry)) { + removalListener.onRemoval(new RemovalNotification<>(entry.key, entry.value, removalReason)); + } + } + + private boolean shouldPrune(Entry entry, long now) { + return exceedsWeight() || isExpired(entry, now); + } + + private boolean exceedsWeight() { + return maximumWeight != -1 && weight > maximumWeight; + } + + private boolean isExpired(Entry entry, long now) { + return (entriesExpireAfterAccess && now - entry.accessTime > expireAfterAccessNanos) + || (entriesExpireAfterWrite && now - entry.writeTime > expireAfterWriteNanos); + } + + private boolean unlink(Entry entry) { + assert lruLock.isHeldByCurrentThread(); + + if (entry.state == State.EXISTING) { + final Entry before = entry.before; + final Entry after = entry.after; + + if (before == null) { + // removing the head + assert head == entry; + head = after; + if (head != null) { + head.before = null; + } + } else { + // removing inner element + before.after = after; + entry.before = null; + } + + if (after == null) { + // removing tail + assert tail == entry; + tail = before; + if (tail != null) { + tail.after = null; + } + } else { + // removing inner element + after.before = before; + entry.after = null; + } + + count--; + weight -= weigher.applyAsLong(entry.key, entry.value); + entry.state = State.DELETED; + return true; + } else { + return false; + } + } + + private void linkAtHead(Entry entry) { + assert lruLock.isHeldByCurrentThread(); + + Entry h = head; + entry.before = null; + entry.after = head; + head = entry; + if (h == null) { + tail = entry; + } else { + h.before = entry; + } + + count++; + weight += weigher.applyAsLong(entry.key, entry.value); + entry.state = State.EXISTING; + } + + private void relinkAtHead(Entry entry) { + assert lruLock.isHeldByCurrentThread(); + + if (head != entry) { + unlink(entry); + linkAtHead(entry); + } + } + + private CacheSegment getCacheSegment(K key) { + return segments[key.hashCode() & 0xff]; + } +} diff --git a/server/src/main/java/org/elasticsearch/common/cache/RemovalNotification.java b/server/src/main/java/org/elasticsearch/common/cache/RemovalNotification.java index 3958b99b9f683..3f17847eea60d 100644 --- a/server/src/main/java/org/elasticsearch/common/cache/RemovalNotification.java +++ b/server/src/main/java/org/elasticsearch/common/cache/RemovalNotification.java @@ -37,4 +37,9 @@ public V getValue() { public RemovalReason getRemovalReason() { return removalReason; } + + @Override + public String toString() { + return "RemovalNotification{" + "key=" + key + ", value=" + value + ", removalReason=" + removalReason + '}'; + } } diff --git a/server/src/test/java/org/elasticsearch/common/cache/CacheBuilderTests.java b/server/src/test/java/org/elasticsearch/common/cache/CacheBuilderTests.java index 56edbed100234..61038e0456436 100644 --- a/server/src/test/java/org/elasticsearch/common/cache/CacheBuilderTests.java +++ b/server/src/test/java/org/elasticsearch/common/cache/CacheBuilderTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.test.ESTestCase; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; public class CacheBuilderTests extends ESTestCase { @@ -26,7 +27,8 @@ public void testSettingExpireAfterAccess() { assertThat(iae.getMessage(), containsString("expireAfterAccess <=")); final TimeValue timeValue = randomPositiveTimeValue(); Cache cache = CacheBuilder.builder().setExpireAfterAccess(timeValue).build(); - assertEquals(timeValue.getNanos(), cache.getExpireAfterAccessNanos()); + assertThat("Type is LRUCache as test depends on it", cache, instanceOf(LRUCache.class)); + assertEquals(timeValue.getNanos(), ((LRUCache) cache).getExpireAfterAccessNanos()); } public void testSettingExpireAfterWrite() { @@ -39,6 +41,7 @@ public void testSettingExpireAfterWrite() { assertThat(iae.getMessage(), containsString("expireAfterWrite <=")); final TimeValue timeValue = randomPositiveTimeValue(); Cache cache = CacheBuilder.builder().setExpireAfterWrite(timeValue).build(); - assertEquals(timeValue.getNanos(), cache.getExpireAfterWriteNanos()); + assertThat("Type is LRUCache as test depends on it", cache, instanceOf(LRUCache.class)); + assertEquals(timeValue.getNanos(), ((LRUCache) cache).getExpireAfterWriteNanos()); } } diff --git a/server/src/test/java/org/elasticsearch/common/cache/CacheTests.java b/server/src/test/java/org/elasticsearch/common/cache/CacheTests.java index a9741b44a1e45..3fbab0312e09d 100644 --- a/server/src/test/java/org/elasticsearch/common/cache/CacheTests.java +++ b/server/src/test/java/org/elasticsearch/common/cache/CacheTests.java @@ -9,815 +9,336 @@ package org.elasticsearch.common.cache; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; import org.junit.Before; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadMXBean; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Random; +import java.util.Optional; import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReferenceArray; -import java.util.stream.Collectors; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.ToLongBiFunction; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.is; +import static org.elasticsearch.common.cache.RemovalNotification.RemovalReason.EVICTED; +import static org.elasticsearch.common.cache.RemovalNotification.RemovalReason.INVALIDATED; public class CacheTests extends ESTestCase { - private int numberOfEntries; + + private List> knownImplementations; @Before public void setUp() throws Exception { super.setUp(); - numberOfEntries = randomIntBetween(1000, 10000); - logger.debug("numberOfEntries: {}", numberOfEntries); + knownImplementations = List.of((removalListener, weigher) -> { + CacheBuilder builder = CacheBuilder.builder(); + weigher.ifPresent(builder::weigher); + removalListener.ifPresent(builder::removalListener); + return builder.build(); + }); + logger.debug("Testing known implementations: {}", knownImplementations); } - // cache some entries, then randomly lookup keys that do not exist, then check the stats - public void testCacheStats() { - AtomicLong evictions = new AtomicLong(); - Set keys = new HashSet<>(); - Cache cache = CacheBuilder.builder() - .setMaximumWeight(numberOfEntries / 2) - .removalListener(notification -> { - keys.remove(notification.getKey()); - evictions.incrementAndGet(); - }) - .build(); - - for (int i = 0; i < numberOfEntries; i++) { - // track the keys, which will be removed upon eviction (see the RemovalListener) - keys.add(i); - cache.put(i, Integer.toString(i)); + @FunctionalInterface + interface CacheSupplier { + default Cache supply() { + return supply(Optional.empty(), Optional.empty()); } - long hits = 0; - long misses = 0; - Integer missingKey = 0; - for (Integer key : keys) { - --missingKey; - if (rarely()) { - misses++; - cache.get(missingKey); - } else { - hits++; - cache.get(key); - } - } - assertEquals(hits, cache.stats().getHits()); - assertEquals(misses, cache.stats().getMisses()); - assertEquals((long) Math.ceil(numberOfEntries / 2.0), evictions.get()); - assertEquals(evictions.get(), cache.stats().getEvictions()); - } - // cache some entries in batches of size maximumWeight; for each batch, touch the even entries to affect the - // ordering; upon the next caching of entries, the entries from the previous batch will be evicted; we can then - // check that the evicted entries were evicted in LRU order (first the odds in a batch, then the evens in a batch) - // for each batch - public void testCacheEvictions() { - int maximumWeight = randomIntBetween(1, numberOfEntries); - AtomicLong evictions = new AtomicLong(); - List evictedKeys = new ArrayList<>(); - Cache cache = CacheBuilder.builder() - .setMaximumWeight(maximumWeight) - .removalListener(notification -> { - evictions.incrementAndGet(); - evictedKeys.add(notification.getKey()); - }) - .build(); - // cache entries up to numberOfEntries - maximumWeight; all of these entries will ultimately be evicted in - // batches of size maximumWeight, first the odds in the batch, then the evens in the batch - List expectedEvictions = new ArrayList<>(); - int iterations = (int) Math.ceil((numberOfEntries - maximumWeight) / (1.0 * maximumWeight)); - for (int i = 0; i < iterations; i++) { - for (int j = i * maximumWeight; j < (i + 1) * maximumWeight && j < numberOfEntries - maximumWeight; j++) { - cache.put(j, Integer.toString(j)); - if (j % 2 == 1) { - expectedEvictions.add(j); - } - } - for (int j = i * maximumWeight; j < (i + 1) * maximumWeight && j < numberOfEntries - maximumWeight; j++) { - if (j % 2 == 0) { - cache.get(j); - expectedEvictions.add(j); - } - } + default Cache supply(RemovalListener removalListener) { + return supply(Optional.ofNullable(removalListener), Optional.empty()); } - // finish filling the cache - for (int i = numberOfEntries - maximumWeight; i < numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); - } - assertEquals(numberOfEntries - maximumWeight, evictions.get()); - assertEquals(evictions.get(), cache.stats().getEvictions()); - - // assert that the keys were evicted in LRU order - Set keys = new HashSet<>(); - List remainingKeys = new ArrayList<>(); - for (Integer key : cache.keys()) { - keys.add(key); - remainingKeys.add(key); - } - assertEquals(expectedEvictions.size(), evictedKeys.size()); - for (int i = 0; i < expectedEvictions.size(); i++) { - assertFalse(keys.contains(expectedEvictions.get(i))); - assertEquals(expectedEvictions.get(i), evictedKeys.get(i)); - } - for (int i = numberOfEntries - maximumWeight; i < numberOfEntries; i++) { - assertTrue(keys.contains(i)); - assertEquals( - numberOfEntries - i + (numberOfEntries - maximumWeight) - 1, - (int) remainingKeys.get(i - (numberOfEntries - maximumWeight)) - ); - } - } - // cache some entries and exceed the maximum weight, then check that the cache has the expected weight and the - // expected evictions occurred - public void testWeigher() { - int maximumWeight = 2 * numberOfEntries; - int weight = randomIntBetween(2, 10); - AtomicLong evictions = new AtomicLong(); - Cache cache = CacheBuilder.builder() - .setMaximumWeight(maximumWeight) - .weigher((k, v) -> weight) - .removalListener(notification -> evictions.incrementAndGet()) - .build(); - for (int i = 0; i < numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); + default Cache supply(ToLongBiFunction weigher) { + return supply(Optional.empty(), Optional.ofNullable(weigher)); } - // cache weight should be the largest multiple of weight less than maximumWeight - assertEquals(weight * (maximumWeight / weight), cache.weight()); - // the number of evicted entries should be the number of entries that fit in the excess weight - assertEquals((int) Math.ceil((weight - 2) * numberOfEntries / (1.0 * weight)), evictions.get()); + default Cache supply(RemovalListener removalListener, ToLongBiFunction weigher) { + return supply(Optional.ofNullable(removalListener), Optional.ofNullable(weigher)); + } - assertEquals(evictions.get(), cache.stats().getEvictions()); + Cache supply(Optional> removalListener, Optional> weigher); } - // cache some entries, randomly invalidate some of them, then check that the weight of the cache is correct - public void testWeight() { - Cache cache = CacheBuilder.builder().weigher((k, v) -> k).build(); - int weight = 0; - for (int i = 0; i < numberOfEntries; i++) { - weight += i; - cache.put(i, Integer.toString(i)); - } - for (int i = 0; i < numberOfEntries; i++) { - if (rarely()) { - weight -= i; - cache.invalidate(i); - } + /** + * Tests the put and get functionality across all known implementations of the Cache. + */ + public void testPutAndGet() { + for (CacheSupplier supplier : knownImplementations) { + Cache cache = supplier.supply(); + String key = randomAlphanumericOfLength(10); + String value = randomAlphanumericOfLength(50); + + cache.put(key, value); + assertEquals(value, cache.get(key)); } - assertEquals(weight, cache.weight()); } - // cache some entries, randomly invalidate some of them, then check that the number of cached entries is correct public void testCount() { - Cache cache = CacheBuilder.builder().build(); - int count = 0; - for (int i = 0; i < numberOfEntries; i++) { - count++; - cache.put(i, Integer.toString(i)); - } - for (int i = 0; i < numberOfEntries; i++) { - if (rarely()) { - count--; - cache.invalidate(i); - } - } - assertEquals(count, cache.count()); - } + for (CacheSupplier supplier : knownImplementations) { + Cache cache = supplier.supply(); - // cache some entries, step the clock forward, cache some more entries, step the clock forward and then check that - // the first batch of cached entries expired and were removed - public void testExpirationAfterAccess() { - AtomicLong now = new AtomicLong(); - Cache cache = new Cache() { - @Override - protected long now() { - return now.get(); + int expectedCount = randomIntBetween(3, 10); + for (int i = 0; i < expectedCount; i++) { + cache.put(randomAlphaOfLength(10), randomAlphaOfLength(50)); } - }; - cache.setExpireAfterAccessNanos(1); - List evictedKeys = new ArrayList<>(); - cache.setRemovalListener(notification -> { - assertEquals(RemovalNotification.RemovalReason.EVICTED, notification.getRemovalReason()); - evictedKeys.add(notification.getKey()); - }); - now.set(0); - for (int i = 0; i < numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); - } - now.set(1); - for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); - } - now.set(2); - cache.refresh(); - assertEquals(numberOfEntries, cache.count()); - for (int i = 0; i < evictedKeys.size(); i++) { - assertEquals(i, (int) evictedKeys.get(i)); - } - Set remainingKeys = new HashSet<>(); - for (Integer key : cache.keys()) { - remainingKeys.add(key); - } - for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) { - assertTrue(remainingKeys.contains(i)); + assertEquals(expectedCount, cache.count()); } } - public void testSimpleExpireAfterAccess() { - AtomicLong now = new AtomicLong(); - Cache cache = new Cache() { - @Override - protected long now() { - return now.get(); - } - }; - cache.setExpireAfterAccessNanos(1); - now.set(0); - for (int i = 0; i < numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); - } - for (int i = 0; i < numberOfEntries; i++) { - assertEquals(cache.get(i), Integer.toString(i)); - } - now.set(2); - for (int i = 0; i < numberOfEntries; i++) { - assertNull(cache.get(i)); - } - } + public void testWeight() { + for (CacheSupplier supplier : knownImplementations) { + ToLongBiFunction weigher = (key, value) -> value.length(); + Cache cache = supplier.supply(weigher); - public void testExpirationAfterWrite() { - AtomicLong now = new AtomicLong(); - Cache cache = new Cache() { - @Override - protected long now() { - return now.get(); - } - }; - cache.setExpireAfterWriteNanos(1); - List evictedKeys = new ArrayList<>(); - cache.setRemovalListener(notification -> { - assertEquals(RemovalNotification.RemovalReason.EVICTED, notification.getRemovalReason()); - evictedKeys.add(notification.getKey()); - }); - now.set(0); - for (int i = 0; i < numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); - } - now.set(1); - for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); - } - now.set(2); - for (int i = 0; i < numberOfEntries; i++) { - cache.get(i); - } - cache.refresh(); - assertEquals(numberOfEntries, cache.count()); - for (int i = 0; i < evictedKeys.size(); i++) { - assertEquals(i, (int) evictedKeys.get(i)); - } - Set remainingKeys = new HashSet<>(); - for (Integer key : cache.keys()) { - remainingKeys.add(key); - } - for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) { - assertTrue(remainingKeys.contains(i)); + int weight1 = randomIntBetween(10, 50); + cache.put(randomAlphaOfLength(10), randomAlphaOfLength(weight1)); + int weight2 = randomIntBetween(10, 50); + cache.put(randomAlphaOfLength(10), randomAlphaOfLength(weight2)); + assertEquals(weight1 + weight2, cache.weight()); // 6 + 11 } } - public void testComputeIfAbsentAfterExpiration() throws ExecutionException { - AtomicLong now = new AtomicLong(); - Cache cache = new Cache() { - @Override - protected long now() { - return now.get(); - } - }; - cache.setExpireAfterAccessNanos(1); - now.set(0); - for (int i = 0; i < numberOfEntries; i++) { - cache.put(i, Integer.toString(i) + "-first"); - } - now.set(2); - for (int i = 0; i < numberOfEntries; i++) { - cache.computeIfAbsent(i, k -> Integer.toString(k) + "-second"); - } - for (int i = 0; i < numberOfEntries; i++) { - assertEquals(i + "-second", cache.get(i)); - } - assertEquals(numberOfEntries, cache.stats().getEvictions()); - } + public void testInvalidate() throws ExecutionException, InterruptedException, TimeoutException { + for (CacheSupplier supplier : knownImplementations) { + String key = randomAlphaOfLength(10); + String value = randomAlphaOfLength(50); + CompletableFuture notified = new CompletableFuture<>(); - public void testComputeIfAbsentDeadlock() throws InterruptedException { - final Cache cache = CacheBuilder.builder() - .setExpireAfterAccess(TimeValue.timeValueNanos(1)) - .build(); - startInParallel(randomIntBetween(2, 32), i -> { - for (int j = 0; j < numberOfEntries; j++) { - try { - cache.computeIfAbsent(0, k -> Integer.toString(k)); - } catch (final ExecutionException e) { - throw new AssertionError(e); - } - } - }); - } + RemovalListener removalListener = (notification) -> { + assertEquals(key, notification.getKey()); + assertEquals(value, notification.getValue()); + assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason()); + notified.complete(true); + }; + Cache cache = supplier.supply(removalListener); - // randomly promote some entries, step the clock forward, then check that the promoted entries remain and the - // non-promoted entries were removed - public void testPromotion() { - AtomicLong now = new AtomicLong(); - Cache cache = new Cache() { - @Override - protected long now() { - return now.get(); - } - }; - cache.setExpireAfterAccessNanos(1); - now.set(0); - for (int i = 0; i < numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); - } - now.set(1); - Set promotedKeys = new HashSet<>(); - for (int i = 0; i < numberOfEntries; i++) { - if (rarely()) { - cache.get(i); - promotedKeys.add(i); - } - } - now.set(2); - cache.refresh(); - assertEquals(promotedKeys.size(), cache.count()); - for (int i = 0; i < numberOfEntries; i++) { - if (promotedKeys.contains(i)) { - assertNotNull(cache.get(i)); - } else { - assertNull(cache.get(i)); - } - } - } + cache.put(key, value); + cache.invalidate(key); - // randomly invalidate some cached entries, then check that a lookup for each of those and only those keys is null - public void testInvalidate() { - Cache cache = CacheBuilder.builder().build(); - for (int i = 0; i < numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); - } - Set keys = new HashSet<>(); - for (Integer key : cache.keys()) { - if (rarely()) { - cache.invalidate(key); - keys.add(key); - } - } - for (int i = 0; i < numberOfEntries; i++) { - if (keys.contains(i)) { - assertNull(cache.get(i)); - } else { - assertNotNull(cache.get(i)); - } + assertTrue(notified.get(3, TimeUnit.SECONDS)); } } - // randomly invalidate some cached entries, then check that we receive invalidate notifications for those and only - // those entries - public void testNotificationOnInvalidate() { - Set notifications = new HashSet<>(); - Cache cache = CacheBuilder.builder().removalListener(notification -> { - assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason()); - notifications.add(notification.getKey()); - }).build(); - for (int i = 0; i < numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); - } - Set invalidated = new HashSet<>(); - for (int i = 0; i < numberOfEntries; i++) { - if (rarely()) { - cache.invalidate(i); - invalidated.add(i); - } - } - assertEquals(notifications, invalidated); - } + public void testInvalidateAll() throws InterruptedException { + for (CacheSupplier supplier : knownImplementations) { + LongAdder notified = new LongAdder(); + RemovalListener removalListener = (notification) -> { + assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason()); + notified.increment(); + }; + Cache cache = supplier.supply(removalListener); - // randomly invalidate some cached entries, then check that a lookup for each of those and only those keys is null - public void testInvalidateWithValue() { - Cache cache = CacheBuilder.builder().build(); - for (int i = 0; i < numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); - } - Set keys = new HashSet<>(); - for (Integer key : cache.keys()) { - if (rarely()) { - if (randomBoolean()) { - cache.invalidate(key, key.toString()); - keys.add(key); - } else { - // invalidate with incorrect value - cache.invalidate(key, Integer.toString(key + randomIntBetween(2, 10))); - } - } - } - for (int i = 0; i < numberOfEntries; i++) { - if (keys.contains(i)) { - assertNull(cache.get(i)); - } else { - assertNotNull(cache.get(i)); + int numberOfEntries = randomIntBetween(10, 50); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(randomAlphaOfLength(10), randomAlphaOfLength(50)); } - } - } + cache.invalidateAll(); - // randomly invalidate some cached entries, then check that we receive invalidate notifications for those and only - // those entries - public void testNotificationOnInvalidateWithValue() { - Set notifications = new HashSet<>(); - Cache cache = CacheBuilder.builder().removalListener(notification -> { - assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason()); - notifications.add(notification.getKey()); - }).build(); - for (int i = 0; i < numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); - } - Set invalidated = new HashSet<>(); - for (int i = 0; i < numberOfEntries; i++) { - if (rarely()) { - if (randomBoolean()) { - cache.invalidate(i, Integer.toString(i)); - invalidated.add(i); - } else { - // invalidate with incorrect value - cache.invalidate(i, Integer.toString(i + randomIntBetween(2, 10))); + for (int tries = 1; tries <= 10; tries++) { + if (notified.longValue() == numberOfEntries) { + break; } + wait(100); } + assertEquals(numberOfEntries, notified.longValue()); + assertEquals(0, cache.count()); } - assertEquals(notifications, invalidated); } - // invalidate all cached entries, then check that the cache is empty - public void testInvalidateAll() { - Cache cache = CacheBuilder.builder().build(); - for (int i = 0; i < numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); - } - cache.invalidateAll(); - assertEquals(0, cache.count()); - assertEquals(0, cache.weight()); - } - - // invalidate all cached entries, then check that we receive invalidate notifications for all entries - public void testNotificationOnInvalidateAll() { - Set notifications = new HashSet<>(); - Cache cache = CacheBuilder.builder().removalListener(notification -> { - assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason()); - notifications.add(notification.getKey()); - }).build(); - Set invalidated = new HashSet<>(); - for (int i = 0; i < numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); - invalidated.add(i); - } - cache.invalidateAll(); - assertEquals(invalidated, notifications); - } - - // randomly replace some entries, increasing the weight by 1 for each replacement, then count that the cache size - // is correct - public void testReplaceRecomputesSize() { - class Value { - private String value; - private long weight; - - Value(String value, long weight) { - this.value = value; - this.weight = weight; + public void testStats() { + for (CacheSupplier supplier : knownImplementations) { + Map entries = new HashMap<>(); + for (int i = 0; i < randomIntBetween(25, 100); i++) { + entries.put(randomAlphaOfLength(10), randomAlphaOfLength(50)); } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + Cache cache = supplier.supply(); + entries.forEach(cache::put); - Value that = (Value) o; - - return value.equals(that.value); + int expectedHits = 0; + int expectedMisses = 0; + for (int i = 0; i < randomIntBetween(25, 100); i++) { + String key = randomAlphaOfLength(10); + if (entries.containsKey(key)) { + expectedHits++; + assertEquals(entries.get(key), cache.get(key)); + } else { + expectedMisses++; + assertNull(cache.get(key)); + } } - @Override - public int hashCode() { - return value.hashCode(); + int invalidations = randomIntBetween(10, entries.size() - 1); + for (int i = 0; i < invalidations; i++) { + String entry = randomFrom(entries.keySet()); + entries.remove(entry); + cache.invalidate(entry); } - } - Cache cache = CacheBuilder.builder().weigher((k, s) -> s.weight).build(); - for (int i = 0; i < numberOfEntries; i++) { - cache.put(i, new Value(Integer.toString(i), 1)); - } - assertEquals(numberOfEntries, cache.count()); - assertEquals(numberOfEntries, cache.weight()); - int replaced = 0; - for (int i = 0; i < numberOfEntries; i++) { - if (rarely()) { - replaced++; - cache.put(i, new Value(Integer.toString(i), 2)); - } - } - assertEquals(numberOfEntries, cache.count()); - assertEquals(numberOfEntries + replaced, cache.weight()); - } - // randomly replace some entries, then check that we received replacement notifications for those and only those - // entries - public void testNotificationOnReplace() { - Set notifications = new HashSet<>(); - Cache cache = CacheBuilder.builder().removalListener(notification -> { - assertEquals(RemovalNotification.RemovalReason.REPLACED, notification.getRemovalReason()); - notifications.add(notification.getKey()); - }).build(); - for (int i = 0; i < numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); - } - Set replacements = new HashSet<>(); - for (int i = 0; i < numberOfEntries; i++) { - if (rarely()) { - cache.put(i, Integer.toString(i) + Integer.toString(i)); - replacements.add(i); - } + Cache.Stats stats = cache.stats(); + assertEquals(expectedHits, stats.getHits()); + assertEquals(expectedMisses, stats.getMisses()); + assertEquals(invalidations, stats.getEvictions()); } - assertEquals(replacements, notifications); } - public void testComputeIfAbsentLoadsSuccessfully() { - Map map = new HashMap<>(); - Cache cache = CacheBuilder.builder().build(); - for (int i = 0; i < numberOfEntries; i++) { - try { - cache.computeIfAbsent(i, k -> { - int value = randomInt(); - map.put(k, value); - return value; - }); - } catch (ExecutionException e) { - throw new AssertionError(e); + public void testKeys() { + for (CacheSupplier supplier : knownImplementations) { + Map entries = new HashMap<>(); + for (int i = 0; i < randomIntBetween(25, 100); i++) { + entries.put(randomAlphaOfLength(10), randomAlphaOfLength(11)); } - } - for (int i = 0; i < numberOfEntries; i++) { - assertEquals(map.get(i), cache.get(i)); - } - } - public void testComputeIfAbsentCallsOnce() throws InterruptedException { - final Cache cache = CacheBuilder.builder().build(); - AtomicReferenceArray flags = new AtomicReferenceArray<>(numberOfEntries); - for (int j = 0; j < numberOfEntries; j++) { - flags.set(j, false); - } - CopyOnWriteArrayList failures = new CopyOnWriteArrayList<>(); - startInParallel(randomIntBetween(2, 32), i -> { - for (int j = 0; j < numberOfEntries; j++) { - try { - cache.computeIfAbsent(j, key -> { - assertTrue(flags.compareAndSet(key, false, true)); - return Integer.toString(key); - }); - } catch (ExecutionException e) { - failures.add(e); - break; - } - } - }); - assertThat(failures, is(empty())); - } + Cache cache = supplier.supply(); + entries.forEach(cache::put); - public void testComputeIfAbsentThrowsExceptionIfLoaderReturnsANullValue() { - final Cache cache = CacheBuilder.builder().build(); - try { - cache.computeIfAbsent(1, k -> null); - fail("expected ExecutionException"); - } catch (ExecutionException e) { - assertThat(e.getCause(), instanceOf(NullPointerException.class)); + Set expectedKeys = new HashSet<>(entries.keySet()); + cache.keys().forEach(key -> assertTrue(expectedKeys.remove(key))); + assertTrue(expectedKeys.isEmpty()); } } - public void testDependentKeyDeadlock() throws InterruptedException { - class Key { - private final int key; - - Key(int key) { - this.key = key; + public void testKeysIterator() { + for (CacheSupplier supplier : knownImplementations) { + Map entries = new HashMap<>(); + for (int i = 0; i < randomIntBetween(25, 100); i++) { + entries.put(randomAlphaOfLength(10), randomAlphaOfLength(11)); } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Key key1 = (Key) o; - - return key == key1.key; - - } - - @Override - public int hashCode() { - return key % 2; - } - } - - int numberOfThreads = randomIntBetween(2, 32); - final Cache cache = CacheBuilder.builder().build(); - - CopyOnWriteArrayList failures = new CopyOnWriteArrayList<>(); - AtomicBoolean reachedTimeLimit = new AtomicBoolean(); - - CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); - CountDownLatch deadlockLatch = new CountDownLatch(numberOfThreads); - List threads = new ArrayList<>(); - for (int i = 0; i < numberOfThreads; i++) { - Thread thread = new Thread(() -> { - try { - safeAwait(barrier); - Random random = new Random(random().nextLong()); - for (int j = 0; j < numberOfEntries && reachedTimeLimit.get() == false; j++) { - Key key = new Key(random.nextInt(numberOfEntries)); - try { - cache.computeIfAbsent(key, k -> { - if (k.key == 0) { - return 0; - } else { - Integer value = cache.get(new Key(k.key / 2)); - return value != null ? value : 0; - } - }); - } catch (ExecutionException e) { - failures.add(e); - break; - } - } - } finally { - // successfully avoided deadlock, release the main thread - deadlockLatch.countDown(); - } + LongAdder notificationsReceived = new LongAdder(); + Cache cache = supplier.supply((notification) -> { + assertEquals(notification.getRemovalReason(), INVALIDATED); + notificationsReceived.increment(); }); - threads.add(thread); - thread.start(); - } - - AtomicBoolean deadlock = new AtomicBoolean(); - assert deadlock.get() == false; - - // start a watchdog service - ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - scheduler.scheduleAtFixedRate(() -> { - Set ids = threads.stream().map(t -> t.getId()).collect(Collectors.toSet()); - ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); - long[] deadlockedThreads = mxBean.findDeadlockedThreads(); - if (deadlock.get() == false && deadlockedThreads != null) { - for (long deadlockedThread : deadlockedThreads) { - // ensure that we detected deadlock on our threads - if (ids.contains(deadlockedThread)) { - deadlock.set(true); - // release the main test thread to fail the test - for (int i = 0; i < numberOfThreads; i++) { - deadlockLatch.countDown(); - } - break; + entries.forEach(cache::put); + + { + Set expectedKeys = new HashSet<>(entries.keySet()); + Set removedKeys = new HashSet<>(); + Iterator iterator = cache.keys().iterator(); + while (iterator.hasNext()) { + String entry = iterator.next(); + assertTrue(expectedKeys.remove(entry)); + if (randomBoolean()) { + iterator.remove(); + removedKeys.add(entry); } } + assertTrue(expectedKeys.isEmpty()); + assertEquals(cache.count(), entries.size() - removedKeys.size()); + assertEquals(notificationsReceived.longValue(), removedKeys.size()); + removedKeys.forEach(entries::remove); } - }, 1, 1, TimeUnit.SECONDS); - // everything is setup, release the hounds - safeAwait(barrier); - - // run the test for a limited amount of time; if threads are still running after that, let them know and exit gracefully - if (deadlockLatch.await(1, TimeUnit.SECONDS) == false) { - reachedTimeLimit.set(true); + { + Set expectedKeys = new HashSet<>(entries.keySet()); + cache.keys().forEach(key -> assertTrue(expectedKeys.remove(key))); + assertTrue(expectedKeys.isEmpty()); + } } + } - // wait for either deadlock to be detected or the threads to terminate (end operations or time limit reached) - safeAwait(deadlockLatch); - - // shutdown the watchdog service - scheduler.shutdown(); + public void testValues() { + for (CacheSupplier supplier : knownImplementations) { + Map entries = new HashMap<>(); + for (int i = 0; i < randomIntBetween(25, 100); i++) { + entries.put(randomAlphaOfLength(10), randomAlphaOfLength(11)); + } - assertThat(failures, is(empty())); + Cache cache = supplier.supply(); + entries.forEach(cache::put); - assertFalse("deadlock", deadlock.get()); + Collection expectedValues = new ArrayList<>(entries.values()); + cache.values().forEach(key -> assertTrue(expectedValues.remove(key))); + assertTrue(expectedValues.isEmpty()); + } } - public void testCachePollution() throws InterruptedException { - int numberOfThreads = randomIntBetween(2, 32); - final Cache cache = CacheBuilder.builder().build(); - startInParallel(numberOfThreads, i -> { - Random random = new Random(random().nextLong()); - for (int j = 0; j < numberOfEntries; j++) { - Integer key = random.nextInt(numberOfEntries); - boolean first; - boolean second; - do { - first = random.nextBoolean(); - second = random.nextBoolean(); - } while (first && second); - if (first) { - try { - cache.computeIfAbsent(key, k -> { - if (random.nextBoolean()) { - return Integer.toString(k); - } else { - throw new Exception("testCachePollution"); - } - }); - } catch (ExecutionException e) { - assertNotNull(e.getCause()); - assertThat(e.getCause(), instanceOf(Exception.class)); - assertEquals(e.getCause().getMessage(), "testCachePollution"); + public void testValuesIterator() { + for (CacheSupplier supplier : knownImplementations) { + Map entries = new HashMap<>(); + for (int i = 0; i < randomIntBetween(25, 100); i++) { + entries.put(randomAlphaOfLength(10), randomAlphaOfLength(11)); + } + + LongAdder notificationsReceived = new LongAdder(); + Cache cache = supplier.supply((notification) -> { + assertEquals(notification.getRemovalReason(), INVALIDATED); + notificationsReceived.increment(); + }); + entries.forEach(cache::put); + + { // checking Iterator#remove() + Collection expectedValues = new ArrayList<>(entries.values()); + Collection removedValues = new ArrayList<>(); + Iterator iterator = cache.values().iterator(); + while (iterator.hasNext()) { + String entry = iterator.next(); + assertTrue(expectedValues.remove(entry)); + if (randomBoolean()) { + iterator.remove(); + removedValues.add(entry); } - } else if (second) { - cache.invalidate(key); - } else { - cache.get(key); } + assertTrue(expectedValues.isEmpty()); + assertEquals(cache.count(), entries.size() - removedValues.size()); + assertEquals(notificationsReceived.longValue(), removedValues.size()); + // multiple entries with the same value might cross but result should be eventually consistent, assuming cache emits + // values in the same way as the hashmap we use here does, like if a Map impl. does de-dup + reference. Academic argument + // I suppose, moving on... + removedValues.forEach(value -> entries.values().remove(value)); } - }); - } - public void testExceptionThrownDuringConcurrentComputeIfAbsent() throws InterruptedException { - final Cache cache = CacheBuilder.builder().build(); - final String key = randomAlphaOfLengthBetween(2, 32); - startInParallel(randomIntBetween(2, 32), i -> { - for (int j = 0; j < numberOfEntries; j++) { - try { - String value = cache.computeIfAbsent(key, k -> { throw new RuntimeException("failed to load"); }); - fail("expected exception but got: " + value); - } catch (ExecutionException e) { - assertNotNull(e.getCause()); - assertThat(e.getCause(), instanceOf(RuntimeException.class)); - assertEquals(e.getCause().getMessage(), "failed to load"); - } + { // checking if a new Iterable is consistent after the modifications + Collection expectedValues = new ArrayList<>(entries.values()); + cache.values().forEach(key -> assertTrue(expectedValues.remove(key))); + assertTrue(expectedValues.isEmpty()); } - }); + } } - // test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key - // here be dragons: this test did catch one subtle bug during development; do not remove lightly - public void testTorture() throws InterruptedException { - final Cache cache = CacheBuilder.builder().setMaximumWeight(1000).weigher((k, v) -> 2).build(); - startInParallel(randomIntBetween(2, 32), i -> { - Random random = new Random(random().nextLong()); - for (int j = 0; j < numberOfEntries; j++) { - Integer key = random.nextInt(numberOfEntries); - cache.put(key, Integer.toString(j)); + public void testForEach() { + for (CacheSupplier supplier : knownImplementations) { + Map entries = new HashMap<>(); + for (int i = 0; i < randomIntBetween(25, 100); i++) { + entries.put(randomAlphaOfLength(10), randomAlphaOfLength(50)); } - }); - cache.refresh(); - assertEquals(500, cache.count()); - } - public void testRemoveUsingValuesIterator() { - final List> removalNotifications = new ArrayList<>(); - Cache cache = CacheBuilder.builder() - .setMaximumWeight(numberOfEntries) - .removalListener(removalNotifications::add) - .build(); + Cache cache = supplier.supply(); + entries.forEach(cache::put); - for (int i = 0; i < numberOfEntries; i++) { - cache.put(i, Integer.toString(i)); + Map expectedEntries = new HashMap<>(entries); + cache.forEach((key, value) -> assertTrue(expectedEntries.remove(key, value))); + assertTrue(expectedEntries.isEmpty()); } + } - assertThat(removalNotifications.size(), is(0)); - final List expectedRemovals = new ArrayList<>(); - Iterator valueIterator = cache.values().iterator(); - while (valueIterator.hasNext()) { - String value = valueIterator.next(); - if (randomBoolean()) { - valueIterator.remove(); - expectedRemovals.add(value); + public void testRefresh() { + for (CacheSupplier supplier : knownImplementations) { + Map entries = new HashMap<>(); + for (int i = 0; i < randomIntBetween(25, 100); i++) { + entries.put(randomAlphaOfLength(10), randomAlphaOfLength(50)); } - } - assertEquals(expectedRemovals.size(), removalNotifications.size()); - for (int i = 0; i < expectedRemovals.size(); i++) { - assertEquals(expectedRemovals.get(i), removalNotifications.get(i).getValue()); - assertEquals(RemovalNotification.RemovalReason.INVALIDATED, removalNotifications.get(i).getRemovalReason()); + Cache cache = supplier.supply(notification -> { + assertEquals(notification.getRemovalReason(), EVICTED); + logger.debug("Eviction happened for key [{}], value [{}]", notification.getKey(), notification.getValue()); + }, + (key, value) -> Long.MAX_VALUE // maximize chance of triggering Eviction. + ); + entries.forEach(cache::put); + + logger.debug("Calling refresh"); + cache.refresh(); } } } diff --git a/server/src/test/java/org/elasticsearch/common/cache/LRUCacheTests.java b/server/src/test/java/org/elasticsearch/common/cache/LRUCacheTests.java new file mode 100644 index 0000000000000..6572b95d6ce0f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/cache/LRUCacheTests.java @@ -0,0 +1,823 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.cache; + +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +public class LRUCacheTests extends ESTestCase { + private int numberOfEntries; + + @Before + public void setUp() throws Exception { + super.setUp(); + numberOfEntries = randomIntBetween(1000, 10000); + logger.debug("numberOfEntries: {}", numberOfEntries); + } + + // cache some entries, then randomly lookup keys that do not exist, then check the stats + public void testCacheStats() { + AtomicLong evictions = new AtomicLong(); + Set keys = new HashSet<>(); + Cache cache = CacheBuilder.builder() + .setMaximumWeight(numberOfEntries / 2) + .removalListener(notification -> { + keys.remove(notification.getKey()); + evictions.incrementAndGet(); + }) + .build(); + + for (int i = 0; i < numberOfEntries; i++) { + // track the keys, which will be removed upon eviction (see the RemovalListener) + keys.add(i); + cache.put(i, Integer.toString(i)); + } + long hits = 0; + long misses = 0; + Integer missingKey = 0; + for (Integer key : keys) { + --missingKey; + if (rarely()) { + misses++; + cache.get(missingKey); + } else { + hits++; + cache.get(key); + } + } + assertEquals(hits, cache.stats().getHits()); + assertEquals(misses, cache.stats().getMisses()); + assertEquals((long) Math.ceil(numberOfEntries / 2.0), evictions.get()); + assertEquals(evictions.get(), cache.stats().getEvictions()); + } + + // cache some entries in batches of size maximumWeight; for each batch, touch the even entries to affect the + // ordering; upon the next caching of entries, the entries from the previous batch will be evicted; we can then + // check that the evicted entries were evicted in LRU order (first the odds in a batch, then the evens in a batch) + // for each batch + public void testCacheEvictions() { + int maximumWeight = randomIntBetween(1, numberOfEntries); + AtomicLong evictions = new AtomicLong(); + List evictedKeys = new ArrayList<>(); + Cache cache = CacheBuilder.builder() + .setMaximumWeight(maximumWeight) + .removalListener(notification -> { + evictions.incrementAndGet(); + evictedKeys.add(notification.getKey()); + }) + .build(); + // cache entries up to numberOfEntries - maximumWeight; all of these entries will ultimately be evicted in + // batches of size maximumWeight, first the odds in the batch, then the evens in the batch + List expectedEvictions = new ArrayList<>(); + int iterations = (int) Math.ceil((numberOfEntries - maximumWeight) / (1.0 * maximumWeight)); + for (int i = 0; i < iterations; i++) { + for (int j = i * maximumWeight; j < (i + 1) * maximumWeight && j < numberOfEntries - maximumWeight; j++) { + cache.put(j, Integer.toString(j)); + if (j % 2 == 1) { + expectedEvictions.add(j); + } + } + for (int j = i * maximumWeight; j < (i + 1) * maximumWeight && j < numberOfEntries - maximumWeight; j++) { + if (j % 2 == 0) { + cache.get(j); + expectedEvictions.add(j); + } + } + } + // finish filling the cache + for (int i = numberOfEntries - maximumWeight; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + assertEquals(numberOfEntries - maximumWeight, evictions.get()); + assertEquals(evictions.get(), cache.stats().getEvictions()); + + // assert that the keys were evicted in LRU order + Set keys = new HashSet<>(); + List remainingKeys = new ArrayList<>(); + for (Integer key : cache.keys()) { + keys.add(key); + remainingKeys.add(key); + } + assertEquals(expectedEvictions.size(), evictedKeys.size()); + for (int i = 0; i < expectedEvictions.size(); i++) { + assertFalse(keys.contains(expectedEvictions.get(i))); + assertEquals(expectedEvictions.get(i), evictedKeys.get(i)); + } + for (int i = numberOfEntries - maximumWeight; i < numberOfEntries; i++) { + assertTrue(keys.contains(i)); + assertEquals( + numberOfEntries - i + (numberOfEntries - maximumWeight) - 1, + (int) remainingKeys.get(i - (numberOfEntries - maximumWeight)) + ); + } + } + + // cache some entries and exceed the maximum weight, then check that the cache has the expected weight and the + // expected evictions occurred + public void testWeigher() { + int maximumWeight = 2 * numberOfEntries; + int weight = randomIntBetween(2, 10); + AtomicLong evictions = new AtomicLong(); + Cache cache = CacheBuilder.builder() + .setMaximumWeight(maximumWeight) + .weigher((k, v) -> weight) + .removalListener(notification -> evictions.incrementAndGet()) + .build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + // cache weight should be the largest multiple of weight less than maximumWeight + assertEquals(weight * (maximumWeight / weight), cache.weight()); + + // the number of evicted entries should be the number of entries that fit in the excess weight + assertEquals((int) Math.ceil((weight - 2) * numberOfEntries / (1.0 * weight)), evictions.get()); + + assertEquals(evictions.get(), cache.stats().getEvictions()); + } + + // cache some entries, randomly invalidate some of them, then check that the weight of the cache is correct + public void testWeight() { + Cache cache = CacheBuilder.builder().weigher((k, v) -> k).build(); + int weight = 0; + for (int i = 0; i < numberOfEntries; i++) { + weight += i; + cache.put(i, Integer.toString(i)); + } + for (int i = 0; i < numberOfEntries; i++) { + if (rarely()) { + weight -= i; + cache.invalidate(i); + } + } + assertEquals(weight, cache.weight()); + } + + // cache some entries, randomly invalidate some of them, then check that the number of cached entries is correct + public void testCount() { + Cache cache = CacheBuilder.builder().build(); + int count = 0; + for (int i = 0; i < numberOfEntries; i++) { + count++; + cache.put(i, Integer.toString(i)); + } + for (int i = 0; i < numberOfEntries; i++) { + if (rarely()) { + count--; + cache.invalidate(i); + } + } + assertEquals(count, cache.count()); + } + + // cache some entries, step the clock forward, cache some more entries, step the clock forward and then check that + // the first batch of cached entries expired and were removed + public void testExpirationAfterAccess() { + AtomicLong now = new AtomicLong(); + LRUCache cache = new LRUCache<>() { + @Override + protected long now() { + return now.get(); + } + }; + cache.setExpireAfterAccessNanos(1); + List evictedKeys = new ArrayList<>(); + cache.setRemovalListener(notification -> { + assertEquals(RemovalNotification.RemovalReason.EVICTED, notification.getRemovalReason()); + evictedKeys.add(notification.getKey()); + }); + now.set(0); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + now.set(1); + for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + now.set(2); + cache.refresh(); + assertEquals(numberOfEntries, cache.count()); + for (int i = 0; i < evictedKeys.size(); i++) { + assertEquals(i, (int) evictedKeys.get(i)); + } + Set remainingKeys = new HashSet<>(); + for (Integer key : cache.keys()) { + remainingKeys.add(key); + } + for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) { + assertTrue(remainingKeys.contains(i)); + } + } + + public void testSimpleExpireAfterAccess() { + AtomicLong now = new AtomicLong(); + LRUCache cache = new LRUCache<>() { + @Override + protected long now() { + return now.get(); + } + }; + cache.setExpireAfterAccessNanos(1); + now.set(0); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + for (int i = 0; i < numberOfEntries; i++) { + assertEquals(cache.get(i), Integer.toString(i)); + } + now.set(2); + for (int i = 0; i < numberOfEntries; i++) { + assertNull(cache.get(i)); + } + } + + public void testExpirationAfterWrite() { + AtomicLong now = new AtomicLong(); + LRUCache cache = new LRUCache<>() { + @Override + protected long now() { + return now.get(); + } + }; + cache.setExpireAfterWriteNanos(1); + List evictedKeys = new ArrayList<>(); + cache.setRemovalListener(notification -> { + assertEquals(RemovalNotification.RemovalReason.EVICTED, notification.getRemovalReason()); + evictedKeys.add(notification.getKey()); + }); + now.set(0); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + now.set(1); + for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + now.set(2); + for (int i = 0; i < numberOfEntries; i++) { + cache.get(i); + } + cache.refresh(); + assertEquals(numberOfEntries, cache.count()); + for (int i = 0; i < evictedKeys.size(); i++) { + assertEquals(i, (int) evictedKeys.get(i)); + } + Set remainingKeys = new HashSet<>(); + for (Integer key : cache.keys()) { + remainingKeys.add(key); + } + for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) { + assertTrue(remainingKeys.contains(i)); + } + } + + public void testComputeIfAbsentAfterExpiration() throws ExecutionException { + AtomicLong now = new AtomicLong(); + LRUCache cache = new LRUCache<>() { + @Override + protected long now() { + return now.get(); + } + }; + cache.setExpireAfterAccessNanos(1); + now.set(0); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i) + "-first"); + } + now.set(2); + for (int i = 0; i < numberOfEntries; i++) { + cache.computeIfAbsent(i, k -> Integer.toString(k) + "-second"); + } + for (int i = 0; i < numberOfEntries; i++) { + assertEquals(i + "-second", cache.get(i)); + } + assertEquals(numberOfEntries, cache.stats().getEvictions()); + } + + public void testComputeIfAbsentDeadlock() throws InterruptedException { + final Cache cache = CacheBuilder.builder() + .setExpireAfterAccess(TimeValue.timeValueNanos(1)) + .build(); + startInParallel(randomIntBetween(2, 32), i -> { + for (int j = 0; j < numberOfEntries; j++) { + try { + cache.computeIfAbsent(0, k -> Integer.toString(k)); + } catch (final ExecutionException e) { + throw new AssertionError(e); + } + } + }); + } + + // randomly promote some entries, step the clock forward, then check that the promoted entries remain and the + // non-promoted entries were removed + public void testPromotion() { + AtomicLong now = new AtomicLong(); + LRUCache cache = new LRUCache<>() { + @Override + protected long now() { + return now.get(); + } + }; + cache.setExpireAfterAccessNanos(1); + now.set(0); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + now.set(1); + Set promotedKeys = new HashSet<>(); + for (int i = 0; i < numberOfEntries; i++) { + if (rarely()) { + cache.get(i); + promotedKeys.add(i); + } + } + now.set(2); + cache.refresh(); + assertEquals(promotedKeys.size(), cache.count()); + for (int i = 0; i < numberOfEntries; i++) { + if (promotedKeys.contains(i)) { + assertNotNull(cache.get(i)); + } else { + assertNull(cache.get(i)); + } + } + } + + // randomly invalidate some cached entries, then check that a lookup for each of those and only those keys is null + public void testInvalidate() { + Cache cache = CacheBuilder.builder().build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + Set keys = new HashSet<>(); + for (Integer key : cache.keys()) { + if (rarely()) { + cache.invalidate(key); + keys.add(key); + } + } + for (int i = 0; i < numberOfEntries; i++) { + if (keys.contains(i)) { + assertNull(cache.get(i)); + } else { + assertNotNull(cache.get(i)); + } + } + } + + // randomly invalidate some cached entries, then check that we receive invalidate notifications for those and only + // those entries + public void testNotificationOnInvalidate() { + Set notifications = new HashSet<>(); + Cache cache = CacheBuilder.builder().removalListener(notification -> { + assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason()); + notifications.add(notification.getKey()); + }).build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + Set invalidated = new HashSet<>(); + for (int i = 0; i < numberOfEntries; i++) { + if (rarely()) { + cache.invalidate(i); + invalidated.add(i); + } + } + assertEquals(notifications, invalidated); + } + + // randomly invalidate some cached entries, then check that a lookup for each of those and only those keys is null + public void testInvalidateWithValue() { + Cache cache = CacheBuilder.builder().build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + Set keys = new HashSet<>(); + for (Integer key : cache.keys()) { + if (rarely()) { + if (randomBoolean()) { + cache.invalidate(key, key.toString()); + keys.add(key); + } else { + // invalidate with incorrect value + cache.invalidate(key, Integer.toString(key + randomIntBetween(2, 10))); + } + } + } + for (int i = 0; i < numberOfEntries; i++) { + if (keys.contains(i)) { + assertNull(cache.get(i)); + } else { + assertNotNull(cache.get(i)); + } + } + } + + // randomly invalidate some cached entries, then check that we receive invalidate notifications for those and only + // those entries + public void testNotificationOnInvalidateWithValue() { + Set notifications = new HashSet<>(); + Cache cache = CacheBuilder.builder().removalListener(notification -> { + assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason()); + notifications.add(notification.getKey()); + }).build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + Set invalidated = new HashSet<>(); + for (int i = 0; i < numberOfEntries; i++) { + if (rarely()) { + if (randomBoolean()) { + cache.invalidate(i, Integer.toString(i)); + invalidated.add(i); + } else { + // invalidate with incorrect value + cache.invalidate(i, Integer.toString(i + randomIntBetween(2, 10))); + } + } + } + assertEquals(notifications, invalidated); + } + + // invalidate all cached entries, then check that the cache is empty + public void testInvalidateAll() { + Cache cache = CacheBuilder.builder().build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + cache.invalidateAll(); + assertEquals(0, cache.count()); + assertEquals(0, cache.weight()); + } + + // invalidate all cached entries, then check that we receive invalidate notifications for all entries + public void testNotificationOnInvalidateAll() { + Set notifications = new HashSet<>(); + Cache cache = CacheBuilder.builder().removalListener(notification -> { + assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason()); + notifications.add(notification.getKey()); + }).build(); + Set invalidated = new HashSet<>(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + invalidated.add(i); + } + cache.invalidateAll(); + assertEquals(invalidated, notifications); + } + + // randomly replace some entries, increasing the weight by 1 for each replacement, then count that the cache size + // is correct + public void testReplaceRecomputesSize() { + class Value { + private String value; + private long weight; + + Value(String value, long weight) { + this.value = value; + this.weight = weight; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Value that = (Value) o; + + return value.equals(that.value); + } + + @Override + public int hashCode() { + return value.hashCode(); + } + } + Cache cache = CacheBuilder.builder().weigher((k, s) -> s.weight).build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, new Value(Integer.toString(i), 1)); + } + assertEquals(numberOfEntries, cache.count()); + assertEquals(numberOfEntries, cache.weight()); + int replaced = 0; + for (int i = 0; i < numberOfEntries; i++) { + if (rarely()) { + replaced++; + cache.put(i, new Value(Integer.toString(i), 2)); + } + } + assertEquals(numberOfEntries, cache.count()); + assertEquals(numberOfEntries + replaced, cache.weight()); + } + + // randomly replace some entries, then check that we received replacement notifications for those and only those + // entries + public void testNotificationOnReplace() { + Set notifications = new HashSet<>(); + Cache cache = CacheBuilder.builder().removalListener(notification -> { + assertEquals(RemovalNotification.RemovalReason.REPLACED, notification.getRemovalReason()); + notifications.add(notification.getKey()); + }).build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + Set replacements = new HashSet<>(); + for (int i = 0; i < numberOfEntries; i++) { + if (rarely()) { + cache.put(i, Integer.toString(i) + Integer.toString(i)); + replacements.add(i); + } + } + assertEquals(replacements, notifications); + } + + public void testComputeIfAbsentLoadsSuccessfully() { + Map map = new HashMap<>(); + Cache cache = CacheBuilder.builder().build(); + for (int i = 0; i < numberOfEntries; i++) { + try { + cache.computeIfAbsent(i, k -> { + int value = randomInt(); + map.put(k, value); + return value; + }); + } catch (ExecutionException e) { + throw new AssertionError(e); + } + } + for (int i = 0; i < numberOfEntries; i++) { + assertEquals(map.get(i), cache.get(i)); + } + } + + public void testComputeIfAbsentCallsOnce() throws InterruptedException { + final Cache cache = CacheBuilder.builder().build(); + AtomicReferenceArray flags = new AtomicReferenceArray<>(numberOfEntries); + for (int j = 0; j < numberOfEntries; j++) { + flags.set(j, false); + } + CopyOnWriteArrayList failures = new CopyOnWriteArrayList<>(); + startInParallel(randomIntBetween(2, 32), i -> { + for (int j = 0; j < numberOfEntries; j++) { + try { + cache.computeIfAbsent(j, key -> { + assertTrue(flags.compareAndSet(key, false, true)); + return Integer.toString(key); + }); + } catch (ExecutionException e) { + failures.add(e); + break; + } + } + }); + assertThat(failures, is(empty())); + } + + public void testComputeIfAbsentThrowsExceptionIfLoaderReturnsANullValue() { + final Cache cache = CacheBuilder.builder().build(); + try { + cache.computeIfAbsent(1, k -> null); + fail("expected ExecutionException"); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(NullPointerException.class)); + } + } + + public void testDependentKeyDeadlock() throws InterruptedException { + class Key { + private final int key; + + Key(int key) { + this.key = key; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Key key1 = (Key) o; + + return key == key1.key; + + } + + @Override + public int hashCode() { + return key % 2; + } + } + + int numberOfThreads = randomIntBetween(2, 32); + final Cache cache = CacheBuilder.builder().build(); + + CopyOnWriteArrayList failures = new CopyOnWriteArrayList<>(); + AtomicBoolean reachedTimeLimit = new AtomicBoolean(); + + CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); + CountDownLatch deadlockLatch = new CountDownLatch(numberOfThreads); + List threads = new ArrayList<>(); + for (int i = 0; i < numberOfThreads; i++) { + Thread thread = new Thread(() -> { + try { + safeAwait(barrier); + Random random = new Random(random().nextLong()); + for (int j = 0; j < numberOfEntries && reachedTimeLimit.get() == false; j++) { + Key key = new Key(random.nextInt(numberOfEntries)); + try { + cache.computeIfAbsent(key, k -> { + if (k.key == 0) { + return 0; + } else { + Integer value = cache.get(new Key(k.key / 2)); + return value != null ? value : 0; + } + }); + } catch (ExecutionException e) { + failures.add(e); + break; + } + } + } finally { + // successfully avoided deadlock, release the main thread + deadlockLatch.countDown(); + } + }); + threads.add(thread); + thread.start(); + } + + AtomicBoolean deadlock = new AtomicBoolean(); + assert deadlock.get() == false; + + // start a watchdog service + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + scheduler.scheduleAtFixedRate(() -> { + Set ids = threads.stream().map(t -> t.getId()).collect(Collectors.toSet()); + ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); + long[] deadlockedThreads = mxBean.findDeadlockedThreads(); + if (deadlock.get() == false && deadlockedThreads != null) { + for (long deadlockedThread : deadlockedThreads) { + // ensure that we detected deadlock on our threads + if (ids.contains(deadlockedThread)) { + deadlock.set(true); + // release the main test thread to fail the test + for (int i = 0; i < numberOfThreads; i++) { + deadlockLatch.countDown(); + } + break; + } + } + } + }, 1, 1, TimeUnit.SECONDS); + + // everything is setup, release the hounds + safeAwait(barrier); + + // run the test for a limited amount of time; if threads are still running after that, let them know and exit gracefully + if (deadlockLatch.await(1, TimeUnit.SECONDS) == false) { + reachedTimeLimit.set(true); + } + + // wait for either deadlock to be detected or the threads to terminate (end operations or time limit reached) + safeAwait(deadlockLatch); + + // shutdown the watchdog service + scheduler.shutdown(); + + assertThat(failures, is(empty())); + + assertFalse("deadlock", deadlock.get()); + } + + public void testCachePollution() throws InterruptedException { + int numberOfThreads = randomIntBetween(2, 32); + final Cache cache = CacheBuilder.builder().build(); + startInParallel(numberOfThreads, i -> { + Random random = new Random(random().nextLong()); + for (int j = 0; j < numberOfEntries; j++) { + Integer key = random.nextInt(numberOfEntries); + boolean first; + boolean second; + do { + first = random.nextBoolean(); + second = random.nextBoolean(); + } while (first && second); + if (first) { + try { + cache.computeIfAbsent(key, k -> { + if (random.nextBoolean()) { + return Integer.toString(k); + } else { + throw new Exception("testCachePollution"); + } + }); + } catch (ExecutionException e) { + assertNotNull(e.getCause()); + assertThat(e.getCause(), instanceOf(Exception.class)); + assertEquals(e.getCause().getMessage(), "testCachePollution"); + } + } else if (second) { + cache.invalidate(key); + } else { + cache.get(key); + } + } + }); + } + + public void testExceptionThrownDuringConcurrentComputeIfAbsent() throws InterruptedException { + final Cache cache = CacheBuilder.builder().build(); + final String key = randomAlphaOfLengthBetween(2, 32); + startInParallel(randomIntBetween(2, 32), i -> { + for (int j = 0; j < numberOfEntries; j++) { + try { + String value = cache.computeIfAbsent(key, k -> { throw new RuntimeException("failed to load"); }); + fail("expected exception but got: " + value); + } catch (ExecutionException e) { + assertNotNull(e.getCause()); + assertThat(e.getCause(), instanceOf(RuntimeException.class)); + assertEquals(e.getCause().getMessage(), "failed to load"); + } + } + }); + } + + // test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key + // here be dragons: this test did catch one subtle bug during development; do not remove lightly + public void testTorture() throws InterruptedException { + final Cache cache = CacheBuilder.builder().setMaximumWeight(1000).weigher((k, v) -> 2).build(); + startInParallel(randomIntBetween(2, 32), i -> { + Random random = new Random(random().nextLong()); + for (int j = 0; j < numberOfEntries; j++) { + Integer key = random.nextInt(numberOfEntries); + cache.put(key, Integer.toString(j)); + } + }); + cache.refresh(); + assertEquals(500, cache.count()); + } + + public void testRemoveUsingValuesIterator() { + final List> removalNotifications = new ArrayList<>(); + Cache cache = CacheBuilder.builder() + .setMaximumWeight(numberOfEntries) + .removalListener(removalNotifications::add) + .build(); + + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + + assertThat(removalNotifications.size(), is(0)); + final List expectedRemovals = new ArrayList<>(); + Iterator valueIterator = cache.values().iterator(); + while (valueIterator.hasNext()) { + String value = valueIterator.next(); + if (randomBoolean()) { + valueIterator.remove(); + expectedRemovals.add(value); + } + } + + assertEquals(expectedRemovals.size(), removalNotifications.size()); + for (int i = 0; i < expectedRemovals.size(); i++) { + assertEquals(expectedRemovals.get(i), removalNotifications.get(i).getValue()); + assertEquals(RemovalNotification.RemovalReason.INVALIDATED, removalNotifications.get(i).getRemovalReason()); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java b/server/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java index d55f712cb91b6..db891561e1090 100644 --- a/server/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java @@ -21,6 +21,7 @@ import org.apache.lucene.store.ByteBuffersDirectory; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.common.cache.LRUCache; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; @@ -60,6 +61,7 @@ import static org.elasticsearch.index.mapper.NumberFieldMapper.NumberType.SHORT; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -375,13 +377,15 @@ public void testFieldDataCacheExpire() { Settings settings = Settings.EMPTY; IndicesFieldDataCache cache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() { }); - assertThat(cache.getCache().getExpireAfterAccessNanos(), equalTo(3_600_000_000_000L)); + assertThat("Type is LRUCache as test depends on it", cache.getCache(), instanceOf(LRUCache.class)); + assertThat(((LRUCache) cache.getCache()).getExpireAfterAccessNanos(), equalTo(3_600_000_000_000L)); } { Settings settings = Settings.builder().put(IndicesFieldDataCache.INDICES_FIELDDATA_CACHE_EXPIRE.getKey(), "5s").build(); IndicesFieldDataCache cache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() { }); - assertThat(cache.getCache().getExpireAfterAccessNanos(), equalTo(5_000_000_000L)); + assertThat("Type is LRUCache as test depends on it", cache.getCache(), instanceOf(LRUCache.class)); + assertThat(((LRUCache) cache.getCache()).getExpireAfterAccessNanos(), equalTo(5_000_000_000L)); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/FieldPermissionsCache.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/FieldPermissionsCache.java index 46261937a0228..57de4496aa1a0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/FieldPermissionsCache.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/FieldPermissionsCache.java @@ -49,7 +49,7 @@ public FieldPermissionsCache(Settings settings) { .build(); } - public Cache.CacheStats getCacheStats() { + public Cache.Stats getCacheStats() { return cache.stats(); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/RoleDescriptorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/RoleDescriptorTests.java index b15ab80ba6c2e..1bd0206beccf7 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/RoleDescriptorTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/RoleDescriptorTests.java @@ -578,7 +578,7 @@ public void testParsingFieldPermissionsUsesCache() throws IOException { FieldPermissionsCache fieldPermissionsCache = new FieldPermissionsCache(Settings.EMPTY); RoleDescriptor.setFieldPermissionsCache(fieldPermissionsCache); - final Cache.CacheStats beforeStats = fieldPermissionsCache.getCacheStats(); + final Cache.Stats beforeStats = fieldPermissionsCache.getCacheStats(); final String json = """ { @@ -604,7 +604,7 @@ public void testParsingFieldPermissionsUsesCache() throws IOException { RoleDescriptor.parserBuilder().build().parse("test", new BytesArray(json), XContentType.JSON); final int numberOfFieldSecurityBlocks = 2; - final Cache.CacheStats betweenStats = fieldPermissionsCache.getCacheStats(); + final Cache.Stats betweenStats = fieldPermissionsCache.getCacheStats(); assertThat(betweenStats.getMisses(), equalTo(beforeStats.getMisses() + numberOfFieldSecurityBlocks)); assertThat(betweenStats.getHits(), equalTo(beforeStats.getHits())); @@ -613,7 +613,7 @@ public void testParsingFieldPermissionsUsesCache() throws IOException { RoleDescriptor.parserBuilder().build().parse("test", new BytesArray(json), XContentType.JSON); } - final Cache.CacheStats afterStats = fieldPermissionsCache.getCacheStats(); + final Cache.Stats afterStats = fieldPermissionsCache.getCacheStats(); assertThat(afterStats.getMisses(), equalTo(betweenStats.getMisses())); assertThat(afterStats.getHits(), equalTo(beforeStats.getHits() + numberOfFieldSecurityBlocks * iterations)); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java index 6a621d1539d55..59b454b65f005 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java @@ -137,13 +137,13 @@ void put(CacheKey cacheKey, CacheValue cacheValue) { } public EnrichStatsAction.Response.CacheStats getStats(String localNodeId) { - Cache.CacheStats cacheStats = cache.stats(); + Cache.Stats stats = cache.stats(); return new EnrichStatsAction.Response.CacheStats( localNodeId, cache.count(), - cacheStats.getHits(), - cacheStats.getMisses(), - cacheStats.getEvictions(), + stats.getHits(), + stats.getMisses(), + stats.getEvictions(), TimeValue.nsecToMSec(hitsTimeInNanos.get()), TimeValue.nsecToMSec(missesTimeInNanos.get()), sizeInBytes.get() diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java index 9fa5934a887f5..c9bc74039e57b 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java @@ -252,7 +252,7 @@ public ApiKeyService( this.threadPool = threadPool; this.cacheHasher = Hasher.resolve(CACHE_HASH_ALGO_SETTING.get(settings)); final TimeValue ttl = CACHE_TTL_SETTING.get(settings); - final int maximumWeight = CACHE_MAX_KEYS_SETTING.get(settings); + final long maximumWeight = CACHE_MAX_KEYS_SETTING.get(settings); if (ttl.getNanos() > 0) { this.apiKeyAuthCache = CacheBuilder.>builder() .setExpireAfterAccess(ttl) @@ -2340,9 +2340,9 @@ private static VersionedApiKeyDoc convertSearchHitToVersionedApiKeyDoc(SearchHit private record VersionedApiKeyDoc(ApiKeyDoc doc, String id, long seqNo, long primaryTerm) {} - private RemovalListener> getAuthCacheRemovalListener(int maximumWeight) { + private RemovalListener> getAuthCacheRemovalListener(long maximumWeight) { return notification -> { - if (RemovalReason.EVICTED == notification.getRemovalReason() && getApiKeyAuthCache().count() >= maximumWeight) { + if (RemovalReason.EVICTED == notification.getRemovalReason() && getApiKeyAuthCache().weight() >= maximumWeight) { evictionCounter.increment(); logger.trace( "API key with ID [{}] was evicted from the authentication cache, " + "possibly due to cache size limit", @@ -2683,7 +2683,7 @@ private static final class ApiKeyDocCache { private final Cache roleDescriptorsBytesCache; private final LockingAtomicCounter lockingAtomicCounter; - ApiKeyDocCache(TimeValue ttl, int maximumWeight) { + ApiKeyDocCache(TimeValue ttl, long maximumWeight) { this.docCache = CacheBuilder.builder() .setMaximumWeight(maximumWeight) .setExpireAfterWrite(ttl) @@ -2693,7 +2693,7 @@ private static final class ApiKeyDocCache { // multiple API keys, so we cache for longer and rely on the weight to manage the cache size. this.roleDescriptorsBytesCache = CacheBuilder.builder() .setExpireAfterAccess(TimeValue.timeValueHours(1)) - .setMaximumWeight(maximumWeight * 2L) + .setMaximumWeight(maximumWeight < Long.MAX_VALUE / 2 ? maximumWeight * 2L : Long.MAX_VALUE) .build(); this.lockingAtomicCounter = new LockingAtomicCounter(); }