@@ -28,31 +28,24 @@ type GoMySQLReader struct {
2828 LastAppliedRowsEventHint mysql.BinlogCoordinates
2929}
3030
31- func NewGoMySQLReader (migrationContext * base.MigrationContext ) (binlogReader * GoMySQLReader , err error ) {
32- binlogReader = & GoMySQLReader {
31+ func NewGoMySQLReader (migrationContext * base.MigrationContext ) * GoMySQLReader {
32+ connectionConfig := migrationContext .InspectorConnectionConfig
33+ return & GoMySQLReader {
3334 migrationContext : migrationContext ,
34- connectionConfig : migrationContext . InspectorConnectionConfig ,
35+ connectionConfig : connectionConfig ,
3536 currentCoordinates : mysql.BinlogCoordinates {},
3637 currentCoordinatesMutex : & sync.Mutex {},
37- binlogSyncer : nil ,
38- binlogStreamer : nil ,
38+ binlogSyncer : replication .NewBinlogSyncer (replication.BinlogSyncerConfig {
39+ ServerID : uint32 (migrationContext .ReplicaServerId ),
40+ Flavor : gomysql .MySQLFlavor ,
41+ Host : connectionConfig .Key .Hostname ,
42+ Port : uint16 (connectionConfig .Key .Port ),
43+ User : connectionConfig .User ,
44+ Password : connectionConfig .Password ,
45+ TLSConfig : connectionConfig .TLSConfig (),
46+ UseDecimal : true ,
47+ }),
3948 }
40-
41- serverId := uint32 (migrationContext .ReplicaServerId )
42-
43- binlogSyncerConfig := replication.BinlogSyncerConfig {
44- ServerID : serverId ,
45- Flavor : "mysql" ,
46- Host : binlogReader .connectionConfig .Key .Hostname ,
47- Port : uint16 (binlogReader .connectionConfig .Key .Port ),
48- User : binlogReader .connectionConfig .User ,
49- Password : binlogReader .connectionConfig .Password ,
50- TLSConfig : binlogReader .connectionConfig .TLSConfig (),
51- UseDecimal : true ,
52- }
53- binlogReader .binlogSyncer = replication .NewBinlogSyncer (binlogSyncerConfig )
54-
55- return binlogReader , err
5649}
5750
5851// ConnectBinlogStreamer
@@ -145,15 +138,17 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
145138 defer this .currentCoordinatesMutex .Unlock ()
146139 this .currentCoordinates .LogPos = int64 (ev .Header .LogPos )
147140 }()
148- if rotateEvent , ok := ev .Event .(* replication.RotateEvent ); ok {
141+
142+ switch binlogEvent := ev .Event .(type ) {
143+ case * replication.RotateEvent :
149144 func () {
150145 this .currentCoordinatesMutex .Lock ()
151146 defer this .currentCoordinatesMutex .Unlock ()
152- this .currentCoordinates .LogFile = string (rotateEvent .NextLogName )
147+ this .currentCoordinates .LogFile = string (binlogEvent .NextLogName )
153148 }()
154- this .migrationContext .Log .Infof ("rotate to next log from %s:%d to %s" , this .currentCoordinates .LogFile , int64 (ev .Header .LogPos ), rotateEvent .NextLogName )
155- } else if rowsEvent , ok := ev . Event .( * replication.RowsEvent ); ok {
156- if err := this .handleRowsEvent (ev , rowsEvent , entriesChannel ); err != nil {
149+ this .migrationContext .Log .Infof ("rotate to next log from %s:%d to %s" , this .currentCoordinates .LogFile , int64 (ev .Header .LogPos ), binlogEvent .NextLogName )
150+ case * replication.RowsEvent :
151+ if err := this .handleRowsEvent (ev , binlogEvent , entriesChannel ); err != nil {
157152 return err
158153 }
159154 }
0 commit comments