@@ -364,7 +364,7 @@ func (this *Migrator) Migrate() (err error) {
364364 return err
365365 }
366366 // If we are resuming, we will initiateStreaming later when we know
367- // the coordinates to resume streaming.
367+ // the binlog coordinates to resume streaming from .
368368 // If not resuming, the streamer must be initiated before the applier,
369369 // so that the "GhostTableMigrated" event gets processed.
370370 if ! this .migrationContext .Resume {
@@ -504,7 +504,7 @@ func (this *Migrator) Migrate() (err error) {
504504 }
505505 atomic .StoreInt64 (& this .migrationContext .CutOverCompleteFlag , 1 )
506506
507- if this .migrationContext .Checkpoint {
507+ if this .migrationContext .Checkpoint && ! this . migrationContext . Noop {
508508 cutoverChk , err := this .CheckpointAfterCutOver ()
509509 if err != nil {
510510 this .migrationContext .Log .Warningf ("failed to checkpoint after cutover: %+v" , err )
@@ -527,7 +527,6 @@ func (this *Migrator) Migrate() (err error) {
527527// after the original cutover, then doing another cutover to swap the tables back.
528528// The steps are similar to Migrate(), but without row copying.
529529func (this * Migrator ) Revert () error {
530- //TODO: add hooks
531530 this .migrationContext .Log .Infof ("Reverting %s.%s from %s.%s" ,
532531 sql .EscapeName (this .migrationContext .DatabaseName ), sql .EscapeName (this .migrationContext .OriginalTableName ),
533532 sql .EscapeName (this .migrationContext .DatabaseName ), sql .EscapeName (this .migrationContext .OldTableName ))
@@ -599,11 +598,17 @@ func (this *Migrator) Revert() error {
599598 } else {
600599 retrier = this .retryOperation
601600 }
601+ if err := this .hooksExecutor .onBeforeCutOver (); err != nil {
602+ return err
603+ }
602604 if err := retrier (this .cutOver ); err != nil {
603605 return err
604606 }
605607 atomic .StoreInt64 (& this .migrationContext .CutOverCompleteFlag , 1 )
606- this .migrationContext .Log .Infof ("Reverted %s.%s" , sql .EscapeName (this .migrationContext .DatabaseName ), sql .EscapeName (this .migrationContext .OriginalTableName ))
608+ if err := this .hooksExecutor .onSuccess (); err != nil {
609+ return err
610+ }
611+ this .migrationContext .Log .Infof ("Done reverting %s.%s" , sql .EscapeName (this .migrationContext .DatabaseName ), sql .EscapeName (this .migrationContext .OriginalTableName ))
607612 return nil
608613}
609614
@@ -749,6 +754,7 @@ func (this *Migrator) waitForEventsUpToLock() error {
749754 if lockProcessed .state == allEventsUpToLockProcessedChallenge {
750755 this .migrationContext .Log .Infof ("Waiting for events up to lock: got %s" , lockProcessed .state )
751756 found = true
757+ this .lastLockProcessed = lockProcessed
752758 } else {
753759 this .migrationContext .Log .Infof ("Waiting for events up to lock: skipping %s" , lockProcessed .state )
754760 }
@@ -757,7 +763,6 @@ func (this *Migrator) waitForEventsUpToLock() error {
757763 }
758764 waitForEventsUpToLockDuration := time .Since (waitForEventsUpToLockStartTime )
759765
760- this .lastLockProcessed = lockProcessed
761766 this .migrationContext .Log .Infof ("Done waiting for events up to lock; duration=%+v" , waitForEventsUpToLockDuration )
762767 this .printStatus (ForcePrintStatusAndHintRule )
763768
0 commit comments