Skip to content

Commit 831de21

Browse files
committed
Limit replay to those records not durably persisted to both command store and data store
Also fix: - Limit truncation to TruncatedApplyWithOutcome until data is persisted durably to local store - IntervalUpdater not invoking super.close() - Do not invoke preRunExclusive without holding lock - IntervalUpdater not correctly initialise BranchBuilder.inUse - AccordExecutor should notify if more work on unlock of caches - Relax paranoid CFK validation during restart Also improve: - Flush logs before System.exit - Start/stop progress log explicitly - Limit progress log concurrency - Clear heavy fields in some messages once processed patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20780
1 parent 418e3a4 commit 831de21

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1138
-745
lines changed

modules/accord

Submodule accord updated 58 files

src/java/org/apache/cassandra/config/AccordSpec.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,11 @@ public enum QueueShardModel
5757
/**
5858
* Same number of threads as queue shards, but the shard lock is held only while managing the queue,
5959
* so that submitting threads may queue load/save work.
60-
*
61-
* The global READ and WRITE stages are used for IO.
6260
*/
6361
THREAD_PER_SHARD,
6462

6563
/**
6664
* Same number of threads as shards, and the shard lock is held for the duration of serving requests.
67-
* The global READ and WRITE stages are used for IO.
6865
*/
6966
THREAD_PER_SHARD_SYNC_QUEUE,
7067

@@ -73,12 +70,6 @@ public enum QueueShardModel
7370
* Fewer shards is generally better, until queue-contention is encountered.
7471
*/
7572
THREAD_POOL_PER_SHARD,
76-
77-
/**
78-
* More threads than shards. Threads update transaction state only, relying on READ and WRITE stages for IO.
79-
* Fewer shards is generally better, until queue-contention is encountered.
80-
*/
81-
THREAD_POOL_PER_SHARD_EXCLUDES_IO,
8273
}
8374

8475
public enum QueueSubmissionModel
@@ -130,6 +121,7 @@ public enum QueueSubmissionModel
130121

131122
public volatile OptionaldPositiveInt max_queued_loads = OptionaldPositiveInt.UNDEFINED;
132123
public volatile OptionaldPositiveInt max_queued_range_loads = OptionaldPositiveInt.UNDEFINED;
124+
public volatile OptionaldPositiveInt max_progress_log_concurrency = OptionaldPositiveInt.UNDEFINED;
133125

134126
public DataStorageSpec.LongMebibytesBound cache_size = null;
135127
public DataStorageSpec.LongMebibytesBound working_set_size = null;
@@ -158,8 +150,8 @@ public enum QueueSubmissionModel
158150

159151
public volatile DurationSpec.IntSecondsBound fast_path_update_delay = null;
160152

161-
public volatile DurationSpec.IntSecondsBound gc_delay = new DurationSpec.IntSecondsBound("5m");
162153
public volatile int shard_durability_target_splits = 16;
154+
public volatile int shard_durability_max_splits = 128;
163155
public volatile DurationSpec.IntSecondsBound durability_txnid_lag = new DurationSpec.IntSecondsBound(5);
164156
public volatile DurationSpec.IntSecondsBound shard_durability_cycle = new DurationSpec.IntSecondsBound(5, TimeUnit.MINUTES);
165157
public volatile DurationSpec.IntSecondsBound global_durability_cycle = new DurationSpec.IntSecondsBound(5, TimeUnit.MINUTES);

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5372,7 +5372,6 @@ public static int getAccordQueueShardCount()
53725372
case THREAD_PER_SHARD_SYNC_QUEUE:
53735373
return conf.accord.queue_shard_count.or(DatabaseDescriptor::getAvailableProcessors);
53745374
case THREAD_POOL_PER_SHARD:
5375-
case THREAD_POOL_PER_SHARD_EXCLUDES_IO:
53765375
int defaultMax = getAccordQueueSubmissionModel() == AccordSpec.QueueSubmissionModel.SYNC ? 8 : 4;
53775376
return conf.accord.queue_shard_count.or(Math.min(defaultMax, DatabaseDescriptor.getAvailableProcessors()));
53785377
}
@@ -5393,6 +5392,11 @@ public static int getAccordMaxQueuedRangeLoadCount()
53935392
return conf.accord.max_queued_range_loads.or(Math.max(4, getAccordConcurrentOps() / 4));
53945393
}
53955394

