Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: remove MsgApp flow control management dependence on heartbeats #130532

Merged
merged 3 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,10 @@ func TestAppendPagination(t *testing.T) {

// After the partition recovers, tick the clock to wake everything
// back up and send the messages.
n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
p := n.peers[raftpb.PeerID(1)].(*raft)
for ticks := p.heartbeatTimeout; ticks > 0; ticks-- {
n.tick(p)
}
assert.True(t, seenFullMessage, "didn't see any messages more than half the max size; something is wrong with this test")
}

Expand Down
43 changes: 38 additions & 5 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,10 +850,11 @@ func (r *raft) sendFortify(to pb.PeerID) {
func (r *raft) bcastAppend() {
r.trk.Visit(func(id pb.PeerID, _ *tracker.Progress) {
if id == r.id {
// NB: the leader doesn't send MsgAppResp to itself here. This means that
// the leader will not have a chance to update its own
// MatchCommit/SentCommit. That is fine because the leader doesn't use
// MatchCommit/SentCommit for itself. It only uses the followers' values.
// NB: the leader doesn't send MsgApp to itself here nor does it receive
// a self directed MsgAppResp. This means that the leader will not have a
// chance to update its own MatchCommit/SentCommit. That is fine because
// the leader doesn't use MatchCommit/SentCommit for itself. It only uses
// the peers' values.
return
}
r.maybeSendAppend(id)
Expand All @@ -880,6 +881,38 @@ func (r *raft) bcastFortify() {
})
}

// maybeUnpauseAndBcastAppend unpauses and attempts to send an MsgApp to all the
// followers that provide store liveness support. If there is no store liveness
// support, we skip unpausing and sending MsgApp because the message is likely
// to be dropped.
func (r *raft) maybeUnpauseAndBcastAppend() {
if !r.fortificationTracker.FortificationEnabled() {
// The underlying store liveness fabric hasn't been enabled.
return
}

r.trk.Visit(func(id pb.PeerID, pr *tracker.Progress) {
if r.id == id {
pav-kv marked this conversation as resolved.
Show resolved Hide resolved
// NB: the leader doesn't send MsgApp to itself here nor does it receive
// a self directed MsgAppResp. This means that the leader will not have a
// chance to update its own MatchCommit/SentCommit. That is fine because
// the leader doesn't use MatchCommit/SentCommit for itself. It only uses
// the peers' values.
return
}

if _, supported := r.fortificationTracker.IsFortifiedBy(id); !supported {
// If the follower's store isn't providing active store liveness support
// to the leader's store, or it is but the leader isn't hearing about it,
// we don't send a MsgApp.
return
}

pr.MsgAppProbesPaused = false
r.maybeSendAppend(id)
})
}

func (r *raft) appliedTo(index uint64, size entryEncodingSize) {
oldApplied := r.raftLog.applied
newApplied := max(index, oldApplied)
Expand Down Expand Up @@ -1108,7 +1141,7 @@ func (r *raft) tickHeartbeat() {

// Try to refortify any followers that don't currently support us.
r.bcastFortify()
// TODO(ibrahim): add/call maybeUnpauseAndBcastAppend() here.
r.maybeUnpauseAndBcastAppend()
}
}

Expand Down
132 changes: 81 additions & 51 deletions pkg/raft/raft_flow_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"

pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/stretchr/testify/require"
)

// TestMsgAppFlowControlFull ensures:
Expand All @@ -39,9 +41,8 @@ func TestMsgAppFlowControlFull(t *testing.T) {
for i := 0; i < r.maxInflight; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
ms := r.readMessages()
if len(ms) != 1 || ms[0].Type != pb.MsgApp {
t.Fatalf("#%d: len(ms) = %d, want 1 MsgApp", i, len(ms))
}
require.Len(t, ms, 1)
require.Equal(t, ms[0].Type, pb.MsgApp)
}

// ensure 1
Expand All @@ -53,9 +54,7 @@ func TestMsgAppFlowControlFull(t *testing.T) {
for i := 0; i < 10; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
ms := r.readMessages()
if len(ms) != 0 {
t.Fatalf("#%d: len(ms) = %d, want 0", i, len(ms))
}
require.Empty(t, ms)
}
}

Expand Down Expand Up @@ -87,9 +86,8 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
// fill in the inflights window again
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
ms := r.readMessages()
if len(ms) != 1 || ms[0].Type != pb.MsgApp {
t.Fatalf("#%d: len(ms) = %d, want 1 MsgApp", tt, len(ms))
}
require.Len(t, ms, 1)
require.Equal(t, ms[0].Type, pb.MsgApp)

// ensure 1
if !pr2.IsPaused() {
Expand All @@ -106,50 +104,82 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
}
}

// TestMsgAppFlowControlRecvHeartbeat ensures a heartbeat response
// frees one slot if the window is full.
func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
r.becomeCandidate()
r.becomeLeader()

pr2 := r.trk.Progress(2)
// force the progress to be in replicate state
pr2.BecomeReplicate()
// fill in the inflights window
for i := 0; i < r.maxInflight; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
r.readMessages()
}

