diff --git a/internal/app/active_nodes.go b/internal/app/active_nodes.go index 227507b..4e1c05b 100644 --- a/internal/app/active_nodes.go +++ b/internal/app/active_nodes.go @@ -140,7 +140,7 @@ func (app *App) calcActiveNodes(state, stateDcs map[string]*HostState, oldActive app.logger.Warn(fmt.Sprintf("Calc active nodes: lost master %s", host)) continue } - if (masterState.PingOk && masterState.PingStable) && !(replicaState.MasterLinkState && masterNode.MatchHost(replicaState.MasterHost)) { + if (masterState.PingOk && masterState.PingStable) && !replicates(&masterState, replicaState, host, masterNode, false) { app.logger.Error(fmt.Sprintf("Calc active nodes: %s is not replicating from alive master, deleting from active...", host)) continue } diff --git a/internal/app/checks.go b/internal/app/checks.go index eb9b147..4784b2d 100644 --- a/internal/app/checks.go +++ b/internal/app/checks.go @@ -32,9 +32,8 @@ func (app *App) checkHAReplicasRunning() bool { app.logger.Warn("Host is ahead in replication history", "fqdn", host) aheadHosts++ } - if hostState.PingOk && !hostState.IsMaster && hostState.ReplicaState != nil { - rs := hostState.ReplicaState - if rs.MasterLinkState && local.MatchHost(rs.MasterHost) { + if hostState.PingOk && !hostState.IsMaster { + if replicates(localState, hostState.ReplicaState, host, local, false) { availableReplicas++ } } diff --git a/internal/app/master.go b/internal/app/master.go index ceaf0e2..36db14d 100644 --- a/internal/app/master.go +++ b/internal/app/master.go @@ -83,6 +83,7 @@ func (app *App) changeMaster(host, master string) error { } node := app.shard.Get(host) + masterState := app.getHostState(master) masterNode := app.shard.Get(master) state := app.getHostState(host) @@ -90,28 +91,28 @@ func (app *App) changeMaster(host, master string) error { return fmt.Errorf("changeMaster: replica %s is dead - unable to init repair", host) } - app.repairReplica(node, state, master) + app.repairReplica(node, masterState, state, master, host) deadline := time.Now().Add(app.config.Redis.WaitReplicationTimeout) for time.Now().Before(deadline) { state = app.getHostState(host) rs := state.ReplicaState - if rs != nil && rs.MasterLinkState && masterNode.MatchHost(rs.MasterHost) { + if rs != nil && replicates(masterState, rs, host, masterNode, false) { break } if !state.PingOk { return fmt.Errorf("changeMaster: replica %s died while waiting to start replication from %s", host, master) } - masterState := app.getHostState(master) + masterState = app.getHostState(master) if !masterState.PingOk { return fmt.Errorf("changeMaster: %s died while waiting to start replication to %s", master, host) } app.logger.Info(fmt.Sprintf("ChangeMaster: waiting for %s to start replication from %s", host, master)) - app.repairReplica(node, state, master) + app.repairReplica(node, masterState, state, master, host) time.Sleep(time.Second) } rs := state.ReplicaState - if rs != nil && rs.MasterLinkState && masterNode.MatchHost(rs.MasterHost) { + if rs != nil && replicates(masterState, rs, host, masterNode, false) { app.logger.Info(fmt.Sprintf("ChangeMaster: %s started replication from %s", host, master)) } else { return fmt.Errorf("%s was unable to start replication from %s", host, master) diff --git a/internal/app/repair.go b/internal/app/repair.go index 3be3cb7..bfcbf4c 100644 --- a/internal/app/repair.go +++ b/internal/app/repair.go @@ -11,6 +11,7 @@ import ( func (app *App) repairShard(shardState map[string]*HostState, activeNodes []string, master string) { replicas := make([]string, 0) syncing := 0 + masterState := shardState[master] masterNode := app.shard.Get(master) for host, state := range shardState { if !state.PingOk { @@ -20,7 +21,7 @@ func (app *App) repairShard(shardState map[string]*HostState, activeNodes []stri app.repairMaster(masterNode, activeNodes, state) } else { rs := state.ReplicaState - if rs != nil && rs.MasterSyncInProgress && masterNode.MatchHost(rs.MasterHost) { + if rs != nil && rs.MasterSyncInProgress && replicates(masterState, rs, host, masterNode, true) { syncing++ } replicas = append(replicas, host) @@ -39,9 +40,9 @@ func (app *App) repairShard(shardState map[string]*HostState, activeNodes []stri } } rs := state.ReplicaState - if rs == nil || state.IsReplPaused || !(rs.MasterLinkState || rs.MasterSyncInProgress) || !masterNode.MatchHost(rs.MasterHost) { + if rs == nil || state.IsReplPaused || !replicates(masterState, rs, host, masterNode, true) { if syncing < app.config.Redis.MaxParallelSyncs { - app.repairReplica(node, state, master) + app.repairReplica(node, masterState, state, master, host) syncing++ } else { app.logger.Error(fmt.Sprintf("Leaving replica %s broken: currently syncing %d/%d", host, syncing, app.config.Redis.MaxParallelSyncs)) @@ -75,10 +76,10 @@ func (app *App) repairMaster(node *redis.Node, activeNodes []string, state *Host } } -func (app *App) repairReplica(node *redis.Node, state *HostState, master string) { +func (app *App) repairReplica(node *redis.Node, masterState, state *HostState, master, replicaFQDN string) { masterNode := app.shard.Get(master) rs := state.ReplicaState - if rs == nil || !masterNode.MatchHost(rs.MasterHost) { + if !replicates(masterState, rs, replicaFQDN, masterNode, true) { switch app.mode { case modeSentinel: err := node.SentinelMakeReplica(app.ctx, master) diff --git a/internal/app/replication.go b/internal/app/replication.go new file mode 100644 index 0000000..6e3a5db --- /dev/null +++ b/internal/app/replication.go @@ -0,0 +1,17 @@ +package app + +import ( + "slices" + + "github.com/yandex/rdsync/internal/redis" +) + +func replicates(masterState *HostState, replicaState *ReplicaState, replicaFQDN string, masterNode *redis.Node, allowSync bool) bool { + if replicaState == nil || !(replicaState.MasterLinkState || !(allowSync && replicaState.MasterSyncInProgress)) { + return false + } + if slices.Contains(masterState.ConnectedReplicas, replicaFQDN) { + return true + } + return masterNode.MatchHost(replicaState.MasterHost) +} diff --git a/internal/app/state.go b/internal/app/state.go index 9b41fcb..2b52080 100644 --- a/internal/app/state.go +++ b/internal/app/state.go @@ -1,12 +1,19 @@ package app import ( + "fmt" "strconv" + "strings" "time" "github.com/yandex/rdsync/internal/dcs" ) +func (app *App) setStateError(state *HostState, fqdn, error string) { + app.logger.Error("GetHostState error", "fqdn", fqdn, "error", error) + state.Error = error +} + func (app *App) getHostState(fqdn string) *HostState { node := app.shard.Get(fqdn) var state HostState @@ -20,7 +27,7 @@ func (app *App) getHostState(fqdn string) *HostState { } info, err := node.GetInfo(app.ctx) if err != nil { - state.Error = err.Error() + app.setStateError(&state, fqdn, err.Error()) if len(info) == 0 { state.PingOk = false state.PingStable = false @@ -31,134 +38,158 @@ func (app *App) getHostState(fqdn string) *HostState { var ok bool state.RunID, ok = info["run_id"] if !ok { - state.Error = "No run_id in info" + app.setStateError(&state, fqdn, "No run_id in info") return &state } state.ReplicationID, ok = info["master_replid"] if !ok { - state.Error = "No master_replid in info" + app.setStateError(&state, fqdn, "No master_replid in info") return &state } state.ReplicationID2, ok = info["master_replid2"] if !ok { - state.Error = "No master_replid2 in info" + app.setStateError(&state, fqdn, "No master_replid2 in info") return &state } masterOffset, ok := info["master_repl_offset"] if !ok { - state.Error = "No master_repl_offset in info" + app.setStateError(&state, fqdn, "No master_repl_offset in info") return &state } state.MasterReplicationOffset, err = strconv.ParseInt(masterOffset, 10, 64) if err != nil { - state.Error = err.Error() + app.setStateError(&state, fqdn, err.Error()) return &state } secondOffset, ok := info["second_repl_offset"] if !ok { - state.Error = "No second_repl_offset in info" + app.setStateError(&state, fqdn, "No second_repl_offset in info") return &state } state.SecondReplicationOffset, err = strconv.ParseInt(secondOffset, 10, 64) if err != nil { - state.Error = err.Error() + app.setStateError(&state, fqdn, err.Error()) return &state } replBacklogFirstByte, ok := info["repl_backlog_first_byte_offset"] if !ok { - state.Error = "No repl_backlog_first_byte_offset in info" + app.setStateError(&state, fqdn, "No repl_backlog_first_byte_offset in info") return &state } state.ReplicationBacklogStart, err = strconv.ParseInt(replBacklogFirstByte, 10, 64) if err != nil { - state.Error = err.Error() + app.setStateError(&state, fqdn, err.Error()) return &state } replBacklogHistlen, ok := info["repl_backlog_histlen"] if !ok { - state.Error = "No repl_backlog_histlen in info" + app.setStateError(&state, fqdn, "No repl_backlog_histlen in info") return &state } state.ReplicationBacklogSize, err = strconv.ParseInt(replBacklogHistlen, 10, 64) if err != nil { - state.Error = err.Error() + app.setStateError(&state, fqdn, err.Error()) return &state } role, ok := info["role"] if !ok { - state.Error = "No role in info" + app.setStateError(&state, fqdn, "No role in info") return &state } if role == "master" { state.IsMaster = true + numReplicasStr, ok := info["connected_slaves"] + if !ok { + app.setStateError(&state, fqdn, "Master has no connected_slaves in info") + return &state + } + numReplicas, err := strconv.ParseInt(numReplicasStr, 10, 64) + if err != nil { + app.setStateError(&state, fqdn, err.Error()) + return &state + } + var i int64 + for i < numReplicas { + replicaID := fmt.Sprintf("slave%d", i) + replicaValue, ok := info[replicaID] + if !ok { + app.setStateError(&state, fqdn, fmt.Sprintf("Master has no %s in info", replicaID)) + return &state + } + // ip is first value in slaveN info + start := strings.Index(replicaValue, "=") + end := strings.Index(replicaValue, ",") + state.ConnectedReplicas = append(state.ConnectedReplicas, replicaValue[start+1:end]) + i++ + } } else { state.IsMaster = false rs := ReplicaState{} rs.MasterHost, ok = info["master_host"] if !ok { - state.Error = "Replica but no master_host in info" + app.setStateError(&state, fqdn, "Replica but no master_host in info") return &state } linkState, ok := info["master_link_status"] if !ok { - state.Error = "Replica but no master_link_status in info" + app.setStateError(&state, fqdn, "Replica but no master_link_status in info") return &state } rs.MasterLinkState = (linkState == "up") syncInProgress, ok := info["master_sync_in_progress"] if !ok { - state.Error = "Replica but no master_sync_in_progress in info" + app.setStateError(&state, fqdn, "Replica but no master_sync_in_progress in info") return &state } rs.MasterSyncInProgress = (syncInProgress != "0") if !rs.MasterLinkState && !rs.MasterSyncInProgress { downSeconds, ok := info["master_link_down_since_seconds"] if !ok { - state.Error = "Replica with link down but no master_link_down_since_seconds in info" + app.setStateError(&state, fqdn, "Replica with link down but no master_link_down_since_seconds in info") return &state } rs.MasterLinkDownTime, err = strconv.ParseInt(downSeconds, 10, 64) rs.MasterLinkDownTime *= 1000 if err != nil { - state.Error = err.Error() + app.setStateError(&state, fqdn, err.Error()) return &state } } replicaOffset, ok := info["slave_repl_offset"] if !ok { - state.Error = "Replica but no slave_repl_offset in info" + app.setStateError(&state, fqdn, "Replica but no slave_repl_offset in info") return &state } rs.ReplicationOffset, err = strconv.ParseInt(replicaOffset, 10, 64) if err != nil { - state.Error = err.Error() + app.setStateError(&state, fqdn, err.Error()) return &state } state.ReplicaState = &rs } state.IsReadOnly, err = node.IsReadOnly(app.ctx) if err != nil { - state.Error = err.Error() + app.setStateError(&state, fqdn, err.Error()) return &state } state.IsOffline, err = node.IsOffline(app.ctx) if err != nil { - state.Error = err.Error() + app.setStateError(&state, fqdn, err.Error()) return &state } state.IsReplPaused, err = node.IsReplPaused(app.ctx) if err != nil { - state.Error = err.Error() + app.setStateError(&state, fqdn, err.Error()) return &state } err = node.RefreshAddrs() if err != nil { - state.Error = err.Error() + app.setStateError(&state, fqdn, err.Error()) return &state } state.IP, err = node.GetIP() if err != nil { - state.Error = err.Error() + app.setStateError(&state, fqdn, err.Error()) return &state } return &state diff --git a/internal/app/types.go b/internal/app/types.go index f214ba8..b032f6d 100644 --- a/internal/app/types.go +++ b/internal/app/types.go @@ -111,6 +111,7 @@ type HostState struct { ReplicationID string `json:"replication_id"` ReplicationID2 string `json:"replication_id2"` Error string `json:"error"` + ConnectedReplicas []string `json:"connected_replicas"` ReplicaState *ReplicaState `json:"replica_state"` SentiCacheState *SentiCacheState `json:"senticache_state"` }