Skip to content

Commit

Permalink
raft: resume probe at heartbeat intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
iskettaneh committed Sep 11, 2024
1 parent 50ab1fe commit cb77779
Show file tree
Hide file tree
Showing 21 changed files with 874 additions and 304 deletions.
1 change: 1 addition & 0 deletions pkg/raft/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_test(
"//pkg/raft/rafttest",
"//pkg/raft/tracker",
"//pkg/settings/cluster",
"//pkg/testutils",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
5 changes: 4 additions & 1 deletion pkg/raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,10 @@ func TestAppendPagination(t *testing.T) {

// After the partition recovers, tick the clock to wake everything
// back up and send the messages.
n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
p := n.peers[raftpb.PeerID(1)].(*raft)
for ticks := p.heartbeatTimeout; ticks > 0; ticks-- {
n.tickRaftHeartbeat(p)
}
assert.True(t, seenFullMessage, "didn't see any messages more than half the max size; something is wrong with this test")
}

Expand Down
69 changes: 67 additions & 2 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,25 +625,33 @@ func (r *raft) maybeSendAppend(to pb.PeerID) bool {
pr := r.trk.Progress(to)

last, commit := r.raftLog.lastIndex(), r.raftLog.committed
//fmt.Printf("!!! IBRAHIM 0 !!! \n")

if !pr.ShouldSendMsgApp(last, commit, r.advanceCommitViaMsgAppOnly()) {
return false
}

//fmt.Printf("!!! IBRAHIM 1 !!! \n")
prevIndex := pr.Next - 1
prevTerm, err := r.raftLog.term(prevIndex)
if err != nil {
// The log probably got truncated at >= pr.Next, so we can't catch up the
// follower log anymore. Send a snapshot instead.
//fmt.Printf("!!! IBRAHIM 2 !!! \n")

return r.maybeSendSnapshot(to, pr)
}

var entries []pb.Entry
if pr.CanSendEntries(last) {
if entries, err = r.raftLog.entries(pr.Next, r.maxMsgSize); err != nil {
// Send a snapshot if we failed to get the entries.
//fmt.Printf("!!! IBRAHIM 3 !!! \n")

return r.maybeSendSnapshot(to, pr)
}
}
//fmt.Printf("!!! IBRAHIM 4 !!! \n")

// Send the MsgApp, and update the progress accordingly.
r.send(pb.Message{
Expand Down Expand Up @@ -761,6 +769,36 @@ func (r *raft) bcastAppend() {
})
}

// maybeSendAppendOrFortify checks if the follower is supporting the leader.
// If it is, it first unpauses all followers that are currently paused due to
// MsgAppProbesPaused. Then it sends RPC, with entries to all peers that are
// not up-to-date according to the progress recorded in r.trk.
// If the follower doesn't currently support the leader, it sends a fortify
// message.
func (r *raft) maybeSendAppendOrFortify() {
r.trk.Visit(func(id pb.PeerID, pr *tracker.Progress) {
if id == r.id {
// NB: the leader doesn't send MsgAppResp to itself here. This means that
// the leader will not have a chance to update its own
// MatchCommit/SentCommit. That is fine because the leader doesn't use
// MatchCommit/SentCommit for itself. It only uses the followers' values.
return
}

if r.supportTracker.IsLeadSupportedByFollower(id) {
//fmt.Printf("!!! IBRAHIM !!! follower: %+v is supporting the current leader\n", id)
pr.MsgAppProbesPaused = false
r.maybeSendAppend(id)
//if sent {
//fmt.Printf("!!! IBRAHIM !!! sent MsgApp to follower: %+v\n", id)
//}
} else {
//fmt.Printf("!!! IBRAHIM !!! follower: %+v is NOT supporting the current leader\n", id)
r.sendFortify(id)
}
})
}

// bcastHeartbeat sends RPC, without entries to all the peers.
func (r *raft) bcastHeartbeat() {
r.trk.Visit(func(id pb.PeerID, _ *tracker.Progress) {
Expand Down Expand Up @@ -996,6 +1034,18 @@ func (r *raft) tickHeartbeat() {
if err := r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}); err != nil {
r.logger.Debugf("error occurred during checking sending heartbeat: %v", err)
}

// If storeliveness is NOT enabled, heartbeat responses unpause followers
// and trigger the leader to send MsgApp if the follower is not up-to-date.
// If storeliveness is enabled, we don't depend on heartbeat responses for
// that, and we do it here on heartbeat timeout if the follower is
// supporting the leader. If the follower doesn't support the leader, the
// leader tries to fortify the follower by sending a MsgFortifyLeader to
// that follower.
//fmt.Printf("!!! IBRAHIM !!! maybeSendAppendOrFortify()\n")
if r.storeLiveness.SupportFromEnabled() {
r.maybeSendAppendOrFortify()
}
}
}

Expand Down Expand Up @@ -1432,6 +1482,15 @@ func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgBeat:
r.bcastHeartbeat()

//// If storeLiveness is enabled, unpause followers on heartbeat intervals
//// and broadcast append entries if they need it.
//// If storeliveness is not enabled, this will happen when the leader
//// receives a heartbeat response from a follower.
//if r.storeLiveness.SupportFromEnabled() {
// r.maybeSendAppendOrFortify()
//}

return nil
case pb.MsgCheckQuorum:
if !r.trk.QuorumActive() {
Expand Down Expand Up @@ -1721,8 +1780,13 @@ func stepLeader(r *raft, m pb.Message) error {
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.MsgAppProbesPaused = false
r.maybeSendAppend(m.From)
//fmt.Printf("!!! IBRAHIM !!! received heartbeat response from node: %+v \n", m.From)
// If storeLiveness is enabled, we don't necessiraly heartbeat followers,
// therefore, we unpause & send MsgApp to followers on every MsgBeat.
if !r.storeLiveness.SupportFromEnabled() {
pr.MsgAppProbesPaused = false
r.maybeSendAppend(m.From)
}

case pb.MsgSnapStatus:
if pr.State != tracker.StateSnapshot {
Expand Down Expand Up @@ -2081,6 +2145,7 @@ func (r *raft) handleSnapshot(m pb.Message) {
}

func (r *raft) handleFortify(m pb.Message) {
//fmt.Printf("!!! IBRAHIM !!! handleFortify \n")
assertTrue(r.state == StateFollower, "leaders should locally fortify without sending a message")
assertTrue(r.lead == m.From, "only the leader should send fortification requests")

Expand Down
116 changes: 75 additions & 41 deletions pkg/raft/raft_flow_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
)

// TestMsgAppFlowControlFull ensures:
Expand Down Expand Up @@ -106,50 +107,83 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
}
}

// TestMsgAppFlowControlRecvHeartbeat ensures a heartbeat response
// frees one slot if the window is full.
func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
r.becomeCandidate()
r.becomeLeader()

pr2 := r.trk.Progress(2)
// force the progress to be in replicate state
pr2.BecomeReplicate()
// fill in the inflights window
for i := 0; i < r.maxInflight; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
r.readMessages()
}

for tt := 1; tt < 5; tt++ {
// recv tt msgHeartbeatResp and expect one free slot
for i := 0; i < tt; i++ {
if !pr2.IsPaused() {
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
// TestMsgAppFlowControl ensures that if store-liveness is disabled, a heartbeat
// response frees one slot if the window is full. If store-livess is enabled,
// a similar thing would happen but on the next heartbeat interval (not on the
// heartbeat response).
func TestMsgAppFlowControl(t *testing.T) {
var r *raft
testutils.RunTrueAndFalse(t, "store-liveness",
func(t *testing.T, storeLivenessEnabled bool) {
if storeLivenessEnabled {
r = newTestRaft(1, 5, 1,
newTestMemoryStorage(withPeers(1, 2)))
} else {
r = newTestRaft(1, 5, 1,
newTestMemoryStorage(withPeers(1, 2)), withFortificationDisabled())
}
// Unpauses the progress, sends an empty MsgApp, and pauses it again.
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
ms := r.readMessages()
if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 0 {
t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms))
}
}

// No more appends are sent if there are no heartbeats.
for i := 0; i < 10; i++ {
if !pr2.IsPaused() {
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
r.becomeCandidate()
r.becomeLeader()
if storeLivenessEnabled {
r.supportTracker.RecordSupport(pb.PeerID(2), 1)
r.supportTracker.RecordSupport(pb.PeerID(3), 1)
}
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
ms := r.readMessages()
if len(ms) != 0 {
t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms))

pr2 := r.trk.Progress(2)
// force the progress to be in replicate state
pr2.BecomeReplicate()
// fill in the inflights window
for i := 0; i < r.maxInflight; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp,
Entries: []pb.Entry{{Data: []byte("somedata")}}})
r.readMessages()
}
}

// clear all pending messages.
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
r.readMessages()
}
for tt := 1; tt < 5; tt++ {
// recv tt msgHeartbeatResp and expect one free slot
for i := 0; i < tt; i++ {
if !pr2.IsPaused() {
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
}

// Unpauses the progress, sends an empty MsgApp, and pauses it again.
// When store-liveness is enabled, we do this on the next heartbeat
// interval. However, when store-liveness is disabled, we do this on
// the next heartbeat response.
if storeLivenessEnabled {
for ticks := r.heartbeatTimeout; ticks > 0; ticks-- {
r.tickHeartbeat()
}
ms := r.readMessages()
if len(ms) != 2 || ms[1].Type != pb.MsgApp || len(ms[1].Entries) != 0 {
t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms))
}
} else {
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
ms := r.readMessages()
if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 0 {
t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms))
}
}
}

