Skip to content

Commit

Permalink
Use advertised replica names for replication check
Browse files Browse the repository at this point in the history
  • Loading branch information
secwall committed Dec 8, 2023
1 parent b3c1d91 commit ac10c09
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 39 deletions.
2 changes: 1 addition & 1 deletion internal/app/active_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (app *App) calcActiveNodes(state, stateDcs map[string]*HostState, oldActive
app.logger.Warn(fmt.Sprintf("Calc active nodes: lost master %s", host))
continue
}
if (masterState.PingOk && masterState.PingStable) && !(replicaState.MasterLinkState && masterNode.MatchHost(replicaState.MasterHost)) {
if (masterState.PingOk && masterState.PingStable) && !replicates(&masterState, replicaState, host, masterNode, false) {
app.logger.Error(fmt.Sprintf("Calc active nodes: %s is not replicating from alive master, deleting from active...", host))
continue
}
Expand Down
5 changes: 2 additions & 3 deletions internal/app/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ func (app *App) checkHAReplicasRunning() bool {
app.logger.Warn("Host is ahead in replication history", "fqdn", host)
aheadHosts++
}
if hostState.PingOk && !hostState.IsMaster && hostState.ReplicaState != nil {
rs := hostState.ReplicaState
if rs.MasterLinkState && local.MatchHost(rs.MasterHost) {
if hostState.PingOk && !hostState.IsMaster {
if replicates(localState, hostState.ReplicaState, host, local, false) {
availableReplicas++
}
}
Expand Down
11 changes: 6 additions & 5 deletions internal/app/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,35 +83,36 @@ func (app *App) changeMaster(host, master string) error {
}

node := app.shard.Get(host)
masterState := app.getHostState(master)
masterNode := app.shard.Get(master)
state := app.getHostState(host)

if !state.PingOk {
return fmt.Errorf("changeMaster: replica %s is dead - unable to init repair", host)
}

app.repairReplica(node, state, master)
app.repairReplica(node, masterState, state, master, host)

deadline := time.Now().Add(app.config.Redis.WaitReplicationTimeout)
for time.Now().Before(deadline) {
state = app.getHostState(host)
rs := state.ReplicaState
if rs != nil && rs.MasterLinkState && masterNode.MatchHost(rs.MasterHost) {
if rs != nil && replicates(masterState, rs, host, masterNode, false) {
break
}
if !state.PingOk {
return fmt.Errorf("changeMaster: replica %s died while waiting to start replication from %s", host, master)
}
masterState := app.getHostState(master)
masterState = app.getHostState(master)
if !masterState.PingOk {
return fmt.Errorf("changeMaster: %s died while waiting to start replication to %s", master, host)
}
app.logger.Info(fmt.Sprintf("ChangeMaster: waiting for %s to start replication from %s", host, master))
app.repairReplica(node, state, master)
app.repairReplica(node, masterState, state, master, host)
time.Sleep(time.Second)
}
rs := state.ReplicaState
if rs != nil && rs.MasterLinkState && masterNode.MatchHost(rs.MasterHost) {
if rs != nil && replicates(masterState, rs, host, masterNode, false) {
app.logger.Info(fmt.Sprintf("ChangeMaster: %s started replication from %s", host, master))
} else {
return fmt.Errorf("%s was unable to start replication from %s", host, master)
Expand Down
11 changes: 6 additions & 5 deletions internal/app/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
func (app *App) repairShard(shardState map[string]*HostState, activeNodes []string, master string) {
replicas := make([]string, 0)
syncing := 0
masterState := shardState[master]
masterNode := app.shard.Get(master)
for host, state := range shardState {
if !state.PingOk {
Expand All @@ -20,7 +21,7 @@ func (app *App) repairShard(shardState map[string]*HostState, activeNodes []stri
app.repairMaster(masterNode, activeNodes, state)
} else {
rs := state.ReplicaState
if rs != nil && rs.MasterSyncInProgress && masterNode.MatchHost(rs.MasterHost) {
if rs != nil && rs.MasterSyncInProgress && replicates(masterState, rs, host, masterNode, true) {
syncing++
}
replicas = append(replicas, host)
Expand All @@ -39,9 +40,9 @@ func (app *App) repairShard(shardState map[string]*HostState, activeNodes []stri
}
}
rs := state.ReplicaState
if rs == nil || state.IsReplPaused || !(rs.MasterLinkState || rs.MasterSyncInProgress) || !masterNode.MatchHost(rs.MasterHost) {
if rs == nil || state.IsReplPaused || !replicates(masterState, rs, host, masterNode, true) {
if syncing < app.config.Redis.MaxParallelSyncs {
app.repairReplica(node, state, master)
app.repairReplica(node, masterState, state, master, host)
syncing++
} else {
app.logger.Error(fmt.Sprintf("Leaving replica %s broken: currently syncing %d/%d", host, syncing, app.config.Redis.MaxParallelSyncs))
Expand Down Expand Up @@ -75,10 +76,10 @@ func (app *App) repairMaster(node *redis.Node, activeNodes []string, state *Host
}
}

func (app *App) repairReplica(node *redis.Node, state *HostState, master string) {
func (app *App) repairReplica(node *redis.Node, masterState, state *HostState, master, replicaFQDN string) {
masterNode := app.shard.Get(master)
rs := state.ReplicaState
if rs == nil || !masterNode.MatchHost(rs.MasterHost) {
if !replicates(masterState, rs, replicaFQDN, masterNode, true) {
switch app.mode {
case modeSentinel:
err := node.SentinelMakeReplica(app.ctx, master)
Expand Down
17 changes: 17 additions & 0 deletions internal/app/replication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package app

import (
"slices"

"github.com/yandex/rdsync/internal/redis"
)

func replicates(masterState *HostState, replicaState *ReplicaState, replicaFQDN string, masterNode *redis.Node, allowSync bool) bool {
if replicaState == nil || !(replicaState.MasterLinkState || !(allowSync && replicaState.MasterSyncInProgress)) {
return false
}
if slices.Contains(masterState.ConnectedReplicas, replicaFQDN) {
return true
}
return masterNode.MatchHost(replicaState.MasterHost)
}
81 changes: 56 additions & 25 deletions internal/app/state.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package app

import (
"fmt"
"strconv"
"strings"
"time"

"github.com/yandex/rdsync/internal/dcs"
)

func (app *App) setStateError(state *HostState, fqdn, error string) {
app.logger.Error("GetHostState error", "fqdn", fqdn, "error", error)
state.Error = error
}

func (app *App) getHostState(fqdn string) *HostState {
node := app.shard.Get(fqdn)
var state HostState
Expand All @@ -20,7 +27,7 @@ func (app *App) getHostState(fqdn string) *HostState {
}
info, err := node.GetInfo(app.ctx)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
if len(info) == 0 {
state.PingOk = false
state.PingStable = false
Expand All @@ -31,134 +38,158 @@ func (app *App) getHostState(fqdn string) *HostState {
var ok bool
state.RunID, ok = info["run_id"]
if !ok {
state.Error = "No run_id in info"
app.setStateError(&state, fqdn, "No run_id in info")
return &state
}
state.ReplicationID, ok = info["master_replid"]
if !ok {
state.Error = "No master_replid in info"
app.setStateError(&state, fqdn, "No master_replid in info")
return &state
}
state.ReplicationID2, ok = info["master_replid2"]
if !ok {
state.Error = "No master_replid2 in info"
app.setStateError(&state, fqdn, "No master_replid2 in info")
return &state
}
masterOffset, ok := info["master_repl_offset"]
if !ok {
state.Error = "No master_repl_offset in info"
app.setStateError(&state, fqdn, "No master_repl_offset in info")
return &state
}
state.MasterReplicationOffset, err = strconv.ParseInt(masterOffset, 10, 64)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
secondOffset, ok := info["second_repl_offset"]
if !ok {
state.Error = "No second_repl_offset in info"
app.setStateError(&state, fqdn, "No second_repl_offset in info")
return &state
}
state.SecondReplicationOffset, err = strconv.ParseInt(secondOffset, 10, 64)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
replBacklogFirstByte, ok := info["repl_backlog_first_byte_offset"]
if !ok {
state.Error = "No repl_backlog_first_byte_offset in info"
app.setStateError(&state, fqdn, "No repl_backlog_first_byte_offset in info")
return &state
}
state.ReplicationBacklogStart, err = strconv.ParseInt(replBacklogFirstByte, 10, 64)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
replBacklogHistlen, ok := info["repl_backlog_histlen"]
if !ok {
state.Error = "No repl_backlog_histlen in info"
app.setStateError(&state, fqdn, "No repl_backlog_histlen in info")
return &state
}
state.ReplicationBacklogSize, err = strconv.ParseInt(replBacklogHistlen, 10, 64)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
role, ok := info["role"]
if !ok {
state.Error = "No role in info"
app.setStateError(&state, fqdn, "No role in info")
return &state
}
if role == "master" {
state.IsMaster = true
numReplicasStr, ok := info["connected_slaves"]
if !ok {
app.setStateError(&state, fqdn, "Master has no connected_slaves in info")
return &state
}
numReplicas, err := strconv.ParseInt(numReplicasStr, 10, 64)
if err != nil {
app.setStateError(&state, fqdn, err.Error())
return &state
}
var i int64
for i < numReplicas {
replicaID := fmt.Sprintf("slave%d", i)
replicaValue, ok := info[replicaID]
if !ok {
app.setStateError(&state, fqdn, fmt.Sprintf("Master has no %s in info", replicaID))
return &state
}
// ip is first value in slaveN info
start := strings.Index(replicaValue, "=")
end := strings.Index(replicaValue, ",")
state.ConnectedReplicas = append(state.ConnectedReplicas, replicaValue[start+1:end])
i++
}
} else {
state.IsMaster = false
rs := ReplicaState{}
rs.MasterHost, ok = info["master_host"]
if !ok {
state.Error = "Replica but no master_host in info"
app.setStateError(&state, fqdn, "Replica but no master_host in info")
return &state
}
linkState, ok := info["master_link_status"]
if !ok {
state.Error = "Replica but no master_link_status in info"
app.setStateError(&state, fqdn, "Replica but no master_link_status in info")
return &state
}
rs.MasterLinkState = (linkState == "up")
syncInProgress, ok := info["master_sync_in_progress"]
if !ok {
state.Error = "Replica but no master_sync_in_progress in info"
app.setStateError(&state, fqdn, "Replica but no master_sync_in_progress in info")
return &state
}
rs.MasterSyncInProgress = (syncInProgress != "0")
if !rs.MasterLinkState && !rs.MasterSyncInProgress {
downSeconds, ok := info["master_link_down_since_seconds"]
if !ok {
state.Error = "Replica with link down but no master_link_down_since_seconds in info"
app.setStateError(&state, fqdn, "Replica with link down but no master_link_down_since_seconds in info")
return &state
}
rs.MasterLinkDownTime, err = strconv.ParseInt(downSeconds, 10, 64)
rs.MasterLinkDownTime *= 1000
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
}
replicaOffset, ok := info["slave_repl_offset"]
if !ok {
state.Error = "Replica but no slave_repl_offset in info"
app.setStateError(&state, fqdn, "Replica but no slave_repl_offset in info")
return &state
}
rs.ReplicationOffset, err = strconv.ParseInt(replicaOffset, 10, 64)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
state.ReplicaState = &rs
}
state.IsReadOnly, err = node.IsReadOnly(app.ctx)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
state.IsOffline, err = node.IsOffline(app.ctx)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
state.IsReplPaused, err = node.IsReplPaused(app.ctx)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
err = node.RefreshAddrs()
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
state.IP, err = node.GetIP()
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
return &state
Expand Down
1 change: 1 addition & 0 deletions internal/app/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type HostState struct {
ReplicationID string `json:"replication_id"`
ReplicationID2 string `json:"replication_id2"`
Error string `json:"error"`
ConnectedReplicas []string `json:"connected_replicas"`
ReplicaState *ReplicaState `json:"replica_state"`
SentiCacheState *SentiCacheState `json:"senticache_state"`
}
Expand Down

0 comments on commit ac10c09

Please sign in to comment.