Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
create table if not exists t1(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
Copyright 2025 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package transactiontimeout

import (
"context"
_ "embed"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
uks = "uks"
cell = "test_misc"

//go:embed uschema.sql
uschemaSQL string
)

func createCluster(t *testing.T, vttabletArgs ...string) func() {
clusterInstance = cluster.NewCluster(cell, "localhost")

err := clusterInstance.StartTopo()
require.NoError(t, err)

clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, vttabletArgs...)

ukeyspace := &cluster.Keyspace{
Name: uks,
SchemaSQL: uschemaSQL,
}
err = clusterInstance.StartUnshardedKeyspace(*ukeyspace, 0, false)
require.NoError(t, err)

err = clusterInstance.StartVtgate()
require.NoError(t, err)

vtParams = clusterInstance.GetVTParams(uks)

_, closer, err := utils.NewMySQL(clusterInstance, uks, uschemaSQL)
require.NoError(t, err)

return func() {
clusterInstance.Teardown()
closer()
}
}

func TestTransactionTimeout(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 21, "vttablet")

// Start cluster with no vtgate or vttablet timeouts
teardown := createCluster(t)
defer teardown()

conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer conn.Close()

// No timeout set, transaction shouldn't timeout
utils.Exec(t, conn, "begin")
utils.Exec(t, conn, "insert into uks.unsharded(id1) values (1),(2),(3),(4),(sleep(0.5))")
utils.Exec(t, conn, "commit")

// Set session transaction timeout
utils.Exec(t, conn, "set transaction_timeout=100")

// Sleeping outside of query will allow the transaction killer to kill the transaction
utils.Exec(t, conn, "begin")
utils.Exec(t, conn, "insert into uks.unsharded(id1) values (1),(2),(3),(4),(5)")
time.Sleep(3 * time.Second)
_, err = utils.ExecAllowError(t, conn, "insert into uks.unsharded(id1) values (1),(2),(3),(4),(5)")
require.ErrorContains(t, err, "Aborted")

// Sleeping in MySQL will cause a context timeout instead (different error)
utils.Exec(t, conn, "begin")
utils.Exec(t, conn, "insert into uks.unsharded(id1) values (1),(2),(3),(4),(5)")
_, err = utils.ExecAllowError(t, conn, "insert into uks.unsharded(id1) values (1),(2),(3),(4),(sleep(0.5))")
require.ErrorContains(t, err, "Query execution was interrupted")

// Get new connection
conn, err = mysql.Connect(context.Background(), &vtParams)

Check failure on line 104 in go/test/endtoend/vtgate/queries/transaction_timeout/transaction_timeout_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

ineffectual assignment to err (ineffassign)

// Set session transaction timeout to 0
utils.Exec(t, conn, "set transaction_timeout=0")

// Should time out using tablet transaction timeout
utils.Exec(t, conn, "begin")
utils.Exec(t, conn, "insert into uks.unsharded(id1) values (1),(2),(3),(4),(5)")
utils.Exec(t, conn, "insert into uks.unsharded(id1) values (1),(2),(3),(4),(sleep(2))")
utils.Exec(t, conn, "commit")
}

func TestSmallerTimeout(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 21, "vttablet")

// Start vttablet with a transaction timeout
teardown := createCluster(t, "--queryserver-config-transaction-timeout", "1s")
defer teardown()

conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)

// Set session transaction timeout larger than tablet transaction timeout
utils.Exec(t, conn, "set transaction_timeout=2000")

// Transaction should get killed with lower timeout
utils.Exec(t, conn, "begin")
utils.Exec(t, conn, "insert into uks.unsharded(id1) values (1),(2),(3),(4),(5)")
time.Sleep(1500 * time.Millisecond)
_, err = utils.ExecAllowError(t, conn, "insert into uks.unsharded(id1) values (1),(2),(3),(4),(5)")
require.ErrorContains(t, err, "Aborted")

// Set session transaction timeout smaller than tablet transaction timeout
utils.Exec(t, conn, "set transaction_timeout=250")

// Session timeout should be used this time
utils.Exec(t, conn, "begin")
utils.Exec(t, conn, "insert into uks.unsharded(id1) values (1),(2),(3),(4),(5)")
time.Sleep(500 * time.Millisecond)
_, err = utils.ExecAllowError(t, conn, "insert into uks.unsharded(id1) values (1),(2),(3),(4),(5)")
require.ErrorContains(t, err, "Aborted")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
create table unsharded(
id1 bigint,
id2 bigint,
key(id1)
) Engine=InnoDB;
21 changes: 16 additions & 5 deletions go/vt/proto/query/query.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go/vt/sqlparser/normalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,7 @@ func (nz *normalizer) sysVarRewrite(cursor *Cursor, node *Variable) {
sysvars.Version.Name,
sysvars.VersionComment.Name,
sysvars.QueryTimeout.Name,
sysvars.TransactionTimeout.Name,
sysvars.Workload.Name:
found = true
}
Expand Down
9 changes: 8 additions & 1 deletion go/vt/sqlparser/normalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ type myTestCase struct {
ddlStrategy, migrationContext, sessionUUID, sessionEnableSystemSettings bool
udv int
autocommit, foreignKeyChecks, clientFoundRows, skipQueryPlanCache, socket, queryTimeout bool
sqlSelectLimit, transactionMode, workload, version, versionComment bool
sqlSelectLimit, transactionMode, workload, version, versionComment, transactionTimeout bool
}

