Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2061,6 +2061,155 @@ func testScheduler(t *testing.T) {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusCancelled)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusCancelled)
})
t.Run("cleanup migrations by context", func(t *testing.T) {
// Submit two migrations with the same context and a non-zero retain-artifacts window so
// they remain eligible for the CLEANUP query after completion.
t1uuid = testOnlineDDLStatement(t, &testOnlineDDLStatementParams{ddlStatement: trivialAlterT1Statement, ddlStrategy: ddlStrategy + " --allow-concurrent --retain-artifacts=1h", executeStrategy: "vtctl", migrationContext: "ctx-cleanup-by-context", skipWait: true})
t2uuid = testOnlineDDLStatement(t, &testOnlineDDLStatementParams{ddlStatement: trivialAlterT2Statement, ddlStrategy: ddlStrategy + " --allow-concurrent --retain-artifacts=1h", executeStrategy: "vtctl", migrationContext: "ctx-cleanup-by-context", skipWait: true})
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusComplete)

// A non-matching context cleans up nothing.
onlineddl.CheckCleanupContextMigrations(t, &vtParams, "ctx-cleanup-by-context-other", 0)

// Cleanup by context: both migrations must be marked for cleanup.
onlineddl.CheckCleanupContextMigrations(t, &vtParams, "ctx-cleanup-by-context", 2)
})
t.Run("complete migrations by context", func(t *testing.T) {
// Submit two migrations with the same context, both postponed so they stay running.
t1uuid = testOnlineDDLStatement(t, &testOnlineDDLStatementParams{ddlStatement: trivialAlterT1Statement, ddlStrategy: ddlStrategy + " --allow-concurrent --postpone-completion", executeStrategy: "vtctl", migrationContext: "ctx-complete-by-context", skipWait: true})
t2uuid = testOnlineDDLStatement(t, &testOnlineDDLStatementParams{ddlStatement: trivialAlterT2Statement, ddlStrategy: ddlStrategy + " --allow-concurrent --postpone-completion", executeStrategy: "vtctl", migrationContext: "ctx-complete-by-context", skipWait: true})
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusRunning)

// A non-matching context completes nothing.
onlineddl.CheckCompleteContextMigrations(t, &vtParams, "ctx-complete-by-context-other", 0)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusRunning)

// Complete by context: both migrations must be completed.
onlineddl.CheckCompleteContextMigrations(t, &vtParams, "ctx-complete-by-context", 2)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusComplete)
})
t.Run("postpone-complete migrations by context", func(t *testing.T) {
// Use --postpone-launch so migrations stay in queued state until we explicitly launch them,
// giving us time to set postpone_completion first.
t1uuid = testOnlineDDLStatement(t, &testOnlineDDLStatementParams{ddlStatement: trivialAlterT1Statement, ddlStrategy: ddlStrategy + " --allow-concurrent --postpone-launch", executeStrategy: "vtctl", migrationContext: "ctx-postpone-complete-by-context", skipWait: true})
t2uuid = testOnlineDDLStatement(t, &testOnlineDDLStatementParams{ddlStatement: trivialAlterT2Statement, ddlStrategy: ddlStrategy + " --allow-concurrent --postpone-launch", executeStrategy: "vtctl", migrationContext: "ctx-postpone-complete-by-context", skipWait: true})
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusQueued)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusQueued)

// A non-matching context postpones nothing.
onlineddl.CheckPostponeCompleteContextMigrations(t, &vtParams, "ctx-postpone-complete-by-context-other", 0)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusQueued)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued)

// Postpone-complete by context: both migrations must be postponed.
onlineddl.CheckPostponeCompleteContextMigrations(t, &vtParams, "ctx-postpone-complete-by-context", 2)

// Now launch the migrations. They will run but not complete because postpone_completion is set.
onlineddl.CheckLaunchContextMigrations(t, &vtParams, "ctx-postpone-complete-by-context", 2)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusRunning)

// Migrations must stay running: postpone_completion prevents them from completing.
time.Sleep(ensureStateNotChangedTime)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusRunning)

// Complete by context: both migrations must now complete.
onlineddl.CheckCompleteContextMigrations(t, &vtParams, "ctx-postpone-complete-by-context", 2)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusComplete)
})
t.Run("force-cutover migrations by context", func(t *testing.T) {
// Use --postpone-launch so migrations stay in queued state. Using --postpone-completion
// instead risks the migration completing between FORCE_CUTOVER CONTEXT and CANCEL CONTEXT,
// causing WaitForMigrationStatus(cancelled) to hang for the full timeout.
t1uuid = testOnlineDDLStatement(t, &testOnlineDDLStatementParams{ddlStatement: trivialAlterT1Statement, ddlStrategy: ddlStrategy + " --allow-concurrent --postpone-launch", executeStrategy: "vtctl", migrationContext: "ctx-force-cutover-by-context", skipWait: true})
t2uuid = testOnlineDDLStatement(t, &testOnlineDDLStatementParams{ddlStatement: trivialAlterT2Statement, ddlStrategy: ddlStrategy + " --allow-concurrent --postpone-launch", executeStrategy: "vtctl", migrationContext: "ctx-force-cutover-by-context", skipWait: true})
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusQueued)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusQueued)

