Skip to content

Commit 489fd05

Browse files
jwangaceJun Wang
andauthored
--consolidator-query-waiter-cap to set the max number of waiter for consolidated query (#17244)
Signed-off-by: Jun Wang <[email protected]> Co-authored-by: Jun Wang <[email protected]>
1 parent de33a39 commit 489fd05

File tree

8 files changed

+59
-5
lines changed

8 files changed

+59
-5
lines changed

changelog/22.0/22.0.0/summary.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ The `querylog-mode` setting can be configured to `error` to log only queries tha
147147
While the flag will continue to accept float values (interpreted as seconds) for backward compatibility,
148148
**float inputs are deprecated** and will be removed in a future release.
149149

150+
- `--consolidator-query-waiter-cap` flag to set the maximum number of clients allowed to wait on the consolidator. The default value is set to 0 for unlimited wait. Users can adjust this value based on the performance of VTTablet to avoid excessive memory usage and the risk of being OOMKilled, particularly in Kubernetes deployments.
151+
150152
### <a id="topo-read-concurrency-changes"/>`--topo_read_concurrency` behaviour changes
151153

152154
The `--topo_read_concurrency` flag was added to all components that access the topology and the provided limit is now applied separately for each global or local cell _(default `32`)_.

go/flags/endtoend/vtcombo.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ Flags:
4444
--config-path strings Paths to search for config files in. (default [{{ .Workdir }}])
4545
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
4646
--config-type string Config file type (omit to infer config type from file extension).
47+
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
4748
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
4849
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)
4950
--consul_auth_static_file string JSON File to read the topos/tokens from.

go/flags/endtoend/vttablet.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ Flags:
7878
--config-path strings Paths to search for config files in. (default [{{ .Workdir }}])
7979
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
8080
--config-type string Config file type (omit to infer config type from file extension).
81+
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
8182
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
8283
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)
8384
--consul_auth_static_file string JSON File to read the topos/tokens from.

go/sync2/consolidator.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type PendingResult interface {
4040
SetResult(*sqltypes.Result)
4141
Result() *sqltypes.Result
4242
Wait()
43+
AddWaiterCounter(int64) *int64
4344
}
4445