// No more appends are sent if there are no heartbeats.
for i := 0; i < 10; i++ {
if !pr2.IsPaused() {
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
}
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp,
Entries: []pb.Entry{{Data: []byte("somedata")}}})
ms := r.readMessages()
if len(ms) != 0 {
t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms))
}
}

// clear all pending messages.
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
r.readMessages()
}
})
}
59 changes: 44 additions & 15 deletions pkg/raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"testing"

pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -101,27 +102,55 @@ func TestStartAsFollower(t *testing.T) {
// TestLeaderBcastBeat tests that if the leader receives a heartbeat tick,
// it will send a MsgHeartbeat with m.Index = 0, m.LogTerm=0 and empty entries
// as heartbeat to all followers.
// If store-liveness is enabled, the leader will send MsgApp messages as
// we eventually rip-out the MsgHeartbeat all together.
// Reference: section 5.2
func TestLeaderBcastBeat(t *testing.T) {
// heartbeat interval
hi := 1
r := newTestRaft(1, 10, hi, newTestMemoryStorage(withPeers(1, 2, 3)))
r.becomeCandidate()
r.becomeLeader()
for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Index: uint64(i) + 1})
}

for i := 0; i < hi; i++ {
r.tick()
}
testutils.RunTrueAndFalse(t, "store-liveness", func(t *testing.T, storeLivenessEnabled bool) {
var r *raft
if storeLivenessEnabled {
r = newTestRaft(1, 10, hi,
newTestMemoryStorage(withPeers(1, 2, 3)))
} else {
r = newTestRaft(1, 10, hi,
newTestMemoryStorage(withPeers(1, 2, 3)), withFortificationDisabled())
}

msgs := r.readMessages()
sort.Sort(messageSlice(msgs))
assert.Equal(t, []pb.Message{
{From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat},
{From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat},
}, msgs)
r.becomeCandidate()
r.becomeLeader()
if storeLivenessEnabled {
r.supportTracker.RecordSupport(pb.PeerID(2), 1)
r.supportTracker.RecordSupport(pb.PeerID(3), 1)
}

for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Index: uint64(i) + 1})
}

for i := 0; i < hi; i++ {
r.tick()
}

msgs := r.readMessages()
sort.Sort(messageSlice(msgs))

if storeLivenessEnabled {
assert.Equal(t, []pb.Message{
{From: 1, To: 2, Term: 1, Entries: r.raftLog.allEntries(), Type: pb.MsgApp},
{From: 1, To: 3, Term: 1, Entries: r.raftLog.allEntries(), Type: pb.MsgApp},
{From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat},
{From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat},
}, msgs)
} else {
assert.Equal(t, []pb.Message{
{From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat},
{From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat},
}, msgs)
}
})
}

func TestFollowerStartElection(t *testing.T) {
Expand Down
Loading

0 comments on commit cb77779

Please sign in to comment.