Skip to content

Commit 4c27ea8

Browse files
authored
VReplication: Support passing VStream filters down to MySQL (#17677)
Signed-off-by: Matt Lord <[email protected]>
1 parent 8fc9801 commit 4c27ea8

File tree

8 files changed

+302
-73
lines changed

8 files changed

+302
-73
lines changed

go/test/endtoend/vreplication/config_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ import (
4545
// default collation as it has to work across versions and the 8.0 default does not exist in 5.7.
4646
var (
4747
// All standard user tables should have a primary key and at least one secondary key.
48-
customerTypes = []string{"'individual'", "'soho'", "'enterprise'"}
49-
customerTableTemplate = `create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_bin, meta json default null,
48+
customerTypes = []string{"'individual'", "'soho'", "'enterprise'"}
49+
// We use utf8mb4_general_ci so that we can test with 5.7 and 8.0+.
50+
customerTableTemplate = `create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_general_ci, meta json default null,
5051
industryCategory varchar(100) generated always as (json_extract(meta, _utf8mb4'$.industry')) virtual, typ enum(%s),
5152
sport set('football','cricket','baseball'), ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00',
5253
date2 datetime not null default '2021-00-01 00:00:00', dec80 decimal(8,0), blb blob, primary key(%s), key(name)) CHARSET=utf8mb4`

go/test/endtoend/vreplication/vstream_test.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1090,3 +1090,137 @@ func TestVStreamHeartbeats(t *testing.T) {
10901090
})
10911091
}
10921092
}
1093+
1094+
// TestVStreamPushdownFilters confirms that pushdown filters are applied correctly
1095+
// when they are specified in the VStream API via the rule.Filter.
1096+
// It also confirms that we use the proper collation for the VStream filter when
1097+
// using VARCHAR fields.
1098+
func TestVStreamPushdownFilters(t *testing.T) {
1099+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1100+
defer cancel()
1101+
setSidecarDBName("_vt")
1102+
config := *mainClusterConfig
1103+
vc = NewVitessCluster(t, &clusterOptions{
1104+
clusterConfig: &config,
1105+
})
1106+
defer vc.TearDown()
1107+
require.NotNil(t, vc)
1108+
ks := "product"
1109+
shard := "0"
1110+
defaultCell := vc.Cells[vc.CellNames[0]]
1111+
1112+
_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
1113+
require.NoError(t, err)
1114+
verifyClusterHealth(t, vc)
1115+
insertInitialData(t)
1116+
1117+
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
1118+
defer vtgateConn.Close()
1119+
1120+
// Make sure that we get at least one paul row event in the copy phase.
1121+
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('PAUĹ')", ks), 1, false)
1122+
require.NoError(t, err)
1123+
res, err := vtgateConn.ExecuteFetch(fmt.Sprintf("select count(*) from %s.customer where name = 'pauĺ'", ks), 1, false)
1124+
require.NoError(t, err)
1125+
require.Len(t, res.Rows, 1)
1126+
startingPauls, err := res.Rows[0][0].ToInt()
1127+
require.NoError(t, err)
1128+
1129+
// Coordinate go-routines.
1130+
streamCtx, streamCancel := context.WithTimeout(context.Background(), 1*time.Minute)
1131+
defer streamCancel()
1132+
done := make(chan struct{})
1133+
1134+
// First goroutine that keeps inserting rows into the table being streamed until the
1135+
// stream context is cancelled.
1136+
createdPauls := startingPauls
1137+
createdNonPauls := 0
1138+
go func() {
1139+
id := 1
1140+
for {
1141+
select {
1142+
case <-streamCtx.Done():
1143+
// Give the VStream a little catch-up time before telling it to stop
1144+
// via the done channel.
1145+
time.Sleep(10 * time.Second)
1146+
close(done)
1147+
return
1148+
default:
1149+
if id%10 == 0 {
1150+
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('paÜl')", ks), 1, false)
1151+
require.NoError(t, err)
1152+
createdPauls++
1153+
} else {
1154+
insertRow(ks, "customer", id)
1155+
createdNonPauls++
1156+
}
1157+
time.Sleep(10 * time.Millisecond)
1158+
id++
1159+
}
1160+
}
1161+
}()
1162+
1163+
vgtid := &binlogdatapb.VGtid{
1164+
ShardGtids: []*binlogdatapb.ShardGtid{{
1165+
Keyspace: ks,
1166+
Shard: shard,
1167+
Gtid: "",
1168+
}}}
1169+
1170+
filter := &binlogdatapb.Filter{
1171+
Rules: []*binlogdatapb.Rule{{
1172+
Match: "customer",
1173+
Filter: "select * from customer where name = 'påul'",
1174+
}},
1175+
}
1176+
flags := &vtgatepb.VStreamFlags{}
1177+
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
1178+
require.NoError(t, err)
1179+
defer vstreamConn.Close()
1180+
1181+
// So we should have at least one paul row event in the copy phase.
1182+
copyPhaseRowEvents := 0
1183+
// And we should have many paul row events in the running phase.
1184+
runningPhaseRowEvents := 0
1185+
copyPhase := true
1186+
1187+
func() {
1188+
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
1189+
require.NoError(t, err)
1190+
for {
1191+
evs, err := reader.Recv()
1192+
1193+
switch err {
1194+
case nil:
1195+
for _, ev := range evs {
1196+
switch ev.Type {
1197+
case binlogdatapb.VEventType_COPY_COMPLETED:
1198+
copyPhase = false
1199+
case binlogdatapb.VEventType_ROW:
1200+
if copyPhase {
1201+
copyPhaseRowEvents++
1202+
} else {
1203+
runningPhaseRowEvents++
1204+
}
1205+
}
1206+
}
1207+
default:
1208+
require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err))
1209+
}
1210+
select {
1211+
case <-done:
1212+
return
1213+
default:
1214+
}
1215+
}
1216+
}()
1217+
1218+
require.NotZero(t, createdPauls)
1219+
require.NotZero(t, createdNonPauls)
1220+
require.Greater(t, createdNonPauls, createdPauls)
1221+
require.NotZero(t, copyPhaseRowEvents)
1222+
require.NotZero(t, runningPhaseRowEvents)
1223+
1224+
t.Logf("Created pauls: %d, pauls copied: %d, pauls replicated: %d", createdPauls, copyPhaseRowEvents, runningPhaseRowEvents)
1225+
require.Equal(t, createdPauls, copyPhaseRowEvents+runningPhaseRowEvents)
1226+
}

go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
213213
defer func() { waitRetryTime = savedWaitRetryTime }()
214214

215215
execStatements(t, []string{
216-
"create table src(idc varchar(20), val int, primary key(idc)) character set utf8mb3", // Use utf8mb3 to get a consistent default collation across MySQL versions
216+
"create table src(idc varchar(20), val int, primary key(idc)) character set utf8mb4 collate utf8mb4_general_ci", // Use general_ci so that we have the same behavior across 5.7 and 8.0
217217
"insert into src values('a', 1), ('c', 2)",
218218
fmt.Sprintf("create table %s.dst(idc varchar(20), val int, primary key(idc))", vrepldb),
219219
})
@@ -282,7 +282,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
282282
"/update _vt.vreplication set state='Copying'",
283283
// Copy mode.
284284
"insert into dst(idc,val) values ('a',1)",
285-
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:"a"}'.*`,
285+
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:"a"}'.*`,
286286
// Copy-catchup mode.
287287
`/insert into dst\(idc,val\) select 'B', 3 from dual where \( .* 'B' COLLATE .* \) <= \( .* 'a' COLLATE .* \)`,
288288
).Then(func(expect qh.ExpectationSequencer) qh.ExpectationSequencer {
@@ -292,11 +292,11 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
292292
// upd1 := expect.
293293
upd1 := expect.Then(qh.Eventually(
294294
"insert into dst(idc,val) values ('B',3)",
295-
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:"B"}'.*`,
295+
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:"B"}'.*`,
296296
))
297297
upd2 := expect.Then(qh.Eventually(
298298
"insert into dst(idc,val) values ('c',2)",
299-
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:"c"}'.*`,
299+
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:"c"}'.*`,
300300
))
301301
upd1.Then(upd2.Eventually())
302302
return upd2

