@@ -22,7 +22,8 @@ import (
2222 "github.com/openark/golib/sqlutils"
2323)
2424
25- const startSlavePostWaitMilliseconds = 500 * time .Millisecond
25+ const startReplicationPostWait = 250 * time .Millisecond
26+ const startReplicationMaxWait = 2 * time .Second
2627
2728// Inspector reads data from the read-MySQL-server (typically a replica, but can be the master)
2829// It is used for gaining initial status and structure, and later also follow up on progress and changelog
@@ -302,12 +303,50 @@ func (this *Inspector) restartReplication() error {
302303 if startError != nil {
303304 return startError
304305 }
305- time .Sleep (startSlavePostWaitMilliseconds )
306+
307+ // loop until replication is running unless we hit a max timeout.
308+ startTime := time .Now ()
309+ for {
310+ replicationRunning , err := this .validateReplicationRestarted ()
311+ if err != nil {
312+ return fmt .Errorf ("Failed to validate if replication had been restarted: %w" , err )
313+ }
314+ if replicationRunning {
315+ break
316+ }
317+ if time .Since (startTime ) > startReplicationMaxWait {
318+ return fmt .Errorf ("Replication did not restart within the maximum wait time of %s" , startReplicationMaxWait )
319+ }
320+ this .migrationContext .Log .Debugf ("Replication not yet restarted, waiting..." )
321+ time .Sleep (startReplicationPostWait )
322+ }
306323
307324 this .migrationContext .Log .Debugf ("Replication restarted" )
308325 return nil
309326}
310327
328+ // validateReplicationRestarted checks that the Slave_IO_Running and Slave_SQL_Running are both 'Yes'
329+ // returns true if both are 'Yes', false otherwise
330+ func (this * Inspector ) validateReplicationRestarted () (bool , error ) {
331+ errNotRunning := fmt .Errorf ("Replication not running on %s" , this .connectionConfig .Key .String ())
332+ query := `show /* gh-ost */ slave status`
333+ err := sqlutils .QueryRowsMap (this .db , query , func (rowMap sqlutils.RowMap ) error {
334+ if rowMap .GetString ("Slave_IO_Running" ) != "Yes" || rowMap .GetString ("Slave_SQL_Running" ) != "Yes" {
335+ return errNotRunning
336+ }
337+ return nil
338+ })
339+
340+ if err != nil {
341+ // If the error is that replication is not running, return that and not an error
342+ if errors .Is (err , errNotRunning ) {
343+ return false , nil
344+ }
345+ return false , err
346+ }
347+ return true , nil
348+ }
349+
311350// applyBinlogFormat sets ROW binlog format and restarts replication to make
312351// the replication thread apply it.
313352func (this * Inspector ) applyBinlogFormat () error {
0 commit comments