@@ -568,6 +568,11 @@ func (this *Applier) readMigrationMaxValues(tx *gosql.Tx, uniqueKey *sql.UniqueK
568568 return err
569569 }
570570 }
571+ // Save a snapshot copy of the initial MigrationRangeMaxValues
572+ abstractValues := make ([]interface {}, len (this .migrationContext .MigrationRangeMaxValues .AbstractValues ()))
573+ copy (abstractValues , this .migrationContext .MigrationRangeMaxValues .AbstractValues ())
574+ this .migrationContext .MigrationRangeMaxValuesInitial = sql .ToColumnValues (abstractValues )
575+
571576 this .migrationContext .Log .Infof ("Migration max values: [%s]" , this .migrationContext .MigrationRangeMaxValues )
572577
573578 return rows .Err ()
@@ -611,6 +616,63 @@ func (this *Applier) ReadMigrationRangeValues() error {
611616 return tx .Commit ()
612617}
613618
619+ // ResetMigrationRangeMaxValues updates the MigrationRangeMaxValues with new values
620+ func (this * Applier ) ResetMigrationRangeMaxValues (uniqueKeyAbstractValues []interface {}) {
621+ abstractValues := make ([]interface {}, len (uniqueKeyAbstractValues ))
622+ copy (abstractValues , uniqueKeyAbstractValues )
623+ this .migrationContext .MigrationRangeMaxValues = sql .ToColumnValues (abstractValues )
624+ this .migrationContext .Log .Debugf ("Reset migration max values: [%s]" , this .migrationContext .MigrationRangeMaxValues )
625+ }
626+
627+ // LockMigrationRangeMaxValues locks the MigrationRangeMaxValues to prevent further updates
628+ func (this * Applier ) LockMigrationRangeMaxValues () {
629+ if this .migrationContext .IsMigrationRangeMaxValuesLocked {
630+ return
631+ }
632+ this .migrationContext .IsMigrationRangeMaxValuesLocked = true
633+ this .migrationContext .Log .Infof ("Lock migration max values: [%s]" , this .migrationContext .MigrationRangeMaxValues )
634+ }
635+
636+ // AttemptToLockMigrationRangeMaxValues attempts to lock MigrationRangeMaxValues to prevent endless copying.
637+ // To avoid infinite updates of MigrationRangeMaxValues causing the copy to never end,
638+ // we need a strategy to stop updates. When the initial copy target is achieved,
639+ // MigrationRangeMaxValues will be locked.
640+ func (this * Applier ) AttemptToLockMigrationRangeMaxValues () {
641+ if this .migrationContext .IsMigrationRangeMaxValuesLocked {
642+ return
643+ }
644+
645+ // Currently only supports single-column unique index of int type
646+ uniqueKeyCols := this .migrationContext .UniqueKey .Columns .Columns ()
647+ if len (uniqueKeyCols ) != 1 {
648+ this .LockMigrationRangeMaxValues ()
649+ return
650+ }
651+ uniqueKeyCol := uniqueKeyCols [0 ]
652+ if uniqueKeyCol .CompareValueFunc == nil {
653+ this .LockMigrationRangeMaxValues ()
654+ return
655+ }
656+
657+ // Compare MigrationIterationRangeMinValues with MigrationRangeMaxValuesInitial to determine copy progress
658+ if this .migrationContext .MigrationIterationRangeMinValues == nil {
659+ return
660+ }
661+ than , err := uniqueKeyCol .CompareValueFunc (
662+ this .migrationContext .MigrationIterationRangeMinValues .AbstractValues ()[0 ],
663+ this .migrationContext .MigrationRangeMaxValuesInitial .AbstractValues ()[0 ],
664+ )
665+ if err != nil {
666+ // If comparison fails, fallback to locking MigrationRangeMaxValues
667+ this .migrationContext .Log .Errore (err )
668+ this .LockMigrationRangeMaxValues ()
669+ return
670+ }
671+ if than >= 0 {
672+ this .LockMigrationRangeMaxValues ()
673+ }
674+ }
675+
614676// CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values,
615677// which will be used for copying the next chunk of rows. Ir returns "false" if there is
616678// no further chunk to work through, i.e. we're past the last chunk and are done with
@@ -620,6 +682,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
620682 if this .migrationContext .MigrationIterationRangeMinValues == nil {
621683 this .migrationContext .MigrationIterationRangeMinValues = this .migrationContext .MigrationRangeMinValues
622684 }
685+ this .AttemptToLockMigrationRangeMaxValues ()
623686 for i := 0 ; i < 2 ; i ++ {
624687 buildFunc := sql .BuildUniqueKeyRangeEndPreparedQueryViaOffset
625688 if i == 1 {
@@ -661,6 +724,8 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
661724 }
662725 }
663726 this .migrationContext .Log .Debugf ("Iteration complete: no further range to iterate" )
727+ // Ensure MigrationRangeMaxValues is locked after iteration is complete
728+ this .LockMigrationRangeMaxValues ()
664729 return hasFurtherRange , nil
665730}
666731
@@ -1282,7 +1347,14 @@ func (this *Applier) IsIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{})
12821347 case than < 0 :
12831348 return true , nil
12841349 case than > 0 :
1285- return false , nil
1350+ // When the value is greater than MigrationRangeMaxValues boundary, attempt to dynamically expand MigrationRangeMaxValues
1351+ // After expand, treat this comparison as equal, otherwise it cannot be ignored
1352+ if ! this .migrationContext .IsMigrationRangeMaxValuesLocked {
1353+ this .ResetMigrationRangeMaxValues (uniqueKeyArgs )
1354+ return true , nil
1355+ } else {
1356+ return false , nil
1357+ }
12861358 default :
12871359 // Since rowcopy is left-open-right-closed, when it is equal to the MigrationRangeMaxValues boundary value, it can be ignored.
12881360 return true , nil
0 commit comments