// A non-matching context force-cuts-over nothing.
onlineddl.CheckForceCutOverContextMigrations(t, &vtParams, "ctx-force-cutover-by-context-other", 0)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusQueued)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued)

// Force-cutover by context: both migrations must be marked.
onlineddl.CheckForceCutOverContextMigrations(t, &vtParams, "ctx-force-cutover-by-context", 2)

// Clean up.
onlineddl.CheckCancelContextMigrations(t, &vtParams, "ctx-force-cutover-by-context", 2)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusCancelled, schema.OnlineDDLStatusFailed)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusCancelled, schema.OnlineDDLStatusFailed)
})
t.Run("launch migrations by context", func(t *testing.T) {
// Submit two migrations with the same context, launch-postponed so they sit in queued state.
t1uuid = testOnlineDDLStatement(t, &testOnlineDDLStatementParams{ddlStatement: trivialAlterT1Statement, ddlStrategy: ddlStrategy + " --allow-concurrent --postpone-launch", executeStrategy: "vtctl", migrationContext: "ctx-launch-by-context", skipWait: true})
t2uuid = testOnlineDDLStatement(t, &testOnlineDDLStatementParams{ddlStatement: trivialAlterT2Statement, ddlStrategy: ddlStrategy + " --allow-concurrent --postpone-launch", executeStrategy: "vtctl", migrationContext: "ctx-launch-by-context", skipWait: true})
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusQueued)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusQueued)

// A non-matching context launches nothing.
onlineddl.CheckLaunchContextMigrations(t, &vtParams, "ctx-launch-by-context-other", 0)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusQueued)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued)

// Launch by context: both migrations must be launched.
onlineddl.CheckLaunchContextMigrations(t, &vtParams, "ctx-launch-by-context", 2)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusComplete)
})
t.Run("throttle/unthrottle migrations by context", func(t *testing.T) {
// Submit two migrations with the same context, both postponed so they stay running.
t1uuid = testOnlineDDLStatement(t, &testOnlineDDLStatementParams{ddlStatement: trivialAlterT1Statement, ddlStrategy: ddlStrategy + " --allow-concurrent --postpone-completion", executeStrategy: "vtctl", migrationContext: "ctx-throttle-by-context", skipWait: true})
t2uuid = testOnlineDDLStatement(t, &testOnlineDDLStatementParams{ddlStatement: trivialAlterT2Statement, ddlStrategy: ddlStrategy + " --allow-concurrent --postpone-completion", executeStrategy: "vtctl", migrationContext: "ctx-throttle-by-context", skipWait: true})
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusRunning)

// Throttle by context.
onlineddl.ThrottleContextMigrations(t, &vtParams, "ctx-throttle-by-context")
time.Sleep(ensureStateNotChangedTime)
// Verify both migrations are throttled.
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
assert.EqualValues(t, 1.0, row.AsFloat64("user_throttle_ratio", 0))
}
rs = onlineddl.ReadMigrations(t, &vtParams, t2uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
assert.EqualValues(t, 1.0, row.AsFloat64("user_throttle_ratio", 0))
}

// Unthrottle by context.
onlineddl.UnthrottleContextMigrations(t, &vtParams, "ctx-throttle-by-context")
time.Sleep(ensureStateNotChangedTime)
// Verify both migrations are unthrottled.
rs = onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
assert.EqualValues(t, 0, row.AsFloat64("user_throttle_ratio", 0))
}
rs = onlineddl.ReadMigrations(t, &vtParams, t2uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
assert.EqualValues(t, 0, row.AsFloat64("user_throttle_ratio", 0))
}

// Clean up: cancel the postponed migrations.
onlineddl.CheckCancelContextMigrations(t, &vtParams, "ctx-throttle-by-context", 2)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusCancelled, schema.OnlineDDLStatusFailed)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusCancelled, schema.OnlineDDLStatusFailed)
})
}

func testSingleton(t *testing.T) {
Expand Down
68 changes: 68 additions & 0 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,17 @@ func CheckLaunchMigration(t *testing.T, vtParams *mysql.ConnParams, shards []clu
}
}

// CheckCompleteContextMigrations completes all pending migrations with a given context and expects number of affected rows.
// A negative value for expectCount indicates "don't care, no need to check"
func CheckCompleteContextMigrations(t *testing.T, vtParams *mysql.ConnParams, migrationContext string, expectCount int) {
query := fmt.Sprintf("alter vitess_migration complete context '%s'", migrationContext)
Comment thread
shlomi-noach marked this conversation as resolved.
r := VtgateExecQuery(t, vtParams, query, "")

if expectCount >= 0 {
assert.Equal(t, expectCount, int(r.RowsAffected))
}
}

// CheckCompleteAllMigrations completes all pending migrations and expect number of affected rows
// A negative value for expectCount indicates "don't care, no need to check"
func CheckCompleteAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expectCount int) {
Expand All @@ -235,6 +246,17 @@ func CheckCompleteAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expect
}
}

