vtctl/workflow: stop reverse replication before Complete drops sources#20188
vtctl/workflow: stop reverse replication before Complete drops sources#20188harshit2017 wants to merge 3 commits into
Conversation
Review ChecklistHello reviewers! 👋 Please follow this checklist when reviewing this Pull Request. General
Tests
Documentation
New flags
If a workflow is added or modified:
Backward compatibility
|
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR adds safety checks and shutdown behavior for reverse VReplication during workflow completion to prevent completing (and potentially removing source data) while reverse streams are unhealthy or not drained.
Changes:
- Validate reverse workflow stream states during
doValidateWorkflowHasCompleted. - Add
stopAndDrainReverseVReplicationto stop reverse streams after waiting for them to catch up to target primary positions. - Update workflow completion tests to expect reverse stream stop queries and add an error-state test case.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| go/vt/vtctl/workflow/utils.go | Adds validation of reverse workflow stream states during completion validation. |
| go/vt/vtctl/workflow/traffic_switcher.go | Implements draining (wait-for-pos) and stopping reverse VReplication streams prior to source removal. |
| go/vt/vtctl/workflow/switcher_interface.go | Extends the switcher interface with reverse drain/stop capability. |
| go/vt/vtctl/workflow/switcher_dry_run.go | Adds dry-run logging for reverse drain/stop behavior. |
| go/vt/vtctl/workflow/switcher.go | Wires switcher call-through to trafficSwitcher implementation. |
| go/vt/vtctl/workflow/server.go | Calls reverse drain/stop before dropping sources (unless ignoreSourceKeyspace). |
| go/vt/vtctl/workflow/server_test.go | Updates expected queries to include stopping reverse streams; adds error-state coverage. |
| go/vt/vtctl/workflow/framework_test.go | Adjusts test TM client behavior to better model reverse workflows. |
| func validateReverseWorkflowForComplete(ctx context.Context, ts *trafficSwitcher, wg *sync.WaitGroup, rec *concurrency.AllErrorRecorder) { | ||
| _ = ts.ForAllSources(func(source *MigrationSource) error { | ||
| wg.Add(1) | ||
| defer wg.Done() | ||
| res, err := ts.ws.tmc.ReadVReplicationWorkflow(ctx, source.GetPrimary().Tablet, &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{ | ||
| Workflow: ts.ReverseWorkflowName(), | ||
| }) | ||
| if err != nil { | ||
| rec.RecordError(err) | ||
| return nil | ||
| } | ||
| if res == nil || len(res.Streams) == 0 { | ||
| return nil | ||
| } | ||
| for _, stream := range res.Streams { | ||
| switch stream.State { | ||
| case binlogdatapb.VReplicationWorkflowState_Running, | ||
| binlogdatapb.VReplicationWorkflowState_Stopped: | ||
| case binlogdatapb.VReplicationWorkflowState_Error: | ||
| rec.RecordError(fmt.Errorf("reverse vreplication stream %d is in error state on tablet %d", | ||
| stream.Id, source.GetPrimary().Alias.Uid)) | ||
| case binlogdatapb.VReplicationWorkflowState_Copying: | ||
| rec.RecordError(fmt.Errorf("reverse vreplication stream %d is still copying on tablet %d", | ||
| stream.Id, source.GetPrimary().Alias.Uid)) | ||
| default: | ||
| rec.RecordError(fmt.Errorf("reverse vreplication stream %d is in state %s on tablet %d", | ||
| stream.Id, stream.State, source.GetPrimary().Alias.Uid)) | ||
| } | ||
| } | ||
| return nil | ||
| }) | ||
| } |
| for _, stream := range res.Streams { | ||
| if stream.Bls == nil { | ||
| return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "reverse vreplication stream %d on %s has no binlog source", | ||
| stream.Id, topoproto.TabletAliasString(source.GetPrimary().GetAlias())) | ||
| } | ||
| targetShard := stream.Bls.Shard | ||
| pos, ok := targetPositions[targetShard] | ||
| if !ok { | ||
| return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, | ||
| "reverse vreplication stream %d on %s reads from unknown target shard %s", | ||
| stream.Id, topoproto.TabletAliasString(source.GetPrimary().GetAlias()), targetShard) | ||
| } | ||
| if stream.State == binlogdatapb.VReplicationWorkflowState_Running { | ||
| ts.Logger().Infof("Waiting for reverse stream %d on %s to catch up to target shard %s position %s", | ||
| stream.Id, topoproto.TabletAliasString(source.GetPrimary().GetAlias()), targetShard, pos) | ||
| if err := ts.TabletManagerClient().VReplicationWaitForPos(ctx, source.GetPrimary().Tablet, stream.Id, pos); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| if stream.State != binlogdatapb.VReplicationWorkflowState_Stopped { | ||
| ts.Logger().Infof("Stopping reverse stream %d on %s for complete", | ||
| stream.Id, topoproto.TabletAliasString(source.GetPrimary().GetAlias())) | ||
| if _, err := ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet, | ||
| binlogplayer.StopVReplication(stream.Id, stoppedForComplete)); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } |
| // reverseReplicationDrainTimeout bounds how long Complete waits for reverse | ||
| // streams to catch up to the target primary position before stopping them. | ||
| reverseReplicationDrainTimeout = 30 * time.Second |
|
Promptless prepared a documentation update related to this change. Triggered by PR #20188 This PR adds validation of reverse workflow state before Review: Document reverse workflow validation in MoveTables Complete |
MoveTables Complete could rename or drop source tables while reverse vreplication was still applying, causing permanent errno 1146 errors. Validate reverse workflow state, wait for catchup, stop streams, then remove source tables. Fixes vitessio#20135 Signed-off-by: Harshit Katyal <harshit1121998@gmail.com> Signed-off-by: Harshit <harshit1121998@gmail.com>
d02e4a8 to
33916df
Compare
…sCompleted Remove the redundant outer WaitGroup; ForAllSources and ForAllTargets already run callbacks concurrently and wait internally. Propagate ForAllSources aggregate errors into the error recorder instead of discarding them, and return the error from validateReverseWorkflowForComplete. Signed-off-by: Harshit <harshit1121998@gmail.com>
| rec.RecordError(fmt.Errorf("reverse vreplication stream %d is in error state on tablet %d", | ||
| stream.Id, source.GetPrimary().Alias.Uid)) | ||
| case binlogdatapb.VReplicationWorkflowState_Copying: | ||
| rec.RecordError(fmt.Errorf("reverse vreplication stream %d is still copying on tablet %d", | ||
| stream.Id, source.GetPrimary().Alias.Uid)) | ||
| default: | ||
| rec.RecordError(fmt.Errorf("reverse vreplication stream %d is in state %s on tablet %d", | ||
| stream.Id, stream.State, source.GetPrimary().Alias.Uid)) |
| res, err := ts.ws.tmc.ReadVReplicationWorkflow(ctx, source.GetPrimary().Tablet, &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{ | ||
| Workflow: ts.ReverseWorkflowName(), | ||
| }) | ||
| if err != nil { | ||
| rec.RecordError(err) | ||
| return nil | ||
| } |
| func (ts *trafficSwitcher) stopAndDrainReverseVReplication(ctx context.Context, waitTime time.Duration) error { | ||
| ctx, cancel := context.WithTimeout(ctx, waitTime) | ||
| defer cancel() |
| if stream.Bls == nil { | ||
| return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "reverse vreplication stream %d on %s has no binlog source", | ||
| stream.Id, topoproto.TabletAliasString(source.GetPrimary().GetAlias())) | ||
| } | ||
| targetShard := stream.Bls.Shard | ||
| pos, ok := targetPositions[targetShard] | ||
| if !ok { | ||
| return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, | ||
| "reverse vreplication stream %d on %s reads from unknown target shard %s", | ||
| stream.Id, topoproto.TabletAliasString(source.GetPrimary().GetAlias()), targetShard) | ||
| } |
| if err := ts.TabletManagerClient().VReplicationWaitForPos(ctx, source.GetPrimary().Tablet, stream.Id, pos); err != nil { | ||
| return err | ||
| } |
Signed-off-by: Harshit <harshit1121998@gmail.com>
| }, | ||
| expectedSourceQueries: []*queryResult{ | ||
| { | ||
| expectedSourceQueries: append(slices.Clone(stopReverseStreamQueries), |
There was a problem hiding this comment.
Why are these changes necessary? It's best to eliminate unnecessary changes if we can.
| } | ||
|
|
||
| func (ts *trafficSwitcher) stopAndDrainReverseVReplication(ctx context.Context, waitTime time.Duration) error { | ||
| targetPositions := make(map[string]string) |
There was a problem hiding this comment.
I don't think that we should do all of this work to "drain" the stream. The tables are normally deleted. In this case we are renaming them. But new data is ONLY in the keyspace where they were moved. We are holding onto locks and can potentially introduce a whole new set of edge cases and failure scenarios by doing this work and having it take a long time, time out, or error.
I think that we should simply stop the workflow / delete the record for the reverse workflow. i.e. I think it's better to rename the function to just stop, and then only do that, stop it.
| return nil | ||
| } | ||
|
|
||
| func validateReverseWorkflowForComplete(ctx context.Context, ts *trafficSwitcher, rec *concurrency.AllErrorRecorder) error { |
There was a problem hiding this comment.
Why do we need to validate it? Why can't we simply stop it / clean it up?
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #20188 +/- ##
===========================================
+ Coverage 69.67% 72.66% +2.99%
===========================================
Files 1614 22 -1592
Lines 216793 7742 -209051
===========================================
- Hits 151044 5626 -145418
+ Misses 65749 2116 -63633
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Description
MoveTables Complete could rename or drop source tables while reverse vreplication was still applying, causing permanent errno 1146 errors. Validate reverse workflow state, wait for catchup, stop streams, then remove source tables.
Related Issue(s)
Fixes #20135
Checklist
Deployment Notes
AI Disclosure
Tests were written by AI