Skip to content

Commit 8ee5109

Browse files
authored
Merge pull request #317 from dolthub/fulghum/com_binlog_prototype
Port: Support for Vitess server to send binlog events
2 parents 96a83d3 + 6621ed3 commit 8ee5109

File tree

10 files changed

+259
-29
lines changed

10 files changed

+259
-29
lines changed

go/mysql/binlog_dump.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
Copyright 2022 The Vitess Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package mysql
18+
19+
import (
20+
"encoding/binary"
21+
vtrpcpb "github.com/dolthub/vitess/go/vt/proto/vtrpc"
22+
"github.com/dolthub/vitess/go/vt/vterrors"
23+
"io"
24+
)
25+
26+
var (
27+
// BinglogMagicNumber is 4-byte number at the beginning of every binary log
28+
BinglogMagicNumber = []byte{0xfe, 0x62, 0x69, 0x6e}
29+
readPacketErr = vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error reading BinlogDumpGTID packet")
30+
)
31+
32+
const (
33+
BinlogDumpNonBlock = 0x01
34+
BinlogThroughPosition = 0x02
35+
BinlogThroughGTID = 0x04
36+
)
37+
38+
func (c *Conn) parseComBinlogDump(data []byte) (logFile string, binlogPos uint32, err error) {
39+
pos := 1
40+
41+
binlogPos, pos, ok := readUint32(data, pos)
42+
if !ok {
43+
return logFile, binlogPos, readPacketErr
44+
}
45+
46+
pos += 2 // flags
47+
pos += 4 // server-id
48+
49+
logFile = string(data[pos:])
50+
return logFile, binlogPos, nil
51+
}
52+
53+
func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint64, position Position, err error) {
54+
// see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
55+
pos := 1
56+
57+
flags := binary.LittleEndian.Uint16(data[pos : pos+2])
58+
pos += 2 // flags
59+
pos += 4 // server-id
60+
61+
fileNameLen, pos, ok := readUint32(data, pos)
62+
if !ok {
63+
return logFile, logPos, position, readPacketErr
64+
}
65+
logFile = string(data[pos : pos+int(fileNameLen)])
66+
pos += int(fileNameLen)
67+
68+
logPos, pos, ok = readUint64(data, pos)
69+
if !ok {
70+
return logFile, logPos, position, readPacketErr
71+
}
72+
73+
if flags&BinlogDumpNonBlock != 0 {
74+
return logFile, logPos, position, io.EOF
75+
}
76+
if flags&BinlogThroughGTID != 0 {
77+
dataSize, pos, ok := readUint32(data, pos)
78+
if !ok {
79+
return logFile, logPos, position, readPacketErr
80+
}
81+
if gtid := string(data[pos : pos+int(dataSize)]); gtid != "" {
82+
position, err = DecodePosition(gtid)
83+
if err != nil {
84+
return logFile, logPos, position, err
85+
}
86+
}
87+
}
88+
89+
return logFile, logPos, position, nil
90+
}

go/mysql/binlog_event.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ type BinlogEvent interface {
122122

123123
// IsPseudo is for custom implementations of GTID.
124124
IsPseudo() bool
125+
126+
// Bytes returns the binary representation of the event
127+
Bytes() []byte
125128
}
126129

127130
// BinlogFormat contains relevant data from the FORMAT_DESCRIPTION_EVENT.

go/mysql/binlog_event_filepos.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ type filePosQueryEvent struct {
8787
filePosFakeEvent
8888
}
8989

