Skip to content

Replay nondurable #4264

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modules/accord
Submodule accord updated 58 files
+4 −4 accord-core/src/main/java/accord/api/AsyncExecutor.java
+3 −2 accord-core/src/main/java/accord/api/DataStore.java
+10 −17 accord-core/src/main/java/accord/api/Journal.java
+5 −0 accord-core/src/main/java/accord/api/LocalListeners.java
+4 −0 accord-core/src/main/java/accord/api/ProgressLog.java
+0 −1 accord-core/src/main/java/accord/coordinate/Invalidate.java
+26 −1 accord-core/src/main/java/accord/impl/AbstractReplayer.java
+6 −0 accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
+27 −0 accord-core/src/main/java/accord/impl/InMemoryAgent.java
+133 −77 accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+3 −4 accord-core/src/main/java/accord/impl/progresslog/CallbackInvoker.java
+148 −93 accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+2 −40 accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLogs.java
+3 −3 accord-core/src/main/java/accord/impl/progresslog/HomeState.java
+4 −4 accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
+4 −0 accord-core/src/main/java/accord/local/Cleanup.java
+12 −0 accord-core/src/main/java/accord/local/Command.java
+19 −27 accord-core/src/main/java/accord/local/CommandStore.java
+2 −3 accord-core/src/main/java/accord/local/Commands.java
+0 −1 accord-core/src/main/java/accord/local/Node.java
+0 −1 accord-core/src/main/java/accord/local/PreLoadContext.java
+21 −9 accord-core/src/main/java/accord/local/RedundantBefore.java
+25 −7 accord-core/src/main/java/accord/local/RedundantStatus.java
+1 −0 accord-core/src/main/java/accord/local/SafeCommand.java
+2 −3 accord-core/src/main/java/accord/local/SafeCommandStore.java
+7 −1 accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+1 −0 accord-core/src/main/java/accord/local/cfk/Serialize.java
+12 −4 accord-core/src/main/java/accord/local/durability/ShardDurability.java
+11 −1 accord-core/src/main/java/accord/messages/Accept.java
+20 −5 accord-core/src/main/java/accord/messages/Apply.java
+14 −4 accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
+3 −3 accord-core/src/main/java/accord/messages/BeginInvalidation.java
+1 −1 accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java
+1 −1 accord-core/src/main/java/accord/messages/StableThenRead.java
+17 −1 accord-core/src/main/java/accord/primitives/SaveStatus.java
+4 −5 accord-core/src/main/java/accord/utils/ArrayBuffers.java
+1 −2 accord-core/src/main/java/accord/utils/IntrusiveLinkedList.java
+1 −1 accord-core/src/main/java/accord/utils/Invariants.java
+10 −0 accord-core/src/main/java/accord/utils/ReducingIntervalMap.java
+32 −0 accord-core/src/main/java/accord/utils/async/AsyncResults.java
+46 −0 accord-core/src/main/java/accord/utils/async/RunnableWithResult.java
+4 −2 accord-core/src/test/java/accord/burn/BurnTestBase.java
+0 −8 accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
+3 −1 accord-core/src/test/java/accord/impl/RemoteListenersTest.java
+41 −66 accord-core/src/test/java/accord/impl/basic/Cluster.java
+3 −2 accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+0 −2 accord-core/src/test/java/accord/impl/basic/NodeSink.java
+1 −0 accord-core/src/test/java/accord/impl/basic/Pending.java
+2 −2 accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
+69 −0 accord-core/src/test/java/accord/impl/basic/TestProgressLogs.java
+27 −7 accord-core/src/test/java/accord/impl/list/ListAgent.java
+1 −1 accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
+10 −5 accord-core/src/test/java/accord/impl/list/ListRequest.java
+19 −85 accord-core/src/test/java/accord/impl/list/ListStore.java
+135 −0 accord-core/src/test/java/accord/impl/list/Snapshotter.java
+5 −4 accord-core/src/test/java/accord/impl/mock/MockStore.java
+3 −1 accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+6 −7 accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java
12 changes: 2 additions & 10 deletions src/java/org/apache/cassandra/config/AccordSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,11 @@ public enum QueueShardModel
/**
* Same number of threads as queue shards, but the shard lock is held only while managing the queue,
* so that submitting threads may queue load/save work.
*
* The global READ and WRITE stages are used for IO.
*/
THREAD_PER_SHARD,

/**
* Same number of threads as shards, and the shard lock is held for the duration of serving requests.
* The global READ and WRITE stages are used for IO.
*/
THREAD_PER_SHARD_SYNC_QUEUE,

Expand All @@ -73,12 +70,6 @@ public enum QueueShardModel
* Fewer shards is generally better, until queue-contention is encountered.
*/
THREAD_POOL_PER_SHARD,

/**
* More threads than shards. Threads update transaction state only, relying on READ and WRITE stages for IO.
* Fewer shards is generally better, until queue-contention is encountered.
*/
THREAD_POOL_PER_SHARD_EXCLUDES_IO,
}

public enum QueueSubmissionModel
Expand Down Expand Up @@ -130,6 +121,7 @@ public enum QueueSubmissionModel

public volatile OptionaldPositiveInt max_queued_loads = OptionaldPositiveInt.UNDEFINED;
public volatile OptionaldPositiveInt max_queued_range_loads = OptionaldPositiveInt.UNDEFINED;
public volatile OptionaldPositiveInt max_progress_log_concurrency = OptionaldPositiveInt.UNDEFINED;

