@@ -632,14 +632,22 @@ func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) err
632632// connections with open transactions, holding locks on the table.
633633// This is done on a best-effort basis, by issuing `KILL` and `KILL QUERY` commands. As MySQL goes,
634634// it is not guaranteed that the queries/transactions will terminate in a timely manner.
635- func (e * Executor ) killTableLockHoldersAndAccessors (ctx context.Context , tableName string ) error {
636- log .Infof ("killTableLockHoldersAndAccessors: %v" , tableName )
635+ func (e * Executor ) killTableLockHoldersAndAccessors (ctx context.Context , uuid string , tableName string , excludeIds ... int64 ) error {
636+ log .Infof ("killTableLockHoldersAndAccessors %v:, table- %v" , uuid , tableName )
637637 conn , err := dbconnpool .NewDBConnection (ctx , e .env .Config ().DB .DbaWithDB ())
638638 if err != nil {
639639 return err
640640 }
641641 defer conn .Close ()
642642
643+ skipKill := func (threadId int64 ) bool {
644+ for _ , excludeId := range excludeIds {
645+ if threadId == excludeId {
646+ return true
647+ }
648+ }
649+ return false
650+ }
643651 {
644652 // First, let's look at PROCESSLIST for queries that _might_ be operating on our table. This may have
645653 // plenty false positives as we're simply looking for the table name as a query substring.
@@ -653,10 +661,14 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
653661 return vterrors .Wrapf (err , "finding queries potentially operating on table" )
654662 }
655663
656- log .Infof ("killTableLockHoldersAndAccessors: found %v potential queries" , len (rs .Rows ))
664+ log .Infof ("killTableLockHoldersAndAccessors %v : found %v potential queries" , uuid , len (rs .Rows ))
657665 // Now that we have some list of queries, we actually parse them to find whether the query actually references our table:
658666 for _ , row := range rs .Named ().Rows {
659667 threadId := row .AsInt64 ("id" , 0 )
668+ if skipKill (threadId ) {
669+ log .Infof ("killTableLockHoldersAndAccessors %v: skipping thread %v as it is excluded" , uuid , threadId )
670+ continue
671+ }
660672 infoQuery := row .AsString ("info" , "" )
661673 stmt , err := e .env .Environment ().Parser ().Parse (infoQuery )
662674 if err != nil {
@@ -683,7 +695,7 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
683695 }, stmt )
684696
685697 if queryUsesTable {
686- log .Infof ("killTableLockHoldersAndAccessors: killing query %v: %.100s" , threadId , infoQuery )
698+ log .Infof ("killTableLockHoldersAndAccessors %v : killing query %v: %.100s" , uuid , threadId , infoQuery )
687699 killQuery := fmt .Sprintf ("KILL QUERY %d" , threadId )
688700 if _ , err := conn .Conn .ExecuteFetch (killQuery , 1 , false ); err != nil {
689701 log .Error (vterrors .Errorf (vtrpcpb .Code_ABORTED , "could not kill query %v. Ignoring" , threadId ))
@@ -708,14 +720,18 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
708720 if err != nil {
709721 return vterrors .Wrapf (err , "finding transactions locking table `%s` %s" , tableName , description )
710722 }
711- log .Infof ("terminateTransactions: found %v transactions locking table `%s` %s" , len (rs .Rows ), tableName , description )
723+ log .Infof ("terminateTransactions %v : found %v transactions locking table `%s` %s" , uuid , len (rs .Rows ), tableName , description )
712724 for _ , row := range rs .Named ().Rows {
713725 threadId := row .AsInt64 (column , 0 )
714- log .Infof ("terminateTransactions: killing connection %v with transaction locking table `%s` %s" , threadId , tableName , description )
726+ if skipKill (threadId ) {
727+ log .Infof ("terminateTransactions %v: skipping thread %v as it is excluded" , uuid , threadId )
728+ continue
729+ }
730+ log .Infof ("terminateTransactions %v: killing connection %v with transaction locking table `%s` %s" , uuid , threadId , tableName , description )
715731 killConnection := fmt .Sprintf ("KILL %d" , threadId )
716732 _ , err = conn .Conn .ExecuteFetch (killConnection , 1 , false )
717733 if err != nil {
718- log .Errorf ("terminateTransactions: unable to kill the connection %d locking table `%s` %s: %v" , threadId , tableName , description , err )
734+ log .Errorf ("terminateTransactions %v : unable to kill the connection %d locking table `%s` %s: %v" , uuid , threadId , tableName , description , err )
719735 }
720736 }
721737 return nil
@@ -861,7 +877,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
861877 // impacts query serving so we wait for a multiple of the cutover threshold here, with
862878 // that variable primarily serving to limit the max time we later spend waiting for
863879 // a position again AFTER we've taken the locks and table access is blocked.
864- if err := waitForPos (s , postSentryPos , onlineDDL .CutOverThreshold * 3 ); err != nil {
880+ if err := waitForPos (s , postSentryPos , 3 * onlineDDL .CutOverThreshold ); err != nil {
865881 return vterrors .Wrapf (err , "failed waiting for pos after sentry creation" )
866882 }
867883 e .updateMigrationStage (ctx , onlineDDL .UUID , "post-sentry pos reached" )
@@ -874,7 +890,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
874890 defer lockConn .Recycle ()
875891 // Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation.
876892 // The code will ensure everything that needs to be terminated by `onlineDDL.CutOverThreshold` will be terminated.
877- lockConnRestoreLockWaitTimeout , err := e .initConnectionLockWaitTimeout (ctx , lockConn .Conn , 5 * onlineDDL .CutOverThreshold )
893+ lockConnRestoreLockWaitTimeout , err := e .initConnectionLockWaitTimeout (ctx , lockConn .Conn , 3 * onlineDDL .CutOverThreshold )
878894 if err != nil {
879895 return vterrors .Wrapf (err , "failed setting lock_wait_timeout on locking connection" )
880896 }
@@ -889,7 +905,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
889905 }
890906 // Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation.
891907 // The code will ensure everything that needs to be terminated by `onlineDDL.CutOverThreshold` will be terminated.
892- renameConnRestoreLockWaitTimeout , err := e .initConnectionLockWaitTimeout (ctx , renameConn .Conn , 5 * onlineDDL .CutOverThreshold * 4 )
908+ renameConnRestoreLockWaitTimeout , err := e .initConnectionLockWaitTimeout (ctx , renameConn .Conn , 2 * onlineDDL .CutOverThreshold )
893909 if err != nil {
894910 return vterrors .Wrapf (err , "failed setting lock_wait_timeout on rename connection" )
895911 }
@@ -999,7 +1015,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
9991015 if err := e .checkOnPreparedPool (ctx , onlineDDL .Table , 100 * time .Millisecond ); err != nil {
10001016 return vterrors .Wrapf (err , "checking prepared pool for table" )
10011017 }
1002- if err := e .killTableLockHoldersAndAccessors (ctx , onlineDDL .Table ); err != nil {
1018+ if err := e .killTableLockHoldersAndAccessors (ctx , onlineDDL .UUID , onlineDDL . Table ); err != nil {
10031019 return vterrors .Wrapf (err , "failed killing table lock holders and accessors" )
10041020 }
10051021 }
@@ -1019,25 +1035,36 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
10191035 // real production
10201036
10211037 e .updateMigrationStage (ctx , onlineDDL .UUID , "locking tables" )
1022- lockCtx , cancel := context .WithTimeout (ctx , onlineDDL .CutOverThreshold )
1023- defer cancel ()
1038+ lockCtx , killWhileRenamingCancel := context .WithTimeout (ctx , onlineDDL .CutOverThreshold )
1039+ defer killWhileRenamingCancel ()
10241040 lockTableQuery := sqlparser .BuildParsedQuery (sqlLockTwoTablesWrite , sentryTableName , onlineDDL .Table )
10251041 if _ , err := lockConn .Conn .Exec (lockCtx , lockTableQuery .Query , 1 , false ); err != nil {
10261042 return vterrors .Wrapf (err , "failed locking tables" )
10271043 }
10281044
10291045 e .updateMigrationStage (ctx , onlineDDL .UUID , "renaming tables" )
1046+ killWhileRenamingContext , killWhileRenamingCancel := context .WithCancel (ctx )
1047+ defer killWhileRenamingCancel ()
1048+ // We run the RENAME in a goroutine, so that we can wait for
10301049 go func () {
10311050 defer close (renameCompleteChan )
10321051 _ , err := renameConn .Conn .Exec (ctx , renameQuery .Query , 1 , false )
10331052 renameCompleteChan <- err
1053+ killWhileRenamingCancel () // RENAME is done, no need to kill queries anymore
10341054 }()
10351055 // the rename should block, because of the LOCK. Wait for it to show up.
10361056 e .updateMigrationStage (ctx , onlineDDL .UUID , "waiting for RENAME to block" )
10371057 if err := waitForRenameProcess (); err != nil {
10381058 return vterrors .Wrapf (err , "failed waiting for rename process" )
10391059 }
10401060 e .updateMigrationStage (ctx , onlineDDL .UUID , "RENAME found" )
1061+
1062+ if shouldForceCutOver {
1063+ log .Infof ("cutOverVReplMigration %v: force cut-over requested, killing table lock holders and accessors while RENAME is in place" , s .workflow )
1064+ if err := e .killTableLockHoldersAndAccessors (killWhileRenamingContext , onlineDDL .UUID , onlineDDL .Table , lockConn .Conn .ID (), renameConn .Conn .ID ()); err != nil {
1065+ return vterrors .Wrapf (err , "failed killing table lock holders and accessors" )
1066+ }
1067+ }
10411068 }
10421069
10431070 e .updateMigrationStage (ctx , onlineDDL .UUID , "reading post-lock pos" )
@@ -1113,7 +1140,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
11131140 if err := <- renameCompleteChan ; err != nil {
11141141 return vterrors .Wrapf (err , "failed waiting for rename to complete" )
11151142 }
1116- renameWasSuccessful = true
1143+ renameWasSuccessful = true // Migration effectively successful
11171144 }
11181145 }
11191146 }
@@ -2524,7 +2551,7 @@ func (e *Executor) executeSpecialAlterDirectDDLActionMigration(ctx context.Conte
25242551
25252552 if forceCutOverAfter > 0 {
25262553 // Irrespective of the --force-cut-over-after flag value, as long as it's nonzero, we now terminate
2527- // connections adn transactions on the migrated table.
2554+ // connections and transactions on the migrated table.
25282555 // --force-cut-over-after was designed to work with `vitess` migrations, that could cut-over multiple times,
25292556 // and was meant to set a limit to the overall duration of the attempts, for example 1 hour.
25302557 // With INSTANT DDL or other quick operations, this becomes meaningless. Once we begin the operation, there
@@ -2537,7 +2564,7 @@ func (e *Executor) executeSpecialAlterDirectDDLActionMigration(ctx context.Conte
25372564 if err := e .checkOnPreparedPool (ctx , onlineDDL .Table , 100 * time .Millisecond ); err != nil {
25382565 return vterrors .Wrapf (err , "checking prepared pool for table" )
25392566 }
2540- if err := e .killTableLockHoldersAndAccessors (ctx , onlineDDL .Table ); err != nil {
2567+ if err := e .killTableLockHoldersAndAccessors (ctx , onlineDDL .UUID , onlineDDL . Table ); err != nil {
25412568 return vterrors .Wrapf (err , "failed killing table lock holders and accessors" )
25422569 }
25432570 }
@@ -3025,11 +3052,16 @@ func shouldCutOverAccordingToBackoff(
30253052 // is beyond the --force-cut-over-after setting, or the column `force_cutover` is "1", and this means:
30263053 // - we do not want to backoff, we want to cutover asap
30273054 // - we agree to brute-force KILL any pending queries on the migrated table so as to ensure it's unlocked.
3028- if forceCutOverAfter > 0 && sinceReadyToComplete > forceCutOverAfter {
3029- // time since migration was ready to complete is beyond the --force-cut-over-after setting
3030- return true , true
3055+ if forceCutOverAfter > 0 {
3056+ if sinceReadyToComplete > forceCutOverAfter {
3057+ // time since migration was ready to complete is beyond the --force-cut-over-after setting
3058+ return true , true
3059+ }
3060+ if forceCutOverAfter <= time .Millisecond {
3061+ // --force-cut-over-after is set so low that it is effectively "now", even if "sinceReadyToComplete" is lower.
3062+ return true , true
3063+ }
30313064 }
3032-
30333065 // Backoff mechanism. Do not attempt to cut-over every single minute. Check how much time passed since last cut-over attempt
30343066 desiredTimeSinceLastCutover := cutoverIntervals [len (cutoverIntervals )- 1 ]
30353067 if int (cutoverAttempts ) < len (cutoverIntervals ) {
@@ -3210,7 +3242,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
32103242 }
32113243 if err := e .cutOverVReplMigration (ctx , s , shouldForceCutOver ); err != nil {
32123244 _ = e .updateMigrationMessage (ctx , uuid , err .Error ())
3213- log .Errorf ("cutOverVReplMigration failed: err=%v" , err )
3245+ log .Errorf ("cutOverVReplMigration failed %s : err=%v" , onlineDDL . UUID , err )
32143246
32153247 if sqlErr , isSQLErr := sqlerror .NewSQLErrorFromError (err ).(* sqlerror.SQLError ); isSQLErr && sqlErr != nil {
32163248 // let's see if this error is actually acceptable
0 commit comments