Skip to content

Commit ea24f45

Browse files
vitess-bot[bot]mhamza15timvaillancourt
authored
[release-23.0] vtorc: support analysis ordering, improve semi-sync rollout (#19427) (#19472)
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com> Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com> Co-authored-by: Mohamed Hamza <mhamza@fastmail.com> Co-authored-by: Tim Vaillancourt <tim@timvaillancourt.com>
1 parent 3c86665 commit ea24f45

12 files changed

Lines changed: 1162 additions & 252 deletions

File tree

go/test/endtoend/backup/vtbackup/main_test.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,7 @@ var (
4343
dbPassword = "VtDbaPass"
4444
shardKsName = fmt.Sprintf("%s/%s", keyspaceName, shardName)
4545
dbCredentialFile string
46-
commonTabletArg = []string{
47-
vtutils.GetFlagVariantForTests("--vreplication-retry-delay"), "1s",
48-
vtutils.GetFlagVariantForTests("--degraded-threshold"), "5s",
49-
vtutils.GetFlagVariantForTests("--lock-tables-timeout"), "5s",
50-
vtutils.GetFlagVariantForTests("--watch-replication-stream"),
51-
vtutils.GetFlagVariantForTests("--enable-replication-reporter"),
52-
vtutils.GetFlagVariantForTests("--serving-state-grace-period"), "1s"}
46+
commonTabletArg []string
5347
)
5448

5549
func TestMain(m *testing.M) {
@@ -59,6 +53,16 @@ func TestMain(m *testing.M) {
5953
localCluster = cluster.NewCluster(cell, hostname)
6054
defer localCluster.Teardown()
6155

56+
vttabletVer := localCluster.VtTabletMajorVersion
57+
commonTabletArg = []string{
58+
vtutils.GetFlagVariantForTestsByVersion("--vreplication-retry-delay", vttabletVer), "1s",
59+
vtutils.GetFlagVariantForTestsByVersion("--degraded-threshold", vttabletVer), "5s",
60+
vtutils.GetFlagVariantForTestsByVersion("--lock-tables-timeout", vttabletVer), "5s",
61+
vtutils.GetFlagVariantForTestsByVersion("--watch-replication-stream", vttabletVer),
62+
vtutils.GetFlagVariantForTestsByVersion("--enable-replication-reporter", vttabletVer),
63+
vtutils.GetFlagVariantForTestsByVersion("--serving-state-grace-period", vttabletVer), "1s",
64+
}
65+
6266
// Start topo server
6367
err := localCluster.StartTopo()
6468
if err != nil {

go/test/endtoend/cluster/cluster_process.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,10 @@ import (
6060

6161
// DefaultCell : If no cell name is passed, then use following
6262
const (
63-
DefaultCell = "zone1"
64-
DefaultStartPort = 6700
65-
DefaultVttestEnv = "VTTEST=endtoend"
63+
DefaultCell = "zone1"
64+
DefaultStartPort = 6700
65+
DefaultVttestEnv = "VTTEST=endtoend"
66+
DefaultVtorcsByCell = 1
6667
)
6768

6869
var (
@@ -298,7 +299,6 @@ func (cluster *LocalProcessCluster) StartUnshardedKeyspace(keyspace Keyspace, re
298299
}
299300

300301
func (cluster *LocalProcessCluster) startPartialKeyspace(keyspace Keyspace, shardNames []string, movedShard string, replicaCount int, rdonly bool, customizers ...any) (err error) {
301-
302302
cluster.HasPartialKeyspaces = true
303303
routedKeyspace := &Keyspace{
304304
Name: fmt.Sprintf("%s_routed", keyspace.Name),
@@ -806,7 +806,7 @@ func NewBareCluster(cell string, hostname string) *LocalProcessCluster {
806806
// path/to/whatever exists
807807
cluster.ReusingVTDATAROOT = true
808808
} else {
809-
err = createDirectory(cluster.CurrentVTDATAROOT, 0700)
809+
err = createDirectory(cluster.CurrentVTDATAROOT, 0o700)
810810
if err != nil {
811811
log.Fatal(err)
812812
}
@@ -1160,7 +1160,8 @@ func (cluster *LocalProcessCluster) waitForMySQLProcessToExit(mysqlctlProcessLis
11601160

11611161
// StartVtbackup starts a vtbackup
11621162
func (cluster *LocalProcessCluster) StartVtbackup(newInitDBFile string, initialBackup bool,
1163-
keyspace string, shard string, cell string, extraArgs ...string) error {
1163+
keyspace string, shard string, cell string, extraArgs ...string,
1164+
) error {
11641165
log.Info("Starting vtbackup")
11651166
cluster.VtbackupProcess = *VtbackupProcessInstance(
11661167
cluster.GetAndReserveTabletUID(),
@@ -1175,7 +1176,6 @@ func (cluster *LocalProcessCluster) StartVtbackup(newInitDBFile string, initialB
11751176
initialBackup)
11761177
cluster.VtbackupProcess.ExtraArgs = extraArgs
11771178
return cluster.VtbackupProcess.Setup()
1178-
11791179
}
11801180

11811181
// GetAndReservePort gives port for required process
@@ -1191,7 +1191,6 @@ func (cluster *LocalProcessCluster) GetAndReservePort() int {
11911191
cluster.nextPortForProcess = cluster.nextPortForProcess + 1
11921192
log.Infof("Attempting to reserve port: %v", cluster.nextPortForProcess)
11931193
ln, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(cluster.nextPortForProcess)))
1194-
11951194
if err != nil {
11961195
log.Errorf("Can't listen on port %v: %s, trying next port", cluster.nextPortForProcess, err)
11971196
continue
@@ -1214,7 +1213,7 @@ const portFileTimeout = 1 * time.Hour
12141213
// If yes, then return that port, and save port + 200 in the same file
12151214
// here, assumptions is 200 ports might be consumed for all tests in a package
12161215
func getPort() int {
1217-
portFile, err := os.OpenFile(path.Join(os.TempDir(), "endtoend.port"), os.O_CREATE|os.O_RDWR, 0644)
1216+
portFile, err := os.OpenFile(path.Join(os.TempDir(), "endtoend.port"), os.O_CREATE|os.O_RDWR, 0o644)
12181217
if err != nil {
12191218
panic(err)
12201219
}

go/test/endtoend/vtorc/general/vtorc_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ package general
1818

1919
import (
2020
"context"
21+
"encoding/json"
2122
"fmt"
23+
"strconv"
2224
"testing"
2325
"time"
2426

@@ -893,3 +895,85 @@ func TestFullStatusConnectionPooling(t *testing.T) {
893895
assert.Equal(t, 200, status)
894896
assert.Equal(t, "null", resp)
895897
}
898+
899+
// TestSemiSyncRecoveryOrdering verifies that when the durability policy changes
900+
// to semi_sync, VTOrc fixes ReplicaSemiSyncMustBeSet before PrimarySemiSyncMustBeSet.
901+
// This ordering is enforced by the AfterAnalyses/BeforeAnalyses dependencies.
902+
func TestSemiSyncRecoveryOrdering(t *testing.T) {
903+
defer utils.PrintVTOrcLogsOnFailure(t, clusterInfo.ClusterInstance)
904+
// Start with durability "none" so no semi-sync is required initially.
905+
utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 0, nil, cluster.VTOrcConfiguration{
906+
PreventCrossCellFailover: true,
907+
}, cluster.DefaultVtorcsByCell, policy.DurabilityNone)
908+
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
909+
shard0 := &keyspace.Shards[0]
910+
911+
// Wait for primary election and healthy replication.
912+
primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0)
913+
assert.NotNil(t, primary, "should have elected a primary")
914+
utils.CheckReplication(t, clusterInfo, primary, shard0.Vttablets, 10*time.Second)
915+
916+
vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0]
917+
utils.WaitForSuccessfulRecoveryCount(t, vtorc, logic.ElectNewPrimaryRecoveryName, keyspace.Name, shard0.Name, 1)
918+
919+
// Change durability to semi_sync. VTOrc should detect that replicas and primary
920+
// need semi-sync enabled, and fix them in the correct order.
921+
out, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
922+
"SetKeyspaceDurabilityPolicy", keyspace.Name, "--durability-policy="+policy.DurabilitySemiSync)
923+
require.NoError(t, err, out)
924+
925+
// Poll the database-state API to verify recovery ordering.
926+
// The topology_recovery table has auto-incremented recovery_id values that
927+
// reflect execution order. All ReplicaSemiSyncMustBeSet recovery_ids should
928+
// be less than any PrimarySemiSyncMustBeSet recovery_id.
929+
type tableState struct {
930+
TableName string
931+
Rows []map[string]any
932+
}
933+
934+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
935+
status, response, err := utils.MakeAPICall(t, vtorc, "/api/database-state")
936+
assert.NoError(c, err)
937+
assert.Equal(c, 200, status)
938+
939+
var tables []tableState
940+
if !assert.NoError(c, json.Unmarshal([]byte(response), &tables)) {
941+
return
942+
}
943+
944+
var maxReplicaRecoveryID, minPrimaryRecoveryID int
945+
var replicaCount, primaryCount int
946+
for _, table := range tables {
947+
if table.TableName != "topology_recovery" {
948+
continue
949+
}
950+
for _, row := range table.Rows {
951+
analysis, _ := row["analysis"].(string)
952+
recoveryIDStr, _ := row["recovery_id"].(string)
953+
recoveryID, err := strconv.Atoi(recoveryIDStr)
954+
if err != nil {
955+
continue
956+
}
957+
switch inst.AnalysisCode(analysis) {
958+
case inst.ReplicaSemiSyncMustBeSet:
959+
replicaCount++
960+
if replicaCount == 1 || recoveryID > maxReplicaRecoveryID {
961+
maxReplicaRecoveryID = recoveryID
962+
}
963+
case inst.PrimarySemiSyncMustBeSet:
964+
primaryCount++
965+
if primaryCount == 1 || recoveryID < minPrimaryRecoveryID {
966+
minPrimaryRecoveryID = recoveryID
967+
}
968+
}
969+
}
970+
}
971+
972+
assert.Greater(c, replicaCount, 0, "should have ReplicaSemiSyncMustBeSet recoveries")
973+
assert.Greater(c, primaryCount, 0, "should have PrimarySemiSyncMustBeSet recoveries")
974+
if replicaCount > 0 && primaryCount > 0 {
975+
assert.Less(c, maxReplicaRecoveryID, minPrimaryRecoveryID,
976+
"all ReplicaSemiSyncMustBeSet recoveries should have lower recovery_id than PrimarySemiSyncMustBeSet")
977+
}
978+
}, 30*time.Second, time.Second)
979+
}

go/vt/vtorc/inst/analysis.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"time"
2222

2323
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
24+
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
2425
"vitess.io/vitess/go/vt/vtorc/config"
2526
)
2627

@@ -115,11 +116,13 @@ type DetectionAnalysis struct {
115116
CountReplicas uint
116117
CountValidReplicas uint
117118
CountValidReplicatingReplicas uint
119+
CountValidSemiSyncReplicatingReplicas uint
118120
ReplicationStopped bool
119121
ErrantGTID string
120122
ReplicaNetTimeout int32
121123
HeartbeatInterval float64
122124
Analysis AnalysisCode
125+
AnalysisMatchedProblems []*DetectionAnalysisProblemMeta
123126
Description string
124127
StructureAnalysis []StructureAnalysisCode
125128
OracleGTIDImmediateTopology bool
@@ -148,6 +151,16 @@ type DetectionAnalysis struct {
148151
IsDiskStalled bool
149152
}
150153

154+
// hasMinSemiSyncAckers returns true if there are a minimum number of semi-sync ackers enabled and replicating.
155+
// True is always returned if the durability policy does not require semi-sync ackers (eg: "none"). This gives
156+
// a useful signal if it is safe to enable semi-sync without risk of stalling ongoing PRIMARY writes.
157+
func hasMinSemiSyncAckers(durabler policy.Durabler, primary *topodatapb.Tablet, analysis *DetectionAnalysis) bool {
158+
if durabler == nil || analysis == nil {
159+
return false
160+
}
161+
return int(analysis.CountValidSemiSyncReplicatingReplicas) >= durabler.SemiSyncAckers(primary)
162+
}
163+
151164
func (detectionAnalysis *DetectionAnalysis) MarshalJSON() ([]byte, error) {
152165
i := struct {
153166
DetectionAnalysis

0 commit comments

Comments
 (0)