Skip to content

Commit

Permalink
Merge #130628
Browse files Browse the repository at this point in the history
130628: raft: refortify followers if they are not fortified r=iskettaneh a=iskettaneh

This commit does the following:

1) The leader now skips sending fortification messages to followers whose stores don't provide support in the store-liveness-fabric.
2) The leader now  followers that needs fortification. It keeps checking on every heartbeat timeout. 

Fixes: #125349

Release note: None

Co-authored-by: Ibrahim Kettaneh <[email protected]>
  • Loading branch information
craig[bot] and iskettaneh committed Sep 27, 2024
2 parents fcf3009 + edc3277 commit 727fc55
Show file tree
Hide file tree
Showing 13 changed files with 681 additions and 78 deletions.
1 change: 1 addition & 0 deletions pkg/raft/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_test(
"//pkg/raft/rafttest",
"//pkg/raft/tracker",
"//pkg/settings/cluster",
"//pkg/testutils",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
50 changes: 40 additions & 10 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,13 +731,43 @@ func (r *raft) sendHeartbeat(to pb.PeerID) {
pr.MaybeUpdateSentCommit(commit)
}

// sendFortify sends a fortification RPC to the given peer.
func (r *raft) sendFortify(to pb.PeerID) {
// maybeSendFortify sends a fortification RPC to the given peer if it isn't
// fortified but the peer's store supports the leader's store in StoreLiveness.
func (r *raft) maybeSendFortify(id pb.PeerID) {
if !r.storeLiveness.SupportFromEnabled() {
// The underlying store liveness fabric hasn't been enabled to allow the
// leader to request support from peers. No-op.
return
}

isFortified, isSupported := r.fortificationTracker.IsFortifiedBy(id)

if isFortified {
return // return early if the follower's fortified
}

if !isSupported {
// If the follower isn't providing active store liveness support to the
// leader, or it is but the leader isn't hearing about it, we don't need to
// send a fortify message. We will attempt to fortify the follower once
// store liveness support is established.
if id == r.id {
// Log if the leader doesn't support itself in the liveness fabric. This
// is possible if the leader is affected by disk stalls.
r.logger.Infof(
"%x leader at term %d does not support itself in the liveness fabric", r.id, r.Term,
)
}
return
}

// Only send a fortify message if we don't know that the follower supports us
// at the current epoch.
r.sendFortify(id)
}

// sendFortify sends a fortification RPC to the given peer.
func (r *raft) sendFortify(to pb.PeerID) {
if to == r.id {
// We handle the case where the leader is trying to fortify itself specially.
// Doing so avoids a self-addressed message.
Expand All @@ -752,10 +782,6 @@ func (r *raft) sendFortify(to pb.PeerID) {
// discrimination for who is providing support (itself vs. other
// follower).
r.send(pb.Message{To: r.id, Type: pb.MsgFortifyLeaderResp, LeadEpoch: epoch})
} else {
r.logger.Infof(
"%x leader at term %d does not support itself in the liveness fabric", r.id, r.Term,
)
}
return
}
Expand Down Expand Up @@ -787,13 +813,13 @@ func (r *raft) bcastHeartbeat() {
})
}

// bcastFortify sends an RPC to fortify the leader to all peers (including the
// leader itself).
// bcastFortify attempts to send an RPC to fortify the leader to all the peers
// (including the leader itself) whose stores are currently providing store
// liveness support to the leader's store but who have not fortified the leader.
func (r *raft) bcastFortify() {
assertTrue(r.state == StateLeader, "only leaders can fortify")

r.trk.Visit(func(id pb.PeerID, _ *tracker.Progress) {
r.sendFortify(id)
r.maybeSendFortify(id)
})
}

Expand Down Expand Up @@ -1012,6 +1038,10 @@ func (r *raft) tickHeartbeat() {
if err := r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}); err != nil {
r.logger.Debugf("error occurred during checking sending heartbeat: %v", err)
}

// Try to refortify any followers that don't currently support us.
r.bcastFortify()
// TODO(ibrahim): add/call maybeUnpauseAndBcastAppend() here.
}
}

Expand Down
52 changes: 37 additions & 15 deletions pkg/raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"testing"

pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -105,23 +106,44 @@ func TestStartAsFollower(t *testing.T) {
func TestLeaderBcastBeat(t *testing.T) {
// heartbeat interval
hi := 1
r := newTestRaft(1, 10, hi, newTestMemoryStorage(withPeers(1, 2, 3)))
r.becomeCandidate()
r.becomeLeader()
for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Index: uint64(i) + 1})
}

