@@ -884,22 +884,35 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
884884 e .updateMigrationStage (ctx , onlineDDL .UUID , "post-sentry pos reached" )
885885 }
886886
887+ renameWasSuccessful := false
887888 lockConn , err := e .pool .Get (ctx , nil )
888889 if err != nil {
889890 return vterrors .Wrapf (err , "failed getting locking connection" )
890891 }
891892 defer lockConn .Recycle ()
893+ defer func () {
894+ // Always attempt UNLOCK TABLES first, as it releases locks immediately on this
895+ // connection. Then kill the connection as a fallback to guarantee any held locks
896+ // are released, even if UNLOCK TABLES were to fail.
897+ unlockCtx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
898+ defer cancel ()
899+ if _ , err := lockConn .Conn .Exec (unlockCtx , sqlUnlockTables , 1 , false ); err != nil {
900+ log .Warn (fmt .Sprintf ("Failed to UNLOCK TABLES in OnlineDDL migration %s: %v" , onlineDDL .UUID , err ))
901+ }
902+ if err := lockConn .Conn .Kill ("closing lock tables connection" , 0 ); err != nil {
903+ log .Warn (fmt .Sprintf ("Failed to kill lock tables connection in OnlineDDL migration %s: %v" , onlineDDL .UUID , err ))
904+ }
905+ }()
906+
892907 // Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation.
893908 // The code will ensure everything that needs to be terminated by `onlineDDL.CutOverThreshold` will be terminated.
894909 lockConnRestoreLockWaitTimeout , err := e .initConnectionLockWaitTimeout (ctx , lockConn .Conn , 3 * onlineDDL .CutOverThreshold )
895910 if err != nil {
896911 return vterrors .Wrapf (err , "failed setting lock_wait_timeout on locking connection" )
897912 }
898913 defer lockConnRestoreLockWaitTimeout ()
899- defer lockConn .Conn .Exec (ctx , sqlUnlockTables , 1 , false )
900914
901915 renameCompleteChan := make (chan error )
902- renameWasSuccessful := false
903916 renameConn , err := e .pool .Get (ctx , nil )
904917 if err != nil {
905918 return vterrors .Wrapf (err , "failed getting rename connection" )
@@ -1210,24 +1223,47 @@ func (e *Executor) initMigrationSQLMode(ctx context.Context, onlineDDL *schema.O
12101223 return deferFunc , nil
12111224}
12121225
1213- // initConnectionLockWaitTimeout sets the given lock_wait_timeout for the given connection, with a deferred value restoration function
1214- func (e * Executor ) initConnectionLockWaitTimeout (ctx context.Context , conn * connpool.Conn , lockWaitTimeout time.Duration ) (deferFunc func (), err error ) {
1226+ // initConnectionSessionTimeout saves the current value of the given session variable, sets it to the given duration,
1227+ // and returns a deferred restore function.
1228+ func (e * Executor ) initConnectionSessionTimeout (ctx context.Context , conn * connpool.Conn , variable string , timeout time.Duration ) (deferFunc func (), err error ) {
12151229 deferFunc = func () {}
12161230
1217- if _ , err := conn .Exec (ctx , `set @lock_wait_timeout=@@session.lock_wait_timeout` , 0 , false ); err != nil {
1218- return deferFunc , vterrors .Errorf (vtrpcpb .Code_UNKNOWN , "could not read lock_wait_timeout: %v" , err )
1231+ saveQuery , err := sqlparser .ParseAndBind (
1232+ fmt .Sprintf ("set @%s=@@session.%s" , variable , variable ),
1233+ )
1234+ if err != nil {
1235+ return deferFunc , err
1236+ }
1237+ if _ , err := conn .Exec (ctx , saveQuery , 0 , false ); err != nil {
1238+ return deferFunc , vterrors .Wrapf (err , "could not read %s" , variable )
1239+ }
1240+ setQuery , err := sqlparser .ParseAndBind (
1241+ fmt .Sprintf ("set @@session.%s=%%a" , variable ),
1242+ sqltypes .Int64BindVariable (int64 (timeout .Seconds ())),
1243+ )
1244+ if err != nil {
1245+ return deferFunc , err
12191246 }
1220- timeoutSeconds := int64 (lockWaitTimeout .Seconds ())
1221- setQuery := fmt .Sprintf ("set @@session.lock_wait_timeout=%d" , timeoutSeconds )
12221247 if _ , err := conn .Exec (ctx , setQuery , 0 , false ); err != nil {
12231248 return deferFunc , err
12241249 }
1250+ restoreQuery , err := sqlparser .ParseAndBind (
1251+ fmt .Sprintf ("set @@session.%s=@%s" , variable , variable ),
1252+ )
1253+ if err != nil {
1254+ return deferFunc , err
1255+ }
12251256 deferFunc = func () {
1226- conn .Exec (ctx , "set @@session.lock_wait_timeout=@lock_wait_timeout" , 0 , false )
1257+ conn .Exec (ctx , restoreQuery , 0 , false )
12271258 }
12281259 return deferFunc , nil
12291260}
12301261
1262+ // initConnectionLockWaitTimeout sets the given lock_wait_timeout for the given connection, with a deferred value restoration function.
1263+ func (e * Executor ) initConnectionLockWaitTimeout (ctx context.Context , conn * connpool.Conn , timeout time.Duration ) (func (), error ) {
1264+ return e .initConnectionSessionTimeout (ctx , conn , "lock_wait_timeout" , timeout )
1265+ }
1266+
12311267// initDBConnectionLockWaitTimeout sets the given lock_wait_timeout for the given direct connection, with a deferred value restoration function.
12321268func (e * Executor ) initDBConnectionLockWaitTimeout (conn * dbconnpool.DBConnection , lockWaitTimeout time.Duration ) (deferFunc func (), err error ) {
12331269 deferFunc = func () {}
0 commit comments