Skip to content

Commit

Permalink
large binlog replication fix
Browse files Browse the repository at this point in the history
  • Loading branch information
noname0443 committed Aug 27, 2024
1 parent 5e9d969 commit ff47d88
Showing 1 changed file with 39 additions and 15 deletions.
54 changes: 39 additions & 15 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type App struct {
cluster *mysql.Cluster
filelock *flock.Flock
nodeFailedAt map[string]time.Time
slaveReadPositions map[string]string
streamFromFailedAt map[string]time.Time
daemonState *DaemonState
daemonMutex sync.Mutex
Expand Down Expand Up @@ -71,6 +72,7 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) {
nodeFailedAt: make(map[string]time.Time),
streamFromFailedAt: make(map[string]time.Time),
replRepairState: make(map[string]*ReplicationRepairState),
slaveReadPositions: make(map[string]string),
externalReplication: externalReplication,
switchHelper: switchHelper,
}
Expand Down Expand Up @@ -880,7 +882,7 @@ func (app *App) calcActiveNodes(clusterState, clusterStateDcs map[string]*NodeSt
if node.PingDubious || clusterStateDcs[host].PingOk {
// we can't rely on ping and slave status if ping was dubios
if util.ContainsString(oldActiveNodes, host) {
app.logger.Warnf("calc active nodes: %s is dubious or keep heath lock in dcs, keeping active...", host)
app.logger.Warnf("calc active nodes: %s is dubious or keep health lock in dcs, keeping active...", host)
activeNodes = append(activeNodes, host)
}
continue
Expand All @@ -896,6 +898,7 @@ func (app *App) calcActiveNodes(clusterState, clusterStateDcs map[string]*NodeSt
}
} else {
app.logger.Errorf("calc active nodes: %s is down, deleting from active...", host)
delete(app.slaveReadPositions, host)
}
continue
} else {
Expand All @@ -918,7 +921,7 @@ func (app *App) calcActiveNodes(clusterState, clusterStateDcs map[string]*NodeSt
return activeNodes, nil
}

func (app *App) calcActiveNodesChanges(clusterState map[string]*NodeState, activeNodes []string, oldActiveNodes []string, master string) (becomeActive, becomeInactive []string, err error) {
func (app *App) calcActiveNodesChanges(clusterState map[string]*NodeState, activeNodes []string, oldActiveNodes []string, master string) (becomeActive, becomeInactive, becomeDataLag []string, err error) {
masterNode := app.cluster.Get(master)
var syncReplicas []string
var deadReplicas []string
Expand All @@ -945,29 +948,39 @@ func (app *App) calcActiveNodesChanges(clusterState map[string]*NodeState, activ
}
}

var dataLagging []string
if len(becomeActive) > 0 {
// Some replicas are going to become semi-sync.
// We need to check that they downloaded (not replayed) almost all binary logs,
// in order to prevent master freezing.
// We can't check all the replicas on each iteration, because SHOW BINARY LOGS is pretty heavy request
var dataLagging []string
masterBinlogs, err := masterNode.GetBinlogs()
if err != nil {
app.logger.Errorf("calc active nodes: failed to list master binlogs on %s: %v", master, err)
return nil, nil, err
return nil, nil, nil, err
}
for _, host := range becomeActive {
slaveState := clusterState[host].SlaveState
dataLag := calcLagBytes(masterBinlogs, slaveState.MasterLogFile, slaveState.MasterLogPos)
if dataLag > app.config.SemiSyncEnableLag {
app.logger.Warnf("calc active nodes: %v should become active, but it has data lag %d, delaying...", host, dataLag)
dataLagging = append(dataLagging, host)
becomeInactive = append(becomeInactive, host)
newBinLog := fmt.Sprintf("%s%019d", slaveState.MasterLogFile, slaveState.MasterLogPos)
oldBinLog := app.slaveReadPositions[host]

if newBinLog <= oldBinLog {
app.logger.Warnf("calc active nodes: %v should become active, but it has data lag %d and it's IO is stopped, delaying...", host, dataLag)
becomeInactive = append(becomeInactive, host)
} else {
app.logger.Warnf("calc active nodes: %v has data lag %d, but it's IO is working", host, dataLag)
dataLagging = append(dataLagging, host)
}

app.slaveReadPositions[host] = newBinLog
}
}
becomeActive = filterOut(becomeActive, dataLagging)
becomeActive = filterOut(becomeActive, becomeInactive)
}
return becomeActive, becomeInactive, nil
return becomeActive, becomeInactive, dataLagging, nil
}

/*
Expand Down Expand Up @@ -1006,7 +1019,7 @@ func (app *App) updateActiveNodes(clusterState, clusterStateDcs map[string]*Node
return nil
}

becomeActive, becomeInactive, err := app.calcActiveNodesChanges(clusterState, activeNodes, oldActiveNodes, master)
becomeActive, becomeInactive, becomeDataLag, err := app.calcActiveNodesChanges(clusterState, activeNodes, oldActiveNodes, master)
if err != nil {
app.logger.Errorf("update active nodes: failed to calc active nodes changes: %v", err)
return err
Expand Down Expand Up @@ -1043,7 +1056,14 @@ func (app *App) updateActiveNodes(clusterState, clusterStateDcs map[string]*Node
}
}
for _, host := range becomeInactive {
err = app.disableSemiSyncOnSlave(host)
err = app.disableSemiSyncOnSlave(host, false)
if err != nil {
app.logger.Warnf("failed to disable semi-sync on slave %s: %v", host, err)
return err
}
}
for _, host := range becomeDataLag {
err = app.disableSemiSyncOnSlave(host, true)
if err != nil {
app.logger.Warnf("failed to disable semi-sync on slave %s: %v", host, err)
return err
Expand Down Expand Up @@ -1131,18 +1151,22 @@ func (app *App) enableSemiSyncOnSlave(host string, slaveState, masterState *Node
return nil
}

func (app *App) disableSemiSyncOnSlave(host string) error {
func (app *App) disableSemiSyncOnSlave(host string, isSlaveWorking bool) error {
node := app.cluster.Get(host)
err := node.SemiSyncDisable()
if err != nil {
app.logger.Errorf("failed to enable semi_sync_slave on %s: %s", host, err)
return err
}
err = node.RestartSlaveIOThread()
if err != nil {
app.logger.Errorf("failed restart slave io thread after set semi_sync_slave on %s: %s", host, err)
return err

if !isSlaveWorking {
err = node.RestartSlaveIOThread()
if err != nil {
app.logger.Errorf("failed restart slave io thread after set semi_sync_slave on %s: %s", host, err)
return err
}
}

return nil
}

Expand Down

0 comments on commit ff47d88

Please sign in to comment.