for tt := 1; tt < 5; tt++ {
// recv tt msgHeartbeatResp and expect one free slot
for i := 0; i < tt; i++ {
if !pr2.IsPaused() {
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
}
// Unpauses the progress, sends an empty MsgApp, and pauses it again.
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
ms := r.readMessages()
if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 0 {
t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms))
// TestMsgAppFlowControl ensures that if storeliveness is disabled, a heartbeat
// response frees one slot if the window is full. If storelivess is enabled,
// a similar thing happens but on the next heartbeat timeout.
func TestMsgAppFlowControl(t *testing.T) {
testutils.RunTrueAndFalse(t, "store-liveness-enabled",
func(t *testing.T, storeLivenessEnabled bool) {
testOptions := emptyTestConfigModifierOpt()
if !storeLivenessEnabled {
testOptions = withFortificationDisabled()
}
}

// No more appends are sent if there are no heartbeats.
for i := 0; i < 10; i++ {
if !pr2.IsPaused() {
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
r := newTestRaft(1, 5, 1,
newTestMemoryStorage(withPeers(1, 2)), testOptions)
r.becomeCandidate()
r.becomeLeader()

pr2 := r.trk.Progress(2)
// force the progress to be in replicate state
pr2.BecomeReplicate()
// fill in the inflights window
for i := 0; i < r.maxInflight; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp,
Entries: []pb.Entry{{Data: []byte("somedata")}}})
r.readMessages()
}
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
ms := r.readMessages()
if len(ms) != 0 {
t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms))
}
}

// clear all pending messages.
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
r.readMessages()
}
for tt := 1; tt < 5; tt++ {
for i := 0; i < tt; i++ {
if !pr2.IsPaused() {
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
}

// Unpauses the progress, sends an empty MsgApp, and pauses it again.
// When storeliveness is enabled, we do this on the next heartbeat
// timeout. However, when storeliveness is disabled, we do this on
// the next heartbeat response.
if storeLivenessEnabled {
for ticks := r.heartbeatTimeout; ticks > 0; ticks-- {
r.tick()
}
ms := r.readMessages()
if len(ms) != 3 || ms[0].Type != pb.MsgHeartbeat || ms[1].Type != pb.MsgFortifyLeader ||
ms[2].Type != pb.MsgApp || len(ms[2].Entries) != 0 {
t.Fatalf("#%d.%d: len(ms) == %d, want 3 messages including one empty MsgApp",
tt, i, len(ms))
}
require.Len(t, ms, 3)
require.Equal(t, ms[0].Type, pb.MsgHeartbeat)
require.Equal(t, ms[1].Type, pb.MsgFortifyLeader)
require.Equal(t, ms[2].Type, pb.MsgApp)
require.Empty(t, ms[0].Entries)
} else {
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
ms := r.readMessages()
require.Len(t, ms, 1)
require.Equal(t, ms[0].Type, pb.MsgApp)
require.Empty(t, ms[0].Entries)
}
}

// No more appends are sent if there are no heartbeats.
for i := 0; i < 10; i++ {
if !pr2.IsPaused() {
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
}
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp,
Entries: []pb.Entry{{Data: []byte("somedata")}}})
ms := r.readMessages()
require.Empty(t, ms)
}

// clear all pending messages.
for ticks := r.heartbeatTimeout; ticks > 0; ticks-- {
r.tick()
}
r.readMessages()
}
})
}
7 changes: 6 additions & 1 deletion pkg/raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,12 @@ func TestStartAsFollower(t *testing.T) {
// TestLeaderBcastBeat tests that if the leader receives a heartbeat tick,
// it will send a MsgHeartbeat with m.Index = 0, m.LogTerm=0 and empty entries
// as heartbeat to all followers.
// Note that if store liveness is enabled, the leader will also send a MsgApp
// on every heartbeat interval.
// Reference: section 5.2
func TestLeaderBcastBeat(t *testing.T) {
// heartbeat interval
hi := 1
hi := 3

testutils.RunTrueAndFalse(t, "store-liveness-enabled",
func(t *testing.T, storeLivenessEnabled bool) {
Expand All @@ -126,13 +128,16 @@ func TestLeaderBcastBeat(t *testing.T) {
}

for i := 0; i < hi; i++ {
require.Empty(t, r.readMessages())
r.tick()
}

msgs := r.readMessages()
slices.SortFunc(msgs, cmpMessages)
if storeLivenessEnabled {
assert.Equal(t, []pb.Message{
{From: 1, To: 2, Term: 1, Type: pb.MsgApp, Entries: r.raftLog.allEntries()},
pav-kv marked this conversation as resolved.
Show resolved Hide resolved
{From: 1, To: 3, Term: 1, Type: pb.MsgApp, Entries: r.raftLog.allEntries()},
{From: 1, To: 2, Term: 1, Type: pb.MsgFortifyLeader},
{From: 1, To: 3, Term: 1, Type: pb.MsgFortifyLeader},
{From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat},
Expand Down
Loading
Loading