Skip to content

Commit 844e570

Browse files
committed
CASSANDRA-20798: Fix non-deterministic reads in Transient Replication
1 parent 6fd1194 commit 844e570

File tree

5 files changed

+526
-9
lines changed

5 files changed

+526
-9
lines changed

doc/modules/cassandra/pages/managing/operating/transientreplication.adoc

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,11 @@ can be discarded.
4545

4646
Transient replication is not enabled by default. Transient replication
4747
must be enabled on each node in a cluster separately by setting the
48-
following configuration property in `cassandra.yaml`.
48+
following configuration property in `cassandra.yaml`. Transient nodes are not allowed with multiple tokens
4949

5050
....
5151
transient_replication_enabled: true
52+
num_tokens: 1
5253
....
5354

5455
Transient replication may be configured with both `SimpleStrategy` and
@@ -59,7 +60,7 @@ As an example, create a keyspace with replication factor (RF) 3.
5960

6061
....
6162
CREATE KEYSPACE CassandraKeyspaceSimple WITH replication = {'class': 'SimpleStrategy',
62-
'replication_factor' : 4/1};
63+
'replication_factor' : '4/1'};
6364
....
6465

6566
As another example, `some_keysopace keyspace` will have 3 replicas in
@@ -68,12 +69,21 @@ transient:
6869

6970
....
7071
CREATE KEYSPACE some_keysopace WITH replication = {'class': 'NetworkTopologyStrategy',
71-
'DC1' : '3/1'', 'DC2' : '5/2'};
72+
'DC1' : '3/1', 'DC2' : '5/2'};
7273
....
7374

7475
Transiently replicated keyspaces only support tables with `read_repair`
7576
set to `NONE`.
7677

78+
....
79+
CREATE TABLE users (
80+
user_id varchar PRIMARY KEY,
81+
first varchar,
82+
last varchar,
83+
age int
84+
) WITH read_repair = 'NONE';
85+
....
86+
7787
Important Restrictions:
7888

7989
* RF cannot be altered while some endpoints are not in a normal state
@@ -124,7 +134,7 @@ quorum write will be upgraded to include transient replicas.
124134

125135
Pending ranges refers to the movement of token ranges between transient
126136
replicas. When a transient range is moved, there will be a period of
127-
time where both transient replicas would need to receive any write
137+
time when both transient replicas would need to receive any write
128138
intended for the logical transient replica so that after the movement
129139
takes effect a read quorum is able to return a response. Nodes are _not_
130140
temporarily transient replicas during expansion. They stream data like a

src/java/org/apache/cassandra/exceptions/UnavailableException.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,15 @@ public static UnavailableException create(ConsistencyLevel consistency, int requ
3636
if (required > alive)
3737
return new UnavailableException(String.format("Cannot achieve consistency level %s. Required %s but only %s alive.", consistency, required, alive),
3838
consistency, required, alive);
39-
assert requiredFull < aliveFull;
39+
assert aliveFull < requiredFull;
4040
return new UnavailableException("Insufficient full replicas", consistency, required, alive);
4141
}
4242

4343
public static UnavailableException create(ConsistencyLevel consistency, String dc, int required, int requiredFull, int alive, int aliveFull)
4444
{
4545
if (required > alive)
4646
return new UnavailableException("Cannot achieve consistency level " + consistency + " in DC " + dc, consistency, required, alive);
47-
assert requiredFull < aliveFull;
47+
assert aliveFull < requiredFull;
4848
return new UnavailableException("Insufficient full replicas in DC " + dc, consistency, required, alive);
4949
}
5050

src/java/org/apache/cassandra/locator/ReplicaPlans.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ static void assureSufficientLiveReplicas(Locator locator, AbstractReplicationStr
186186
total += dcCount.allReplicas();
187187
}
188188
if (totalFull < blockForFullReplicas)
189-
throw UnavailableException.create(consistencyLevel, blockFor, total, blockForFullReplicas, totalFull);
189+
throw UnavailableException.create(consistencyLevel, blockFor, blockForFullReplicas, total, totalFull);
190190
break;
191191
}
192192
// Fallthough on purpose for SimpleStrategy
@@ -812,7 +812,8 @@ private static <E extends Endpoints<E>> E contactForEachQuorumRead(Locator locat
812812
});
813813
}
814814

815-
private static <E extends Endpoints<E>> E contactForRead(Locator locator, AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E candidates)
815+
@VisibleForTesting
816+
public static <E extends Endpoints<E>> E contactForRead(Locator locator, AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E candidates)
816817
{
817818
/*
818819
* If we are doing an each quorum query, we have to make sure that the endpoints we select
@@ -823,13 +824,45 @@ private static <E extends Endpoints<E>> E contactForRead(Locator locator, Abstra
823824
*
824825
* TODO: this is still very inconistently managed between {LOCAL,EACH}_QUORUM and other consistency levels - should address this in a follow-up
825826
*/
827+
candidates = reorderWithOneFullReplicaFirst(candidates);
826828
if (consistencyLevel == EACH_QUORUM && replicationStrategy instanceof NetworkTopologyStrategy)
827829
return contactForEachQuorumRead(locator, (NetworkTopologyStrategy) replicationStrategy, candidates);
828830

829831
int count = consistencyLevel.blockFor(replicationStrategy) + (alwaysSpeculate ? 1 : 0);
830832
return candidates.subList(0, Math.min(count, candidates.size()));
831833
}
832834

835+
/**
836+
* Reorders the provided list of replicas such that, if any transient replicas are present,
837+
* a single full replica (if available) is placed first. The remaining replicas retain their original order(order by NodeProximity),
838+
*
839+
* @param endpoints the original set of replicas
840+
* @param <E> a subtype of Endpoints
841+
* @return the same set if there are no transient replicas or no full replica is found,
842+
* otherwise a reordered set with the first full replica placed first
843+
*/
844+
private static <E extends Endpoints<E>> E reorderWithOneFullReplicaFirst(E endpoints) {
845+
boolean hasTransient = false;
846+
Replica firstFull = null;
847+
848+
for (Replica r : endpoints) {
849+
if (r.isTransient()) hasTransient = true;
850+
if (firstFull == null && r.isFull()) firstFull = r;
851+
}
852+
853+
if (!hasTransient || firstFull == null)
854+
return endpoints;
855+
856+
ReplicaCollection.Builder<E> builder = endpoints.newBuilder(endpoints.size());
857+
builder.add(firstFull);
858+
for (Replica r : endpoints) {
859+
if (!r.equals(firstFull)) {
860+
builder.add(r);
861+
}
862+
}
863+
864+
return builder.build();
865+
}
833866

834867
/**
835868
* Construct a plan for reading from a single node - this permits no speculation or read-repair

0 commit comments

Comments
 (0)