5395+
public static int getAccordProgressLogMaxConcurrency()
5396+
{
5397+
return conf.accord.max_progress_log_concurrency.or(64);
5398+
}
5399+
53965400
public static boolean getAccordCacheShrinkingOn()
53975401
{
53985402
return conf.accord.shrink_cache_entries_before_eviction;
@@ -5426,14 +5430,14 @@ public static long getAccordFastPathUpdateDelayMillis()
54265430
return bound == null ? -1 : bound.to(TimeUnit.MILLISECONDS);
54275431
}
54285432

5429-
public static long getAccordGCDelay(TimeUnit unit)
5433+
public static int getAccordShardDurabilityTargetSplits()
54305434
{
5431-
return conf.accord.gc_delay.to(unit);
5435+
return conf.accord.shard_durability_target_splits;
54325436
}
54335437

5434-
public static int getAccordShardDurabilityTargetSplits()
5438+
public static int getAccordShardDurabilityMaxSplits()
54355439
{
5436-
return conf.accord.shard_durability_target_splits;
5440+
return conf.accord.shard_durability_max_splits;
54375441
}
54385442

54395443
public static long getAccordScheduleDurabilityTxnIdLag(TimeUnit unit)

src/java/org/apache/cassandra/db/ColumnFamilyStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,6 +1374,7 @@ public Collection<SSTableReader> flushMemtable(ColumnFamilyStore cfs, Memtable m
13741374
}
13751375
}
13761376
cfs.replaceFlushed(memtable, sstables);
1377+
memtable.notifyFlushed();
13771378
reclaim(memtable);
13781379
cfs.compactionStrategyManager.compactionLogger.flush(sstables);
13791380
logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",

src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,19 @@
1818

1919
package org.apache.cassandra.db.memtable;
2020

21+
import java.util.Collection;
2122
import java.util.HashMap;
2223
import java.util.Map;
2324
import java.util.concurrent.ConcurrentSkipListSet;
2425
import java.util.concurrent.atomic.AtomicBoolean;
2526
import java.util.concurrent.atomic.AtomicInteger;
2627
import java.util.concurrent.atomic.AtomicLong;
2728
import java.util.concurrent.atomic.AtomicReference;
29+
import java.util.function.Consumer;
30+
import java.util.function.Supplier;
2831

2932
import com.google.common.annotations.VisibleForTesting;
33+
import com.google.common.collect.ImmutableMap;
3034

3135
import org.apache.cassandra.db.RegularAndStaticColumns;
3236
import org.apache.cassandra.db.commitlog.CommitLogPosition;
@@ -51,6 +55,7 @@ public abstract class AbstractMemtable implements Memtable
5155
// The smallest local deletion time for all partitions in this memtable
5256
protected AtomicLong minLocalDeletionTime = new AtomicLong(Long.MAX_VALUE);
5357
private final long id = nextId.incrementAndGet();
58+
private Map<Object, Consumer<TableMetadata>> onFlush = ImmutableMap.of();
5459
// Note: statsCollector has corresponding statistics to the two above, but starts with an epoch value which is not
5560
// correct for their usage.
5661

@@ -147,6 +152,29 @@ public LifecycleTransaction setFlushTransaction(LifecycleTransaction flushTransa
147152
return this.flushTransaction.getAndSet(flushTransaction);
148153
}
149154

155+
@Override
156+
public synchronized <T extends Consumer<TableMetadata>> T ensureFlushListener(Object key, Supplier<T> factory)
157+
{
158+
if (onFlush == null)
159+
return null;
160+
161+
T listener;
162+
if (null == (listener = (T)onFlush.get(key)))
163+
onFlush = ImmutableMap.<Object, Consumer<TableMetadata>>builder().putAll(onFlush).put(key, listener = factory.get()).build();
164+
return listener;
165+
}
166+
167+
public void notifyFlushed()
168+
{
169+
Collection<Consumer<TableMetadata>> run;
170+
synchronized (this)
171+
{
172+
run = onFlush.values();
173+
onFlush = null;
174+
}
175+
run.forEach(c -> c.accept(metadata()));
176+
}
177+
150178
protected static class ColumnsCollector
151179
{
152180
private final HashMap<ColumnMetadata, AtomicBoolean> predefined = new HashMap<>();

src/java/org/apache/cassandra/db/memtable/Memtable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.cassandra.db.memtable;
2020

2121
import java.util.concurrent.atomic.AtomicReference;
22+
import java.util.function.Consumer;
23+
import java.util.function.Supplier;
2224
import javax.annotation.concurrent.NotThreadSafe;
2325

2426
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -420,6 +422,10 @@ default boolean shouldSwitch(ColumnFamilyStore.FlushReason reason)
420422
return shouldSwitch(reason, metadata());
421423
}
422424

