Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,7 @@ func testScheduler(t *testing.T) {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusCancelled)
})

// now, we submit the exact same migratoin again: same UUID, same migration context.
// now, we submit the exact same migration again: same UUID, same migration context.
t.Run("resubmit migration", func(t *testing.T) {
executedUUID := testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtctl", "", "", true)) // skip wait
require.Equal(t, uuid, executedUUID)
Expand Down
37 changes: 32 additions & 5 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ func (e *Executor) primaryPosition(ctx context.Context) (pos replication.Positio
}

// terminateVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration
func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) error {
func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string, deleteEntry bool) error {
tablet, err := e.ts.GetTablet(ctx, e.tabletAlias)
if err != nil {
return err
Expand All @@ -621,10 +621,26 @@ func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) err
if _, err := e.vreplicationExec(ctx, tablet.Tablet, query); err != nil {
log.Errorf("FAIL vreplicationExec: uuid=%s, query=%v, error=%v", uuid, query, err)
}
if deleteEntry {
if err := e.deleteVReplicationEntry(ctx, uuid); err != nil {
return err
}
}

if err := e.deleteVReplicationEntry(ctx, uuid); err != nil {
return nil
}

func (e *Executor) startVreplication(ctx context.Context, tablet *topodatapb.Tablet, workflow string) (err error) {
query, err := sqlparser.ParseAndBind(sqlStartVReplStream,
sqltypes.StringBindVariable(e.dbName),
sqltypes.StringBindVariable(workflow),
)
if err != nil {
return err
}
if _, err := e.vreplicationExec(ctx, tablet, query); err != nil {
return vterrors.Wrapf(err, "FAIL vreplicationExec: uuid=%s, query=%v", workflow, query)
}
return nil
}

Expand Down Expand Up @@ -1071,6 +1087,16 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
}
go log.Infof("cutOverVReplMigration %v: stopped vreplication", s.workflow)

defer func() {
if !renameWasSuccessful {
// Restarting vreplication
if err := e.startVreplication(ctx, tablet.Tablet, s.workflow); err != nil {
log.Errorf("cutOverVReplMigration %v: failed restarting vreplication after cutover failure: %v", s.workflow, err)
}
go log.Infof("cutOverVReplMigration %v: started vreplication after cutover failure", s.workflow)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why to start a goroutine for calling in log function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because in this critical section I don't want to block on writing a log line, as silly as it may be.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shlomi-noach Is it then essential to log at all? Starting a goroutine also has some cost 😄.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbussink , yeah, I'd like this entry to appear in the logs. If it appears, it means vreplication has definitely started, which is an important info

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not be the only go log.Infof call in the cutOverVReplMigration function. There's a bunch of other such calls.

}
}()

// rename tables atomically (remember, writes on source tables are stopped)
{
if isVreplicationTestSuite {
Expand Down Expand Up @@ -1345,7 +1371,7 @@ func (e *Executor) initVreplicationRevertMigration(ctx context.Context, onlineDD
// ExecuteWithVReplication sets up the grounds for a vreplication schema migration
func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schema.OnlineDDL, revertMigration *schema.OnlineDDL) error {
// make sure there's no vreplication workflow running under same name
_ = e.terminateVReplMigration(ctx, onlineDDL.UUID)
_ = e.terminateVReplMigration(ctx, onlineDDL.UUID, true)

if e.tabletTypeFunc() != topodatapb.TabletType_PRIMARY {
return ErrExecutorNotWritableTablet
Expand Down Expand Up @@ -1506,7 +1532,7 @@ func (e *Executor) terminateMigration(ctx context.Context, onlineDDL *schema.Onl
// migration could have started by a different tablet. We need to actively verify if it is running
s, _ := e.readVReplStream(ctx, onlineDDL.UUID, true)
foundRunning = (s != nil && s.isRunning())
if err := e.terminateVReplMigration(ctx, onlineDDL.UUID); err != nil {
if err := e.terminateVReplMigration(ctx, onlineDDL.UUID, false); err != nil {
return foundRunning, fmt.Errorf("Error terminating migration, vreplication exec error: %+v", err)
}
}
Expand Down Expand Up @@ -3137,6 +3163,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
cancellable = append(cancellable, newCancellableMigration(uuid, s.message))
}
if !s.isRunning() {
log.Infof("migration %s in 'running' state but vreplication state is '%s'", uuid, s.state.String())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some added visibility.

return nil
}
// This VRepl migration may have started from outside this tablet, so
Expand Down Expand Up @@ -4128,7 +4155,7 @@ func (e *Executor) ForceCutOverPendingMigrations(ctx context.Context) (result *s
if err != nil {
return result, err
}
log.Infof("ForceCutOverPendingMigrations: iterating %v migrations %s", len(uuids))
log.Infof("ForceCutOverPendingMigrations: iterating %v migrations", len(uuids))

result = &sqltypes.Result{}
for _, uuid := range uuids {
Expand Down
Loading