Skip to content

Commit

Permalink
raft,tracker: track commit index of each follower
Browse files Browse the repository at this point in the history
This commit closes a gap in commit index tracking.

Previously, the leader did not precisely know what commit index the
follower is at, and always had to send an empty MsgApp to brind it up to
date if it's not.

With this commit, followers now send the commit index of their logs back
to the leader, and the leader tracks each follower's commit index. This
will allow the leader (see other commits) to send an empty MsgApp with a
commit index update only if the tracked index is behind, which will
reduce the number of unnecessary messages in the system.

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Mar 4, 2024
1 parent d475d7e commit ec87568
Show file tree
Hide file tree
Showing 23 changed files with 354 additions and 336 deletions.
14 changes: 9 additions & 5 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
// if r.maybeCommit() {
// r.bcastAppend()
// }
r.send(pb.Message{To: r.id, Type: pb.MsgAppResp, Index: li})
r.send(pb.Message{To: r.id, Type: pb.MsgAppResp, Index: li, Commit: r.raftLog.committed})
return true
}

Expand Down Expand Up @@ -1489,7 +1489,9 @@ func stepLeader(r *raft, m pb.Message) error {
// equals pr.Match we know we don't m.Index+1 in our log, so moving
// back to replicating state is not useful; besides pr.PendingSnapshot
// would prevent it.
if pr.MaybeUpdate(m.Index) || (pr.Match == m.Index && pr.State == tracker.StateProbe) {
updated := pr.MaybeUpdate(m.Index)
pr.UpdateCommit(m.Commit)
if updated || (pr.Match == m.Index && pr.State == tracker.StateProbe) {
switch {
case pr.State == tracker.StateProbe:
pr.BecomeReplicate()
Expand Down Expand Up @@ -1520,6 +1522,8 @@ func stepLeader(r *raft, m pb.Message) error {
} else if oldPaused {
// If we were paused before, this node may be missing the
// latest commit index, so send it.
// TODO(pav-kv): remove this branch, and decide on sending the commit
// index update based on pr.Commit.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
Expand Down Expand Up @@ -1759,12 +1763,12 @@ func (r *raft) handleAppendEntries(m pb.Message) {
// message, and validate it before taking any action (e.g. bumping term).
a := logSliceFromMsgApp(&m)

if a.prev.index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
if commit := r.raftLog.committed; a.prev.index < commit {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed, Commit: commit})
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(a, m.Commit); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex, Commit: r.raftLog.committed})
return
}
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
Expand Down
Loading

0 comments on commit ec87568

Please sign in to comment.