public DataStorageSpec.LongMebibytesBound cache_size = null;
public DataStorageSpec.LongMebibytesBound working_set_size = null;
Expand Down Expand Up @@ -158,8 +150,8 @@ public enum QueueSubmissionModel

public volatile DurationSpec.IntSecondsBound fast_path_update_delay = null;

public volatile DurationSpec.IntSecondsBound gc_delay = new DurationSpec.IntSecondsBound("5m");
public volatile int shard_durability_target_splits = 16;
public volatile int shard_durability_max_splits = 128;
public volatile DurationSpec.IntSecondsBound durability_txnid_lag = new DurationSpec.IntSecondsBound(5);
public volatile DurationSpec.IntSecondsBound shard_durability_cycle = new DurationSpec.IntSecondsBound(5, TimeUnit.MINUTES);
public volatile DurationSpec.IntSecondsBound global_durability_cycle = new DurationSpec.IntSecondsBound(5, TimeUnit.MINUTES);
Expand Down
14 changes: 9 additions & 5 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -5372,7 +5372,6 @@ public static int getAccordQueueShardCount()
case THREAD_PER_SHARD_SYNC_QUEUE:
return conf.accord.queue_shard_count.or(DatabaseDescriptor::getAvailableProcessors);
case THREAD_POOL_PER_SHARD:
case THREAD_POOL_PER_SHARD_EXCLUDES_IO:
int defaultMax = getAccordQueueSubmissionModel() == AccordSpec.QueueSubmissionModel.SYNC ? 8 : 4;
return conf.accord.queue_shard_count.or(Math.min(defaultMax, DatabaseDescriptor.getAvailableProcessors()));
}
Expand All @@ -5393,6 +5392,11 @@ public static int getAccordMaxQueuedRangeLoadCount()
return conf.accord.max_queued_range_loads.or(Math.max(4, getAccordConcurrentOps() / 4));
}

public static int getAccordProgressLogMaxConcurrency()
{
return conf.accord.max_progress_log_concurrency.or(64);
}

public static boolean getAccordCacheShrinkingOn()
{
return conf.accord.shrink_cache_entries_before_eviction;
Expand Down Expand Up @@ -5426,14 +5430,14 @@ public static long getAccordFastPathUpdateDelayMillis()
return bound == null ? -1 : bound.to(TimeUnit.MILLISECONDS);
}

public static long getAccordGCDelay(TimeUnit unit)
public static int getAccordShardDurabilityTargetSplits()
{
return conf.accord.gc_delay.to(unit);
return conf.accord.shard_durability_target_splits;
}

public static int getAccordShardDurabilityTargetSplits()
public static int getAccordShardDurabilityMaxSplits()
{
return conf.accord.shard_durability_target_splits;
return conf.accord.shard_durability_max_splits;
}

public static long getAccordScheduleDurabilityTxnIdLag(TimeUnit unit)
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,7 @@ public Collection<SSTableReader> flushMemtable(ColumnFamilyStore cfs, Memtable m
}
}
cfs.replaceFlushed(memtable, sstables);
memtable.notifyFlushed();
reclaim(memtable);
cfs.compactionStrategyManager.compactionLogger.flush(sstables);
logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;

import org.apache.cassandra.db.compaction.unified.UnifiedCompactionTask;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
Expand Down
34 changes: 34 additions & 0 deletions src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

package org.apache.cassandra.db.memtable;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;

import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
Expand All @@ -51,6 +55,7 @@ public abstract class AbstractMemtable implements Memtable
// The smallest local deletion time for all partitions in this memtable
protected AtomicLong minLocalDeletionTime = new AtomicLong(Long.MAX_VALUE);
private final long id = nextId.incrementAndGet();
private Map<Object, Consumer<TableMetadata>> onFlush = ImmutableMap.of();
// Note: statsCollector has corresponding statistics to the two above, but starts with an epoch value which is not
// correct for their usage.

Expand Down Expand Up @@ -147,6 +152,35 @@ public LifecycleTransaction setFlushTransaction(LifecycleTransaction flushTransa
return this.flushTransaction.getAndSet(flushTransaction);
}

@Override
public synchronized <T extends Consumer<TableMetadata>> T ensureFlushListener(Object key, Supplier<T> factory)
{
if (onFlush == null)
return null;

T listener = (T)onFlush.get(key);
if (null == listener)
{
listener = factory.get();
onFlush = ImmutableMap.<Object, Consumer<TableMetadata>>builder()
.putAll(onFlush)
.put(key, listener)
.build();
}
return listener;
}

public void notifyFlushed()
{
Collection<Consumer<TableMetadata>> run;
synchronized (this)
{
run = onFlush.values();
onFlush = null;
}
run.forEach(c -> c.accept(metadata()));
}

protected static class ColumnsCollector
{
private final HashMap<ColumnMetadata, AtomicBoolean> predefined = new HashMap<>();
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/db/memtable/Memtable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.cassandra.db.memtable;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.concurrent.NotThreadSafe;

import org.apache.cassandra.db.ColumnFamilyStore;
Expand Down Expand Up @@ -420,6 +422,10 @@ default boolean shouldSwitch(ColumnFamilyStore.FlushReason reason)
return shouldSwitch(reason, metadata());
}

// returns null if already flushed
<T extends Consumer<TableMetadata>> T ensureFlushListener(Object key, Supplier<T> factory);
void notifyFlushed();

/**
* Called when the table's metadata is updated. The memtable's metadata reference now points to the new version.
* This will not be called if {@link #shouldSwitch } (SCHEMA_CHANGE) returns true, as the memtable will be swapped out
Expand Down
Loading