|
| 1 | +/* |
| 2 | +Copyright 2026 The Vitess Authors. |
| 3 | +
|
| 4 | +Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | +you may not use this file except in compliance with the License. |
| 6 | +You may obtain a copy of the License at |
| 7 | +
|
| 8 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +
|
| 10 | +Unless required by applicable law or agreed to in writing, software |
| 11 | +distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +See the License for the specific language governing permissions and |
| 14 | +limitations under the License. |
| 15 | +*/ |
| 16 | + |
| 17 | +package cli |
| 18 | + |
| 19 | +import ( |
| 20 | + "context" |
| 21 | + "testing" |
| 22 | + "testing/synctest" |
| 23 | + |
| 24 | + "github.com/stretchr/testify/assert" |
| 25 | + "github.com/stretchr/testify/require" |
| 26 | + |
| 27 | + "vitess.io/vitess/go/mysql/replication" |
| 28 | + "vitess.io/vitess/go/vt/mysqlctl" |
| 29 | + "vitess.io/vitess/go/vt/topo" |
| 30 | + "vitess.io/vitess/go/vt/topo/memorytopo" |
| 31 | + |
| 32 | + topodatapb "vitess.io/vitess/go/vt/proto/topodata" |
| 33 | +) |
| 34 | + |
| 35 | +func TestCatchUpReplicationForBackupClearsLastErrWhenReplicationBecomesHealthy(t *testing.T) { |
| 36 | + synctest.Test(t, func(t *testing.T) { |
| 37 | + ctx := t.Context() |
| 38 | + |
| 39 | + oldInitKeyspace := initKeyspace |
| 40 | + oldInitShard := initShard |
| 41 | + initKeyspace = "test_keyspace" |
| 42 | + initShard = "0" |
| 43 | + t.Cleanup(func() { |
| 44 | + initKeyspace = oldInitKeyspace |
| 45 | + initShard = oldInitShard |
| 46 | + }) |
| 47 | + |
| 48 | + ts := memorytopo.NewServer(ctx, "zone1") |
| 49 | + t.Cleanup(ts.Close) |
| 50 | + require.NoError(t, ts.CreateKeyspace(ctx, initKeyspace, &topodatapb.Keyspace{})) |
| 51 | + require.NoError(t, ts.CreateShard(ctx, initKeyspace, initShard)) |
| 52 | + primaryAlias := &topodatapb.TabletAlias{Cell: "zone1", Uid: 100} |
| 53 | + require.NoError(t, ts.CreateTablet(ctx, &topodatapb.Tablet{ |
| 54 | + Alias: primaryAlias, |
| 55 | + Keyspace: initKeyspace, |
| 56 | + Shard: initShard, |
| 57 | + Hostname: "primary.test", |
| 58 | + MysqlHostname: "primary-mysql.test", |
| 59 | + MysqlPort: 3306, |
| 60 | + Type: topodatapb.TabletType_PRIMARY, |
| 61 | + })) |
| 62 | + _, err := ts.UpdateShardFields(ctx, initKeyspace, initShard, func(si *topo.ShardInfo) error { |
| 63 | + si.PrimaryAlias = primaryAlias |
| 64 | + return nil |
| 65 | + }) |
| 66 | + require.NoError(t, err) |
| 67 | + |
| 68 | + restorePos := testCatchupPosition(1) |
| 69 | + primaryPos := testCatchupPosition(3) |
| 70 | + statuses := []replication.ReplicationStatus{ |
| 71 | + { |
| 72 | + Position: restorePos, |
| 73 | + IOState: replication.ReplicationStateConnecting, |
| 74 | + LastIOError: "Replica I/O for channel '': Error reconnecting to source 'vt_test@192.0.2.10:3306'. This was attempt 1/300, with a delay of 10 seconds between attempts. Message: Can't connect to MySQL server on '192.0.2.10:3306' (111), Error_code: MY-002003", |
| 75 | + SQLState: replication.ReplicationStateRunning, |
| 76 | + }, |
| 77 | + } |
| 78 | + for range int(timeoutWaitingForReplicationStatus.Seconds()) + 1 { |
| 79 | + statuses = append(statuses, replication.ReplicationStatus{ |
| 80 | + Position: restorePos, |
| 81 | + IOState: replication.ReplicationStateRunning, |
| 82 | + SQLState: replication.ReplicationStateRunning, |
| 83 | + }) |
| 84 | + } |
| 85 | + statuses = append( |
| 86 | + statuses, |
| 87 | + replication.ReplicationStatus{ |
| 88 | + Position: primaryPos, |
| 89 | + IOState: replication.ReplicationStateRunning, |
| 90 | + SQLState: replication.ReplicationStateRunning, |
| 91 | + }, |
| 92 | + replication.ReplicationStatus{ |
| 93 | + Position: primaryPos, |
| 94 | + IOState: replication.ReplicationStateRunning, |
| 95 | + SQLState: replication.ReplicationStateRunning, |
| 96 | + }, |
| 97 | + ) |
| 98 | + mysqld := &catchupReplicationMysqlDaemon{ |
| 99 | + statuses: statuses, |
| 100 | + } |
| 101 | + |
| 102 | + status, err := catchUpReplicationForBackup(ctx, ts, mysqld, restorePos, primaryPos) |
| 103 | + |
| 104 | + require.NoError(t, err) |
| 105 | + assert.True(t, status.Position.Equal(primaryPos)) |
| 106 | + assert.Equal(t, 1, mysqld.setReplicationSourceCalls) |
| 107 | + assert.Equal(t, 1, mysqld.stopReplicationCalls) |
| 108 | + }) |
| 109 | +} |
| 110 | + |
| 111 | +type catchupReplicationMysqlDaemon struct { |
| 112 | + mysqlctl.MysqlDaemon |
| 113 | + |
| 114 | + statuses []replication.ReplicationStatus |
| 115 | + statusCalls int |
| 116 | + setReplicationSourceCalls int |
| 117 | + stopReplicationCalls int |
| 118 | +} |
| 119 | + |
| 120 | +func (m *catchupReplicationMysqlDaemon) ReplicationStatus(ctx context.Context) (replication.ReplicationStatus, error) { |
| 121 | + status := m.statuses[m.statusCalls] |
| 122 | + m.statusCalls++ |
| 123 | + return status, nil |
| 124 | +} |
| 125 | + |
| 126 | +func (m *catchupReplicationMysqlDaemon) SetReplicationSource(ctx context.Context, host string, port int32, heartbeatInterval float64, stopReplicationBefore bool, startReplicationAfter bool) error { |
| 127 | + m.setReplicationSourceCalls++ |
| 128 | + return nil |
| 129 | +} |
| 130 | + |
| 131 | +func (m *catchupReplicationMysqlDaemon) StopReplication(ctx context.Context, hookExtraEnv map[string]string) error { |
| 132 | + m.stopReplicationCalls++ |
| 133 | + return nil |
| 134 | +} |
| 135 | + |
| 136 | +func testCatchupPosition(pos uint64) replication.Position { |
| 137 | + return replication.Position{GTIDSet: replication.FilePosGTID{File: "source-bin.000001", Pos: pos}} |
| 138 | +} |
0 commit comments