Skip to content

Commit

Permalink
tracker: track in-flight commit index
Browse files Browse the repository at this point in the history
This commit adds a Progress.pendingCommit field tracking the highest
commit index <= Next-1 which the leader sent to the follower. It is used
to distinguish cases when a commit index update needs or doesn't need to
be sent to a follower.

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Mar 6, 2024
1 parent cf5cadf commit 4399f8f
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 144 deletions.
24 changes: 15 additions & 9 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,10 @@ func (r *raft) sendAppend(to uint64) {
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
//
// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress
// struct contains all the state necessary for deciding whether to send a
// message.
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.trk.Progress[to]
if pr.IsPaused() {
Expand Down Expand Up @@ -641,6 +645,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
Commit: r.raftLog.committed,
})
pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)))
pr.SentCommit(r.raftLog.committed)
return true
}

Expand Down Expand Up @@ -675,21 +680,21 @@ func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {

// sendHeartbeat sends a heartbeat RPC to the given peer.
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
pr := r.trk.Progress[to]
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
commit := min(r.trk.Progress[to].Match, r.raftLog.committed)
m := pb.Message{
commit := min(pr.Match, r.raftLog.committed)
r.send(pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
}

r.send(m)
})
pr.SentCommit(commit)
}

// bcastAppend sends RPC, with entries to all peers that are not up-to-date
Expand Down Expand Up @@ -1480,7 +1485,6 @@ func stepLeader(r *raft, m pb.Message) error {
r.sendAppend(m.From)
}
} else {
oldPaused := pr.IsPaused()
// We want to update our tracking if the response updates our
// matched index or if the response can move a probing peer back
// into StateReplicate (see heartbeat_rep_recovers_from_probing.txt
Expand Down Expand Up @@ -1517,9 +1521,11 @@ func stepLeader(r *raft, m pb.Message) error {
// to respond to pending read index requests
releasePendingReadIndexMessages(r)
r.bcastAppend()
} else if oldPaused {
// If we were paused before, this node may be missing the
// latest commit index, so send it.
} else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) {
// This node may be missing the latest commit index, so send it.
// NB: this is not strictly necessary because the periodic heartbeat
// messages deliver commit indices too. However, a message sent now
// may arrive earlier than the next heartbeat fires.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
Expand Down
12 changes: 0 additions & 12 deletions testdata/confchange_v1_add_single.txt
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,3 @@ stabilize
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgAppResp Term:1 Log:0/4
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4
12 changes: 0 additions & 12 deletions testdata/confchange_v2_add_double_auto.txt
Original file line number Diff line number Diff line change
Expand Up @@ -193,18 +193,6 @@ stabilize 1 3
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/5
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=5 next=6 paused pendingSnap=5]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgApp Term:1 Log:1/5 Commit:5
> 3 receiving messages
1->3 MsgApp Term:1 Log:1/5 Commit:5
> 3 handling Ready
Ready MustSync=false:
Messages:
3->1 MsgAppResp Term:1 Log:0/5
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/5

# Nothing else happens.
stabilize
Expand Down
12 changes: 0 additions & 12 deletions testdata/confchange_v2_add_single_auto.txt
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,3 @@ stabilize
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgAppResp Term:1 Log:0/4
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4
12 changes: 0 additions & 12 deletions testdata/confchange_v2_add_single_explicit.txt
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,6 @@ stabilize 1 2
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgAppResp Term:1 Log:0/4
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4

# Check that we're not allowed to change membership again while in the joint state.
# This leads to an empty entry being proposed instead (index 5 in the stabilize block
Expand Down
16 changes: 0 additions & 16 deletions testdata/confchange_v2_replace_leader.txt
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,6 @@ stabilize
4->1 MsgAppResp Term:1 Log:0/4
> 1 receiving messages
4->1 MsgAppResp Term:1 Log:0/4
> 1 handling Ready
Ready MustSync=false:
Messages:
1->4 MsgApp Term:1 Log:1/4 Commit:4
> 4 receiving messages
1->4 MsgApp Term:1 Log:1/4 Commit:4
> 4 handling Ready
Ready MustSync=false:
Messages:
4->1 MsgAppResp Term:1 Log:0/4
> 1 receiving messages
4->1 MsgAppResp Term:1 Log:0/4


# Transfer leadership while in the joint config.
Expand Down Expand Up @@ -284,12 +272,10 @@ stabilize
CommittedEntries:
2/5 EntryNormal ""
Messages:
4->1 MsgApp Term:2 Log:2/5 Commit:4
4->1 MsgApp Term:2 Log:2/5 Commit:5
4->2 MsgApp Term:2 Log:2/5 Commit:5
4->3 MsgApp Term:2 Log:2/5 Commit:5
> 1 receiving messages
4->1 MsgApp Term:2 Log:2/5 Commit:4
4->1 MsgApp Term:2 Log:2/5 Commit:5
> 2 receiving messages
4->2 MsgApp Term:2 Log:2/5 Commit:5
Expand All @@ -302,7 +288,6 @@ stabilize
2/5 EntryNormal ""
Messages:
1->4 MsgAppResp Term:2 Log:0/5
1->4 MsgAppResp Term:2 Log:0/5
> 2 handling Ready
Ready MustSync=false:
HardState Term:2 Vote:4 Commit:5
Expand All @@ -318,7 +303,6 @@ stabilize
Messages:
3->4 MsgAppResp Term:2 Log:0/5
> 4 receiving messages
1->4 MsgAppResp Term:2 Log:0/5
1->4 MsgAppResp Term:2 Log:0/5
2->4 MsgAppResp Term:2 Log:0/5
3->4 MsgAppResp Term:2 Log:0/5
Expand Down
60 changes: 0 additions & 60 deletions testdata/probe_and_replicate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -513,18 +513,6 @@ stabilize 1 2
2->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
2->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgApp Term:8 Log:8/21 Commit:18
> 2 receiving messages
1->2 MsgApp Term:8 Log:8/21 Commit:18
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
2->1 MsgAppResp Term:8 Log:0/21