90+
func (ev filePosQueryEvent) Bytes() []byte {
91+
panic("not implemented")
92+
}
93+
9094
func newFilePosQueryEvent(query string, ts uint32) filePosQueryEvent {
9195
return filePosQueryEvent{
9296
query: query,
@@ -221,6 +225,10 @@ type filePosGTIDEvent struct {
221225
gtid filePosGTID
222226
}
223227

228+
func (ev filePosGTIDEvent) Bytes() []byte {
229+
panic("not implemented")
230+
}
231+
224232
func newFilePosGTIDEvent(file string, pos int, timestamp uint32) filePosGTIDEvent {
225233
return filePosGTIDEvent{
226234
filePosFakeEvent: filePosFakeEvent{

go/mysql/binlog_event_make.go

Lines changed: 65 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ package mysql
1818

1919
import (
2020
"encoding/binary"
21+
"hash/crc32"
22+
)
23+
24+
const (
25+
FlagLogEventArtificial = 0x20
2126
)
2227

2328
// This file contains utility methods to create binlog replication
@@ -100,7 +105,12 @@ func (s *FakeBinlogStream) Packetize(f BinlogFormat, typ byte, flags uint16, dat
100105
}
101106

102107
result := make([]byte, length)
103-
binary.LittleEndian.PutUint32(result[0:4], s.Timestamp)
108+
switch typ {
109+
case eRotateEvent, eHeartbeatEvent:
110+
// timestamp remains zero
111+
default:
112+
binary.LittleEndian.PutUint32(result[0:4], s.Timestamp)
113+
}
104114
result[4] = typ
105115
binary.LittleEndian.PutUint32(result[5:9], s.ServerID)
106116
binary.LittleEndian.PutUint32(result[9:13], uint32(length))
@@ -109,6 +119,13 @@ func (s *FakeBinlogStream) Packetize(f BinlogFormat, typ byte, flags uint16, dat
109119
binary.LittleEndian.PutUint16(result[17:19], flags)
110120
}
111121
copy(result[f.HeaderLength:], data)
122+
123+
switch f.ChecksumAlgorithm {
124+
case BinlogChecksumAlgCRC32:
125+
checksum := crc32.ChecksumIEEE(result[0 : length-4])
126+
binary.LittleEndian.PutUint32(result[length-4:], checksum)
127+
}
128+
112129
return result
113130
}
114131

@@ -157,12 +174,38 @@ func NewRotateEvent(f BinlogFormat, s *FakeBinlogStream, position uint64, filena
157174
len(filename)
158175
data := make([]byte, length)
159176
binary.LittleEndian.PutUint64(data[0:8], position)
177+
copy(data[8:], filename)
160178

161179
ev := s.Packetize(f, eRotateEvent, 0, data)
162-
ev[0] = 0
163-
ev[1] = 0
164-
ev[2] = 0
165-
ev[3] = 0
180+
return NewMysql56BinlogEvent(ev)
181+
}
182+
183+
func NewFakeRotateEvent(f BinlogFormat, s *FakeBinlogStream, filename string) BinlogEvent {
184+
length := 8 + // position
185+
len(filename)
186+
data := make([]byte, length)
187+
binary.LittleEndian.PutUint64(data[0:8], 4)
188+
copy(data[8:], filename)
189+
190+
ev := s.Packetize(f, eRotateEvent, FlagLogEventArtificial, data)
191+
return NewMysql56BinlogEvent(ev)
192+
}
193+
194+
// NewHeartbeatEvent returns a HeartbeatEvent.
195+
// see https://dev.mysql.com/doc/internals/en/heartbeat-event.html
196+
func NewHeartbeatEvent(f BinlogFormat, s *FakeBinlogStream) BinlogEvent {
197+
ev := s.Packetize(f, eHeartbeatEvent, 0, []byte{})
198+
return NewMysql56BinlogEvent(ev)
199+
}
200+
201+
// NewHeartbeatEvent returns a HeartbeatEvent.
202+
// see https://dev.mysql.com/doc/internals/en/heartbeat-event.html
203+
func NewHeartbeatEventWithLogFile(f BinlogFormat, s *FakeBinlogStream, filename string) BinlogEvent {
204+
length := len(filename)
205+
data := make([]byte, length)
206+
copy(data, filename)
207+
208+
ev := s.Packetize(f, eHeartbeatEvent, 0, data)
166209
return NewMysql56BinlogEvent(ev)
167210
}
168211

@@ -172,7 +215,7 @@ func NewQueryEvent(f BinlogFormat, s *FakeBinlogStream, q Query) BinlogEvent {
172215
if q.Charset != nil {
173216
statusVarLength += 1 + 2 + 2 + 2
174217
}
175-
length := 4 + // slave proxy id
218+
length := 4 + // proxy id
176219
4 + // execution time
177220
1 + // schema length
178221
2 + // error code
@@ -296,9 +339,9 @@ func NewTableMapEvent(f BinlogFormat, s *FakeBinlogStream, tableID uint64, tm *T
296339
1 + // table name length
297340
len(tm.Name) +
298341
1 + // [00]
299-
1 + // column-count FIXME(alainjobart) len enc
342+
lenEncIntSize(uint64(len(tm.Types))) + // column-count len enc
300343
len(tm.Types) +
301-
1 + // lenenc-str column-meta-def FIXME(alainjobart) len enc
344+
lenEncIntSize(uint64(metadataLength)) + // lenenc-str column-meta-def
302345
metadataLength +
303346
len(tm.CanBeNull.data)
304347
data := make([]byte, length)
@@ -320,15 +363,10 @@ func NewTableMapEvent(f BinlogFormat, s *FakeBinlogStream, tableID uint64, tm *T
320363
data[pos] = 0
321364
pos++
322365

323-
data[pos] = byte(len(tm.Types)) // FIXME(alainjobart) lenenc
324-
pos++
325-
366+
pos = writeLenEncInt(data, pos, uint64(len(tm.Types)))
326367
pos += copy(data[pos:], tm.Types)
327368

328-
// Per-column meta data. Starting with len-enc length.
329-
// FIXME(alainjobart) lenenc
330-
data[pos] = byte(metadataLength)
331-
pos++
369+
pos = writeLenEncInt(data, pos, uint64(metadataLength))
332370
for c, typ := range tm.Types {
333371
pos = metadataWrite(data, pos, typ, tm.Metadata[c])
334372
}
@@ -366,10 +404,20 @@ func newRowsEvent(f BinlogFormat, s *FakeBinlogStream, typ byte, tableID uint64,
366404
panic("Not implemented, post_header_length==6")
367405
}
368406

407+
hasIdentify := typ == eUpdateRowsEventV1 || typ == eUpdateRowsEventV2 ||
408+
typ == eDeleteRowsEventV1 || typ == eDeleteRowsEventV2
409+
hasData := typ == eWriteRowsEventV1 || typ == eWriteRowsEventV2 ||
410+
typ == eUpdateRowsEventV1 || typ == eUpdateRowsEventV2
411+
412+
rowLen := rows.DataColumns.Count()
413+
if hasIdentify {
414+
rowLen = rows.IdentifyColumns.Count()
415+
}
416+
369417
length := 6 + // table id
370418
2 + // flags
371419
2 + // extra data length, no extra data.
372-
1 + // num columns FIXME(alainjobart) len enc
420+
lenEncIntSize(uint64(rowLen)) + // num columns
373421
len(rows.IdentifyColumns.data) + // only > 0 for Update & Delete
374422
len(rows.DataColumns.data) // only > 0 for Write & Update
375423
for _, row := range rows.Rows {
@@ -380,11 +428,6 @@ func newRowsEvent(f BinlogFormat, s *FakeBinlogStream, typ byte, tableID uint64,
380428
}
381429
data := make([]byte, length)
382430

383-
hasIdentify := typ == eUpdateRowsEventV1 || typ == eUpdateRowsEventV2 ||
384-
typ == eDeleteRowsEventV1 || typ == eDeleteRowsEventV2
385-
hasData := typ == eWriteRowsEventV1 || typ == eWriteRowsEventV2 ||
386-
typ == eUpdateRowsEventV1 || typ == eUpdateRowsEventV2
387-
388431
data[0] = byte(tableID)
389432
data[1] = byte(tableID >> 8)
390433
data[2] = byte(tableID >> 16)
@@ -396,12 +439,7 @@ func newRowsEvent(f BinlogFormat, s *FakeBinlogStream, typ byte, tableID uint64,
396439
data[8] = 0x02
397440
data[9] = 0x00
398441

399-
if hasIdentify {
400-
data[10] = byte(rows.IdentifyColumns.Count()) // FIXME(alainjobart) len
401-
} else {
402-
data[10] = byte(rows.DataColumns.Count()) // FIXME(alainjobart) len
403-
}
404-
pos := 11
442+
pos := writeLenEncInt(data, 10, uint64(rowLen))
405443

406444
if hasIdentify {
407445
pos += copy(data[pos:], rows.IdentifyColumns.data)

go/mysql/binlog_event_mysql56_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,4 @@ func TestMysql56ParsePosition(t *testing.T) {
126126
if !got.Equal(want) {
127127
t.Errorf("(&mysql56{}).ParsePosition(%#v) = %#v, want %#v", input, got, want)
128128
}
129-
}
129+
}

go/mysql/conn.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1359,6 +1359,19 @@ func (c *Conn) handleNextCommand(handler Handler) error {
13591359
c.writeErrorPacketFromError(err)
13601360
}
13611361

1362+
case ComBinlogDumpGTID:
1363+
ok := c.handleComBinlogDumpGTID(handler, data)
1364+
if !ok {
1365+
return fmt.Errorf("error handling ComBinlogDumpGTID packet: %v", data)
1366+
}
1367+
return nil
1368+
1369+
case ComRegisterReplica:
1370+
// TODO: Seems like we probably need this command implemented, too, but it hasn't been needed
1371+
// yet in a simple Vitess <-> Vitess replication test, so skipping for now.
1372+
//return c.handleComRegisterReplica(handler, data)
1373+
return fmt.Errorf("ComRegisterReplica not implemented")
1374+
13621375
default:
13631376
log.Errorf("Got unhandled packet (default) from %s, returning error: %v", c, data)
13641377
c.recycleReadPacket()
@@ -1371,6 +1384,36 @@ func (c *Conn) handleNextCommand(handler Handler) error {
13711384
return nil
13721385
}
13731386

1387+
func (c *Conn) handleComBinlogDumpGTID(handler Handler, data []byte) (kontinue bool) {
1388+
binlogReplicaHandler, ok := handler.(BinlogReplicaHandler)
1389+
if !ok {
1390+
log.Warningf("received BINLOG_DUMP_GTID command, but handler does not implement BinlogReplicaHandler")
1391+
return true
1392+
}
1393+
1394+
c.recycleReadPacket()
1395+
kontinue = true
1396+
1397+
c.startWriterBuffering()
1398+
defer func() {
1399+
if err := c.flush(); err != nil {
1400+
log.Errorf("conn %v: flush() failed: %v", c.ID(), err)
1401+
kontinue = false
1402+
}
1403+
}()
1404+
1405+
logFile, logPos, position, err := c.parseComBinlogDumpGTID(data)
1406+
if err != nil {
1407+
log.Errorf("conn %v: parseComBinlogDumpGTID failed: %v", c.ID(), err)
1408+
return false
1409+
}
1410+
if err := binlogReplicaHandler.ComBinlogDumpGTID(c, logFile, logPos, position.GTIDSet); err != nil {
1411+
log.Error(err.Error())
1412+
return false
1413+
}
1414+
return kontinue
1415+
}
1416+
13741417
// writeNumRows writes the specified number of rows to the handler, the end result, and flushes
13751418
func (c *Conn) writeNumRows(numRows int) (err error) {
13761419
origRows := c.cs.pending.Rows

go/mysql/constants.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,9 @@ const (
232232
// ComBinlogDumpGTID is COM_BINLOG_DUMP_GTID.
233233
ComBinlogDumpGTID = 0x1e
234234

235+
// ComRegisterReplica is COM_REGISTER_REPLICA
236+
ComRegisterReplica = 0x15
237+
235238
// OKPacket is the header of the OK packet.
236239
OKPacket = 0x00
237240

go/mysql/replication.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,31 @@ func (c *Conn) SemiSyncExtensionLoaded() bool {
7777
}
7878
return len(qr.Rows) >= 1
7979
}
80+
81+
// WriteBinlogEvent writes a binlog event as part of a replication stream
82+
// https://dev.mysql.com/doc/internals/en/binlog-network-stream.html
83+
// https://dev.mysql.com/doc/internals/en/binlog-event.html
84+
func (c *Conn) WriteBinlogEvent(ev BinlogEvent, semiSyncEnabled bool) error {
85+
extraBytes := 1 // OK packet
86+
if semiSyncEnabled {
87+
extraBytes += 2
88+
}
89+
90+
// NOTE: The latest Vitess code has changed startEphemeralPacket to startEphemeralPacketWithHeader,
91+
// but we haven't ported that over yet, so instead, we use startEphemeralPacket and assign
92+
// 0 to pos to indicate no header was included.
93+
//data, pos := c.startEphemeralPacketWithHeader(len(ev.Bytes()) + extraBytes)
94+
95+
data, pos := c.startEphemeralPacket(len(ev.Bytes()) + extraBytes), 0
96+
pos = writeByte(data, pos, 0) // "OK" prefix
97+
if semiSyncEnabled {
98+
pos = writeByte(data, pos, 0xef) // semi sync indicator
99+
pos = writeByte(data, pos, 0) // no ack expected
100+
}
101+
_ = writeEOFString(data, pos, string(ev.Bytes()))
102+
if err := c.writeEphemeralPacket(); err != nil {
103+
return NewSQLError(CRServerGone, SSUnknownSQLState, "%v", err)
104+
}
105+
return nil
106+
}
107+

0 commit comments

Comments
 (0)