Skip to content

Commit 33916df

Browse files
committed
vtctl/workflow: stop reverse replication before Complete drops sources
MoveTables Complete could rename or drop source tables while reverse vreplication was still applying, causing permanent errno 1146 errors. Validate reverse workflow state, wait for catchup, stop streams, then remove source tables. Fixes #20135 Signed-off-by: Harshit Katyal <harshit1121998@gmail.com> Signed-off-by: Harshit <harshit1121998@gmail.com>
1 parent cff3492 commit 33916df

8 files changed

Lines changed: 185 additions & 17 deletions

File tree

go/vt/vtctl/workflow/framework_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,9 @@ func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *t
365365
}
366366
}
367367
blsKs := tmc.env.sourceKeyspace
368-
if tmc.reverse.Load() && tablet.Keyspace == tmc.env.sourceKeyspace.KeyspaceName {
368+
if strings.HasSuffix(req.Workflow, reverseSuffix) && tablet.Keyspace == tmc.env.sourceKeyspace.KeyspaceName {
369+
blsKs = tmc.env.targetKeyspace
370+
} else if tmc.reverse.Load() && tablet.Keyspace == tmc.env.sourceKeyspace.KeyspaceName {
369371
blsKs = tmc.env.targetKeyspace
370372
}
371373
for i, shard := range blsKs.ShardNames {
@@ -380,7 +382,9 @@ func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *t
380382
},
381383
},
382384
}
383-
if tmc.frozen.Load() {
385+
if strings.HasSuffix(req.Workflow, reverseSuffix) {
386+
stream.State = binlogdatapb.VReplicationWorkflowState_Running
387+
} else if tmc.frozen.Load() {
384388
stream.Message = Frozen
385389
}
386390
res.Streams = append(res.Streams, stream)

go/vt/vtctl/workflow/server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2513,6 +2513,11 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
25132513
return nil, err
25142514
}
25152515
}
2516+
if !wopts.ignoreSourceKeyspace {
2517+
if err := sw.stopAndDrainReverseVReplication(ctx, reverseReplicationDrainTimeout); err != nil {
2518+
return nil, err
2519+
}
2520+
}
25162521
if !keepData {
25172522
switch ts.MigrationType() {
25182523
case binlogdatapb.MigrationType_TABLES:

go/vt/vtctl/workflow/server_test.go

Lines changed: 65 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,16 @@ func TestMoveTablesComplete(t *testing.T) {
199199
defer cancel()
200200

201201
workflowName := "wf1"
202+
stopReverseStreamQueries := []*queryResult{
203+
{
204+
query: "update _vt.vreplication set state='Stopped', message='stopped for complete' where id=1",
205+
result: &querypb.QueryResult{},
206+
},
207+
{
208+
query: "update _vt.vreplication set state='Stopped', message='stopped for complete' where id=2",
209+
result: &querypb.QueryResult{},
210+
},
211+
}
202212
table1Name := "t1"
203213
table2Name := "t1_2"
204214
table3Name := "t1_3"
@@ -259,25 +269,25 @@ func TestMoveTablesComplete(t *testing.T) {
259269
TargetKeyspace: targetKeyspaceName,
260270
Workflow: workflowName,
261271
},
262-
expectedSourceQueries: []*queryResult{
263-
{
272+
expectedSourceQueries: append(slices.Clone(stopReverseStreamQueries),
273+
&queryResult{
264274
query: fmt.Sprintf("drop table `vt_%s`.`%s`", sourceKeyspaceName, table1Name),
265275
result: &querypb.QueryResult{},
266276
},
267-
{
277+
&queryResult{
268278
query: fmt.Sprintf("drop table `vt_%s`.`%s`", sourceKeyspaceName, table2Name),
269279
result: &querypb.QueryResult{},
270280
},
271-
{
281+
&queryResult{
272282
query: fmt.Sprintf("drop table `vt_%s`.`%s`", sourceKeyspaceName, table3Name),
273283
result: &querypb.QueryResult{},
274284
},
275-
{
285+
&queryResult{
276286
query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'",
277287
sourceKeyspaceName, ReverseWorkflowName(workflowName)),
278288
result: &querypb.QueryResult{},
279289
},
280-
},
290+
),
281291
expectedTargetQueries: []*queryResult{
282292
{
283293
query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'",
@@ -306,13 +316,13 @@ func TestMoveTablesComplete(t *testing.T) {
306316
KeepRoutingRules: true,
307317
KeepData: new(true),
308318
},
309-
expectedSourceQueries: []*queryResult{
310-
{
319+
expectedSourceQueries: append(slices.Clone(stopReverseStreamQueries),
320+
&queryResult{
311321
query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'",
312322
sourceKeyspaceName, ReverseWorkflowName(workflowName)),
313323
result: &querypb.QueryResult{},
314324
},
315-
},
325+
),
316326
expectedTargetQueries: []*queryResult{
317327
{
318328
query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'",
@@ -343,25 +353,25 @@ func TestMoveTablesComplete(t *testing.T) {
343353
Workflow: workflowName,
344354
RenameTables: true,
345355
},
346-
expectedSourceQueries: []*queryResult{
347-
{
356+
expectedSourceQueries: append(slices.Clone(stopReverseStreamQueries),
357+
&queryResult{
348358
query: fmt.Sprintf("rename table `vt_%s`.`%s` TO `vt_%s`.`_%s_old`", sourceKeyspaceName, table1Name, sourceKeyspaceName, table1Name),
349359
result: &querypb.QueryResult{},
350360
},
351-
{
361+
&queryResult{
352362
query: fmt.Sprintf("rename table `vt_%s`.`%s` TO `vt_%s`.`_%s_old`", sourceKeyspaceName, table2Name, sourceKeyspaceName, table2Name),
353363
result: &querypb.QueryResult{},
354364
},
355-
{
365+
&queryResult{
356366
query: fmt.Sprintf("rename table `vt_%s`.`%s` TO `vt_%s`.`_%s_old`", sourceKeyspaceName, table3Name, sourceKeyspaceName, table3Name),
357367
result: &querypb.QueryResult{},
358368
},
359-
{
369+
&queryResult{
360370
query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'",
361371
sourceKeyspaceName, ReverseWorkflowName(workflowName)),
362372
result: &querypb.QueryResult{},
363373
},
364-
},
374+
),
365375
expectedTargetQueries: []*queryResult{
366376
{
367377
query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'",
@@ -374,6 +384,46 @@ func TestMoveTablesComplete(t *testing.T) {
374384
workflowName, targetKeyspaceName),
375385
},
376386
},
387+
{
388+
name: "reverse workflow in error state",
389+
sourceKeyspace: &testKeyspace{
390+
KeyspaceName: sourceKeyspaceName,
391+
ShardNames: []string{"0"},
392+
},
393+
targetKeyspace: &testKeyspace{
394+
KeyspaceName: targetKeyspaceName,
395+
ShardNames: []string{"-80", "80-"},
396+
},
397+
req: &vtctldatapb.MoveTablesCompleteRequest{
398+
TargetKeyspace: targetKeyspaceName,
399+
Workflow: workflowName,
400+
},
401+
preFunc: func(t *testing.T, env *testEnv) {
402+
for _, tablet := range env.tablets[sourceKeyspaceName] {
403+
env.tmc.expectReadVReplicationWorkflowRequest(tablet.Alias.Uid, &readVReplicationWorkflowRequestResponse{
404+
req: &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{
405+
Workflow: ReverseWorkflowName(workflowName),
406+
},
407+
res: &tabletmanagerdatapb.ReadVReplicationWorkflowResponse{
408+
Workflow: ReverseWorkflowName(workflowName),
409+
Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{
410+
{
411+
Id: 1,
412+
State: binlogdatapb.VReplicationWorkflowState_Error,
413+
Bls: &binlogdatapb.BinlogSource{
414+
Keyspace: targetKeyspaceName,
415+
Shard: "-80",
416+
},
417+
},
418+
},
419+
},
420+
})
421+
}
422+
},
423+
wantErr: "reverse vreplication stream 1 is in error state on tablet 100",
424+
postFunc: func(t *testing.T, env *testEnv) {
425+
},
426+
},
377427
{
378428
name: "ignore source keyspace",
379429
sourceKeyspace: &testKeyspace{

go/vt/vtctl/workflow/switcher.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ func (r *switcher) waitForCatchup(ctx context.Context, filteredReplicationWaitTi
116116
return r.ts.waitForCatchup(ctx, filteredReplicationWaitTime)
117117
}
118118

119+
func (r *switcher) stopAndDrainReverseVReplication(ctx context.Context, waitTime time.Duration) error {
120+
return r.ts.stopAndDrainReverseVReplication(ctx, waitTime)
121+
}
122+
119123
func (r *switcher) stopSourceWrites(ctx context.Context) error {
120124
return r.ts.stopSourceWrites(ctx)
121125
}

go/vt/vtctl/workflow/switcher_dry_run.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,12 @@ func (dr *switcherDryRun) waitForCatchup(ctx context.Context, filteredReplicatio
261261
return nil
262262
}
263263

264+
func (dr *switcherDryRun) stopAndDrainReverseVReplication(ctx context.Context, waitTime time.Duration) error {
265+
dr.drLog.Logf("Stop and drain reverse vreplication workflow %s for up to %v before removing source data",
266+
dr.ts.ReverseWorkflowName(), waitTime)
267+
return nil
268+
}
269+
264270
func (dr *switcherDryRun) stopSourceWrites(ctx context.Context) error {
265271
sources := maps.Values(dr.ts.Sources())
266272
// Sort the slice for deterministic output.

go/vt/vtctl/workflow/switcher_interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type iswitcher interface {
3131
stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error)
3232
stopSourceWrites(ctx context.Context) error
3333
waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error
34+
stopAndDrainReverseVReplication(ctx context.Context, waitTime time.Duration) error
3435
migrateStreams(ctx context.Context, sm *StreamMigrator) error
3536
createReverseVReplication(ctx context.Context) error
3637
createJournals(ctx context.Context, sourceWorkflows []string) error

go/vt/vtctl/workflow/traffic_switcher.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ const (
7575
// operation too.
7676
shardTabletRefreshTimeout = time.Duration(30 * time.Second)
7777

78+
// reverseReplicationDrainTimeout bounds how long Complete waits for reverse
79+
// streams to catch up to the target primary position before stopping them.
80+
reverseReplicationDrainTimeout = 30 * time.Second
81+
82+
stoppedForComplete = "stopped for complete"
83+
7884
// Use pt-osc's naming convention, this format also ensures vstreamer ignores such tables.
7985
renameTableTemplate = "_%.59s_old" // limit table name to 64 characters
8086

@@ -1029,6 +1035,64 @@ func (ts *trafficSwitcher) buildTenantPredicate(ctx context.Context) (*sqlparser
10291035
return tenantPredicate, nil
10301036
}
10311037

1038+
func (ts *trafficSwitcher) stopAndDrainReverseVReplication(ctx context.Context, waitTime time.Duration) error {
1039+
ctx, cancel := context.WithTimeout(ctx, waitTime)
1040+
defer cancel()
1041+
1042+
targetPositions := make(map[string]string)
1043+
if err := ts.ForAllTargets(func(target *MigrationTarget) error {
1044+
pos, err := ts.TabletManagerClient().PrimaryPosition(ctx, target.GetPrimary().Tablet)
1045+
if err != nil {
1046+
return err
1047+
}
1048+
targetPositions[target.GetShard().ShardName()] = pos
1049+
return nil
1050+
}); err != nil {
1051+
return err
1052+
}
1053+
1054+
return ts.ForAllSources(func(source *MigrationSource) error {
1055+
res, err := ts.ws.tmc.ReadVReplicationWorkflow(ctx, source.GetPrimary().Tablet, &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{
1056+
Workflow: ts.ReverseWorkflowName(),
1057+
})
1058+
if err != nil {
1059+
return err
1060+
}
1061+
if res == nil || len(res.Streams) == 0 {
1062+
return nil
1063+
}
1064+
for _, stream := range res.Streams {
1065+
if stream.Bls == nil {
1066+
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "reverse vreplication stream %d on %s has no binlog source",
1067+
stream.Id, topoproto.TabletAliasString(source.GetPrimary().GetAlias()))
1068+
}
1069+
targetShard := stream.Bls.Shard
1070+
pos, ok := targetPositions[targetShard]
1071+
if !ok {
1072+
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
1073+
"reverse vreplication stream %d on %s reads from unknown target shard %s",
1074+
stream.Id, topoproto.TabletAliasString(source.GetPrimary().GetAlias()), targetShard)
1075+
}
1076+
if stream.State == binlogdatapb.VReplicationWorkflowState_Running {
1077+
ts.Logger().Infof("Waiting for reverse stream %d on %s to catch up to target shard %s position %s",
1078+
stream.Id, topoproto.TabletAliasString(source.GetPrimary().GetAlias()), targetShard, pos)
1079+
if err := ts.TabletManagerClient().VReplicationWaitForPos(ctx, source.GetPrimary().Tablet, stream.Id, pos); err != nil {
1080+
return err
1081+
}
1082+
}
1083+
if stream.State != binlogdatapb.VReplicationWorkflowState_Stopped {
1084+
ts.Logger().Infof("Stopping reverse stream %d on %s for complete",
1085+
stream.Id, topoproto.TabletAliasString(source.GetPrimary().GetAlias()))
1086+
if _, err := ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet,
1087+
binlogplayer.StopVReplication(stream.Id, stoppedForComplete)); err != nil {
1088+
return err
1089+
}
1090+
}
1091+
}
1092+
return nil
1093+
})
1094+
}
1095+
10321096
func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error {
10331097
ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime)
10341098
defer cancel()

go/vt/vtctl/workflow/utils.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,7 @@ func doValidateWorkflowHasCompleted(ctx context.Context, ts *trafficSwitcher) er
579579
return nil
580580
})
581581
}
582+
validateReverseWorkflowForComplete(ctx, ts, &wg, &rec)
582583
wg.Wait()
583584

584585
if !ts.keepRoutingRules {
@@ -605,6 +606,39 @@ func doValidateWorkflowHasCompleted(ctx context.Context, ts *trafficSwitcher) er
605606
return nil
606607
}
607608

609+
func validateReverseWorkflowForComplete(ctx context.Context, ts *trafficSwitcher, wg *sync.WaitGroup, rec *concurrency.AllErrorRecorder) {
610+
_ = ts.ForAllSources(func(source *MigrationSource) error {
611+
wg.Add(1)
612+
defer wg.Done()
613+
res, err := ts.ws.tmc.ReadVReplicationWorkflow(ctx, source.GetPrimary().Tablet, &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{
614+
Workflow: ts.ReverseWorkflowName(),
615+
})
616+
if err != nil {
617+
rec.RecordError(err)
618+
return nil
619+
}
620+
if res == nil || len(res.Streams) == 0 {
621+
return nil
622+
}
623+
for _, stream := range res.Streams {
624+
switch stream.State {
625+
case binlogdatapb.VReplicationWorkflowState_Running,
626+
binlogdatapb.VReplicationWorkflowState_Stopped:
627+
case binlogdatapb.VReplicationWorkflowState_Error:
628+
rec.RecordError(fmt.Errorf("reverse vreplication stream %d is in error state on tablet %d",
629+
stream.Id, source.GetPrimary().Alias.Uid))
630+
case binlogdatapb.VReplicationWorkflowState_Copying:
631+
rec.RecordError(fmt.Errorf("reverse vreplication stream %d is still copying on tablet %d",
632+
stream.Id, source.GetPrimary().Alias.Uid))
633+
default:
634+
rec.RecordError(fmt.Errorf("reverse vreplication stream %d is in state %s on tablet %d",
635+
stream.Id, stream.State, source.GetPrimary().Alias.Uid))
636+
}
637+
}
638+
return nil
639+
})
640+
}
641+
608642
// ReverseWorkflowName returns the "reversed" name of a workflow. For a
609643
// "forward" workflow, this is the workflow name with "_reverse" appended, and
610644
// for a "reversed" workflow, this is the workflow name with the "_reverse"

0 commit comments

Comments
 (0)