Skip to content

Commit

Permalink
Cherry-pick 70114ad with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] authored and vitess-bot committed Feb 14, 2025
1 parent 57a3391 commit a964901
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 8 deletions.
72 changes: 72 additions & 0 deletions go/vt/topotools/mirror_rules.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topotools

import (
"context"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"

vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)

func GetMirrorRulesMap(rules *vschemapb.MirrorRules) map[string]map[string]float32 {

Check failure on line 28 in go/vt/topotools/mirror_rules.go

View workflow job for this annotation

GitHub Actions / Code Coverage

undefined: vschemapb.MirrorRules

Check failure on line 28 in go/vt/topotools/mirror_rules.go

View workflow job for this annotation

GitHub Actions / Code Coverage

undefined: vschemapb.MirrorRules
if rules == nil {
return nil
}
rulesMap := make(map[string]map[string]float32)
for _, mr := range rules.Rules {
if _, ok := rulesMap[mr.FromTable]; !ok {
rulesMap[mr.FromTable] = make(map[string]float32)
}
rulesMap[mr.FromTable][mr.ToTable] = mr.Percent
}
return rulesMap
}

// GetMirrorRules fetches mirror rules from the topology server and returns a
// mapping of fromTable=>toTable=>percent.
func GetMirrorRules(ctx context.Context, ts *topo.Server) (map[string]map[string]float32, error) {
mrs, err := ts.GetMirrorRules(ctx)

Check failure on line 45 in go/vt/topotools/mirror_rules.go

View workflow job for this annotation

GitHub Actions / Code Coverage

ts.GetMirrorRules undefined (type *topo.Server has no field or method GetMirrorRules)

Check failure on line 45 in go/vt/topotools/mirror_rules.go

View workflow job for this annotation

GitHub Actions / Code Coverage

ts.GetMirrorRules undefined (type *topo.Server has no field or method GetMirrorRules)
if err != nil {
return nil, err
}

rules := GetMirrorRulesMap(mrs)

return rules, nil
}

// SaveMirrorRules converts a mapping of fromTable=>[]toTables into a
// vschemapb.MirrorRules protobuf message and saves it in the topology.
func SaveMirrorRules(ctx context.Context, ts *topo.Server, rules map[string]map[string]float32) error {
log.V(2).Infof("Saving mirror rules %v\n", rules)

rrs := &vschemapb.MirrorRules{Rules: make([]*vschemapb.MirrorRule, 0)}

Check failure on line 60 in go/vt/topotools/mirror_rules.go

View workflow job for this annotation

GitHub Actions / Code Coverage

undefined: vschemapb.MirrorRules

Check failure on line 60 in go/vt/topotools/mirror_rules.go

View workflow job for this annotation

GitHub Actions / Code Coverage

undefined: vschemapb.MirrorRule

Check failure on line 60 in go/vt/topotools/mirror_rules.go

View workflow job for this annotation

GitHub Actions / Code Coverage

undefined: vschemapb.MirrorRules

Check failure on line 60 in go/vt/topotools/mirror_rules.go

View workflow job for this annotation

GitHub Actions / Code Coverage

undefined: vschemapb.MirrorRule
for fromTable, mrs := range rules {
for toTable, percent := range mrs {
rrs.Rules = append(rrs.Rules, &vschemapb.MirrorRule{

Check failure on line 63 in go/vt/topotools/mirror_rules.go

View workflow job for this annotation

GitHub Actions / Code Coverage

undefined: vschemapb.MirrorRule

Check failure on line 63 in go/vt/topotools/mirror_rules.go

View workflow job for this annotation

GitHub Actions / Code Coverage

undefined: vschemapb.MirrorRule
FromTable: fromTable,
Percent: percent,
ToTable: toTable,
})
}
}

return ts.SaveMirrorRules(ctx, rrs)

Check failure on line 71 in go/vt/topotools/mirror_rules.go

View workflow job for this annotation

GitHub Actions / Code Coverage

ts.SaveMirrorRules undefined (type *topo.Server has no field or method SaveMirrorRules)

Check failure on line 71 in go/vt/topotools/mirror_rules.go

View workflow job for this annotation

GitHub Actions / Code Coverage

ts.SaveMirrorRules undefined (type *topo.Server has no field or method SaveMirrorRules)
}
13 changes: 13 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3271,7 +3271,20 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
time.Sleep(lockTablesCycleDelay)
}
}
<<<<<<< HEAD

=======
// Get the source positions now that writes are stopped, the streams were stopped (e.g.
// intra-keyspace materializations that write on the source), and we know for certain
// that any in progress writes are done.
if err := ts.gatherSourcePositions(ctx); err != nil {
return handleError("failed to gather replication positions on migration sources", err)
}

