Skip to content

Commit

Permalink
switchover improvement (#28)
Browse files Browse the repository at this point in the history
* feat: retry for ReenableEvents
fix: logs

* fix: issues

* fix: method visibility
  • Loading branch information
Fizic authored Aug 3, 2023
1 parent 8818d6f commit f74fa87
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 113 deletions.
144 changes: 32 additions & 112 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,14 +602,14 @@ func (app *App) stateManager() appState {
err = app.approveSwitchover(switchover, activeNodes, clusterState)
if err != nil {
app.logger.Errorf("cannot perform switchover: %s", err)
err = app.finishSwitchover(switchover, err)
err = app.FinishSwitchover(switchover, err)
if err != nil {
app.logger.Errorf("failed to reject switchover: %s", err)
}
return stateManager
}

err = app.startSwitchover(switchover)
err = app.StartSwitchover(switchover)
if err != nil {
app.logger.Errorf("failed to start switchover: %s", err)
return stateManager
Expand All @@ -619,12 +619,12 @@ func (app *App) stateManager() appState {
app.logger.Errorf("switchover was aborted")
} else {
if err != nil {
err = app.failSwitchover(switchover, err)
err = app.FailSwitchover(switchover, err)
if err != nil {
app.logger.Errorf("failed to report switchover failure: %s", err)
}
} else {
err = app.finishSwitchover(switchover, nil)
err = app.FinishSwitchover(switchover, nil)
if err != nil {
// we failed to update status in DCS, it's highly possible
// that current process lost DCS connection
Expand All @@ -648,7 +648,7 @@ func (app *App) stateManager() appState {
err = app.approveFailover(clusterState, clusterStateDcs, activeNodes, master)
if err == nil {
app.logger.Infof("failover approved")
err = app.issueFailover(master)
err = app.IssueFailover(master)
if err != nil {
app.logger.Error(err.Error())
}
Expand Down Expand Up @@ -676,7 +676,7 @@ func (app *App) stateManager() appState {
err = app.approveFailover(clusterState, clusterStateDcs, activeNodes, master)
if err == nil {
app.logger.Infof("failover approved")
err = app.issueFailover(master)
err = app.IssueFailover(master)
if err != nil {
app.logger.Error(err.Error())
}
Expand Down Expand Up @@ -803,78 +803,6 @@ func (app *App) approveSwitchover(switchover *Switchover, activeNodes []string,
return nil
}

func (app *App) startSwitchover(switchover *Switchover) error {
app.logger.Infof("switchover: %s => %s starting...", switchover.From, switchover.To)
switchover.StartedAt = time.Now()
switchover.StartedBy = app.config.Hostname
return app.dcs.Set(pathCurrentSwitch, switchover)
}

// Fail current switchover, it will be repeated next cycle
func (app *App) failSwitchover(switchover *Switchover, err error) error {
app.logger.Errorf("switchover: %s => %s failed: %s", switchover.From, switchover.To, err)
switchover.RunCount++
switchover.Result = new(SwitchoverResult)
switchover.Result.Ok = false
switchover.Result.Error = err.Error()
switchover.Result.FinishedAt = time.Now()
return app.dcs.Set(pathCurrentSwitch, switchover)
}

// Finish current switchover and write the result
func (app *App) finishSwitchover(switchover *Switchover, switchErr error) error {
result := true
action := "finished"
path := pathLastSwitch
if switchErr != nil {
result = false
action = "rejected"
path = pathLastRejectedSwitch
}

app.logger.Infof("switchover: %s => %s %s", switchover.From, switchover.To, action)
switchover.Result = new(SwitchoverResult)
switchover.Result.Ok = result
switchover.Result.FinishedAt = time.Now()

if switchErr != nil {
switchover.Result.Error = switchErr.Error()
}

err := app.dcs.Delete(pathCurrentSwitch)
if err != nil {
return err
}
return app.dcs.Set(path, switchover)
}

func (app *App) getLastSwitchover() Switchover {
var lastSwitch, lastRejectedSwitch Switchover
err := app.dcs.Get(pathLastSwitch, &lastSwitch)
if err != nil && err != dcs.ErrNotFound {
app.logger.Errorf("%s: %s", pathLastSwitch, err.Error())
}
errRejected := app.dcs.Get(pathLastRejectedSwitch, &lastRejectedSwitch)
if errRejected != nil && errRejected != dcs.ErrNotFound {
app.logger.Errorf("%s: %s", pathLastRejectedSwitch, errRejected.Error())
}

if lastRejectedSwitch.InitiatedAt.After(lastSwitch.InitiatedAt) {
return lastRejectedSwitch
}

return lastSwitch
}

func (app *App) issueFailover(master string) error {
var switchover Switchover
switchover.From = master
switchover.InitiatedBy = app.config.Hostname
switchover.InitiatedAt = time.Now()
switchover.Cause = CauseAuto
return app.dcs.Create(pathCurrentSwitch, switchover)
}

/*
Returns list of hosts that should be active at the moment
Typically it's master + list of alive, replicating, not split-brained replicas
Expand Down Expand Up @@ -1202,15 +1130,15 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
if err, ok := errs[oldMaster]; ok && err != nil && clusterState[oldMaster].PingOk {
err = fmt.Errorf("switchover: failed to set old master %s read-only %s", oldMaster, err)
app.logger.Info(err.Error())
switchErr := app.finishSwitchover(switchover, err)
switchErr := app.FinishSwitchover(switchover, err)
if switchErr != nil {
return fmt.Errorf("switchover: failed to reject switchover %s", switchErr)
}
app.logger.Info("switchover: rejected")
return err
}

app.logger.Info("switchover: phase 1: stop replication")
app.logger.Info("switchover: phase 2: stop replication")
errs2 := util.RunParallel(func(host string) error {
if !clusterState[host].PingOk {
return fmt.Errorf("switchover: failed to ping host %s", host)
Expand Down Expand Up @@ -1242,7 +1170,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
}

// collect active host positions
app.logger.Info("switchover: phase 2: find most up-to-date host")
app.logger.Info("switchover: phase 3: find most up-to-date host")
positions, err := app.getNodePositions(frozenActiveNodes)
if err != nil {
return err
Expand Down Expand Up @@ -1285,7 +1213,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
newMasterNode := app.cluster.Get(newMaster)

// catch up
app.logger.Info("switchover: phase 3: catch up if needed")
app.logger.Info("switchover: phase 4: catch up if needed")
if newMaster != mostRecent {
app.logger.Infof("switchover: new master %s differs from most recent host %s, need to catch up", newMaster, mostRecent)
err := app.cluster.Get(mostRecent).SetOnline()
Expand Down Expand Up @@ -1323,7 +1251,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
}

// turn slaves to the new master
app.logger.Info("switchover: phase 4: turn to the new master")
app.logger.Info("switchover: phase 5: turn to the new master")
errs = util.RunParallel(func(host string) error {
if host == newMaster || !clusterState[host].PingOk {
return nil
Expand Down Expand Up @@ -1362,7 +1290,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
}

// promote new master
app.logger.Info("switchover: phase 5: promote new master")
app.logger.Info("switchover: phase 6: promote new master")
err = newMasterNode.StopSlave()
if err != nil || app.emulateError("promote_stop_slave") {
return fmt.Errorf("failed to stop slave on new master %s: %s", newMaster, err)
Expand All @@ -1388,7 +1316,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
app.logger.Infof("switchover: new master %s set writable", newMaster)

// reenable events
events, err := newMasterNode.ReenableEvents()
events, err := newMasterNode.ReenableEventsRetry()
if err != nil || app.emulateError("promote_reenable_events") {
return fmt.Errorf("failed to reenable slaveside disabled events on %s: %s", newMaster, err)
}
Expand All @@ -1412,13 +1340,9 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
}

func (app *App) getCurrentMaster(clusterState map[string]*NodeState) (string, error) {
var master string
err := app.dcs.Get(pathMasterNode, &master)
if err != nil && err != dcs.ErrNotFound {
return "", fmt.Errorf("failed to get current master from dcs: %s", err)
}
if master != "" {
return master, nil
master, err := app.GetMasterHostFromDcs()
if master != "" && err == nil {
return master, err
}
return app.ensureCurrentMaster(clusterState)
}
Expand All @@ -1431,11 +1355,23 @@ func (app *App) ensureCurrentMaster(clusterState map[string]*NodeState) (string,
if master == "" {
return "", ErrNoMaster
}
err = app.dcs.Set(pathMasterNode, master)
if err != nil {
return "", fmt.Errorf("failed to set current master to dcs: %s", err)
return app.SetMasterHost(master)
}

func (app *App) getMasterHost(clusterState map[string]*NodeState) (string, error) {
masters := make([]string, 0)
for host, state := range clusterState {
if state.PingOk && state.IsMaster {
masters = append(masters, host)
}
}
return master, nil
if len(masters) > 1 {
return "", fmt.Errorf("%w: %s", ErrManyMasters, masters)
}
if len(masters) == 0 {
return "", nil
}
return masters[0], nil
}

func (app *App) repairOfflineMode(clusterState map[string]*NodeState, master string) {
Expand Down Expand Up @@ -2150,22 +2086,6 @@ func (app *App) waitForCatchUp(node *mysql.Node, gtidset gtids.GTIDSet, timeout
return false, nil
}

func (app *App) getMasterHost(clusterState map[string]*NodeState) (string, error) {
masters := make([]string, 0)
for host, state := range clusterState {
if state.PingOk && state.IsMaster {
masters = append(masters, host)
}
}
if len(masters) > 1 {
return "", fmt.Errorf("%w: %s", ErrManyMasters, masters)
}
if len(masters) == 0 {
return "", nil
}
return masters[0], nil
}

// Set master offline and disable semi-sync replication
func (app *App) stopReplicationOnMaster(masterNode *mysql.Node) error {
host := masterNode.Host()
Expand Down
92 changes: 92 additions & 0 deletions internal/app/app_dcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,95 @@ func (app *App) GetLastShutdownNodeTime() (time.Time, error) {
}
return t, err
}

// FinishSwitchover finish current switchover and write the result
func (app *App) FinishSwitchover(switchover *Switchover, switchErr error) error {
result := true
action := "finished"
path := pathLastSwitch
if switchErr != nil {
result = false
action = "rejected"
path = pathLastRejectedSwitch
}

app.logger.Infof("switchover: %s => %s %s", switchover.From, switchover.To, action)
switchover.Result = new(SwitchoverResult)
switchover.Result.Ok = result
switchover.Result.FinishedAt = time.Now()

if switchErr != nil {
switchover.Result.Error = switchErr.Error()
}

err := app.dcs.Delete(pathCurrentSwitch)
if err != nil {
return err
}
return app.dcs.Set(path, switchover)
}

// Fail current switchover, it will be repeated next cycle
func (app *App) FailSwitchover(switchover *Switchover, err error) error {
app.logger.Errorf("switchover: %s => %s failed: %s", switchover.From, switchover.To, err)
switchover.RunCount++
switchover.Result = new(SwitchoverResult)
switchover.Result.Ok = false
switchover.Result.Error = err.Error()
switchover.Result.FinishedAt = time.Now()
return app.dcs.Set(pathCurrentSwitch, switchover)
}

func (app *App) StartSwitchover(switchover *Switchover) error {
app.logger.Infof("switchover: %s => %s starting...", switchover.From, switchover.To)
switchover.StartedAt = time.Now()
switchover.StartedBy = app.config.Hostname
return app.dcs.Set(pathCurrentSwitch, switchover)
}

func (app *App) GetLastSwitchover() Switchover {
var lastSwitch, lastRejectedSwitch Switchover
err := app.dcs.Get(pathLastSwitch, &lastSwitch)
if err != nil && err != dcs.ErrNotFound {
app.logger.Errorf("%s: %s", pathLastSwitch, err.Error())
}
errRejected := app.dcs.Get(pathLastRejectedSwitch, &lastRejectedSwitch)
if errRejected != nil && errRejected != dcs.ErrNotFound {
app.logger.Errorf("%s: %s", pathLastRejectedSwitch, errRejected.Error())
}

if lastRejectedSwitch.InitiatedAt.After(lastSwitch.InitiatedAt) {
return lastRejectedSwitch
}

return lastSwitch
}

func (app *App) IssueFailover(master string) error {
var switchover Switchover
switchover.From = master
switchover.InitiatedBy = app.config.Hostname
switchover.InitiatedAt = time.Now()
switchover.Cause = CauseAuto
return app.dcs.Create(pathCurrentSwitch, switchover)
}

func (app *App) SetMasterHost(master string) (string, error) {
err := app.dcs.Set(pathMasterNode, master)
if err != nil {
return "", fmt.Errorf("failed to set current master to dcs: %s", err)
}
return master, nil
}

func (app *App) GetMasterHostFromDcs() (string, error) {
var master string
err := app.dcs.Get(pathMasterNode, &master)
if err != nil && err != dcs.ErrNotFound {
return "", fmt.Errorf("failed to get current master from dcs: %s", err)
}
if master != "" {
return master, nil
}
return "", nil
}
2 changes: 1 addition & 1 deletion internal/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration
for {
select {
case <-ticker.C:
lastSwitchover = app.getLastSwitchover()
lastSwitchover = app.GetLastSwitchover()
if lastSwitchover.InitiatedBy == switchover.InitiatedBy && lastSwitchover.InitiatedAt.Unix() == switchover.InitiatedAt.Unix() {
break Out
} else {
Expand Down
14 changes: 14 additions & 0 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,20 @@ func (n *Node) ChangeMaster(host string) error {
})
}

const ReenableEventsRetryCount = 3

func (n *Node) ReenableEventsRetry() ([]Event, error) {
var err error
for attempt := 0; attempt < ReenableEventsRetryCount; attempt++ {
events, err := n.ReenableEvents()
if err == nil {
return events, nil
}
n.logger.Errorf("failed to enable events: %s", err)
}
return nil, err
}

func (n *Node) ReenableEvents() ([]Event, error) {
var events []Event
err := n.queryRows(queryListSlavesideDisabledEvents, nil, func(rows *sqlx.Rows) error {
Expand Down

0 comments on commit f74fa87

Please sign in to comment.