Skip to content

Commit

Permalink
raft,tracker: track commit index of each follower
Browse files Browse the repository at this point in the history
This commit closes a gap in commit index tracking.

Previously, the leader did not precisely know what commit index the
follower is at, and always had to send an empty MsgApp to brind it up to
date if it's not.

With this commit, followers now send the commit index of their logs back
to the leader, and the leader tracks each follower's commit index. This
will allow the leader (see other commits) to send an empty MsgApp with a
commit index update only if the tracked index is behind, which will
reduce the number of unnecessary messages in the system.

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Mar 6, 2024
1 parent 8b05a9d commit c916c15
Show file tree
Hide file tree
Showing 21 changed files with 340 additions and 323 deletions.
12 changes: 7 additions & 5 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
// if r.maybeCommit() {
// r.bcastAppend()
// }
r.send(pb.Message{To: r.id, Type: pb.MsgAppResp, Index: li})
r.send(pb.Message{To: r.id, Type: pb.MsgAppResp, Index: li, Commit: r.raftLog.committed})
return true
}

Expand Down Expand Up @@ -1493,7 +1493,9 @@ func stepLeader(r *raft, m pb.Message) error {
// equals pr.Match we know we don't m.Index+1 in our log, so moving
// back to replicating state is not useful; besides pr.PendingSnapshot
// would prevent it.
if pr.MaybeUpdate(m.Index) || (pr.Match == m.Index && pr.State == tracker.StateProbe) {
updated := pr.MaybeUpdate(m.Index)
pr.UpdateCommit(m.Commit)
if updated || (pr.Match == m.Index && pr.State == tracker.StateProbe) {
switch {
case pr.State == tracker.StateProbe:
pr.BecomeReplicate()
Expand Down Expand Up @@ -1765,12 +1767,12 @@ func (r *raft) handleAppendEntries(m pb.Message) {
// message, and validate it before taking any action (e.g. bumping term).
a := logSliceFromMsgApp(&m)

if a.prev.index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
if commit := r.raftLog.committed; a.prev.index < commit {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed, Commit: commit})
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(a, m.Commit); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex, Commit: r.raftLog.committed})
return
}
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
Expand Down
22 changes: 11 additions & 11 deletions raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ package raft

import (
"fmt"
"github.com/stretchr/testify/require"
"reflect"
"sort"
"testing"
Expand Down Expand Up @@ -606,20 +607,21 @@ func TestFollowerCheckMsgApp(t *testing.T) {
term uint64
index uint64
windex uint64
wcommit uint64
wreject bool
wrejectHint uint64
wlogterm uint64
}{
// match with committed entries
{0, 0, 1, false, 0, 0},
{ents[0].Term, ents[0].Index, 1, false, 0, 0},
{0, 0, 1, 1, false, 0, 0},
{ents[0].Term, ents[0].Index, 1, 1, false, 0, 0},
// match with uncommitted entries
{ents[1].Term, ents[1].Index, 2, false, 0, 0},
{ents[1].Term, ents[1].Index, 2, 1, false, 0, 0},

// unmatch with existing entry
{ents[0].Term, ents[1].Index, ents[1].Index, true, 1, 1},
{ents[0].Term, ents[1].Index, ents[1].Index, 0, true, 1, 1},
// unexisting entry
{ents[1].Term + 1, ents[1].Index + 1, ents[1].Index + 1, true, 2, 2},
{ents[1].Term + 1, ents[1].Index + 1, ents[1].Index + 1, 0, true, 2, 2},
}
for i, tt := range tests {
storage := newTestMemoryStorage(withPeers(1, 2, 3))
Expand All @@ -630,13 +632,11 @@ func TestFollowerCheckMsgApp(t *testing.T) {

r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index})

msgs := r.readMessages()
wmsgs := []pb.Message{
{From: 1, To: 2, Type: pb.MsgAppResp, Term: 2, Index: tt.windex, Reject: tt.wreject, RejectHint: tt.wrejectHint, LogTerm: tt.wlogterm},
}
if !reflect.DeepEqual(msgs, wmsgs) {
t.Errorf("#%d: msgs = %+v, want %+v", i, msgs, wmsgs)
wmsg := pb.Message{
From: 1, To: 2, Type: pb.MsgAppResp, Term: 2, Index: tt.windex, Commit: tt.wcommit,
Reject: tt.wreject, RejectHint: tt.wrejectHint, LogTerm: tt.wlogterm,
}
require.Equal(t, []pb.Message{wmsg}, r.readMessages(), "#%d", i)
}
}

Expand Down
Loading

0 comments on commit c916c15

Please sign in to comment.