// CheckPostponeCompleteContextMigrations postpones completion of all pending migrations with a given context and expects number of affected rows.
// A negative value for expectCount indicates "don't care, no need to check"
func CheckPostponeCompleteContextMigrations(t *testing.T, vtParams *mysql.ConnParams, migrationContext string, expectCount int) {
query := fmt.Sprintf("alter vitess_migration postpone complete context '%s'", migrationContext)
r := VtgateExecQuery(t, vtParams, query, "")

if expectCount >= 0 {
assert.Equal(t, expectCount, int(r.RowsAffected))
}
}

// CheckPostponeCompleteAllMigrations postpones all pending migrations and expect number of affected rows
// A negative value for expectCount indicates "don't care, no need to check"
func CheckPostponeCompleteAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expectCount int) {
Expand Down Expand Up @@ -268,6 +290,18 @@ func CheckCancelContextMigrations(t *testing.T, vtParams *mysql.ConnParams, migr
}
}

// CheckCleanupContextMigrations cleans up terminal migrations with a given context and expects number of affected rows.
// A negative value for expectCount indicates "don't care, no need to check"
func CheckCleanupContextMigrations(t *testing.T, vtParams *mysql.ConnParams, migrationContext string, expectCount int) uint64 {
query := fmt.Sprintf("alter vitess_migration cleanup context '%s'", migrationContext)
r := VtgateExecQuery(t, vtParams, query, "")

if expectCount >= 0 {
assert.Equal(t, expectCount, int(r.RowsAffected))
}
return r.RowsAffected
}

// CheckCleanupAllMigrations cleans up all applicable migrations and expect number of affected rows
// A negative value for expectCount indicates "don't care, no need to check"
func CheckCleanupAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expectCount int) uint64 {
Expand All @@ -280,6 +314,17 @@ func CheckCleanupAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expectC
return r.RowsAffected
}

// CheckLaunchContextMigrations launches all queued postponed migrations with a given context and expects number of affected rows.
// A negative value for expectCount indicates "don't care, no need to check"
func CheckLaunchContextMigrations(t *testing.T, vtParams *mysql.ConnParams, migrationContext string, expectCount int) {
query := fmt.Sprintf("alter vitess_migration launch context '%s'", migrationContext)
r := VtgateExecQuery(t, vtParams, query, "")

if expectCount >= 0 {
assert.Equal(t, expectCount, int(r.RowsAffected))
}
}

// CheckLaunchAllMigrations launches all queued posponed migrations and expect number of affected rows
// A negative value for expectCount indicates "don't care, no need to check"
func CheckLaunchAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expectCount int) {
Expand All @@ -291,6 +336,17 @@ func CheckLaunchAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expectCo
}
}

// CheckForceCutOverContextMigrations marks all pending migrations with a given context for forced cut-over and expects number of affected rows.
// A negative value for expectCount indicates "don't care, no need to check"
func CheckForceCutOverContextMigrations(t *testing.T, vtParams *mysql.ConnParams, migrationContext string, expectCount int) {
query := fmt.Sprintf("alter vitess_migration force_cutover context '%s'", migrationContext)
r := VtgateExecQuery(t, vtParams, query, "")

if expectCount >= 0 {
assert.Equal(t, expectCount, int(r.RowsAffected))
}
}

// CheckForceMigrationCutOver marks a migration for forced cut-over, and expects success by counting affected rows.
func CheckForceMigrationCutOver(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectPossible bool) {
query, err := sqlparser.ParseAndBind("alter vitess_migration %a force_cutover",
Expand Down Expand Up @@ -431,12 +487,24 @@ func ThrottleAllMigrations(t *testing.T, vtParams *mysql.ConnParams) {
_ = VtgateExecQuery(t, vtParams, query, "")
}

// ThrottleContextMigrations throttles all pending migrations with a given context.
func ThrottleContextMigrations(t *testing.T, vtParams *mysql.ConnParams, migrationContext string) {
query := fmt.Sprintf("alter vitess_migration throttle context '%s' expire '24h' ratio 1", migrationContext)
_ = VtgateExecQuery(t, vtParams, query, "")
}

// UnthrottleAllMigrations cancels migration throttling
func UnthrottleAllMigrations(t *testing.T, vtParams *mysql.ConnParams) {
query := "alter vitess_migration unthrottle all"
_ = VtgateExecQuery(t, vtParams, query, "")
}

// UnthrottleContextMigrations unthrottles all pending migrations with a given context.
func UnthrottleContextMigrations(t *testing.T, vtParams *mysql.ConnParams, migrationContext string) {
query := fmt.Sprintf("alter vitess_migration unthrottle context '%s'", migrationContext)
_ = VtgateExecQuery(t, vtParams, query, "")
}

// CheckThrottledApps checks for existence or non-existence of an app in the throttled apps list
func CheckThrottledApps(t *testing.T, vtParams *mysql.ConnParams, throttlerApp throttlerapp.Name, expectFind bool) bool {
ctx, cancel := context.WithTimeout(context.Background(), ThrottledAppsTimeout)
Expand Down
Loading
Loading