for i := 0; i < hi; i++ {
r.tick()
}
testutils.RunTrueAndFalse(t, "store-liveness-enabled",
func(t *testing.T, storeLivenessEnabled bool) {
testOptions := emptyTestConfigModifierOpt()
if !storeLivenessEnabled {
testOptions = withFortificationDisabled()
}

msgs := r.readMessages()
sort.Sort(messageSlice(msgs))
assert.Equal(t, []pb.Message{
{From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat},
{From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat},
}, msgs)
r := newTestRaft(1, 10, hi,
newTestMemoryStorage(withPeers(1, 2, 3)), testOptions)

r.becomeCandidate()
r.becomeLeader()

for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Index: uint64(i) + 1})
}

for i := 0; i < hi; i++ {
r.tick()
}

msgs := r.readMessages()
sort.Sort(messageSlice(msgs))
if storeLivenessEnabled {
assert.Equal(t, []pb.Message{
{From: 1, To: 2, Term: 1, Type: pb.MsgFortifyLeader},
{From: 1, To: 3, Term: 1, Type: pb.MsgFortifyLeader},
{From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat},
{From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat},
}, msgs)
} else {
assert.Equal(t, []pb.Message{
{From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat},
{From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat},
}, msgs)
}
})
}

func TestFollowerStartElection(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4112,6 +4112,11 @@ type testConfigModifiers struct {
// that may be used to modify the config.
type testConfigModifierOpt func(*testConfigModifiers)

// emptyTestConfigModifierOpt returns an empty testConfigModifierOpt.
func emptyTestConfigModifierOpt() testConfigModifierOpt {
return func(modifier *testConfigModifiers) {}
}

// withRaftFortification disables raft fortification.
func withFortificationDisabled() testConfigModifierOpt {
return func(modifier *testConfigModifiers) {
Expand Down
11 changes: 10 additions & 1 deletion pkg/raft/rafttest/interaction_env_handler_add_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) er
cfg := cfg // fork the config stub
cfg.ID, cfg.Storage = id, s

env.Fabric.addNode()
cfg.StoreLiveness = newStoreLiveness(env.Fabric, id)

// If the node creating command hasn't specified the CRDBVersion, use the
Expand Down Expand Up @@ -173,5 +172,15 @@ func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) er
}
env.Nodes = append(env.Nodes, node)
}

// The potential store nodes is the max between the number of nodes in the env
// and the sum of voters and learners. Add the difference between the
// potential nodes and the current store nodes.
allPotential := max(len(env.Nodes),
len(snap.Metadata.ConfState.Voters)+len(snap.Metadata.ConfState.Learners))
curNodesCount := len(env.Fabric.state) - 1 // 1-indexed stores
for rem := allPotential - curNodesCount; rem > 0; rem-- {
env.Fabric.addNode()
}
return nil
}
19 changes: 16 additions & 3 deletions pkg/raft/testdata/async_storage_writes_append_aba_race.txt
Original file line number Diff line number Diff line change
Expand Up @@ -414,20 +414,32 @@ Messages:
4->5 MsgHeartbeat Term:3 Log:0/0
4->6 MsgHeartbeat Term:3 Log:0/0
4->7 MsgHeartbeat Term:3 Log:0/0
4->1 MsgFortifyLeader Term:3 Log:0/0
4->2 MsgFortifyLeader Term:3 Log:0/0
4->3 MsgFortifyLeader Term:3 Log:0/0
4->5 MsgFortifyLeader Term:3 Log:0/0
4->6 MsgFortifyLeader Term:3 Log:0/0
4->7 MsgFortifyLeader Term:3 Log:0/0
4->AppendThread MsgStorageAppend Term:0 Log:0/0 Responses:[
4->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1
]

deliver-msgs 1
----
4->1 MsgHeartbeat Term:3 Log:0/0
INFO 1 [term: 2] received a MsgHeartbeat message with higher term from 4 [term: 3]
INFO 1 became follower at term 3
4->1 MsgFortifyLeader Term:3 Log:0/0

process-ready 1
----
Ready MustSync=true:
HardState Term:3 Commit:11 Lead:4 LeadEpoch:0
HardState Term:3 Commit:11 Lead:4 LeadEpoch:1
Messages:
1->4 MsgHeartbeatResp Term:3 Log:0/0
1->AppendThread MsgStorageAppend Term:3 Log:0/0 Commit:11 Lead:4
1->AppendThread MsgStorageAppend Term:3 Log:0/0 Commit:11 Lead:4 LeadEpoch:1 Responses:[
1->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1
]

