Skip to content

Commit

Permalink
Use advertised replica names for replication check
Browse files Browse the repository at this point in the history
  • Loading branch information
secwall committed Dec 8, 2023
1 parent b3c1d91 commit fb43746
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 14 deletions.
2 changes: 1 addition & 1 deletion internal/app/active_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (app *App) calcActiveNodes(state, stateDcs map[string]*HostState, oldActive
app.logger.Warn(fmt.Sprintf("Calc active nodes: lost master %s", host))
continue
}
if (masterState.PingOk && masterState.PingStable) && !(replicaState.MasterLinkState && masterNode.MatchHost(replicaState.MasterHost)) {
if (masterState.PingOk && masterState.PingStable) && !replicates(&masterState, replicaState, host, masterNode, false) {
app.logger.Error(fmt.Sprintf("Calc active nodes: %s is not replicating from alive master, deleting from active...", host))
continue
}
Expand Down
5 changes: 2 additions & 3 deletions internal/app/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ func (app *App) checkHAReplicasRunning() bool {
app.logger.Warn("Host is ahead in replication history", "fqdn", host)
aheadHosts++
}
if hostState.PingOk && !hostState.IsMaster && hostState.ReplicaState != nil {
rs := hostState.ReplicaState
if rs.MasterLinkState && local.MatchHost(rs.MasterHost) {
if hostState.PingOk && !hostState.IsMaster {
if replicates(localState, hostState.ReplicaState, host, local, false) {
availableReplicas++
}
}
Expand Down
11 changes: 6 additions & 5 deletions internal/app/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,35 +83,36 @@ func (app *App) changeMaster(host, master string) error {
}

node := app.shard.Get(host)
masterState := app.getHostState(master)
masterNode := app.shard.Get(master)
state := app.getHostState(host)

if !state.PingOk {
return fmt.Errorf("changeMaster: replica %s is dead - unable to init repair", host)
}

app.repairReplica(node, state, master)
app.repairReplica(node, masterState, state, master, host)

deadline := time.Now().Add(app.config.Redis.WaitReplicationTimeout)
for time.Now().Before(deadline) {
state = app.getHostState(host)
rs := state.ReplicaState
if rs != nil && rs.MasterLinkState && masterNode.MatchHost(rs.MasterHost) {
if rs != nil && replicates(masterState, rs, host, masterNode, false) {
break
}
if !state.PingOk {
return fmt.Errorf("changeMaster: replica %s died while waiting to start replication from %s", host, master)
}
masterState := app.getHostState(master)
masterState = app.getHostState(master)
if !masterState.PingOk {
return fmt.Errorf("changeMaster: %s died while waiting to start replication to %s", master, host)
}
app.logger.Info(fmt.Sprintf("ChangeMaster: waiting for %s to start replication from %s", host, master))
app.repairReplica(node, state, master)
app.repairReplica(node, masterState, state, master, host)
time.Sleep(time.Second)
}
rs := state.ReplicaState
if rs != nil && rs.MasterLinkState && masterNode.MatchHost(rs.MasterHost) {
if rs != nil && replicates(masterState, rs, host, masterNode, false) {
app.logger.Info(fmt.Sprintf("ChangeMaster: %s started replication from %s", host, master))
} else {
return fmt.Errorf("%s was unable to start replication from %s", host, master)
Expand Down
11 changes: 6 additions & 5 deletions internal/app/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
func (app *App) repairShard(shardState map[string]*HostState, activeNodes []string, master string) {
replicas := make([]string, 0)
syncing := 0
masterState := shardState[master]
masterNode := app.shard.Get(master)
for host, state := range shardState {
if !state.PingOk {
Expand All @@ -20,7 +21,7 @@ func (app *App) repairShard(shardState map[string]*HostState, activeNodes []stri
app.repairMaster(masterNode, activeNodes, state)
} else {
rs := state.ReplicaState
if rs != nil && rs.MasterSyncInProgress && masterNode.MatchHost(rs.MasterHost) {
if rs != nil && rs.MasterSyncInProgress && replicates(masterState, rs, host, masterNode, true) {
syncing++
}
replicas = append(replicas, host)
Expand All @@ -39,9 +40,9 @@ func (app *App) repairShard(shardState map[string]*HostState, activeNodes []stri
}
}
rs := state.ReplicaState
if rs == nil || state.IsReplPaused || !(rs.MasterLinkState || rs.MasterSyncInProgress) || !masterNode.MatchHost(rs.MasterHost) {
if rs == nil || state.IsReplPaused || !replicates(masterState, rs, host, masterNode, true) {
if syncing < app.config.Redis.MaxParallelSyncs {
app.repairReplica(node, state, master)
app.repairReplica(node, masterState, state, master, host)
syncing++
} else {
app.logger.Error(fmt.Sprintf("Leaving replica %s broken: currently syncing %d/%d", host, syncing, app.config.Redis.MaxParallelSyncs))
Expand Down Expand Up @@ -75,10 +76,10 @@ func (app *App) repairMaster(node *redis.Node, activeNodes []string, state *Host
}
}

func (app *App) repairReplica(node *redis.Node, state *HostState, master string) {
func (app *App) repairReplica(node *redis.Node, masterState, state *HostState, master, replicaFQDN string) {
masterNode := app.shard.Get(master)
rs := state.ReplicaState
if rs == nil || !masterNode.MatchHost(rs.MasterHost) {
if !replicates(masterState, rs, replicaFQDN, masterNode, true) {
switch app.mode {
case modeSentinel:
err := node.SentinelMakeReplica(app.ctx, master)
Expand Down
17 changes: 17 additions & 0 deletions internal/app/replication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package app

import (
"slices"

"github.com/yandex/rdsync/internal/redis"
)

func replicates(masterState *HostState, replicaState *ReplicaState, replicaFQDN string, masterNode *redis.Node, allowSync bool) bool {
if replicaState == nil || !(replicaState.MasterLinkState || (allowSync && replicaState.MasterSyncInProgress)) {
return false
}
if slices.Contains(masterState.ConnectedReplicas, replicaFQDN) {
return true
}
return masterNode.MatchHost(replicaState.MasterHost)
}
26 changes: 26 additions & 0 deletions internal/app/state.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package app

import (
"fmt"
"strconv"
"strings"
"time"

"github.com/yandex/rdsync/internal/dcs"
Expand Down Expand Up @@ -91,6 +93,30 @@ func (app *App) getHostState(fqdn string) *HostState {
}
if role == "master" {
state.IsMaster = true
numReplicasStr, ok := info["connected_slaves"]
if !ok {
state.Error = "Master has no connected_slaves in info"
return &state
}
numReplicas, err := strconv.ParseInt(numReplicasStr, 10, 64)
if err != nil {
state.Error = err.Error()
return &state
}
var i int64
for i < numReplicas {
replicaID := fmt.Sprintf("slave%d", i)
replicaValue, ok := info[replicaID]
if !ok {
state.Error = fmt.Sprintf("Master has no %s in info", replicaID)
return &state
}
// ip is first value in slaveN info
start := strings.Index(replicaValue, "=")
end := strings.Index(replicaValue, ",")
state.ConnectedReplicas = append(state.ConnectedReplicas, replicaValue[start+1:end])
i++
}
} else {
state.IsMaster = false
rs := ReplicaState{}
Expand Down
1 change: 1 addition & 0 deletions internal/app/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type HostState struct {
ReplicationID string `json:"replication_id"`
ReplicationID2 string `json:"replication_id2"`
Error string `json:"error"`
ConnectedReplicas []string `json:"connected_replicas"`
ReplicaState *ReplicaState `json:"replica_state"`
SentiCacheState *SentiCacheState `json:"senticache_state"`
}
Expand Down

0 comments on commit fb43746

Please sign in to comment.