Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@ package materialize

import (
"fmt"
"strings"

"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common"
"vitess.io/vitess/go/mysql/config"
"vitess.io/vitess/go/vt/topo/topoproto"

vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

var (
updateOptions = struct {
AddReferenceTables []string
}{}

// base is the base command for all actions related to Materialize.
base = &cobra.Command{
Use: "Materialize --workflow <workflow> --target-keyspace <keyspace> [command] [command-flags]",
Expand All @@ -35,8 +42,42 @@ var (
Aliases: []string{"materialize"},
Args: cobra.ExactArgs(1),
}

// update is the command for updating existing materialize workflow.
// This can be helpful if we plan to add other actions as well such as
// removing tables from workflow.
update = &cobra.Command{
Use: "update --add-tables='table1,table2'",
Short: "Update existing materialize workflow.",
Aliases: []string{"Update"},
Args: cobra.NoArgs,
RunE: commandUpdate,
}
)

func commandUpdate(cmd *cobra.Command, args []string) error {
tableSettings := []*vtctldatapb.TableMaterializeSettings{}
for _, table := range updateOptions.AddReferenceTables {
tableSettings = append(tableSettings, &vtctldatapb.TableMaterializeSettings{
TargetTable: table,
})
}

_, err := common.GetClient().WorkflowAddTables(common.GetCommandCtx(), &vtctldatapb.WorkflowAddTablesRequest{
Workflow: common.BaseOptions.Workflow,
Keyspace: common.BaseOptions.TargetKeyspace,
TableSettings: tableSettings,
MaterializationIntent: vtctldatapb.MaterializationIntent_REFERENCE,
})

if err != nil {
return err
}
fmt.Printf("Table(s) %s added to the workflow %s. Use show to view the status.\n",
strings.Join(updateOptions.AddReferenceTables, ", "), common.BaseOptions.Workflow)
return nil
}

func registerCommands(root *cobra.Command) {
common.AddCommonFlags(base)
root.AddCommand(base)
Expand All @@ -54,6 +95,10 @@ func registerCommands(root *cobra.Command) {
create.Flags().StringSliceVarP(&common.CreateOptions.ReferenceTables, "reference-tables", "r", nil, "Used to specify the reference tables to materialize on every target shard.")
base.AddCommand(create)

update.Flags().StringSliceVar(&updateOptions.AddReferenceTables, "add-reference-tables", nil, "Used to specify the reference tables to be added to the existing workflow")
update.MarkFlagRequired("add-reference-tables")
base.AddCommand(update)

// Generic workflow commands.
opts := &common.SubCommandsOpts{
SubCommand: "Materialize",
Expand Down
67 changes: 67 additions & 0 deletions go/test/endtoend/vreplication/materialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,16 @@ const (
id2 bigint not null,
primary key (id)
) engine=InnoDB default charset=utf8mb4 collate=utf8mb4_unicode_ci;
create table ref3 (
id bigint not null,
id2 bigint not null,
primary key (id)
) engine=InnoDB default charset=utf8mb4 collate=utf8mb4_unicode_ci;
create table ref4 (
id bigint not null,
id2 bigint not null,
primary key (id)
) engine=InnoDB default charset=utf8mb4 collate=utf8mb4_unicode_ci;
`
refSourceVSchema = `
{
Expand All @@ -246,6 +256,12 @@ const (
"type": "reference"
},
"ref2": {
"type": "reference"
},
"ref3": {
"type": "reference"
},
"ref4": {
"type": "reference"
}
}
Expand All @@ -261,12 +277,22 @@ const (
"ref2": {
"type": "reference",
"source": "ks1.ref2"
},
"ref3": {
"type": "reference",
"source": "ks1.ref3"
},
"ref4": {
"type": "reference",
"source": "ks1.ref4"
}
}
}
`
initRef1DataQuery = `insert into ks1.ref1(id, val) values (1, 'abc'), (2, 'def'), (3, 'ghi')`
initRef2DataQuery = `insert into ks1.ref2(id, id2) values (1, 1), (2, 2), (3, 3)`
initRef3DataQuery = `insert into ks1.ref3(id, id2) values (1, 1), (2, 2), (3, 3), (4, 4)`
initRef4DataQuery = `insert into ks1.ref4(id, id2) values (1, 1), (2, 2), (3, 3)`
)

// TestReferenceTableMaterialize tests materializing reference tables.
Expand All @@ -287,6 +313,10 @@ func TestReferenceTableMaterialize(t *testing.T) {
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(initRef2DataQuery, 0, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(initRef3DataQuery, 0, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(initRef4DataQuery, 0, false)
require.NoError(t, err)

err = vc.VtctldClient.ExecuteCommand("Materialize", "--target-keyspace", "ks2", "--workflow", "wf1", "create",
"--source-keyspace", "ks1", "--reference-tables", "ref1,ref2")
Expand Down Expand Up @@ -322,4 +352,41 @@ func TestReferenceTableMaterialize(t *testing.T) {
waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref2", 4)
}
vdiff(t, "ks2", "wf1", defaultCellName, nil)

// Testing update with --add-reference-tables.
err = vc.VtctldClient.ExecuteCommand("Materialize", "--target-keyspace", "ks2", "--workflow", "wf1", "update",
"--add-reference-tables", "ref3,ref4")
require.NoError(t, err, "MaterializeAddTables")

for _, shard := range shards {
tab := vc.getPrimaryTablet(t, "ks2", shard)
catchup(t, tab, "wf1", "Materialize")
}

for _, shard := range shards {
waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref3", 4)
waitForQueryResult(t, vtgateConn, "ks2:"+shard, "select id, id2 from ref3",
`[[INT64(1) INT64(1)] [INT64(2) INT64(2)] [INT64(3) INT64(3)] [INT64(4) INT64(4)]]`)
waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref4", 3)
waitForQueryResult(t, vtgateConn, "ks2:"+shard, "select id, id2 from ref4",
`[[INT64(1) INT64(1)] [INT64(2) INT64(2)] [INT64(3) INT64(3)]]`)
}
vdiff(t, "ks2", "wf1", defaultCellName, nil)

queries = []string{
"update ks1.ref3 set id2=3 where id=2",
"update ks1.ref4 set id2=3 where id=2",
"delete from ks1.ref3 where id2=3",
"delete from ks1.ref4 where id2=3",
"insert into ks1.ref3(id, id2) values (3, 3)",
"insert into ks1.ref4(id, id2) values (3, 3), (4, 4)",
}
for _, query := range queries {
execVtgateQuery(t, vtgateConn, "ks1", query)
}
for _, shard := range shards {
waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref3", 3)
waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref4", 3)
}
vdiff(t, "ks2", "wf1", defaultCellName, nil)
}
Loading
Loading