stabilize 1 3
----
Expand Down Expand Up @@ -579,18 +567,6 @@ stabilize 1 3
3->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
3->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgApp Term:8 Log:8/21 Commit:18
> 3 receiving messages
1->3 MsgApp Term:8 Log:8/21 Commit:18
> 3 handling Ready
Ready MustSync=false:
Messages:
3->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
3->1 MsgAppResp Term:8 Log:0/21

stabilize 1 4
----
Expand Down Expand Up @@ -674,18 +650,6 @@ stabilize 1 5
5->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
5->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->5 MsgApp Term:8 Log:8/21 Commit:21
> 5 receiving messages
1->5 MsgApp Term:8 Log:8/21 Commit:21
> 5 handling Ready
Ready MustSync=false:
Messages:
5->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
5->1 MsgAppResp Term:8 Log:0/21

stabilize 1 6
----
Expand Down Expand Up @@ -741,18 +705,6 @@ stabilize 1 6
6->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
6->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->6 MsgApp Term:8 Log:8/21 Commit:21
> 6 receiving messages
1->6 MsgApp Term:8 Log:8/21 Commit:21
> 6 handling Ready
Ready MustSync=false:
Messages:
6->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
6->1 MsgAppResp Term:8 Log:0/21

stabilize 1 7
----
Expand Down Expand Up @@ -816,15 +768,3 @@ stabilize 1 7
7->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
7->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->7 MsgApp Term:8 Log:8/21 Commit:21
> 7 receiving messages
1->7 MsgApp Term:8 Log:8/21 Commit:21
> 7 handling Ready
Ready MustSync=false:
Messages:
7->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
7->1 MsgAppResp Term:8 Log:0/21
11 changes: 0 additions & 11 deletions testdata/snapshot_succeed_via_app_resp.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,6 @@ stabilize 1
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/11
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=11]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgApp Term:1 Log:1/11 Commit:11

status 1
----
Expand All @@ -143,16 +139,9 @@ stabilize
----
> 2 receiving messages
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11
> 3 receiving messages
1->3 MsgApp Term:1 Log:1/11 Commit:11
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgHeartbeatResp Term:1 Log:0/0
> 3 handling Ready
Ready MustSync=false:
Messages:
3->1 MsgAppResp Term:1 Log:0/11
> 1 receiving messages
2->1 MsgHeartbeatResp Term:1 Log:0/0
3->1 MsgAppResp Term:1 Log:0/11
26 changes: 26 additions & 0 deletions tracker/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ type Progress struct {
// In StateSnapshot, Next == PendingSnapshot + 1.
Next uint64

// sentCommit is the highest commit index in flight to the follower.
//
// In StateSnapshot, sentCommit == PendingSnapshot == Next-1.
sentCommit uint64

// State defines how the leader should interact with the follower.
//
// When in StateProbe, leader sends at most one replication message
Expand Down Expand Up @@ -131,6 +136,7 @@ func (pr *Progress) BecomeProbe() {
pr.ResetState(StateProbe)
pr.Next = pr.Match + 1
}
pr.sentCommit = min(pr.sentCommit, pr.Next-1)
}

// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
Expand All @@ -145,6 +151,7 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
pr.ResetState(StateSnapshot)
pr.PendingSnapshot = snapshoti
pr.Next = snapshoti + 1
pr.sentCommit = snapshoti
}

// UpdateOnEntriesSend updates the progress on the given number of consecutive
Expand Down Expand Up @@ -174,6 +181,21 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) {
}
}

// CanBumpCommit returns true if sending the given commit index can potentially
// advance the follower's commit index.
func (pr *Progress) CanBumpCommit(index uint64) bool {
// Sending the given commit index may bump the follower's commit index up to
// Next-1 in normal operation, or higher in some rare cases. Allow sending a
// commit index eagerly only if we haven't already sent one that bumps the
// follower's commit all the way to Next-1.
return index > pr.sentCommit && pr.sentCommit < pr.Next-1
}

// SentCommit updates the sentCommit.
func (pr *Progress) SentCommit(commit uint64) {
pr.sentCommit = commit
}

// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
// index acked by it. The method returns false if the given n index comes from
// an outdated message. Otherwise it updates the progress and returns true.
Expand Down Expand Up @@ -209,6 +231,8 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
//
// TODO(tbg): why not use matchHint if it's larger?
pr.Next = pr.Match + 1
// Regress the sentCommit since it unlikely has been applied.
pr.sentCommit = min(pr.sentCommit, pr.Next-1)
return true
}

Expand All @@ -220,6 +244,8 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
}

pr.Next = max(min(rejected, matchHint+1), pr.Match+1)
// Regress the sentCommit since it unlikely has been applied.
pr.sentCommit = min(pr.sentCommit, pr.Next-1)
pr.MsgAppFlowPaused = false
return true
}
Expand Down

0 comments on commit 4399f8f

Please sign in to comment.