Skip to content

Commit 8f48378

Browse files
authored
Add forward compatibility for caching_sha2_password and replication (#18033)
1 parent f9643b6 commit 8f48378

File tree

10 files changed

+8088
-7993
lines changed

10 files changed

+8088
-7993
lines changed

go/mysql/flavor_mysql.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,8 @@ func (mysqlFlavor) setReplicationSourceCommand(params *ConnParams, host string,
562562
}
563563
if params.SslEnabled() {
564564
args = append(args, "SOURCE_SSL = 1")
565+
} else {
566+
args = append(args, "GET_SOURCE_PUBLIC_KEY = 1")
565567
}
566568
if params.SslCa != "" {
567569
args = append(args, fmt.Sprintf("SOURCE_SSL_CA = '%s'", params.SslCa))
@@ -603,7 +605,7 @@ func (mysqlFlavor) catchupToGTIDCommands(params *ConnParams, replPos replication
603605
cmds = append(cmds, cmd+";")
604606
} else {
605607
// No TLS
606-
cmds = append(cmds, fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='%s', SOURCE_PASSWORD='%s', SOURCE_AUTO_POSITION=1;", params.Host, params.Port, params.Uname, params.Pass))
608+
cmds = append(cmds, fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='%s', SOURCE_PASSWORD='%s', GET_SOURCE_PUBLIC_KEY=1, SOURCE_AUTO_POSITION=1;", params.Host, params.Port, params.Uname, params.Pass))
607609
}
608610

609611
if replPos.IsZero() { // when the there is no afterPos, that means need to replicate completely

go/mysql/flavor_mysql_legacy.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ func (mysqlFlavorLegacy) catchupToGTIDCommands(params *ConnParams, replPos repli
212212
cmds = append(cmds, cmd+";")
213213
} else {
214214
// No TLS
215-
cmds = append(cmds, fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='%s', MASTER_PASSWORD='%s', MASTER_AUTO_POSITION=1;", params.Host, params.Port, params.Uname, params.Pass))
215+
cmds = append(cmds, fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='%s', MASTER_PASSWORD='%s', GET_MASTER_PUBLIC_KEY=1, MASTER_AUTO_POSITION=1;", params.Host, params.Port, params.Uname, params.Pass))
216216
}
217217

218218
if replPos.IsZero() { // when the there is no afterPos, that means need to replicate completely
@@ -233,6 +233,8 @@ func (mysqlFlavorLegacy) setReplicationSourceCommand(params *ConnParams, host st
233233
}
234234
if params.SslEnabled() {
235235
args = append(args, "MASTER_SSL = 1")
236+
} else {
237+
args = append(args, "GET_MASTER_PUBLIC_KEY = 1")
236238
}
237239
if params.SslCa != "" {
238240
args = append(args, fmt.Sprintf("MASTER_SSL_CA = '%s'", params.SslCa))

go/mysql/flavor_mysql_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ func TestMysql8SetReplicationSourceCommand(t *testing.T) {
3838
SOURCE_USER = 'username',
3939
SOURCE_PASSWORD = 'password',
4040
SOURCE_CONNECT_RETRY = 1234,
41+
GET_SOURCE_PUBLIC_KEY = 1,
4142
SOURCE_AUTO_POSITION = 1`
4243

4344
conn := &Conn{flavor: mysqlFlavor8{}}
@@ -51,6 +52,7 @@ func TestMysql8SetReplicationSourceCommand(t *testing.T) {
5152
SOURCE_USER = 'username',
5253
SOURCE_PASSWORD = 'password',
5354
SOURCE_CONNECT_RETRY = 1234,
55+
GET_SOURCE_PUBLIC_KEY = 1,
5456
SOURCE_HEARTBEAT_PERIOD = 5.4,
5557
SOURCE_AUTO_POSITION = 1`
5658

go/test/endtoend/backup/vtctlbackup/backup_utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1153,7 +1153,7 @@ func GetReplicaGtidPurged(t *testing.T, replicaIndex int) string {
11531153
}
11541154

11551155
func ReconnectReplicaToPrimary(t *testing.T, replicaIndex int) {
1156-
query := fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='localhost', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", primary.MySQLPort)
1156+
query := fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='localhost', SOURCE_PORT=%d, SOURCE_USER='vt_repl', GET_SOURCE_PUBLIC_KEY = 1, SOURCE_AUTO_POSITION = 1", primary.MySQLPort)
11571157
replica := getReplica(t, replicaIndex)
11581158
_, err := replica.VttabletProcess.QueryTablet("stop replica", keyspaceName, true)
11591159
require.NoError(t, err)

go/test/endtoend/reparent/plannedreparent/reparent_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus
236236
resetCmd,
237237
"RESET REPLICA",
238238
fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID),
239-
fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort),
239+
fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', GET_SOURCE_PUBLIC_KEY = 1, SOURCE_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort),
240240
}
241241
utils.RunSQLs(ctx, t, changeReplicationSourceCommands, tablets[0])
242242

@@ -250,7 +250,7 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus
250250
"STOP REPLICA",
251251
resetCmd,
252252
fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID),
253-
fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort),
253+
fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', GET_SOURCE_PUBLIC_KEY = 1, SOURCE_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort),
254254
"START REPLICA",
255255
}
256256
utils.RunSQLs(ctx, t, changeReplicationSourceCommands, tablets[2])

go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func failoverExternalReparenting(t *testing.T, clusterInstance *cluster.LocalPro
7878
"STOP REPLICA",
7979
resetCmd,
8080
fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID),
81-
fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", "localhost", newPrimary.MySQLPort),
81+
fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', GET_SOURCE_PUBLIC_KEY = 1, SOURCE_AUTO_POSITION = 1", "localhost", newPrimary.MySQLPort),
8282
"START REPLICA",
8383
}
8484
err = oldPrimary.VttabletProcess.QueryTabletMultiple(changeSourceCommands, keyspaceUnshardedName, true)

go/test/endtoend/tabletmanager/tablet_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func TestResetReplicationParameters(t *testing.T) {
8181
require.NoError(t, err)
8282

8383
// Set a replication source on the tablet and start replication
84-
err = tablet.VttabletProcess.QueryTabletMultiple([]string{"stop replica", "change replication source to source_host = 'localhost', source_port = 123", "start replica"}, keyspaceName, false)
84+
err = tablet.VttabletProcess.QueryTabletMultiple([]string{"stop replica", "change replication source to source_host = 'localhost', source_port = 123, get_source_public_key = 1", "start replica"}, keyspaceName, false)
8585
require.NoError(t, err)
8686

8787
// Check the replica status.

go/test/endtoend/vtorc/general/vtorc_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ func TestVTOrcRepairs(t *testing.T) {
248248
changeReplicationSourceCommands := []string{
249249
"STOP REPLICA",
250250
"RESET REPLICA ALL",
251-
fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", utils.Hostname, otherReplica.MySQLPort),
251+
fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', GET_SOURCE_PUBLIC_KEY = 1, SOURCE_AUTO_POSITION = 1", utils.Hostname, otherReplica.MySQLPort),
252252
"START REPLICA",
253253
}
254254
err := utils.RunSQLs(t, changeReplicationSourceCommands, replica, "")
@@ -279,7 +279,7 @@ func TestVTOrcRepairs(t *testing.T) {
279279
changeReplicationSourceCommands := []string{
280280
"STOP REPLICA",
281281
"RESET REPLICA ALL",
282-
fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", replica.VttabletProcess.TabletHostname, replica.MySQLPort),
282+
fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', GET_SOURCE_PUBLIC_KEY = 1, SOURCE_AUTO_POSITION = 1", replica.VttabletProcess.TabletHostname, replica.MySQLPort),
283283
"START REPLICA",
284284
}
285285
err := utils.RunSQLs(t, changeReplicationSourceCommands, curPrimary, "")

0 commit comments

Comments
 (0)