425+
// returns null if already flushed
426+
<T extends Consumer<TableMetadata>> T ensureFlushListener(Object key, Supplier<T> factory);
427+
void notifyFlushed();
428+
423429
/**
424430
* Called when the table's metadata is updated. The memtable's metadata reference now points to the new version.
425431
* This will not be called if {@link #shouldSwitch } (SCHEMA_CHANGE) returns true, as the memtable will be swapped out

src/java/org/apache/cassandra/service/accord/AccordCache.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,14 @@
5252
import accord.utils.Invariants;
5353
import accord.utils.QuadFunction;
5454
import accord.utils.TriFunction;
55-
import accord.utils.async.Cancellable;
5655
import org.agrona.collections.Object2ObjectHashMap;
5756
import org.apache.cassandra.cache.CacheSize;
5857
import org.apache.cassandra.config.DatabaseDescriptor;
5958
import org.apache.cassandra.exceptions.UnknownTableException;
6059
import org.apache.cassandra.io.util.DataInputBuffer;
6160
import org.apache.cassandra.metrics.AccordCacheMetrics;
6261
import org.apache.cassandra.metrics.CacheAccessMetrics;
62+
import org.apache.cassandra.service.accord.AccordCacheEntry.LoadExecutor;
6363
import org.apache.cassandra.service.accord.AccordCacheEntry.Status;
6464
import org.apache.cassandra.service.accord.events.CacheEvents;
6565
import org.apache.cassandra.service.accord.serializers.Version;
@@ -72,6 +72,8 @@
7272
import static org.apache.cassandra.service.accord.AccordCacheEntry.Status.EVICTED;
7373
import static org.apache.cassandra.service.accord.AccordCacheEntry.Status.LOADED;
7474
import static org.apache.cassandra.service.accord.AccordCacheEntry.Status.MODIFIED;
75+
import static org.apache.cassandra.service.accord.AccordCacheEntry.Status.SAVING;
76+
import static org.apache.cassandra.service.accord.AccordCacheEntry.Status.WAITING_TO_SAVE;
7577

7678
/**
7779
* Cache for AccordCommand and AccordCommandsForKey, available memory is shared between the two object types.
@@ -100,6 +102,7 @@ public interface Adapter<K, V, S>
100102
{
101103
@Nullable V load(AccordCommandStore commandStore, K key);
102104
@Nullable Runnable save(AccordCommandStore commandStore, K key, @Nullable V value, @Nullable Object shrunk);
105+
default boolean canSave(@Nullable V value, @Nullable Object shrunk) { return true; }
103106
// a result of null means we can immediately evict, without saving
104107
@Nullable V quickShrink(V value);
105108
// a result of null means we cannot shrink, and should save/evict as appropriate
@@ -138,9 +141,7 @@ public ImmutableStats(Stats stats)
138141
}
139142

140143
private final List<Type<?, ?, ?>> types = new CopyOnWriteArrayList<>();
141-
private final Function<Runnable, Cancellable> saveExecutor;
142-
private final AccordCacheEntry.OnSaved onSaved;
143-
// TODO (required): monitor this queue and periodically clean up entries, or implement an eviction deadline system
144+
final AccordCacheEntry.SaveExecutor saveExecutor;
144145
private final IntrusiveLinkedList<AccordCacheEntry<?,?>> evictQueue = new IntrusiveLinkedList<>();
145146
private final IntrusiveLinkedList<AccordCacheEntry<?,?>> noEvictQueue = new IntrusiveLinkedList<>();
146147

@@ -155,10 +156,9 @@ public ImmutableStats(Stats stats)
155156
final AccordCacheMetrics metrics;
156157
final Stats stats = new Stats();
157158

158-
public AccordCache(Function<Runnable, Cancellable> saveExecutor, AccordCacheEntry.OnSaved onSaved, long maxSizeInBytes, AccordCacheMetrics metrics)
159+
public AccordCache(AccordCacheEntry.SaveExecutor saveExecutor, long maxSizeInBytes, AccordCacheMetrics metrics)
159160
{
160161
this.saveExecutor = saveExecutor;
161-
this.onSaved = onSaved;
162162
this.maxSizeInBytes = maxSizeInBytes;
163163
this.metrics = metrics;
164164
}
@@ -265,14 +265,21 @@ public <K, V> void tryEvict(AccordCacheEntry<K, V> node)
265265
evict(node, true);
266266
break;
267267
case MODIFIED:
268-
Type<K, V, ?> parent = node.owner.parent();
269-
node.save(saveExecutor, parent.adapter, onSaved);
268+
node.save();
270269
boolean evict = node.status() == LOADED;
271270
node.unlink();
272271
if (evict) evict(node, true);
273272
}
274273
}
275274

275+
public void saveWhenReadyExclusive(AccordCacheEntry<?, ?> entry, Runnable onSuccess)
276+
{
277+
if (!entry.isSavingOrWaiting() && !entry.saveWhenReady())
278+
onSuccess.run();
279+
else
280+
entry.savingOrWaitingToSave().identity.onSuccess(onSuccess);
281+
}
282+
276283
private void evict(AccordCacheEntry<?, ?> node, boolean updateUnreferenced)
277284
{
278285
if (logger.isTraceEnabled())
@@ -302,10 +309,10 @@ private void evict(AccordCacheEntry<?, ?> node, boolean updateUnreferenced)
302309
node.evicted();
303310
}
304311

305-
<P, K, V> Collection<AccordTask<?>> load(BiFunction<P, Runnable, Cancellable> loadExecutor, P param, AccordCacheEntry<K, V> node, AccordCacheEntry.OnLoaded onLoaded)
312+
<P1, P2, K, V> Collection<AccordTask<?>> load(LoadExecutor<P1, P2> loadExecutor, P1 p1, P2 p2, AccordCacheEntry<K, V> node)
306313
{
307314
Type<K, V, ?> parent = node.owner.parent();
308-
return node.load(loadExecutor, param, parent.adapter, onLoaded).waiters();
315+
return node.load(loadExecutor, p1, p2).waiters();
309316
}
310317

311318
<K, V> void loaded(AccordCacheEntry<K, V> node, V value)
@@ -329,7 +336,7 @@ <K, V> void failedToLoad(AccordCacheEntry<K, V> node)
329336

330337
<K, V> void saved(AccordCacheEntry<K, V> node, Object identity, Throwable fail)
331338
{
332-
if (node.saved(identity, fail) && node.references() == 0)
339+
if (node.saved(identity, fail) && node.references() == 0 && node.isUnqueued())
333340
evictQueue.addFirst(node); // add to front since we have just saved, so we were eligible for eviction
334341
}
335342

@@ -539,6 +546,7 @@ else if (node.isLoadingOrWaiting())
539546
switch (status)
540547
{
541548
default: throw new IllegalStateException("Unhandled status " + status);
549+
case WAITING_TO_SAVE:
542550
case WAITING_TO_LOAD:
543551
case LOADING:
544552
case LOADED:
@@ -560,15 +568,15 @@ public Stream<AccordCacheEntry<K, V>> stream()
560568
return cache.values().stream();
561569
}
562570

563-
Type<K, V, S> parent()
571+
final Type<K, V, S> parent()
564572
{
565573
return Type.this;
566574
}
567575

568576
@Override
569577
public Iterator<AccordCacheEntry<K, V>> iterator()
570578
{
571-
return stream().iterator();
579+
return cache.values().iterator();
572580
}
573581

574582
void validateLoadEvicted(AccordCacheEntry<?, ?> node)
@@ -672,7 +680,6 @@ public boolean tryUnregister(Listener<K, V> l)
672680
listeners = null;
673681
return true;
674682
}
675-
676683
}
677684

678685
private final Class<K> keyClass;
@@ -733,7 +740,7 @@ private void incrementCacheMisses()
733740
AccordCache.this.stats.misses++;
734741
}
735742

736-
AccordCache parent()
743+
final AccordCache parent()
737744
{
738745
return AccordCache.this;
739746
}
@@ -768,7 +775,7 @@ public BiFunction<AccordCommandStore, K, V> unsafeGetLoadFunction()
768775
return ((SettableWrapper<K, V, S>)adapter).load;
769776
}
770777

771-
Adapter<K, V, S> adapter()
778+
final Adapter<K, V, S> adapter()
772779
{
773780
return adapter;
774781
}
@@ -1120,6 +1127,12 @@ public Runnable save(AccordCommandStore commandStore, RoutingKey key, @Nullable
11201127
return commandStore.saveCommandsForKey(key, value, serialized);
11211128
}
11221129

1130+
@Override
1131+
public boolean canSave(@Nullable CommandsForKey value, @Nullable Object serialized)
1132+
{
1133+
return value == null || !value.isLoadingPruned();
1134+
}
1135+
11231136
@Override
11241137
public CommandsForKey quickShrink(CommandsForKey value)
11251138
{

0 commit comments

Comments
 (0)