Skip to content

Commit 488ef3d

Browse files
authored
VStream: Prevent buffering entire transactions (OOM risk), instead send chunks to client (#18849)
Signed-off-by: twthorn <[email protected]>
1 parent 8ce3eb2 commit 488ef3d

File tree

7 files changed

+398
-52
lines changed

7 files changed

+398
-52
lines changed

go/test/endtoend/vreplication/initial_data_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ import (
2020
"fmt"
2121
"math/rand/v2"
2222
"os"
23+
"strings"
2324
"testing"
2425

26+
"vitess.io/vitess/go/mysql"
2527
"vitess.io/vitess/go/vt/log"
2628
)
2729

@@ -43,6 +45,12 @@ func insertInitialData(t *testing.T) {
4345
`[[VARCHAR("Monoprice") VARCHAR("eléctronics")] [VARCHAR("newegg") VARCHAR("elec†ronics")]]`)
4446

4547
insertJSONValues(t)
48+
49+
insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs+":0", 50000)
50+
log.Infof("Inserted large transaction for chunking tests")
51+
52+
execVtgateQuery(t, vtgateConn, defaultSourceKs, "delete from customer where cid >= 50000 and cid < 50100")
53+
log.Infof("Cleaned up chunk testing rows from source keyspace")
4654
})
4755
}
4856

@@ -140,3 +148,15 @@ func insertIntoBlobTable(t *testing.T) {
140148
execVtgateQuery(t, vtgateConn, defaultSourceKs+":0", query)
141149
}
142150
}
151+
152+
// insertLargeTransactionForChunkTesting inserts a transaction large enough to exceed the 1KB chunking threshold.
153+
func insertLargeTransactionForChunkTesting(t *testing.T, vtgateConn *mysql.Conn, keyspace string, startID int) {
154+
execVtgateQuery(t, vtgateConn, keyspace, "BEGIN")
155+
for i := 0; i < 15; i++ {
156+
largeData := strings.Repeat("x", 94) + fmt.Sprintf("_%05d", i)
157+
query := fmt.Sprintf("INSERT INTO customer (cid, name) VALUES (%d, '%s')",
158+
startID+i, largeData)
159+
execVtgateQuery(t, vtgateConn, keyspace, query)
160+
}
161+
execVtgateQuery(t, vtgateConn, keyspace, "COMMIT")
162+
}

go/test/endtoend/vreplication/vstream_test.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
8080
},
8181
}
8282
flags := &vtgatepb.VStreamFlags{
83-
TablesToCopy: []string{"product", "customer"},
83+
TablesToCopy: []string{"product", "customer"},
84+
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
8485
}
8586
id := 0
8687
vtgateConn := vc.GetVTGateConn(t)
@@ -95,6 +96,8 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
9596
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id))
9697
}
9798

99+
insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 10000)
100+
98101
// Stream events from the VStream API
99102
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
100103
require.NoError(t, err)
@@ -151,6 +154,7 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
151154
stopInserting.Store(false)
152155
var insertMu sync.Mutex
153156
go func() {
157+
insertCount := 0
154158
for {
155159
if stopInserting.Load() {
156160
return
@@ -160,6 +164,10 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
160164
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
161165
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id))
162166
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id))
167+
insertCount++
168+
if insertCount%5 == 0 {
169+
insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 20000+insertCount*10)
170+
}
163171
insertMu.Unlock()
164172
}
165173
}()
@@ -239,7 +247,10 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
239247
Filter: "select * from customer",
240248
}},
241249
}
242-
flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600}
250+
flags := &vtgatepb.VStreamFlags{
251+
HeartbeatInterval: 3600,
252+
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
253+
}
243254
done := atomic.Bool{}
244255
done.Store(false)
245256

@@ -254,13 +265,18 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
254265

255266
// first goroutine that keeps inserting rows into table being streamed until some time elapses after second PRS
256267
go func() {
268+
insertCount := 0
257269
for {
258270
if stopInserting.Load() {
259271
return
260272
}
261273
insertMu.Lock()
262274
id++
263275
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
276+
insertCount++
277+
if insertCount%3 == 0 {
278+
insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 40000+insertCount*10)
279+
}
264280
insertMu.Unlock()
265281
}
266282
}()
@@ -441,7 +457,11 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID
441457
Filter: "select * from customer",
442458
}},
443459
}
444-
flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600, StopOnReshard: stopOnReshard}
460+
flags := &vtgatepb.VStreamFlags{
461+
HeartbeatInterval: 3600,
462+
StopOnReshard: stopOnReshard,
463+
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
464+
}
445465
done := false
446466

447467
id := 1000
@@ -581,7 +601,9 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven
581601
Match: "/customer.*/",
582602
}},
583603
}
584-
flags := &vtgatepb.VStreamFlags{}
604+
flags := &vtgatepb.VStreamFlags{
605+
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
606+
}
585607
done := false
586608

587609
id := 1000
@@ -765,6 +787,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
765787
}
766788
flags := &vtgatepb.VStreamFlags{
767789
IncludeReshardJournalEvents: true,
790+
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
768791
}
769792
journalEvents := 0
770793

@@ -962,7 +985,8 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
962985
}},
963986
}
964987
flags := &vtgatepb.VStreamFlags{
965-
StopOnReshard: true,
988+
StopOnReshard: true,
989+
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
966990
}
967991

968992
// Stream events but stop once we have a VGTID with positions for the old/original shards.
@@ -1233,6 +1257,7 @@ func TestVStreamHeartbeats(t *testing.T) {
12331257
name: "With Keyspace Heartbeats On",
12341258
flags: &vtgatepb.VStreamFlags{
12351259
StreamKeyspaceHeartbeats: true,
1260+
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
12361261
},
12371262
expectedHeartbeats: numExpectedHeartbeats,
12381263
},
@@ -1363,7 +1388,9 @@ func runVStreamAndGetNumOfRowEvents(t *testing.T, ctx context.Context, vstreamCo
13631388
vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, done chan struct{}) (copyPhaseRowEvents int, runningPhaseRowEvents int) {
13641389
copyPhase := true
13651390
func() {
1366-
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{})
1391+
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{
1392+
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
1393+
})
13671394
require.NoError(t, err)
13681395
for {
13691396
evs, err := reader.Recv()

go/vt/proto/vtgate/vtgate.pb.go

Lines changed: 18 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/vt/proto/vtgate/vtgate_vtproto.pb.go

Lines changed: 28 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)