@@ -43,7 +43,8 @@ type Coordinator struct {
4343 // list of workers
4444 workers []* Worker
4545
46- // The low water mark. This is the sequence number of the last job that has been committed.
46+ // The low water mark. We maintain that all transactions with
47+ // sequence number <= lowWaterMark have been completed.
4748 lowWaterMark int64
4849
4950 // This is a map of completed jobs by their sequence numbers.
@@ -194,6 +195,7 @@ func (w *Worker) ProcessEvents() error {
194195 if len (dmlEvents ) == cap (dmlEvents ) {
195196 if err := w .applyDMLEvents (dmlEvents ); err != nil {
196197 w .coordinator .migrationContext .Log .Errore (err )
198+ // TODO do something with the err
197199 }
198200 dmlEvents = dmlEvents [:0 ]
199201 }
@@ -479,10 +481,6 @@ func (c *Coordinator) WaitForTransaction(lastCommitted int64) chan struct{} {
479481 return nil
480482 }
481483
482- if _ , ok := c .completedJobs [lastCommitted ]; ok {
483- return nil
484- }
485-
486484 waitChannel := make (chan struct {})
487485 c .waitingJobs [lastCommitted ] = append (c .waitingJobs [lastCommitted ], waitChannel )
488486
@@ -503,8 +501,6 @@ func (c *Coordinator) MarkTransactionCompleted(sequenceNumber, logPos, eventSize
503501 c .mu .Lock ()
504502 defer c .mu .Unlock ()
505503
506- //c.migrationContext.Log.Infof("Coordinator: Marking job as completed: %d\n", sequenceNumber)
507-
508504 // Mark the job as completed
509505 c .completedJobs [sequenceNumber ] = & mysql.BinlogCoordinates {LogPos : logPos , EventSize : eventSize }
510506
@@ -522,7 +518,7 @@ func (c *Coordinator) MarkTransactionCompleted(sequenceNumber, logPos, eventSize
522518
523519 // Schedule any jobs that were waiting for this job to complete or for the low watermark
524520 for waitingForSequenceNumber , channels := range c .waitingJobs {
525- if waitingForSequenceNumber <= c .lowWaterMark || waitingForSequenceNumber == sequenceNumber {
521+ if waitingForSequenceNumber <= c .lowWaterMark {
526522 channelsToNotify = append (channelsToNotify , channels ... )
527523 delete (c .waitingJobs , waitingForSequenceNumber )
528524 }
0 commit comments