go/vt/vttablet/tabletserver/vstreamer/copy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
212212
lastPK := getLastPKFromQR(uvs.plans[tableName].tablePK.Lastpk)
213213
filter := uvs.plans[tableName].rule.Filter
214214

215-
log.Infof("Starting copyTable for %s, PK %v", tableName, lastPK)
215+
log.Infof("Starting copyTable for %s, Filter: %s, LastPK: %v", tableName, filter, lastPK)
216216
uvs.sendTestEvent(fmt.Sprintf("Copy Start %s", tableName))
217217

218218
err := uvs.vse.StreamRows(ctx, filter, lastPK, func(rows *binlogdatapb.VStreamRowsResponse) error {

go/vt/vttablet/tabletserver/vstreamer/planbuilder.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,16 @@ type Plan struct {
5858
// of the table.
5959
Filters []Filter
6060

61+
// Predicates in the Filter query that we can push down to MySQL
62+
// to reduce the returned rows we need to filter in the VStreamer
63+
// during the copy phase. This will contain any valid expressions
64+
// in the Filter's WHERE clause with the exception of the
65+
// in_keyrange() function which is a filter that must be applied
66+
// by the VStreamer (it's not a valid MySQL function). Note that
67+
// the Filter cannot contain any MySQL functions because the
68+
// VStreamer cannot filter binlog events using them.
69+
whereExprsToPushDown []sqlparser.Expr
70+
6171
// Convert any integer values seen in the binlog events for ENUM or SET
6272
// columns to the string values. The map is keyed on the column number, with
6373
// the value being the map of ordinal values to string values.
@@ -564,6 +574,7 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
564574
if where == nil {
565575
return nil
566576
}
577+
// Only a series of AND expressions are supported.
567578
exprs := splitAndExpression(nil, where.Expr)
568579
for _, expr := range exprs {
569580
switch expr := expr.(type) {
@@ -601,10 +612,6 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
601612
if !ok {
602613
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
603614
}
604-
// StrVal is varbinary, we do not support varchar since we would have to implement all collation types
605-
if val.Type != sqlparser.IntVal && val.Type != sqlparser.StrVal {
606-
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
607-
}
608615
pv, err := evalengine.Translate(val, &evalengine.Config{
609616
Collation: plan.env.CollationEnv().DefaultConnectionCharset(),
610617
Environment: plan.env,
@@ -622,7 +629,11 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
622629
ColNum: colnum,
623630
Value: resolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()),
624631
})
632+
// Add it to the expressions that get pushed down to mysqld.
633+
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
625634
case *sqlparser.FuncExpr:
635+
// We cannot filter binlog events in VStreamer using MySQL functions so
636+
// we only allow the in_keyrange() function, which is VStreamer specific.
626637
if !expr.Name.EqualString("in_keyrange") {
627638
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
628639
}
@@ -648,6 +659,8 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
648659
Opcode: IsNotNull,
649660
ColNum: colnum,
650661
})
662+
// Add it to the expressions that get pushed down to mysqld.
663+
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
651664
default:
652665
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
653666
}

0 commit comments

Comments
 (0)