Skip to content

Commit 000c6c8

Browse files
authored
OnlineDDL: Improve Safety of CREATE/DROP TABLE phases (#19242)
Signed-off-by: Matt Lord <mattalord@gmail.com>
1 parent d2c0d46 commit 000c6c8

5 files changed

Lines changed: 249 additions & 35 deletions

File tree

go/vt/vttablet/onlineddl/executor.go

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,6 @@ import (
4848
"vitess.io/vitess/go/vt/binlog/binlogplayer"
4949
"vitess.io/vitess/go/vt/dbconnpool"
5050
"vitess.io/vitess/go/vt/log"
51-
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
52-
querypb "vitess.io/vitess/go/vt/proto/query"
53-
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
54-
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
5551
"vitess.io/vitess/go/vt/schema"
5652
"vitess.io/vitess/go/vt/schemadiff"
5753
"vitess.io/vitess/go/vt/servenv"
@@ -66,6 +62,11 @@ import (
6662
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
6763
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
6864
"vitess.io/vitess/go/vt/vttablet/tmclient"
65+
66+
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
67+
querypb "vitess.io/vitess/go/vt/proto/query"
68+
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
69+
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
6970
)
7071

7172
var (
@@ -79,12 +80,6 @@ var (
7980

8081
var staleMigrationMinutesStats = stats.NewGauge("OnlineDDLStaleMigrationMinutes", "longest stale migration in minutes")
8182

82-
// fixCompletedTimestampDone fixes a nil `completed_timestamp` columns, see
83-
// https://github.com/vitessio/vitess/issues/13927
84-
// The fix is in release-18.0
85-
// TODO: remove in release-19.0
86-
var fixCompletedTimestampDone bool
87-
8883
var (
8984
emptyResult = &sqltypes.Result{}
9085
acceptableDropTableIfExistsErrorCodes = []sqlerror.ErrorCode{sqlerror.ERCantFindFile, sqlerror.ERNoSuchTable}
@@ -118,7 +113,6 @@ func registerOnlineDDLFlags(fs *pflag.FlagSet) {
118113
}
119114

120115
const (
121-
maxPasswordLength = 32 // MySQL's *replication* password may not exceed 32 characters
122116
staleMigrationFailMinutes = 180
123117
staleMigrationWarningMinutes = 5
124118
progressPctStarted float64 = 0
@@ -468,7 +462,8 @@ func (e *Executor) getCreateTableStatement(ctx context.Context, tableName string
468462
return createTable, nil
469463
}
470464

471-
// executeDirectly runs a DDL query directly on the backend MySQL server
465+
// executeDirectly runs a DDL query directly on the backend MySQL server.
466+
// This is primarily used for CREATE/DROP/RENAME TABLE statements.
472467
func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.OnlineDDL, acceptableMySQLErrorCodes ...sqlerror.ErrorCode) (acceptableErrorCodeFound bool, err error) {
473468
conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaWithDB())
474469
if err != nil {
@@ -498,6 +493,11 @@ func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.Online
498493
}
499494
defer conn.ExecuteFetch("SET foreign_key_checks=@vt_onlineddl_foreign_key_checks", 0, false)
500495
}
496+
restoreLockWaitTimeout, err := e.initDBConnectionLockWaitTimeout(conn, onlineDDL.CutOverThreshold)
497+
if err != nil {
498+
return false, vterrors.Wrap(err, "failed to set lock_wait_timeout on direct connection")
499+
}
500+
defer restoreLockWaitTimeout()
501501
_, err = conn.ExecuteFetch(onlineDDL.SQL, 0, false)
502502
if err != nil {
503503
// let's see if this error is actually acceptable
@@ -1228,6 +1228,24 @@ func (e *Executor) initConnectionLockWaitTimeout(ctx context.Context, conn *conn
12281228
return deferFunc, nil
12291229
}
12301230

1231+
// initDBConnectionLockWaitTimeout sets the given lock_wait_timeout for the given direct connection, with a deferred value restoration function.
1232+
func (e *Executor) initDBConnectionLockWaitTimeout(conn *dbconnpool.DBConnection, lockWaitTimeout time.Duration) (deferFunc func(), err error) {
1233+
deferFunc = func() {}
1234+
1235+
if _, err := conn.ExecuteFetch("set @lock_wait_timeout=@@session.lock_wait_timeout", 0, false); err != nil {
1236+
return deferFunc, vterrors.Wrap(err, "could not read lock_wait_timeout")
1237+
}
1238+
timeoutSeconds := int64(lockWaitTimeout.Seconds())
1239+
setQuery := fmt.Sprintf("set @@session.lock_wait_timeout=%d", timeoutSeconds)
1240+
if _, err := conn.ExecuteFetch(setQuery, 0, false); err != nil {
1241+
return deferFunc, err
1242+
}
1243+
deferFunc = func() {
1244+
conn.ExecuteFetch("set @@session.lock_wait_timeout=@lock_wait_timeout", 0, false)
1245+
}
1246+
return deferFunc, nil
1247+
}
1248+
12311249
// createDuplicateTableLike creates the table named by `newTableName` in the likeness of onlineDDL.Table
12321250
// This function emulates MySQL's `CREATE TABLE LIKE ...` statement. The difference is that this function takes control over the generated CONSTRAINT names,
12331251
// if any, such that they are deterministic across shards, as well as preserve original names where possible.
@@ -3532,17 +3550,6 @@ func (e *Executor) gcArtifacts(ctx context.Context) error {
35323550
e.migrationMutex.Lock()
35333551
defer e.migrationMutex.Unlock()
35343552

3535-
// v18 fix. Remove in v19
3536-
if !fixCompletedTimestampDone {
3537-
if _, err := e.execQuery(ctx, sqlFixCompletedTimestamp); err != nil {
3538-
// This query fixes a bug where stale migrations were marked as 'cancelled' or 'failed' without updating 'completed_timestamp'
3539-
// Running this query retroactively sets completed_timestamp
3540-
// This fix is created in v18 and can be removed in v19
3541-
return err
3542-
}
3543-
fixCompletedTimestampDone = true
3544-
}
3545-
35463553
query, err := sqlparser.ParseAndBind(sqlSelectUncollectedArtifacts,
35473554
sqltypes.Int64BindVariable(int64((retainOnlineDDLTables).Seconds())),
35483555
)

go/vt/vttablet/onlineddl/executor_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,26 @@ Functionality of this Executor is tested in go/test/endtoend/onlineddl/...
2121
package onlineddl
2222

2323
import (
24+
"context"
2425
"testing"
2526
"time"
2627

2728
"github.com/stretchr/testify/assert"
2829
"github.com/stretchr/testify/require"
2930

31+
"vitess.io/vitess/go/mysql/fakesqldb"
32+
"vitess.io/vitess/go/sqltypes"
33+
"vitess.io/vitess/go/timer"
34+
"vitess.io/vitess/go/vt/dbconfigs"
35+
"vitess.io/vitess/go/vt/dbconnpool"
3036
"vitess.io/vitess/go/vt/schema"
37+
"vitess.io/vitess/go/vt/topo/memorytopo"
38+
"vitess.io/vitess/go/vt/vtenv"
39+
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
40+
"vitess.io/vitess/go/vt/vttablet/tmclient"
41+
"vitess.io/vitess/go/vt/vttablet/tmclienttest"
42+
43+
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
3144
)
3245

3346
func TestShouldCutOverAccordingToBackoff(t *testing.T) {
@@ -249,3 +262,90 @@ func TestGetInOrderCompletionPendingCount(t *testing.T) {
249262
require.Equal(t, uint64(3), getInOrderCompletionPendingCount(onlineDDL, pendingMigrationsUUIDs))
250263
}
251264
}
265+
266+
func TestInitDBConnectionLockWaitTimeout(t *testing.T) {
267+
db := fakesqldb.New(t)
268+
defer db.Close()
269+
params := db.ConnParams()
270+
connector := dbconfigs.NewTestDBConfigs(*params, *params, params.DbName).DbaWithDB()
271+
conn, err := dbconnpool.NewDBConnection(context.Background(), connector)
272+
require.NoError(t, err)
273+
defer conn.Close()
274+
275+
db.AddQuery("set @lock_wait_timeout=@@session.lock_wait_timeout", &sqltypes.Result{})
276+
db.AddQuery("set @@session.lock_wait_timeout=5", &sqltypes.Result{})
277+
db.AddQuery("set @@session.lock_wait_timeout=@lock_wait_timeout", &sqltypes.Result{})
278+
279+
executor := &Executor{}
280+
deferFunc, err := executor.initDBConnectionLockWaitTimeout(conn, 5*time.Second)
281+
require.NoError(t, err)
282+
queryLog := db.QueryLog()
283+
assert.Contains(t, queryLog, "set @lock_wait_timeout=@@session.lock_wait_timeout")
284+
assert.Contains(t, queryLog, "set @@session.lock_wait_timeout=5")
285+
286+
deferFunc()
287+
assert.Contains(t, db.QueryLog(), "set @@session.lock_wait_timeout=@lock_wait_timeout")
288+
}
289+
290+
func TestExecuteDirectlySetsLockWaitTimeout(t *testing.T) {
291+
ctx := t.Context()
292+
db := fakesqldb.New(t)
293+
defer db.Close()
294+
params := db.ConnParams()
295+
connector := dbconfigs.NewTestDBConfigs(*params, *params, params.DbName).DbaWithDB()
296+
conn, err := dbconnpool.NewDBConnection(ctx, connector)
297+
require.NoError(t, err)
298+
defer conn.Close()
299+
300+
db.AddQuery("set @lock_wait_timeout=@@session.lock_wait_timeout", &sqltypes.Result{})
301+
db.AddQuery("set @@session.lock_wait_timeout=5", &sqltypes.Result{})
302+
db.AddQuery("set @@session.lock_wait_timeout=@lock_wait_timeout", &sqltypes.Result{})
303+
db.AddQuery("create table test_lock_wait(id int)", &sqltypes.Result{})
304+
305+
venv := vtenv.NewTestEnv()
306+
cfg := tabletenv.NewDefaultConfig()
307+
cfg.DB = dbconfigs.NewTestDBConfigs(*params, *params, params.DbName)
308+
protocolName := t.Name()
309+
resetProtocol := tmclienttest.SetProtocol(t.Name(), protocolName)
310+
defer resetProtocol()
311+
tmclient.RegisterTabletManagerClientFactory(protocolName, func() tmclient.TabletManagerClient {
312+
return &fakeTabletManagerClient{}
313+
})
314+
alias := &topodatapb.TabletAlias{Cell: "cell", Uid: 1}
315+
ts := memorytopo.NewServer(ctx, "cell")
316+
err = ts.CreateTablet(ctx, &topodatapb.Tablet{
317+
Alias: alias,
318+
Keyspace: "ks",
319+
Shard: "0",
320+
Type: topodatapb.TabletType_PRIMARY,
321+
})
322+
require.NoError(t, err)
323+
executor := &Executor{
324+
env: tabletenv.NewEnv(venv, cfg, "ExecutorTest"),
325+
ts: ts,
326+
tabletAlias: alias,
327+
execQuery: func(ctx context.Context, query string) (*sqltypes.Result, error) {
328+
return &sqltypes.Result{}, nil
329+
},
330+
ticks: timer.NewTimer(migrationCheckInterval),
331+
}
332+
333+
onlineDDL := &schema.OnlineDDL{SQL: "create table test_lock_wait(id int)", CutOverThreshold: 5 * time.Second, UUID: "uuid"}
334+
_, err = executor.executeDirectly(ctx, onlineDDL)
335+
require.NoError(t, err)
336+
337+
queryLog := db.QueryLog()
338+
assert.Contains(t, queryLog, "set @lock_wait_timeout=@@session.lock_wait_timeout")
339+
assert.Contains(t, queryLog, "set @@session.lock_wait_timeout=5")
340+
assert.Contains(t, queryLog, "set @@session.lock_wait_timeout=@lock_wait_timeout")
341+
}
342+
343+
type fakeTabletManagerClient struct {
344+
tmclient.TabletManagerClient
345+
}
346+
347+
func (fakeTabletManagerClient) Close() {}
348+
349+
func (fakeTabletManagerClient) ReloadSchema(ctx context.Context, tablet *topodatapb.Tablet, waitPosition string) error {
350+
return nil
351+
}

go/vt/vttablet/tabletserver/gc/tablegc.go

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ package gc
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
23+
"maps"
2224
"sort"
25+
"strings"
2326
"sync"
2427
"sync/atomic"
2528
"time"
@@ -29,6 +32,7 @@ import (
2932
"vitess.io/vitess/go/mysql/capabilities"
3033
"vitess.io/vitess/go/mysql/sqlerror"
3134

35+
"vitess.io/vitess/go/sqltypes"
3236
"vitess.io/vitess/go/timer"
3337
"vitess.io/vitess/go/vt/dbconnpool"
3438
"vitess.io/vitess/go/vt/log"
@@ -37,6 +41,7 @@ import (
3741
"vitess.io/vitess/go/vt/sqlparser"
3842
"vitess.io/vitess/go/vt/topo"
3943
"vitess.io/vitess/go/vt/utils"
44+
"vitess.io/vitess/go/vt/vterrors"
4045
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
4146
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
4247
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
@@ -127,6 +132,11 @@ type TableGC struct {
127132
lifecycleStates map[schema.TableGCState]bool
128133
}
129134

135+
type capabilityConn interface {
136+
ExecuteFetch(query string, maxrows int, wantfields bool) (*sqltypes.Result, error)
137+
SupportsCapability(capabilities.FlavorCapability) (bool, error)
138+
}
139+
130140
// Status published some status values from the collector
131141
type Status struct {
132142
Keyspace string
@@ -191,19 +201,11 @@ func (collector *TableGC) Open() (err error) {
191201
return err
192202
}
193203
defer conn.Close()
194-
serverSupportsFastDrops, err := conn.SupportsCapability(capabilities.FastDropTableFlavorCapability)
204+
collector.lifecycleStates, err = adjustLifecycleForFastDrops(conn, collector.lifecycleStates)
195205
if err != nil {
196206
return err
197207
}
198-
if serverSupportsFastDrops {
199-
// MySQL 8.0.23 and onwards supports fast DROP TABLE operations. This means we don't have to
200-
// go through the purging & evac cycle: once the table has been held for long enough, we can just
201-
// move on to dropping it. Dropping a large table in 8.0.23 is expected to take several seconds, and
202-
// should not block other queries or place any locks on the buffer pool.
203-
delete(collector.lifecycleStates, schema.PurgeTableGCState)
204-
delete(collector.lifecycleStates, schema.EvacTableGCState)
205-
}
206-
log.Infof("TableGC: MySQL version=%v, serverSupportsFastDrops=%v, lifecycleStates=%v", conn.ServerVersion, serverSupportsFastDrops, collector.lifecycleStates)
208+
log.Infof("TableGC: MySQL version=%v, lifecycleStates=%v", conn.ServerVersion, collector.lifecycleStates)
207209

208210
ctx := context.Background()
209211
ctx, collector.cancelOperation = context.WithCancel(ctx)
@@ -212,6 +214,44 @@ func (collector *TableGC) Open() (err error) {
212214
return nil
213215
}
214216

217+
// adjustLifecycleForFastDrops adjusts the default lifecycle phases because MySQL 8.0.23 and onwards
218+
// supports fast DROP TABLE operations. This means we don't have to go through the purge & evac
219+
// cycles: once the table has been held for long enough, we can just move on to dropping it. Dropping
220+
// a large table in 8.0.23 is expected to take several seconds, and should not block other queries
221+
// or place any locks on the buffer pool.
222+
func adjustLifecycleForFastDrops(conn capabilityConn, lifecycleStates map[schema.TableGCState]bool) (map[schema.TableGCState]bool, error) {
223+
if len(lifecycleStates) == 0 {
224+
return lifecycleStates, nil
225+
}
226+
227+
serverSupportsFastDrops, err := conn.SupportsCapability(capabilities.FastDropTableFlavorCapability)
228+
if err != nil {
229+
return lifecycleStates, err
230+
}
231+
if !serverSupportsFastDrops {
232+
return lifecycleStates, nil
233+
}
234+
235+
// Unfortunately you can still encounter problems if the Adaptive Hash Indexes are enabled: https://bugs.mysql.com/bug.php?id=113312
236+
// So if AHI is enabled, we cannot safely skip PURGE and EVAC even on MySQL versions that support fast DROP TABLE.
237+
res, err := conn.ExecuteFetch("SELECT variable_value FROM performance_schema.global_variables WHERE variable_name = 'innodb_adaptive_hash_index'", 1, false)
238+
if err != nil {
239+
return nil, vterrors.Wrap(err, "failed to check innodb_adaptive_hash_index setting")
240+
}
241+
if res == nil || len(res.Rows) == 0 || len(res.Rows[0]) == 0 {
242+
return nil, errors.New("failed to check innodb_adaptive_hash_index setting: unexpected result")
243+
}
244+
if strings.ToLower(res.Rows[0][0].ToString()) == "on" {
245+
return lifecycleStates, nil
246+
}
247+
248+
updated := make(map[schema.TableGCState]bool, len(lifecycleStates))
249+
maps.Copy(updated, lifecycleStates)
250+
delete(updated, schema.PurgeTableGCState)
251+
delete(updated, schema.EvacTableGCState)
252+
return updated, nil
253+
}
254+
215255
// Close frees resources
216256
func (collector *TableGC) Close() {
217257
log.Infof("TableGC - started execution of Close. Acquiring initMutex lock")

0 commit comments

Comments
 (0)