@@ -32,11 +32,6 @@ type Coordinator struct {
3232 // Atomic counter for number of active workers (not in workerQueue)
3333 busyWorkers atomic.Int64
3434
35- // Mutex protecting currentCoordinates
36- currentCoordinatesMutex sync.Mutex
37- // The binlog coordinates of the low water mark transaction.
38- currentCoordinates mysql.BinlogCoordinates
39-
4035 // Mutex to protect the fields below
4136 mu sync.Mutex
4237
@@ -50,7 +45,7 @@ type Coordinator struct {
5045 // This is a map of completed jobs by their sequence numbers.
5146 // This is used when updating the low water mark.
5247 // It records the binlog coordinates of the completed transaction.
53- completedJobs map [int64 ]* mysql. BinlogCoordinates
48+ completedJobs map [int64 ]struct {}
5449
5550 // These are the jobs that are waiting for a previous job to complete.
5651 // They are indexed by the sequence number of the job they are waiting for.
@@ -256,21 +251,18 @@ func NewCoordinator(migrationContext *base.MigrationContext, applier *Applier, t
256251
257252 throttler : throttler ,
258253
259- currentCoordinates : mysql.BinlogCoordinates {},
260-
261254 binlogReader : binlog .NewGoMySQLReader (migrationContext ),
262255
263256 lowWaterMark : 0 ,
264- completedJobs : make (map [int64 ]* mysql. BinlogCoordinates ),
257+ completedJobs : make (map [int64 ]struct {} ),
265258 waitingJobs : make (map [int64 ][]chan struct {}),
266259
267260 events : make (chan * replication.BinlogEvent , 1000 ),
268261 }
269262}
270263
271- func (c * Coordinator ) StartStreaming (ctx context.Context , canStopStreaming func () bool ) error {
272- coords := c .GetCurrentBinlogCoordinates ()
273- err := c .binlogReader .ConnectBinlogStreamer (* coords )
264+ func (c * Coordinator ) StartStreaming (ctx context.Context , coords mysql.BinlogCoordinates , canStopStreaming func () bool ) error {
265+ err := c .binlogReader .ConnectBinlogStreamer (coords )
274266 if err != nil {
275267 return err
276268 }
@@ -297,10 +289,11 @@ func (c *Coordinator) StartStreaming(ctx context.Context, canStopStreaming func(
297289 }
298290 c .migrationContext .Log .Infof ("Reconnecting... Will resume at %+v" , coords )
299291
300- // We reconnect at the position of the last low water mark.
301- // Some jobs after the low water mark may have already applied, but
302- // it's OK to reapply them since the DML operations are idempotent.
303- coords := c .GetCurrentBinlogCoordinates ()
292+ // We reconnect from the event that was last emitted to the stream.
293+ // This ensures we don't miss any events, and we don't process any events twice.
294+ // Processing events twice messes up the transaction tracking and
295+ // will cause data corruption.
296+ coords := c .binlogReader .GetCurrentBinlogCoordinates ()
304297 if err := c .binlogReader .ConnectBinlogStreamer (* coords ); err != nil {
305298 return err
306299 }
@@ -385,10 +378,7 @@ func (c *Coordinator) ProcessEventsUntilDrained() error {
385378 }
386379 c .mu .Unlock ()
387380 case * replication.RotateEvent :
388- c .currentCoordinatesMutex .Lock ()
389- c .currentCoordinates .LogFile = string (binlogEvent .NextLogName )
390- c .currentCoordinatesMutex .Unlock ()
391- c .migrationContext .Log .Infof ("rotate to next log from %s:%d to %s" , c .currentCoordinates .LogFile , int64 (ev .Header .LogPos ), binlogEvent .NextLogName )
381+ c .migrationContext .Log .Infof ("rotate to next log in %s" , binlogEvent .NextLogName )
392382 continue
393383 default : // ignore all other events
394384 continue
@@ -495,19 +485,17 @@ func (c *Coordinator) HandleChangeLogEvent(event *binlog.BinlogDMLEvent) {
495485
496486func (c * Coordinator ) MarkTransactionCompleted (sequenceNumber , logPos , eventSize int64 ) {
497487 var channelsToNotify []chan struct {}
498- var lastCoords * mysql.BinlogCoordinates
499488
500489 func () {
501490 c .mu .Lock ()
502491 defer c .mu .Unlock ()
503492
504493 // Mark the job as completed
505- c .completedJobs [sequenceNumber ] = & mysql. BinlogCoordinates { LogPos : logPos , EventSize : eventSize }
494+ c .completedJobs [sequenceNumber ] = struct {}{ }
506495
507496 // Then, update the low water mark if possible
508497 for {
509- if coords , ok := c .completedJobs [c .lowWaterMark + 1 ]; ok {
510- lastCoords = coords
498+ if _ , ok := c .completedJobs [c .lowWaterMark + 1 ]; ok {
511499 c .lowWaterMark ++
512500 delete (c .completedJobs , c .lowWaterMark )
513501 } else {
@@ -525,29 +513,11 @@ func (c *Coordinator) MarkTransactionCompleted(sequenceNumber, logPos, eventSize
525513 }
526514 }()
527515
528- // update the binlog coords of the low water mark
529- if lastCoords != nil {
530- func () {
531- // c.migrationContext.Log.Infof("Updating binlog coordinates to %s:%d\n", c.currentCoordinates.LogFile, c.currentCoordinates.LogPos)
532- c .currentCoordinatesMutex .Lock ()
533- defer c .currentCoordinatesMutex .Unlock ()
534- c .currentCoordinates .LogPos = lastCoords .LogPos
535- c .currentCoordinates .EventSize = lastCoords .EventSize
536- }()
537- }
538-
539516 for _ , waitChannel := range channelsToNotify {
540517 waitChannel <- struct {}{}
541518 }
542519}
543520
544- func (c * Coordinator ) GetCurrentBinlogCoordinates () * mysql.BinlogCoordinates {
545- c .currentCoordinatesMutex .Lock ()
546- defer c .currentCoordinatesMutex .Unlock ()
547- returnCoordinates := c .currentCoordinates
548- return & returnCoordinates
549- }
550-
551521func (c * Coordinator ) Teardown () {
552522 c .finishedMigrating .Store (true )
553523}
0 commit comments