func TestRewrites(in *testing.T) {
Expand All @@ -603,6 +603,10 @@ func TestRewrites(in *testing.T) {
in: "SELECT @@query_timeout",
expected: "SELECT :__vtquery_timeout as `@@query_timeout`",
queryTimeout: true,
}, {
in: "SELECT @@transaction_timeout",
expected: "SELECT :__vttransaction_timeout as `@@transaction_timeout`",
transactionTimeout: true,
}, {
in: "SELECT @@version_comment",
expected: "SELECT :__vtversion_comment as `@@version_comment`",
Expand Down Expand Up @@ -862,6 +866,7 @@ func TestRewrites(in *testing.T) {
sessTrackGTID: true,
socket: true,
queryTimeout: true,
transactionTimeout: true,
}, {
in: "SHOW GLOBAL VARIABLES",
expected: "SHOW GLOBAL VARIABLES",
Expand All @@ -883,6 +888,7 @@ func TestRewrites(in *testing.T) {
sessTrackGTID: true,
socket: true,
queryTimeout: true,
transactionTimeout: true,
}}
parser := NewTestParser()
for _, tc := range tests {
Expand Down Expand Up @@ -924,6 +930,7 @@ func TestRewrites(in *testing.T) {
assert.Equal(tc.transactionMode, result.NeedsSysVar(sysvars.TransactionMode.Name), "should need :__vttransactionMode")
assert.Equal(tc.workload, result.NeedsSysVar(sysvars.Workload.Name), "should need :__vtworkload")
assert.Equal(tc.queryTimeout, result.NeedsSysVar(sysvars.QueryTimeout.Name), "should need :__vtquery_timeout")
assert.Equal(tc.transactionTimeout, result.NeedsSysVar(sysvars.TransactionTimeout.Name), "should need :__vttransaction_timeout")
assert.Equal(tc.ddlStrategy, result.NeedsSysVar(sysvars.DDLStrategy.Name), "should need ddlStrategy")
assert.Equal(tc.migrationContext, result.NeedsSysVar(sysvars.MigrationContext.Name), "should need migrationContext")
assert.Equal(tc.sessionUUID, result.NeedsSysVar(sysvars.SessionUUID.Name), "should need sessionUUID")
Expand Down
2 changes: 2 additions & 0 deletions go/vt/sysvars/sysvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var (
TxReadOnly = SystemVariable{Name: "tx_read_only", IsBoolean: true, Default: off}
Workload = SystemVariable{Name: "workload", IdentifierAsString: true}
QueryTimeout = SystemVariable{Name: "query_timeout"}
TransactionTimeout = SystemVariable{Name: "transaction_timeout"}

// Online DDL
DDLStrategy = SystemVariable{Name: "ddl_strategy", IdentifierAsString: true}
Expand Down Expand Up @@ -109,6 +110,7 @@ var (
ReadAfterWriteTimeOut,
SessionTrackGTIDs,
QueryTimeout,
TransactionTimeout,
}

ReadOnly = []SystemVariable{
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ func (t *noopVCursor) SetClientFoundRows(context.Context, bool) error {
func (t *noopVCursor) SetQueryTimeout(maxExecutionTime int64) {
}

func (t *noopVCursor) SetTransactionTimeout(timeout int64) {}

func (t *noopVCursor) SetSkipQueryPlanCache(context.Context, bool) error {
panic("implement me")
}
Expand Down Expand Up @@ -414,6 +416,7 @@ func (t *noopVCursor) DisableLogging() {}
func (t *noopVCursor) GetVExplainLogs() []ExecuteEntry {
return nil
}

func (t *noopVCursor) GetLogs() ([]ExecuteEntry, error) {
return nil, nil
}
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ type (
// SetQueryTimeout sets the query timeout
SetQueryTimeout(queryTimeout int64)

// SetTransactionTimeout sets the transaction timeout.
SetTransactionTimeout(transactionTimeout int64)

// InTransaction returns true if the session has already opened transaction or
// will start a transaction on the query execution.
InTransaction() bool
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/engine/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,12 @@ func (svss *SysVarSetAware) Execute(ctx context.Context, vcursor VCursor, env *e
return err
}
vcursor.Session().SetQueryTimeout(queryTimeout)
case sysvars.TransactionTimeout.Name:
transactionTimeout, err := svss.evalAsInt64(env, vcursor)
if err != nil {
return err
}
vcursor.Session().SetTransactionTimeout(transactionTimeout)
case sysvars.SessionEnableSystemSettings.Name:
err = svss.setBoolSysVar(ctx, env, vcursor.Session().SetSessionEnableSystemSettings)
case sysvars.Charset.Name, sysvars.Names.Name:
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,12 @@ func (e *Executor) addNeededBindVars(vcursor *econtext.VCursorImpl, bindVarNeeds
bindVars[key] = sqltypes.BoolBindVariable(session.Autocommit)
case sysvars.QueryTimeout.Name:
bindVars[key] = sqltypes.Int64BindVariable(session.GetQueryTimeout())
case sysvars.TransactionTimeout.Name:
var v int64
ifOptionsExist(session, func(options *querypb.ExecuteOptions) {
v = options.GetTransactionTimeout()
})
bindVars[key] = sqltypes.Int64BindVariable(v)
case sysvars.ClientFoundRows.Name:
var v bool
ifOptionsExist(session, func(options *querypb.ExecuteOptions) {
Expand Down
Loading
Loading