deliver-msgs 4
----
Expand Down Expand Up @@ -513,8 +525,9 @@ INFO mark (term,index)=(2,12) mismatched the last accepted term 3 in unstable lo
process-append-thread 1
----
Processing:
1->AppendThread MsgStorageAppend Term:3 Log:0/0 Commit:11 Lead:4
1->AppendThread MsgStorageAppend Term:3 Log:0/0 Commit:11 Lead:4 LeadEpoch:1
Responses:
1->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1

raft-log 1
----
Expand Down
48 changes: 45 additions & 3 deletions pkg/raft/testdata/checkquorum.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,31 +73,57 @@ INFO 1 became follower at term 1
stabilize
----
> 1 handling Ready
Ready MustSync=false:
Ready MustSync=true:
State:StateFollower
HardState Term:1 Vote:1 Commit:11 Lead:1 LeadEpoch:2
Messages:
1->2 MsgHeartbeat Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgHeartbeat Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgHeartbeat Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgHeartbeat Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgHeartbeat Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
> 2 receiving messages
1->2 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
INFO 2 [term: 2] ignored a MsgFortifyLeader message with lower term from 1 [term: 1]
1->2 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
INFO 2 [term: 2] ignored a MsgFortifyLeader message with lower term from 1 [term: 1]
1->2 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
INFO 2 [term: 2] ignored a MsgFortifyLeader message with lower term from 1 [term: 1]
1->2 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
INFO 2 [term: 2] ignored a MsgFortifyLeader message with lower term from 1 [term: 1]
1->2 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
INFO 2 [term: 2] ignored a MsgFortifyLeader message with lower term from 1 [term: 1]
> 3 receiving messages
1->3 MsgHeartbeat Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
> 2 handling Ready
Ready MustSync=false:
Messages:
Expand All @@ -107,13 +133,19 @@ stabilize
2->1 MsgAppResp Term:2 Log:0/0
2->1 MsgAppResp Term:2 Log:0/0
> 3 handling Ready
Ready MustSync=false:
Ready MustSync=true:
HardState Term:1 Vote:1 Commit:11 Lead:1 LeadEpoch:2
Messages:
3->1 MsgHeartbeatResp Term:1 Log:0/0
3->1 MsgHeartbeatResp Term:1 Log:0/0
3->1 MsgHeartbeatResp Term:1 Log:0/0
3->1 MsgHeartbeatResp Term:1 Log:0/0
3->1 MsgHeartbeatResp Term:1 Log:0/0
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2
> 1 receiving messages
2->1 MsgAppResp Term:2 Log:0/0
INFO 1 [term: 1] received a MsgAppResp message with higher term from 2 [term: 2]
Expand All @@ -132,6 +164,16 @@ stabilize
INFO 1 [term: 2] ignored a MsgHeartbeatResp message with lower term from 3 [term: 1]
3->1 MsgHeartbeatResp Term:1 Log:0/0
INFO 1 [term: 2] ignored a MsgHeartbeatResp message with lower term from 3 [term: 1]
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2
INFO 1 [term: 2] ignored a MsgFortifyLeaderResp message with lower term from 3 [term: 1]
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2
INFO 1 [term: 2] ignored a MsgFortifyLeaderResp message with lower term from 3 [term: 1]
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2
INFO 1 [term: 2] ignored a MsgFortifyLeaderResp message with lower term from 3 [term: 1]
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2
INFO 1 [term: 2] ignored a MsgFortifyLeaderResp message with lower term from 3 [term: 1]
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2
INFO 1 [term: 2] ignored a MsgFortifyLeaderResp message with lower term from 3 [term: 1]
> 1 handling Ready
Ready MustSync=true:
HardState Term:2 Commit:11 Lead:0 LeadEpoch:0
Expand Down Expand Up @@ -165,7 +207,7 @@ INFO 1 [logterm: 1, index: 11, vote: 0] cast MsgVote for 2 [logterm: 1, index: 1
deliver-msgs 3
----
2->3 MsgVote Term:3 Log:1/11
INFO 3 [logterm: 1, index: 11, vote: 1] ignored MsgVote from 2 [logterm: 1, index: 11] at term 1: recently received communication from leader (remaining ticks: 3)
INFO 3 [logterm: 1, index: 11, vote: 1] ignored MsgVote from 2 [logterm: 1, index: 11] at term 1: recently received communication from leader (remaining ticks: 3) and supporting fortified leader 1 at epoch 2

stabilize
----
Expand Down
Loading

0 comments on commit 727fc55

Please sign in to comment.