Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,9 @@ func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *t
}
}
blsKs := tmc.env.sourceKeyspace
if tmc.reverse.Load() && tablet.Keyspace == tmc.env.sourceKeyspace.KeyspaceName {
if strings.HasSuffix(req.Workflow, reverseSuffix) && tablet.Keyspace == tmc.env.sourceKeyspace.KeyspaceName {
blsKs = tmc.env.targetKeyspace
} else if tmc.reverse.Load() && tablet.Keyspace == tmc.env.sourceKeyspace.KeyspaceName {
blsKs = tmc.env.targetKeyspace
}
for i, shard := range blsKs.ShardNames {
Expand All @@ -380,7 +382,9 @@ func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *t
},
},
}
if tmc.frozen.Load() {
if strings.HasSuffix(req.Workflow, reverseSuffix) {
stream.State = binlogdatapb.VReplicationWorkflowState_Running
} else if tmc.frozen.Load() {
stream.Message = Frozen
}
res.Streams = append(res.Streams, stream)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2513,6 +2513,11 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
return nil, err
}
}
if !wopts.ignoreSourceKeyspace {
if err := sw.stopAndDrainReverseVReplication(ctx, reverseReplicationDrainTimeout); err != nil {
return nil, err
}
}
if !keepData {
switch ts.MigrationType() {
case binlogdatapb.MigrationType_TABLES:
Expand Down
80 changes: 65 additions & 15 deletions go/vt/vtctl/workflow/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,16 @@ func TestMoveTablesComplete(t *testing.T) {
defer cancel()

workflowName := "wf1"
stopReverseStreamQueries := []*queryResult{
{
query: "update _vt.vreplication set state='Stopped', message='stopped for complete' where id=1",
result: &querypb.QueryResult{},
},
{
query: "update _vt.vreplication set state='Stopped', message='stopped for complete' where id=2",
result: &querypb.QueryResult{},
},
}
table1Name := "t1"
table2Name := "t1_2"
table3Name := "t1_3"
Expand Down Expand Up @@ -259,25 +269,25 @@ func TestMoveTablesComplete(t *testing.T) {
TargetKeyspace: targetKeyspaceName,
Workflow: workflowName,
},
expectedSourceQueries: []*queryResult{
{
expectedSourceQueries: append(slices.Clone(stopReverseStreamQueries),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these changes necessary? It's best to eliminate unnecessary changes if we can.

&queryResult{
query: fmt.Sprintf("drop table `vt_%s`.`%s`", sourceKeyspaceName, table1Name),
result: &querypb.QueryResult{},
},
{
&queryResult{
query: fmt.Sprintf("drop table `vt_%s`.`%s`", sourceKeyspaceName, table2Name),
result: &querypb.QueryResult{},
},
{
&queryResult{
query: fmt.Sprintf("drop table `vt_%s`.`%s`", sourceKeyspaceName, table3Name),
result: &querypb.QueryResult{},
},
{
&queryResult{
query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'",
sourceKeyspaceName, ReverseWorkflowName(workflowName)),
result: &querypb.QueryResult{},
},
},
),
expectedTargetQueries: []*queryResult{
{
query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'",
Expand Down Expand Up @@ -306,13 +316,13 @@ func TestMoveTablesComplete(t *testing.T) {
KeepRoutingRules: true,
KeepData: new(true),
},
expectedSourceQueries: []*queryResult{
{
expectedSourceQueries: append(slices.Clone(stopReverseStreamQueries),
&queryResult{
query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'",
sourceKeyspaceName, ReverseWorkflowName(workflowName)),
result: &querypb.QueryResult{},
},
},
),
expectedTargetQueries: []*queryResult{
{
query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'",
Expand Down Expand Up @@ -343,25 +353,25 @@ func TestMoveTablesComplete(t *testing.T) {
Workflow: workflowName,
RenameTables: true,
},
expectedSourceQueries: []*queryResult{
{
expectedSourceQueries: append(slices.Clone(stopReverseStreamQueries),
&queryResult{
query: fmt.Sprintf("rename table `vt_%s`.`%s` TO `vt_%s`.`_%s_old`", sourceKeyspaceName, table1Name, sourceKeyspaceName, table1Name),
result: &querypb.QueryResult{},
},
{
&queryResult{
query: fmt.Sprintf("rename table `vt_%s`.`%s` TO `vt_%s`.`_%s_old`", sourceKeyspaceName, table2Name, sourceKeyspaceName, table2Name),
result: &querypb.QueryResult{},
},
{
&queryResult{
query: fmt.Sprintf("rename table `vt_%s`.`%s` TO `vt_%s`.`_%s_old`", sourceKeyspaceName, table3Name, sourceKeyspaceName, table3Name),
result: &querypb.QueryResult{},
},
{
&queryResult{
query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'",
sourceKeyspaceName, ReverseWorkflowName(workflowName)),
result: &querypb.QueryResult{},
},
},
),
expectedTargetQueries: []*queryResult{
{
query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'",
Expand All @@ -374,6 +384,46 @@ func TestMoveTablesComplete(t *testing.T) {
workflowName, targetKeyspaceName),
},
},
{
name: "reverse workflow in error state",
sourceKeyspace: &testKeyspace{
KeyspaceName: sourceKeyspaceName,
ShardNames: []string{"0"},
},
targetKeyspace: &testKeyspace{
KeyspaceName: targetKeyspaceName,
ShardNames: []string{"-80", "80-"},
},
req: &vtctldatapb.MoveTablesCompleteRequest{
TargetKeyspace: targetKeyspaceName,
Workflow: workflowName,
},
preFunc: func(t *testing.T, env *testEnv) {
for _, tablet := range env.tablets[sourceKeyspaceName] {
env.tmc.expectReadVReplicationWorkflowRequest(tablet.Alias.Uid, &readVReplicationWorkflowRequestResponse{
req: &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{
Workflow: ReverseWorkflowName(workflowName),
},
res: &tabletmanagerdatapb.ReadVReplicationWorkflowResponse{
Workflow: ReverseWorkflowName(workflowName),
Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{
{
Id: 1,
State: binlogdatapb.VReplicationWorkflowState_Error,
Bls: &binlogdatapb.BinlogSource{
Keyspace: targetKeyspaceName,
Shard: "-80",
},
},
},
},
})
}
},
wantErr: "reverse vreplication stream 1 is in error state on cell-0000000100",
postFunc: func(t *testing.T, env *testEnv) {
},
},
{
name: "ignore source keyspace",
sourceKeyspace: &testKeyspace{
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtctl/workflow/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func (r *switcher) waitForCatchup(ctx context.Context, filteredReplicationWaitTi
return r.ts.waitForCatchup(ctx, filteredReplicationWaitTime)
}

func (r *switcher) stopAndDrainReverseVReplication(ctx context.Context, waitTime time.Duration) error {
return r.ts.stopAndDrainReverseVReplication(ctx, waitTime)
}

func (r *switcher) stopSourceWrites(ctx context.Context) error {
return r.ts.stopSourceWrites(ctx)
}
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtctl/workflow/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ func (dr *switcherDryRun) waitForCatchup(ctx context.Context, filteredReplicatio
return nil
}

func (dr *switcherDryRun) stopAndDrainReverseVReplication(ctx context.Context, waitTime time.Duration) error {
dr.drLog.Logf("Stop and drain reverse vreplication workflow %s for up to %v before removing source data",
dr.ts.ReverseWorkflowName(), waitTime)
return nil
}

func (dr *switcherDryRun) stopSourceWrites(ctx context.Context) error {
sources := maps.Values(dr.ts.Sources())
// Sort the slice for deterministic output.
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtctl/workflow/switcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type iswitcher interface {
stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error)
stopSourceWrites(ctx context.Context) error
waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error
stopAndDrainReverseVReplication(ctx context.Context, waitTime time.Duration) error
migrateStreams(ctx context.Context, sm *StreamMigrator) error
createReverseVReplication(ctx context.Context) error
createJournals(ctx context.Context, sourceWorkflows []string) error
Expand Down
74 changes: 74 additions & 0 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ const (
// operation too.
shardTabletRefreshTimeout = time.Duration(30 * time.Second)

// reverseReplicationDrainTimeout bounds how long Complete waits for reverse
// streams to catch up to the target primary position before stopping them.
reverseReplicationDrainTimeout = 30 * time.Second
Comment on lines +78 to +80

stoppedForComplete = "stopped for complete"

// Use pt-osc's naming convention, this format also ensures vstreamer ignores such tables.
renameTableTemplate = "_%.59s_old" // limit table name to 64 characters

Expand Down Expand Up @@ -1029,6 +1035,74 @@ func (ts *trafficSwitcher) buildTenantPredicate(ctx context.Context) (*sqlparser
return tenantPredicate, nil
}

func (ts *trafficSwitcher) stopAndDrainReverseVReplication(ctx context.Context, waitTime time.Duration) error {
targetPositions := make(map[string]string)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we should do all of this work to "drain" the stream. The tables are normally deleted. In this case we are renaming them. But new data is ONLY in the keyspace where they were moved. We are holding onto locks and can potentially introduce a whole new set of edge cases and failure scenarios by doing this work and having it take a long time, time out, or error.

I think that we should simply stop the workflow / delete the record for the reverse workflow. i.e. I think it's better to rename the function to just stop, and then only do that, stop it.

if err := ts.ForAllTargets(func(target *MigrationTarget) error {
pos, err := ts.TabletManagerClient().PrimaryPosition(ctx, target.GetPrimary().Tablet)
if err != nil {
return err
}
targetPositions[target.GetShard().ShardName()] = pos
return nil
}); err != nil {
return err
}

ctx, cancel := context.WithTimeout(ctx, waitTime)
defer cancel()

return ts.ForAllSources(func(source *MigrationSource) error {
tabletAlias := topoproto.TabletAliasString(source.GetPrimary().GetAlias())
res, err := ts.ws.tmc.ReadVReplicationWorkflow(ctx, source.GetPrimary().Tablet, &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{
Workflow: ts.ReverseWorkflowName(),
})
if err != nil {
return vterrors.Wrapf(err, "reading reverse workflow %s on %s", ts.ReverseWorkflowName(), tabletAlias)
}
if res == nil || len(res.Streams) == 0 {
return nil
}
for _, stream := range res.Streams {
if stream.Bls == nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "reverse vreplication stream %d on %s has no binlog source",
stream.Id, tabletAlias)
}
targetShard := stream.Bls.Shard
pos, ok := targetPositions[targetShard]
if !ok {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
"reverse vreplication stream %d on %s reads from unknown target shard %s",
stream.Id, tabletAlias, targetShard)
}
Comment on lines +1066 to +1076
switch stream.State {
case binlogdatapb.VReplicationWorkflowState_Error:
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
"reverse vreplication stream %d on %s is in error state", stream.Id, tabletAlias)
case binlogdatapb.VReplicationWorkflowState_Copying:
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
"reverse vreplication stream %d on %s is still copying", stream.Id, tabletAlias)
case binlogdatapb.VReplicationWorkflowState_Stopped:
continue
case binlogdatapb.VReplicationWorkflowState_Running:
ts.Logger().Infof("Waiting for reverse stream %d on %s to catch up to target shard %s position %s",
stream.Id, tabletAlias, targetShard, pos)
if err := ts.TabletManagerClient().VReplicationWaitForPos(ctx, source.GetPrimary().Tablet, stream.Id, pos); err != nil {
return err
}
Comment on lines +1089 to +1091
default:
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
"reverse vreplication stream %d on %s is in state %s", stream.Id, tabletAlias, stream.State)
}
ts.Logger().Infof("Stopping reverse stream %d on %s for complete", stream.Id, tabletAlias)
if _, err := ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet,
binlogplayer.StopVReplication(stream.Id, stoppedForComplete)); err != nil {
return err
}
}
Comment on lines +1065 to +1101
return nil
})
}

func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime)
defer cancel()
Expand Down
53 changes: 43 additions & 10 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,21 +548,18 @@ func HashStreams(targetKeyspace string, targets map[string]*MigrationTarget) int
}

func doValidateWorkflowHasCompleted(ctx context.Context, ts *trafficSwitcher) error {
wg := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}
if ts.MigrationType() == binlogdatapb.MigrationType_SHARDS {
_ = ts.ForAllSources(func(source *MigrationSource) error {
wg.Add(1)
if err := ts.ForAllSources(func(source *MigrationSource) error {
if source.GetShard().IsPrimaryServing {
rec.RecordError(fmt.Errorf("shard %s is still serving", source.GetShard().ShardName()))
}
wg.Done()
return nil
})
}); err != nil {
rec.RecordError(err)
}
} else {
_ = ts.ForAllTargets(func(target *MigrationTarget) error {
wg.Add(1)
defer wg.Done()
if err := ts.ForAllTargets(func(target *MigrationTarget) error {
res, err := ts.ws.tmc.ReadVReplicationWorkflow(ctx, target.GetPrimary().Tablet, &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{
Workflow: ts.WorkflowName(),
})
Expand All @@ -577,9 +574,13 @@ func doValidateWorkflowHasCompleted(ctx context.Context, ts *trafficSwitcher) er
}
}
return nil
})
}); err != nil {
rec.RecordError(err)
}
}
if err := validateReverseWorkflowForComplete(ctx, ts, &rec); err != nil {
rec.RecordError(err)
}
wg.Wait()

if !ts.keepRoutingRules {
// Check if table is routable.
Expand All @@ -605,6 +606,38 @@ func doValidateWorkflowHasCompleted(ctx context.Context, ts *trafficSwitcher) er
return nil
}

func validateReverseWorkflowForComplete(ctx context.Context, ts *trafficSwitcher, rec *concurrency.AllErrorRecorder) error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to validate it? Why can't we simply stop it / clean it up?

return ts.ForAllSources(func(source *MigrationSource) error {
tabletAlias := topoproto.TabletAliasString(source.GetPrimary().GetAlias())
res, err := ts.ws.tmc.ReadVReplicationWorkflow(ctx, source.GetPrimary().Tablet, &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{
Workflow: ts.ReverseWorkflowName(),
})
if err != nil {
rec.RecordError(vterrors.Wrapf(err, "reading reverse workflow %s on %s", ts.ReverseWorkflowName(), tabletAlias))
return nil
}
Comment on lines +612 to +618
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if res == nil || len(res.Streams) == 0 {
return nil
}
for _, stream := range res.Streams {
switch stream.State {
case binlogdatapb.VReplicationWorkflowState_Running,
binlogdatapb.VReplicationWorkflowState_Stopped:
case binlogdatapb.VReplicationWorkflowState_Error:
rec.RecordError(fmt.Errorf("reverse vreplication stream %d is in error state on %s",
stream.Id, tabletAlias))
case binlogdatapb.VReplicationWorkflowState_Copying:
rec.RecordError(fmt.Errorf("reverse vreplication stream %d is still copying on %s",
stream.Id, tabletAlias))
default:
rec.RecordError(fmt.Errorf("reverse vreplication stream %d is in state %s on %s",
stream.Id, stream.State, tabletAlias))
}
}
return nil
})
}

// ReverseWorkflowName returns the "reversed" name of a workflow. For a
// "forward" workflow, this is the workflow name with "_reverse" appended, and
// for a "reversed" workflow, this is the workflow name with the "_reverse"
Expand Down
Loading