Skip to content

Commit

Permalink
raft: remove MsgApp flow control management dependence on heartbeats
Browse files Browse the repository at this point in the history
This commit does the following:

Instead of unpausing the follower and maybe sending a MsgApp when
receiving a heartbeat response, it does the same thing but on heartbeat
timeout if the follower is supporting the leader.

Fixes: #130493

Release note: None
  • Loading branch information
iskettaneh committed Sep 27, 2024
1 parent 7641988 commit 3ee9e30
Show file tree
Hide file tree
Showing 14 changed files with 723 additions and 277 deletions.
5 changes: 4 additions & 1 deletion pkg/raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,10 @@ func TestAppendPagination(t *testing.T) {

// After the partition recovers, tick the clock to wake everything
// back up and send the messages.
n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
p := n.peers[raftpb.PeerID(1)].(*raft)
for ticks := p.heartbeatTimeout; ticks > 0; ticks-- {
n.tickRaftHeartbeat(p)
}
assert.True(t, seenFullMessage, "didn't see any messages more than half the max size; something is wrong with this test")
}

Expand Down
35 changes: 34 additions & 1 deletion pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,39 @@ func (r *raft) bcastFortify() {
})
}

// maybeUnpauseAndBcastAppend unpauses and attempts to send an MsgApp to all the
// followers that provide store liveness support. If there is no store liveness
// support, we skip unpausing and sending MsgApp because the message is likely
// to be dropped.
func (r *raft) maybeUnpauseAndBcastAppend() {
if !r.storeLiveness.SupportFromEnabled() {
// The underlying store liveness fabric hasn't been enabled.
return
}

r.trk.Visit(func(id pb.PeerID, pr *tracker.Progress) {
_, isSupported := r.fortificationTracker.IsFortifiedBy(id)

if !isSupported {
// If the follower's store isn't providing active store liveness support
// to the leader's store, or it is but the leader isn't hearing about it,
// we don't need to send a MsgApp.
return
}

if r.id == id {
// NB: the leader doesn't send MsgAppResp to itself here. This means that
// the leader will not have a chance to update its own
// MatchCommit/SentCommit. That is fine because the leader doesn't use
// MatchCommit/SentCommit for itself. It only uses the followers' values.
return
}

pr.MsgAppProbesPaused = false
r.maybeSendAppend(id)
})
}

func (r *raft) appliedTo(index uint64, size entryEncodingSize) {
oldApplied := r.raftLog.applied
newApplied := max(index, oldApplied)
Expand Down Expand Up @@ -1041,7 +1074,7 @@ func (r *raft) tickHeartbeat() {

// Try to refortify any followers that don't currently support us.
r.bcastFortify()
// TODO(ibrahim): add/call maybeUnpauseAndBcastAppend() here.
r.maybeUnpauseAndBcastAppend()
}
}

Expand Down
114 changes: 73 additions & 41 deletions pkg/raft/raft_flow_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
)

// TestMsgAppFlowControlFull ensures:
Expand Down Expand Up @@ -106,50 +107,81 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
}
}

// TestMsgAppFlowControlRecvHeartbeat ensures a heartbeat response
// frees one slot if the window is full.
func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
r.becomeCandidate()
r.becomeLeader()
// TestMsgAppFlowControl ensures that if storeliveness is disabled, a heartbeat
// response frees one slot if the window is full. If storelivess is enabled,
// a similar thing happens but on the next heartbeat timeout.
func TestMsgAppFlowControl(t *testing.T) {
testutils.RunTrueAndFalse(t, "store-liveness-enabled",
func(t *testing.T, storeLivenessEnabled bool) {

pr2 := r.trk.Progress(2)
// force the progress to be in replicate state
pr2.BecomeReplicate()
// fill in the inflights window
for i := 0; i < r.maxInflight; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
r.readMessages()
}

for tt := 1; tt < 5; tt++ {
// recv tt msgHeartbeatResp and expect one free slot
for i := 0; i < tt; i++ {
if !pr2.IsPaused() {
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
testOptions := emptyTestConfigModifierOpt()
if !storeLivenessEnabled {
testOptions = withFortificationDisabled()
}
// Unpauses the progress, sends an empty MsgApp, and pauses it again.
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
ms := r.readMessages()
if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 0 {
t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms))
}
}

// No more appends are sent if there are no heartbeats.
for i := 0; i < 10; i++ {
if !pr2.IsPaused() {
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
}
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
ms := r.readMessages()
if len(ms) != 0 {
t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms))
r := newTestRaft(1, 5, 1,
newTestMemoryStorage(withPeers(1, 2)), testOptions)

r.becomeCandidate()
r.becomeLeader()

pr2 := r.trk.Progress(2)
// force the progress to be in replicate state
pr2.BecomeReplicate()
// fill in the inflights window
for i := 0; i < r.maxInflight; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp,
Entries: []pb.Entry{{Data: []byte("somedata")}}})
r.readMessages()
}
}

// clear all pending messages.
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
r.readMessages()
}
for tt := 1; tt < 5; tt++ {
for i := 0; i < tt; i++ {
if !pr2.IsPaused() {
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
}

// Unpauses the progress, sends an empty MsgApp, and pauses it again.
// When storeliveness is enabled, we do this on the next heartbeat
// timeout. However, when storeliveness is disabled, we do this on
// the next heartbeat response.
if storeLivenessEnabled {
for ticks := r.heartbeatTimeout; ticks > 0; ticks-- {
r.tickHeartbeat()
}
ms := r.readMessages()
if len(ms) != 3 || ms[0].Type != pb.MsgHeartbeat || ms[1].Type != pb.MsgFortifyLeader ||
ms[2].Type != pb.MsgApp || len(ms[2].Entries) != 0 {
t.Fatalf("#%d.%d: len(ms) == %d, want 3 messages including one empty MsgApp",
tt, i, len(ms))
}
} else {
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
ms := r.readMessages()
if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 0 {
t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms))
}
}
}

// No more appends are sent if there are no heartbeats.
for i := 0; i < 10; i++ {
if !pr2.IsPaused() {
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
}
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp,
Entries: []pb.Entry{{Data: []byte("somedata")}}})
ms := r.readMessages()
if len(ms) != 0 {
t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms))
}
}

// clear all pending messages.
for ticks := r.heartbeatTimeout; ticks > 0; ticks-- {
r.tickHeartbeat()
}
r.readMessages()
}
})
}
2 changes: 2 additions & 0 deletions pkg/raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ func TestLeaderBcastBeat(t *testing.T) {
sort.Sort(messageSlice(msgs))
if storeLivenessEnabled {
assert.Equal(t, []pb.Message{
{From: 1, To: 2, Term: 1, Type: pb.MsgApp, Entries: r.raftLog.allEntries()},
{From: 1, To: 3, Term: 1, Type: pb.MsgApp, Entries: r.raftLog.allEntries()},
{From: 1, To: 2, Term: 1, Type: pb.MsgFortifyLeader},
{From: 1, To: 3, Term: 1, Type: pb.MsgFortifyLeader},
{From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat},
Expand Down
Loading

0 comments on commit 3ee9e30

Please sign in to comment.