@@ -7,6 +7,7 @@ package logic
77
88import (
99 "context"
10+ "errors"
1011 "fmt"
1112 "io"
1213 "math"
@@ -21,6 +22,10 @@ import (
2122 "github.com/github/gh-ost/go/sql"
2223)
2324
25+ var (
26+ ErrMigratorUnsupportedRenameAlter = errors .New ("ALTER statement seems to RENAME the table. This is not supported, and you should run your RENAME outside gh-ost." )
27+ )
28+
2429type ChangelogState string
2530
2631const (
@@ -224,28 +229,22 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
224229 case Migrated , ReadMigrationRangeValues :
225230 // no-op event
226231 case GhostTableMigrated :
227- {
228- this .ghostTableMigrated <- true
229- }
232+ this .ghostTableMigrated <- true
230233 case AllEventsUpToLockProcessed :
231- {
232- var applyEventFunc tableWriteFunc = func () error {
233- this .allEventsUpToLockProcessed <- changelogStateString
234- return nil
235- }
236- // at this point we know all events up to lock have been read from the streamer,
237- // because the streamer works sequentially. So those events are either already handled,
238- // or have event functions in applyEventsQueue.
239- // So as not to create a potential deadlock, we write this func to applyEventsQueue
240- // asynchronously, understanding it doesn't really matter.
241- go func () {
242- this .applyEventsQueue <- newApplyEventStructByFunc (& applyEventFunc )
243- }()
234+ var applyEventFunc tableWriteFunc = func () error {
235+ this .allEventsUpToLockProcessed <- changelogStateString
236+ return nil
244237 }
238+ // at this point we know all events up to lock have been read from the streamer,
239+ // because the streamer works sequentially. So those events are either already handled,
240+ // or have event functions in applyEventsQueue.
241+ // So as not to create a potential deadlock, we write this func to applyEventsQueue
242+ // asynchronously, understanding it doesn't really matter.
243+ go func () {
244+ this .applyEventsQueue <- newApplyEventStructByFunc (& applyEventFunc )
245+ }()
245246 default :
246- {
247- return fmt .Errorf ("Unknown changelog state: %+v" , changelogState )
248- }
247+ return fmt .Errorf ("Unknown changelog state: %+v" , changelogState )
249248 }
250249 this .migrationContext .Log .Infof ("Handled changelog state %s" , changelogState )
251250 return nil
@@ -270,13 +269,13 @@ func (this *Migrator) listenOnPanicAbort() {
270269 this .teardown ()
271270}
272271
273- // validateStatement validates the `alter` statement meets criteria.
272+ // validateAlterStatement validates the `alter` statement meets criteria.
274273// At this time this means:
275274// - column renames are approved
276275// - no table rename allowed
277- func (this * Migrator ) validateStatement () (err error ) {
276+ func (this * Migrator ) validateAlterStatement () (err error ) {
278277 if this .parser .IsRenameTable () {
279- return fmt . Errorf ( "ALTER statement seems to RENAME the table. This is not supported, and you should run your RENAME outside gh-ost." )
278+ return ErrMigratorUnsupportedRenameAlter
280279 }
281280 if this .parser .HasNonTrivialRenames () && ! this .migrationContext .SkipRenamedColumns {
282281 this .migrationContext .ColumnRenameMap = this .parser .GetNonTrivialRenames ()
@@ -355,7 +354,7 @@ func (this *Migrator) Migrate() (err error) {
355354 if err := this .parser .ParseAlterStatement (this .migrationContext .AlterStatement ); err != nil {
356355 return err
357356 }
358- if err := this .validateStatement (); err != nil {
357+ if err := this .validateAlterStatement (); err != nil {
359358 return err
360359 }
361360
@@ -904,72 +903,49 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
904903 }
905904}
906905
907- // printStatus prints the progress status, and optionally additionally detailed
908- // dump of configuration.
909- // `rule` indicates the type of output expected.
910- // By default the status is written to standard output, but other writers can
911- // be used as well.
912- func (this * Migrator ) printStatus (rule PrintStatusRule , writers ... io.Writer ) {
913- if rule == NoPrintStatusRule {
914- return
915- }
916- writers = append (writers , os .Stdout )
917-
918- elapsedTime := this .migrationContext .ElapsedTime ()
919- elapsedSeconds := int64 (elapsedTime .Seconds ())
920- totalRowsCopied := this .migrationContext .GetTotalRowsCopied ()
921- rowsEstimate := atomic .LoadInt64 (& this .migrationContext .RowsEstimate ) + atomic .LoadInt64 (& this .migrationContext .RowsDeltaEstimate )
922- if atomic .LoadInt64 (& this .rowCopyCompleteFlag ) == 1 {
923- // Done copying rows. The totalRowsCopied value is the de-facto number of rows,
924- // and there is no further need to keep updating the value.
925- rowsEstimate = totalRowsCopied
926- }
927- var progressPct float64
928- if rowsEstimate == 0 {
929- progressPct = 100.0
930- } else {
931- progressPct = 100.0 * float64 (totalRowsCopied ) / float64 (rowsEstimate )
932- }
933- // we take the opportunity to update migration context with progressPct
934- this .migrationContext .SetProgressPct (progressPct )
935- // Before status, let's see if we should print a nice reminder for what exactly we're doing here.
936- shouldPrintMigrationStatusHint := (elapsedSeconds % 600 == 0 )
937- if rule == ForcePrintStatusAndHintRule {
938- shouldPrintMigrationStatusHint = true
939- }
940- if rule == ForcePrintStatusOnlyRule {
941- shouldPrintMigrationStatusHint = false
942- }
943- if shouldPrintMigrationStatusHint {
944- this .printMigrationStatusHint (writers ... )
906+ // getProgressPercent returns an estimate of migration progess as a percent.
907+ func (this * Migrator ) getProgressPercent (rowsEstimate int64 ) (progressPct float64 ) {
908+ progressPct = 100.0
909+ if rowsEstimate > 0 {
910+ progressPct *= float64 (this .migrationContext .GetTotalRowsCopied ()) / float64 (rowsEstimate )
945911 }
912+ return progressPct
913+ }
946914
947- var etaSeconds float64 = math .MaxFloat64
948- var etaDuration = time .Duration (base .ETAUnknown )
915+ // getMigrationETA returns the estimated duration of the migration
916+ func (this * Migrator ) getMigrationETA (rowsEstimate int64 ) (eta string , duration time.Duration ) {
917+ duration = time .Duration (base .ETAUnknown )
918+ progressPct := this .getProgressPercent (rowsEstimate )
949919 if progressPct >= 100.0 {
950- etaDuration = 0
920+ duration = 0
951921 } else if progressPct >= 0.1 {
922+ totalRowsCopied := this .migrationContext .GetTotalRowsCopied ()
952923 elapsedRowCopySeconds := this .migrationContext .ElapsedRowCopyTime ().Seconds ()
953924 totalExpectedSeconds := elapsedRowCopySeconds * float64 (rowsEstimate ) / float64 (totalRowsCopied )
954- etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
925+ etaSeconds : = totalExpectedSeconds - elapsedRowCopySeconds
955926 if etaSeconds >= 0 {
956- etaDuration = time .Duration (etaSeconds ) * time .Second
927+ duration = time .Duration (etaSeconds ) * time .Second
957928 } else {
958- etaDuration = 0
929+ duration = 0
959930 }
960931 }
961- this .migrationContext .SetETADuration (etaDuration )
962- var eta string
963- switch etaDuration {
932+
933+ switch duration {
964934 case 0 :
965935 eta = "due"
966936 case time .Duration (base .ETAUnknown ):
967937 eta = "N/A"
968938 default :
969- eta = base .PrettifyDurationOutput (etaDuration )
939+ eta = base .PrettifyDurationOutput (duration )
970940 }
971941
972- state := "migrating"
942+ return eta , duration
943+ }
944+
945+ // getMigrationStateAndETA returns the state and eta of the migration.
946+ func (this * Migrator ) getMigrationStateAndETA (rowsEstimate int64 ) (state , eta string , etaDuration time.Duration ) {
947+ eta , etaDuration = this .getMigrationETA (rowsEstimate )
948+ state = "migrating"
973949 if atomic .LoadInt64 (& this .migrationContext .CountingRowsFlag ) > 0 && ! this .migrationContext .ConcurrentCountTableRows {
974950 state = "counting rows"
975951 } else if atomic .LoadInt64 (& this .migrationContext .IsPostponingCutOver ) > 0 {
@@ -978,27 +954,78 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
978954 } else if isThrottled , throttleReason , _ := this .migrationContext .IsThrottled (); isThrottled {
979955 state = fmt .Sprintf ("throttled, %s" , throttleReason )
980956 }
957+ return state , eta , etaDuration
958+ }
981959
982- var shouldPrintStatus bool
983- if rule == HeuristicPrintStatusRule {
984- if elapsedSeconds <= 60 {
985- shouldPrintStatus = true
986- } else if etaSeconds <= 60 {
987- shouldPrintStatus = true
988- } else if etaSeconds <= 180 {
989- shouldPrintStatus = (elapsedSeconds % 5 == 0 )
990- } else if elapsedSeconds <= 180 {
991- shouldPrintStatus = (elapsedSeconds % 5 == 0 )
992- } else if this .migrationContext .TimeSincePointOfInterest ().Seconds () <= 60 {
993- shouldPrintStatus = (elapsedSeconds % 5 == 0 )
994- } else {
995- shouldPrintStatus = (elapsedSeconds % 30 == 0 )
996- }
960+ // shouldPrintStatus returns true when the migrator is due to print status info.
961+ func (this * Migrator ) shouldPrintStatus (rule PrintStatusRule , elapsedSeconds int64 , etaDuration time.Duration ) (shouldPrint bool ) {
962+ if rule != HeuristicPrintStatusRule {
963+ return true
964+ }
965+
966+ etaSeconds := etaDuration .Seconds ()
967+ if elapsedSeconds <= 60 {
968+ shouldPrint = true
969+ } else if etaSeconds <= 60 {
970+ shouldPrint = true
971+ } else if etaSeconds <= 180 {
972+ shouldPrint = (elapsedSeconds % 5 == 0 )
973+ } else if elapsedSeconds <= 180 {
974+ shouldPrint = (elapsedSeconds % 5 == 0 )
975+ } else if this .migrationContext .TimeSincePointOfInterest ().Seconds () <= 60 {
976+ shouldPrint = (elapsedSeconds % 5 == 0 )
997977 } else {
998- // Not heuristic
999- shouldPrintStatus = true
978+ shouldPrint = (elapsedSeconds % 30 == 0 )
979+ }
980+
981+ return shouldPrint
982+ }
983+
984+ // shouldPrintMigrationStatus returns true when the migrator is due to print the migration status hint
985+ func (this * Migrator ) shouldPrintMigrationStatusHint (rule PrintStatusRule , elapsedSeconds int64 ) (shouldPrint bool ) {
986+ if elapsedSeconds % 600 == 0 {
987+ shouldPrint = true
988+ } else if rule == ForcePrintStatusAndHintRule {
989+ shouldPrint = true
990+ }
991+ return shouldPrint
992+ }
993+
994+ // printStatus prints the progress status, and optionally additionally detailed
995+ // dump of configuration.
996+ // `rule` indicates the type of output expected.
997+ // By default the status is written to standard output, but other writers can
998+ // be used as well.
999+ func (this * Migrator ) printStatus (rule PrintStatusRule , writers ... io.Writer ) {
1000+ if rule == NoPrintStatusRule {
1001+ return
1002+ }
1003+ writers = append (writers , os .Stdout )
1004+
1005+ elapsedTime := this .migrationContext .ElapsedTime ()
1006+ elapsedSeconds := int64 (elapsedTime .Seconds ())
1007+ totalRowsCopied := this .migrationContext .GetTotalRowsCopied ()
1008+ rowsEstimate := atomic .LoadInt64 (& this .migrationContext .RowsEstimate ) + atomic .LoadInt64 (& this .migrationContext .RowsDeltaEstimate )
1009+ if atomic .LoadInt64 (& this .rowCopyCompleteFlag ) == 1 {
1010+ // Done copying rows. The totalRowsCopied value is the de-facto number of rows,
1011+ // and there is no further need to keep updating the value.
1012+ rowsEstimate = totalRowsCopied
1013+ }
1014+
1015+ // we take the opportunity to update migration context with progressPct
1016+ progressPct := this .getProgressPercent (rowsEstimate )
1017+ this .migrationContext .SetProgressPct (progressPct )
1018+
1019+ // Before status, let's see if we should print a nice reminder for what exactly we're doing here.
1020+ if this .shouldPrintMigrationStatusHint (rule , elapsedSeconds ) {
1021+ this .printMigrationStatusHint (writers ... )
10001022 }
1001- if ! shouldPrintStatus {
1023+
1024+ // Get state + ETA
1025+ state , eta , etaDuration := this .getMigrationStateAndETA (rowsEstimate )
1026+ this .migrationContext .SetETADuration (etaDuration )
1027+
1028+ if ! this .shouldPrintStatus (rule , elapsedSeconds , etaDuration ) {
10021029 return
10031030 }
10041031
@@ -1017,7 +1044,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
10171044 )
10181045 this .applier .WriteChangelog (
10191046 fmt .Sprintf ("copy iteration %d at %d" , this .migrationContext .GetIteration (), time .Now ().Unix ()),
1020- status ,
1047+ state ,
10211048 )
10221049 w := io .MultiWriter (writers ... )
10231050 fmt .Fprintln (w , status )
0 commit comments