diff --git a/internal/app/app.go b/internal/app/app.go index 8bf4dbdb..3d13e494 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -33,6 +33,7 @@ type App struct { cluster *mysql.Cluster filelock *flock.Flock nodeFailedAt map[string]time.Time + slaveReadPositions map[string]string streamFromFailedAt map[string]time.Time daemonState *DaemonState daemonMutex sync.Mutex @@ -71,6 +72,7 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) { nodeFailedAt: make(map[string]time.Time), streamFromFailedAt: make(map[string]time.Time), replRepairState: make(map[string]*ReplicationRepairState), + slaveReadPositions: make(map[string]string), externalReplication: externalReplication, switchHelper: switchHelper, } @@ -880,7 +882,7 @@ func (app *App) calcActiveNodes(clusterState, clusterStateDcs map[string]*NodeSt if node.PingDubious || clusterStateDcs[host].PingOk { // we can't rely on ping and slave status if ping was dubios if util.ContainsString(oldActiveNodes, host) { - app.logger.Warnf("calc active nodes: %s is dubious or keep heath lock in dcs, keeping active...", host) + app.logger.Warnf("calc active nodes: %s is dubious or keep health lock in dcs, keeping active...", host) activeNodes = append(activeNodes, host) } continue @@ -896,6 +898,7 @@ func (app *App) calcActiveNodes(clusterState, clusterStateDcs map[string]*NodeSt } } else { app.logger.Errorf("calc active nodes: %s is down, deleting from active...", host) + delete(app.slaveReadPositions, host) } continue } else { @@ -918,7 +921,7 @@ func (app *App) calcActiveNodes(clusterState, clusterStateDcs map[string]*NodeSt return activeNodes, nil } -func (app *App) calcActiveNodesChanges(clusterState map[string]*NodeState, activeNodes []string, oldActiveNodes []string, master string) (becomeActive, becomeInactive []string, err error) { +func (app *App) calcActiveNodesChanges(clusterState map[string]*NodeState, activeNodes []string, oldActiveNodes []string, master string) (becomeActive, becomeInactive, becomeDataLag []string, err error) { masterNode := app.cluster.Get(master) var syncReplicas []string var deadReplicas []string @@ -945,29 +948,39 @@ func (app *App) calcActiveNodesChanges(clusterState map[string]*NodeState, activ } } + var dataLagging []string if len(becomeActive) > 0 { // Some replicas are going to become semi-sync. // We need to check that they downloaded (not replayed) almost all binary logs, // in order to prevent master freezing. // We can't check all the replicas on each iteration, because SHOW BINARY LOGS is pretty heavy request - var dataLagging []string masterBinlogs, err := masterNode.GetBinlogs() if err != nil { app.logger.Errorf("calc active nodes: failed to list master binlogs on %s: %v", master, err) - return nil, nil, err + return nil, nil, nil, err } for _, host := range becomeActive { slaveState := clusterState[host].SlaveState dataLag := calcLagBytes(masterBinlogs, slaveState.MasterLogFile, slaveState.MasterLogPos) if dataLag > app.config.SemiSyncEnableLag { - app.logger.Warnf("calc active nodes: %v should become active, but it has data lag %d, delaying...", host, dataLag) - dataLagging = append(dataLagging, host) - becomeInactive = append(becomeInactive, host) + newBinLog := fmt.Sprintf("%s%019d", slaveState.MasterLogFile, slaveState.MasterLogPos) + oldBinLog := app.slaveReadPositions[host] + + if newBinLog <= oldBinLog { + app.logger.Warnf("calc active nodes: %v should become active, but it has data lag %d and it's IO is stopped, delaying...", host, dataLag) + becomeInactive = append(becomeInactive, host) + } else { + app.logger.Warnf("calc active nodes: %v has data lag %d, but it's IO is working", host, dataLag) + dataLagging = append(dataLagging, host) + } + + app.slaveReadPositions[host] = newBinLog } } becomeActive = filterOut(becomeActive, dataLagging) + becomeActive = filterOut(becomeActive, becomeInactive) } - return becomeActive, becomeInactive, nil + return becomeActive, becomeInactive, dataLagging, nil } /* @@ -1006,7 +1019,7 @@ func (app *App) updateActiveNodes(clusterState, clusterStateDcs map[string]*Node return nil } - becomeActive, becomeInactive, err := app.calcActiveNodesChanges(clusterState, activeNodes, oldActiveNodes, master) + becomeActive, becomeInactive, becomeDataLag, err := app.calcActiveNodesChanges(clusterState, activeNodes, oldActiveNodes, master) if err != nil { app.logger.Errorf("update active nodes: failed to calc active nodes changes: %v", err) return err @@ -1043,7 +1056,14 @@ func (app *App) updateActiveNodes(clusterState, clusterStateDcs map[string]*Node } } for _, host := range becomeInactive { - err = app.disableSemiSyncOnSlave(host) + err = app.disableSemiSyncOnSlave(host, false) + if err != nil { + app.logger.Warnf("failed to disable semi-sync on slave %s: %v", host, err) + return err + } + } + for _, host := range becomeDataLag { + err = app.disableSemiSyncOnSlave(host, true) if err != nil { app.logger.Warnf("failed to disable semi-sync on slave %s: %v", host, err) return err @@ -1131,18 +1151,22 @@ func (app *App) enableSemiSyncOnSlave(host string, slaveState, masterState *Node return nil } -func (app *App) disableSemiSyncOnSlave(host string) error { +func (app *App) disableSemiSyncOnSlave(host string, isSlaveWorking bool) error { node := app.cluster.Get(host) err := node.SemiSyncDisable() if err != nil { app.logger.Errorf("failed to enable semi_sync_slave on %s: %s", host, err) return err } - err = node.RestartSlaveIOThread() - if err != nil { - app.logger.Errorf("failed restart slave io thread after set semi_sync_slave on %s: %s", host, err) - return err + + if !isSlaveWorking { + err = node.RestartSlaveIOThread() + if err != nil { + app.logger.Errorf("failed restart slave io thread after set semi_sync_slave on %s: %s", host, err) + return err + } } + return nil }