if err := confirmKeyspaceLocksHeld(); err != nil {
return handleError("locks were lost", err)
}
>>>>>>> 70114ad687 (Multi-tenant workflow SwitchWrites: Don't add denied tables on cancelMigration() (#17782))
ts.Logger().Infof("Waiting for streams to catchup")
if err := sw.waitForCatchup(ctx, timeout); err != nil {
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
Expand Down
27 changes: 21 additions & 6 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,9 +1003,9 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
if ctx.Err() != nil {
// Even though we create a new context later on we still record any context error:
// for forensics in case of failures.
ts.Logger().Infof("In Cancel migration: original context invalid: %s", ctx.Err())
ts.Logger().Infof("cancelMigration (%v): original context invalid: %s", ts.WorkflowName(), ctx.Err())
}

ts.Logger().Infof("cancelMigration (%v): starting", ts.WorkflowName())
// We create a new context while canceling the migration, so that we are independent of the original
// context being canceled prior to or during the cancel operation itself.
// First we create a copy of the parent context, so that we maintain the locks, but which cannot be
Expand All @@ -1017,20 +1017,31 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
defer cmCancel()

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
<<<<<<< HEAD
err = ts.changeTableSourceWrites(cmCtx, allowWrites)
=======
if !ts.IsMultiTenantMigration() {
ts.Logger().Infof("cancelMigration (%v): adding denied tables to target", ts.WorkflowName())
err = ts.switchDeniedTables(cmCtx, true /* revert */)
} else {
ts.Logger().Infof("cancelMigration (%v): multi-tenant, not adding denied tables to target", ts.WorkflowName())
}
>>>>>>> 70114ad687 (Multi-tenant workflow SwitchWrites: Don't add denied tables on cancelMigration() (#17782))
} else {
ts.Logger().Infof("cancelMigration (%v): allowing writes on source shards", ts.WorkflowName())
err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
}
if err != nil {
cancelErrs.RecordError(fmt.Errorf("could not revert denied tables / shard access: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not revert denied tables / shard access: %v", ts.WorkflowName(), err)
}

if err := sm.CancelStreamMigrations(cmCtx); err != nil {
cancelErrs.RecordError(fmt.Errorf("could not cancel stream migrations: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not cancel stream migrations: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not cancel stream migrations: %v", ts.WorkflowName(), err)
}

ts.Logger().Infof("cancelMigration (%v): restarting vreplication workflows", ts.WorkflowName())
err = ts.ForAllTargets(func(target *MigrationTarget) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName()))
Expand All @@ -1039,17 +1050,21 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
})
if err != nil {
cancelErrs.RecordError(fmt.Errorf("could not restart vreplication: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not restart vreplication: %v", ts.WorkflowName(), err)
}

ts.Logger().Infof("cancelMigration (%v): deleting reverse vreplication workflows", ts.WorkflowName())
if err := ts.deleteReverseVReplication(cmCtx); err != nil {
cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not delete reverse vreplication streams: %v", ts.WorkflowName(), err)
}

if cancelErrs.HasErrors() {
ts.Logger().Errorf("Cancel migration failed for %v, manual cleanup work may be necessary: %v", ts.WorkflowName(), cancelErrs.AggrError(vterrors.Aggregate))
return vterrors.Wrap(cancelErrs.AggrError(vterrors.Aggregate), "cancel migration failed, manual cleanup work may be necessary")
}

ts.Logger().Infof("cancelMigration (%v): completed", ts.WorkflowName())
return nil
}

Expand Down
7 changes: 6 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,20 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
source: &binlogdatapb.BinlogSource{},
}
ct.sourceTablet.Store(&topodatapb.TabletAlias{})
log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params)

id, err := strconv.ParseInt(params["id"], 10, 32)
if err != nil {
return nil, err
}
ct.id = int32(id)
ct.workflow = params["workflow"]
<<<<<<< HEAD
ct.lastWorkflowError = vterrors.NewLastError(fmt.Sprintf("VReplication controller %d for workflow %q", ct.id, ct.workflow), maxTimeToRetryError)
=======
log.Infof("creating controller with id: %v, name: %v, cell: %v, tabletTypes: %v", ct.id, ct.workflow, cell, tabletTypesStr)

ct.lastWorkflowError = vterrors.NewLastError(fmt.Sprintf("VReplication controller %d for workflow %q", ct.id, ct.workflow), workflowConfig.MaxTimeToRetryError)
>>>>>>> 70114ad687 (Multi-tenant workflow SwitchWrites: Don't add denied tables on cancelMigration() (#17782))

state := params["state"]
blpStats.State.Store(state)
Expand Down
8 changes: 7 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,13 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
settings.StopPos = pausePos
saveStop = false
}
<<<<<<< HEAD

=======
log.Infof("Starting VReplication player id: %v, name: %v, startPos: %v, stop: %v", vr.id, vr.WorkflowName, settings.StartPos, settings.StopPos)
log.V(2).Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %+v",
vr.id, settings.StartPos, settings.StopPos, vr.source.Filter)
>>>>>>> 70114ad687 (Multi-tenant workflow SwitchWrites: Don't add denied tables on cancelMigration() (#17782))
queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) {
return vr.dbClient.ExecuteWithRetry(ctx, sql)
}
Expand Down Expand Up @@ -251,7 +257,7 @@ func (vp *vplayer) updateFKCheck(ctx context.Context, flags2 uint32) error {
// one. This allows for the apply thread to catch up more quickly if
// a backlog builds up.
func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) {
log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %v", vp.vr.id, vp.startPos, vp.stopPos, vp.vr.source)
log.Infof("Starting VReplication player id: %v, name: %v, startPos: %v, stop: %v", vp.vr.id, vp.vr.WorkflowName, vp.startPos, vp.stopPos)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down

0 comments on commit a964901

Please sign in to comment.