Skip to content

Commit

Permalink
tracker: track in-flight commit index
Browse files Browse the repository at this point in the history
This commit adds a Progress.pendingCommit field tracking the highest
commit index <= Next-1 which the leader sent to the follower. It is used
to distinguish cases when a commit index update needs or doesn't need to
be sent to a follower.

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Feb 27, 2024
1 parent d475d7e commit 8b5fb3a
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 25 deletions.
24 changes: 15 additions & 9 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,10 @@ func (r *raft) sendAppend(to uint64) {
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
//
// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress
// struct contains all the state necessary for deciding whether to send a
// message.
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.trk.Progress[to]
if pr.IsPaused() {
Expand Down Expand Up @@ -641,6 +645,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
Commit: r.raftLog.committed,
})
pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)))
pr.SentCommit(r.raftLog.committed)
return true
}

Expand Down Expand Up @@ -675,21 +680,21 @@ func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {

// sendHeartbeat sends a heartbeat RPC to the given peer.
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
pr := r.trk.Progress[to]
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
commit := min(r.trk.Progress[to].Match, r.raftLog.committed)
m := pb.Message{
commit := min(pr.Match, r.raftLog.committed)
r.send(pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
}

r.send(m)
})
pr.SentCommit(commit)
}

// bcastAppend sends RPC, with entries to all peers that are not up-to-date
Expand Down Expand Up @@ -1480,7 +1485,6 @@ func stepLeader(r *raft, m pb.Message) error {
r.sendAppend(m.From)
}
} else {
oldPaused := pr.IsPaused()
// We want to update our tracking if the response updates our
// matched index or if the response can move a probing peer back
// into StateReplicate (see heartbeat_rep_recovers_from_probing.txt
Expand Down Expand Up @@ -1517,9 +1521,11 @@ func stepLeader(r *raft, m pb.Message) error {
// to respond to pending read index requests
releasePendingReadIndexMessages(r)
r.bcastAppend()
} else if oldPaused {
// If we were paused before, this node may be missing the
// latest commit index, so send it.
} else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) {
// This node may be missing the latest commit index, so send it.
// NB: this is not strictly necessary because the periodic heartbeat
// messages deliver commit indices too. However, a message sent now
// may arrive earlier than the next heartbeat fires.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
Expand Down
4 changes: 0 additions & 4 deletions testdata/confchange_v2_replace_leader.txt
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,10 @@ stabilize
CommittedEntries:
2/5 EntryNormal ""
Messages:
4->1 MsgApp Term:2 Log:2/5 Commit:4
4->1 MsgApp Term:2 Log:2/5 Commit:5
4->2 MsgApp Term:2 Log:2/5 Commit:5
4->3 MsgApp Term:2 Log:2/5 Commit:5
> 1 receiving messages
4->1 MsgApp Term:2 Log:2/5 Commit:4
4->1 MsgApp Term:2 Log:2/5 Commit:5
> 2 receiving messages
4->2 MsgApp Term:2 Log:2/5 Commit:5
Expand All @@ -302,7 +300,6 @@ stabilize
2/5 EntryNormal ""
Messages:
1->4 MsgAppResp Term:2 Log:0/5
1->4 MsgAppResp Term:2 Log:0/5
> 2 handling Ready
Ready MustSync=false:
HardState Term:2 Vote:4 Commit:5
Expand All @@ -318,7 +315,6 @@ stabilize
Messages:
3->4 MsgAppResp Term:2 Log:0/5
> 4 receiving messages
1->4 MsgAppResp Term:2 Log:0/5
1->4 MsgAppResp Term:2 Log:0/5
2->4 MsgAppResp Term:2 Log:0/5
3->4 MsgAppResp Term:2 Log:0/5
Expand Down
12 changes: 0 additions & 12 deletions testdata/probe_and_replicate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -513,18 +513,6 @@ stabilize 1 2
2->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
2->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgApp Term:8 Log:8/21 Commit:18
> 2 receiving messages
1->2 MsgApp Term:8 Log:8/21 Commit:18
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
2->1 MsgAppResp Term:8 Log:0/21

stabilize 1 3
----
Expand Down
30 changes: 30 additions & 0 deletions tracker/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,20 @@ type Progress struct {
// entries with indices in (Match, Next) interval are already in flight.
//
// Invariant: 0 <= Match < Next.
// NB: it follows that Next >= 1.
Next uint64

// pendingCommit is the highest commit index <= Next-1 in flight to the
// follower.
//
// The actual in-flight commit index can be higher, but we track only up to
// Next-1, because higher indices can be ignored by the peer or reduced to
// Next-1 when all the in-flight entries are appended. When Next regresses,
// pendingCommit regresses too.
//
// Invariant: 0 <= pendingCommit < Next.
pendingCommit uint64

// State defines how the leader should interact with the follower.
//
// When in StateProbe, leader sends at most one replication message
Expand Down Expand Up @@ -128,12 +140,14 @@ func (pr *Progress) BecomeProbe() {
pr.ResetState(StateProbe)
pr.Next = pr.Match + 1
}
pr.pendingCommit = min(pr.pendingCommit, pr.Next-1)
}

// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
func (pr *Progress) BecomeReplicate() {
pr.ResetState(StateReplicate)
pr.Next = pr.Match + 1
pr.pendingCommit = min(pr.pendingCommit, pr.Next-1)
}

// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
Expand Down Expand Up @@ -170,6 +184,20 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) {
}
}

// CanBumpCommit returns true if sending the given commit index can potentially
// advance the follower's commit index.
func (pr *Progress) CanBumpCommit(index uint64) bool {
return pr.pendingCommit < min(index, pr.Next-1)
}

// SentCommit updates the pendingCommit.
func (pr *Progress) SentCommit(commit uint64) {
// Sending the given commit index may bump the follower's commit index up to
// Next-1, or even higher. We track only up to Next-1, and maintain the
// invariant: pendingCommit < Next.
pr.pendingCommit = min(max(pr.pendingCommit, commit), pr.Next-1)
}

// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
// index acked by it. The method returns false if the given n index comes from
// an outdated message. Otherwise it updates the progress and returns true.
Expand Down Expand Up @@ -205,6 +233,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
//
// TODO(tbg): why not use matchHint if it's larger?
pr.Next = pr.Match + 1
pr.pendingCommit = min(pr.pendingCommit, pr.Next-1)
return true
}

Expand All @@ -216,6 +245,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
}

pr.Next = max(min(rejected, matchHint+1), pr.Match+1)
pr.pendingCommit = min(pr.pendingCommit, pr.Next-1)
pr.MsgAppFlowPaused = false
return true
}
Expand Down

0 comments on commit 8b5fb3a

Please sign in to comment.