Skip to content

Commit

Permalink
raft: advance commit index safely
Browse files Browse the repository at this point in the history
This change makes the commit index advancement in handleHeartbeat safe.
Previously, a follower would attempt to update the commit index to
whichever was sent in the MsgHeartbeat message. Out-of-bound indices
would crash the node.

It is always safe to advance a commit index if the follower's log is "in
sync" with the leader, i.e. when its log is guaranteed to be a prefix of
the leader's log. This becomes true when the first MsgApp append message
succeeds.

At the moment, the leader will never send a commit index that exceeds
the follower's log size. However, this may change in future. This change
is a defence-in-depth.

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Jan 26, 2024
1 parent 026484c commit 2584c41
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,11 @@ type raft struct {

// the leader id
lead uint64
// logSynced is true if this node's log is guaranteed to be a prefix of the
// leader's log at this term. Always true for the leader. Always false for a
// candidate. For a follower, becomes true the first time a MsgApp append to
// the log succeeds.
logSynced bool
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
Expand Down Expand Up @@ -763,6 +768,7 @@ func (r *raft) reset(term uint64) {
r.Vote = None
}
r.lead = None
r.logSynced = false

r.electionElapsed = 0
r.heartbeatElapsed = 0
Expand Down Expand Up @@ -908,6 +914,7 @@ func (r *raft) becomeLeader() {
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.logSynced = true // the leader's log is in sync with itself
r.state = StateLeader
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
Expand Down Expand Up @@ -1735,6 +1742,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.logSynced = true // from now on, the log is a prefix of the leader's log
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
return
}
Expand Down Expand Up @@ -1770,7 +1778,11 @@ func (r *raft) handleAppendEntries(m pb.Message) {
}

func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
// If our log is not a prefix of the leader's log, it is unsafe to advance the
// commit index, because the entries at this index may mismatch.
if r.logSynced {
r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex()))
}
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

Expand Down

0 comments on commit 2584c41

Please sign in to comment.