4546
type consolidator struct {
@@ -77,6 +78,7 @@ func (co *consolidator) Create(query string) (PendingResult, bool) {
7778
defer co.mu.Unlock()
7879
var r *pendingResult
7980
if r, ok := co.queries[query]; ok {
81+
r.AddWaiterCounter(1)
8082
return r, false
8183
}
8284
r = &pendingResult{consolidator: co, query: query}
@@ -122,17 +124,23 @@ func (rs *pendingResult) Wait() {
122124
rs.executing.RLock()
123125
}
124126

127+
func (rs *pendingResult) AddWaiterCounter(c int64) *int64 {
128+
atomic.AddInt64(rs.consolidator.totalWaiterCount, c)
129+
return rs.consolidator.totalWaiterCount
130+
}
131+
125132
// ConsolidatorCache is a thread-safe object used for counting how often recent
126133
// queries have been consolidated.
127134
// It is also used by the txserializer package to count how often transactions
128135
// have been queued and had to wait because they targeted the same row (range).
129136
type ConsolidatorCache struct {
130137
*cache.LRUCache[*ccount]
138+
totalWaiterCount *int64
131139
}
132140

133141
// NewConsolidatorCache creates a new cache with the given capacity.
134142
func NewConsolidatorCache(capacity int64) *ConsolidatorCache {
135-
return &ConsolidatorCache{cache.NewLRUCache[*ccount](capacity)}
143+
return &ConsolidatorCache{cache.NewLRUCache[*ccount](capacity), new(int64)}
136144
}
137145

138146
// Record increments the count for "query" by 1.

go/sync2/consolidator_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,42 @@ package sync2
1818

1919
import (
2020
"reflect"
21+
"sync"
2122
"testing"
2223

2324
"vitess.io/vitess/go/sqltypes"
2425
)
2526

27+
func TestAddWaiterCount(t *testing.T) {
28+
con := NewConsolidator()
29+
sql := "select * from SomeTable"
30+
pr, _ := con.Create(sql)
31+
var wgAdd sync.WaitGroup
32+
var wgSub sync.WaitGroup
33+
34+
var concurrent = 1000
35+
36+
for i := 0; i < concurrent; i++ {
37+
wgAdd.Add(1)
38+
wgSub.Add(1)
39+
go func() {
40+
defer wgAdd.Done()
41+
pr.AddWaiterCounter(1)
42+
}()
43+
go func() {
44+
defer wgSub.Done()
45+
pr.AddWaiterCounter(-1)
46+
}()
47+
}
48+
49+
wgAdd.Wait()
50+
wgSub.Wait()
51+
52+
if *pr.AddWaiterCounter(0) != 0 {
53+
t.Fatalf("Expect 0 totalWaiterCount but got: %v", *pr.AddWaiterCounter(0))
54+
}
55+
}
56+
2657
func TestConsolidator(t *testing.T) {
2758
con := NewConsolidator()
2859
sql := "select * from SomeTable"

go/sync2/fake_consolidator.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,8 @@ func (fr *FakePendingResult) SetResult(result *sqltypes.Result) {
112112
func (fr *FakePendingResult) Wait() {
113113
fr.WaitCalls++
114114
}
115+
116+
// AddWaiterCounter is currently a no-op.
117+
func (fr *FakePendingResult) AddWaiterCounter(int64) *int64 {
118+
return new(int64)
119+
}

go/vt/vttablet/tabletserver/query_executor.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -718,10 +718,14 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
718718
q.SetErr(err)
719719
}
720720
} else {
721-
qre.logStats.QuerySources |= tabletenv.QuerySourceConsolidator
722-
startTime := time.Now()
723-
q.Wait()
724-
qre.tsv.stats.WaitTimings.Record("Consolidations", startTime)
721+
waiterCap := qre.tsv.config.ConsolidatorQueryWaiterCap
722+
if waiterCap == 0 || *q.AddWaiterCounter(0) <= waiterCap {
723+
qre.logStats.QuerySources |= tabletenv.QuerySourceConsolidator
724+
startTime := time.Now()
725+
q.Wait()
726+
qre.tsv.stats.WaitTimings.Record("Consolidations", startTime)
727+
}
728+
q.AddWaiterCounter(-1)
725729
}
726730
if q.Err() != nil {
727731
return nil, q.Err()

go/vt/vttablet/tabletserver/tabletenv/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
198198
fs.Int64Var(&currentConfig.ConsolidatorStreamQuerySize, "consolidator-stream-query-size", defaultConfig.ConsolidatorStreamQuerySize, "Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator.")
199199
fs.Int64Var(&currentConfig.ConsolidatorStreamTotalSize, "consolidator-stream-total-size", defaultConfig.ConsolidatorStreamTotalSize, "Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator.")
200200

201+
fs.Int64Var(&currentConfig.ConsolidatorQueryWaiterCap, "consolidator-query-waiter-cap", 0, "Configure the maximum number of clients allowed to wait on the consolidator.")
201202
fs.DurationVar(&healthCheckInterval, "health_check_interval", defaultConfig.Healthcheck.Interval, "Interval between health checks")
202203
fs.DurationVar(&degradedThreshold, "degraded_threshold", defaultConfig.Healthcheck.DegradedThreshold, "replication lag after which a replica is considered degraded")
203204
fs.DurationVar(&unhealthyThreshold, "unhealthy_threshold", defaultConfig.Healthcheck.UnhealthyThreshold, "replication lag after which a replica is considered unhealthy")
@@ -324,6 +325,7 @@ type TabletConfig struct {
324325
StreamBufferSize int `json:"streamBufferSize,omitempty"`
325326
ConsolidatorStreamTotalSize int64 `json:"consolidatorStreamTotalSize,omitempty"`
326327
ConsolidatorStreamQuerySize int64 `json:"consolidatorStreamQuerySize,omitempty"`
328+
ConsolidatorQueryWaiterCap int64 `json:"consolidatorMaxQueryWait,omitempty"`
327329
QueryCacheMemory int64 `json:"queryCacheMemory,omitempty"`
328330
QueryCacheDoorkeeper bool `json:"queryCacheDoorkeeper,omitempty"`
329331
SchemaReloadInterval time.Duration `json:"schemaReloadIntervalSeconds,omitempty"`

0 commit comments

Comments
 (0)