Skip to content

Commit

Permalink
raft: advance commit index safely
Browse files Browse the repository at this point in the history
This change makes the commit index advancement in handleHeartbeat safe.
Previously, a follower would attempt to update the commit index to
whichever was sent in the MsgHeartbeat message. Out-of-bound indices
would crash the node.

It is always safe to advance a commit index if the follower's log is "in
sync" with the leader, i.e. when its log is guaranteed to be a prefix of
the leader's log. This is always true if the term of last entry in the
log matches the leader team, otherwise this guarantee is established
when the first MsgApp append message from the leader succeeds.

At the moment, the leader will never send a commit index that exceeds
the follower's log size. However, this may change in future. This change
is a defence-in-depth.

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Jan 26, 2024
1 parent 026484c commit ee8de1f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
19 changes: 18 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,12 @@ type raft struct {

// the leader id
lead uint64
// logSynced is true if this node's log is guaranteed to be a prefix of the
// leader's log at this term. Always true for the leader. Always false for a
// candidate. For a follower, this is true if the last entry term matches the
// leader term, otherwise becomes true when the first MsgApp append from the
// leader succeeds.
logSynced bool
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
Expand Down Expand Up @@ -763,6 +769,7 @@ func (r *raft) reset(term uint64) {
r.Vote = None
}
r.lead = None
r.logSynced = false

r.electionElapsed = 0
r.heartbeatElapsed = 0
Expand Down Expand Up @@ -866,6 +873,10 @@ func (r *raft) becomeFollower(term uint64, lead uint64) {
r.reset(term)
r.tick = r.tickElection
r.lead = lead
// If the last entry term matches the leader term, the log is guaranteed to be
// a prefix of the leader's log. Otherwise, we will establish this guarantee
// later, on the first successful MsgApp.
r.logSynced = r.raftLog.lastTerm() == term
r.state = StateFollower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}
Expand Down Expand Up @@ -908,6 +919,7 @@ func (r *raft) becomeLeader() {
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.logSynced = true // the leader's log is in sync with itself
r.state = StateLeader
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
Expand Down Expand Up @@ -1735,6 +1747,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.logSynced = true // from now on, the log is a prefix of the leader's log
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
return
}
Expand Down Expand Up @@ -1770,7 +1783,11 @@ func (r *raft) handleAppendEntries(m pb.Message) {
}

func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
// It is only safe to advance the commit index if our log is a prefix of the
// leader's log. Otherwise, entries at this index may mismatch.
if r.logSynced {
r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex()))
}
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

Expand Down
19 changes: 14 additions & 5 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,19 +1332,28 @@ func TestHandleMsgApp(t *testing.T) {
func TestHandleHeartbeat(t *testing.T) {
commit := uint64(2)
tests := []struct {
m pb.Message
wCommit uint64
m pb.Message
logSynced bool
wCommit uint64
}{
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1},
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 3, Commit: commit + 1}, true, commit + 1},
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 3, Commit: commit - 1}, true, commit}, // do not decrease commit
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 3, Commit: commit - 1}, false, commit},

// Increase the commit index only if the log is in sync with the leader.
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 3, Commit: commit + 1}, false, commit},
// Do not increase the commit index beyond our log size.
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 3, Commit: commit + 10}, true, commit + 1},
}

for i, tt := range tests {
storage := newTestMemoryStorage(withPeers(1, 2))
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
sm := newTestRaft(1, 5, 1, storage)
sm.becomeFollower(2, 2)
sm.becomeFollower(3, 2)
sm.raftLog.commitTo(commit)
sm.logSynced = tt.logSynced

sm.handleHeartbeat(tt.m)
if sm.raftLog.committed != tt.wCommit {
t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
Expand Down
1 change: 1 addition & 0 deletions rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,7 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
if err != nil {
t.Fatal(err)
}
rawNode.raft.logSynced = true // needed to be able to advance the commit index

for highestApplied := uint64(0); highestApplied != 11; {
rd := rawNode.Ready()
Expand Down

0 comments on commit ee8de1f

Please sign in to comment.