Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracker: track in-flight commit index #171

Merged
merged 3 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,10 @@ func (r *raft) sendAppend(to uint64) {
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
//
// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress
// struct contains all the state necessary for deciding whether to send a
// message.
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.trk.Progress[to]
if pr.IsPaused() {
Expand Down Expand Up @@ -640,7 +644,8 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
Entries: ents,
Commit: r.raftLog.committed,
})
pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)))
pr.SentEntries(len(ents), uint64(payloadsSize(ents)))
pr.SentCommit(r.raftLog.committed)
return true
}

Expand Down Expand Up @@ -675,21 +680,21 @@ func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {

// sendHeartbeat sends a heartbeat RPC to the given peer.
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
pr := r.trk.Progress[to]
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
commit := min(r.trk.Progress[to].Match, r.raftLog.committed)
m := pb.Message{
commit := min(pr.Match, r.raftLog.committed)
r.send(pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
}

r.send(m)
})
pr.SentCommit(commit)
}

// bcastAppend sends RPC, with entries to all peers that are not up-to-date
Expand Down Expand Up @@ -1480,7 +1485,6 @@ func stepLeader(r *raft, m pb.Message) error {
r.sendAppend(m.From)
}
} else {
oldPaused := pr.IsPaused()
// We want to update our tracking if the response updates our
// matched index or if the response can move a probing peer back
// into StateReplicate (see heartbeat_rep_recovers_from_probing.txt
Expand Down Expand Up @@ -1517,9 +1521,11 @@ func stepLeader(r *raft, m pb.Message) error {
// to respond to pending read index requests
releasePendingReadIndexMessages(r)
r.bcastAppend()
} else if oldPaused {
// If we were paused before, this node may be missing the
// latest commit index, so send it.
} else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) {
// This node may be missing the latest commit index, so send it.
// NB: this is not strictly necessary because the periodic heartbeat
// messages deliver commit indices too. However, a message sent now
// may arrive earlier than the next heartbeat fires.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
Expand Down
14 changes: 1 addition & 13 deletions testdata/confchange_v1_add_single.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ stabilize
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand All @@ -94,15 +94,3 @@ stabilize
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgAppResp Term:1 Log:0/4
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4
16 changes: 2 additions & 14 deletions testdata/confchange_v2_add_double_auto.txt
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ stabilize 1 2
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand Down Expand Up @@ -171,7 +171,7 @@ stabilize 1 3
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 3 for index 3
DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 5] sent snapshot[index: 5, term: 1] to 3 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=1 paused pendingSnap=5]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused pendingSnap=5]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand All @@ -193,18 +193,6 @@ stabilize 1 3
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/5
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=5 next=6 paused pendingSnap=5]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgApp Term:1 Log:1/5 Commit:5
> 3 receiving messages
1->3 MsgApp Term:1 Log:1/5 Commit:5
> 3 handling Ready
Ready MustSync=false:
Messages:
3->1 MsgAppResp Term:1 Log:0/5
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/5

# Nothing else happens.
stabilize
Expand Down
2 changes: 1 addition & 1 deletion testdata/confchange_v2_add_double_implicit.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ stabilize 1 2
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand Down
14 changes: 1 addition & 13 deletions testdata/confchange_v2_add_single_auto.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ stabilize
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand All @@ -95,15 +95,3 @@ stabilize
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgAppResp Term:1 Log:0/4
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4
14 changes: 1 addition & 13 deletions testdata/confchange_v2_add_single_explicit.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ stabilize 1 2
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand All @@ -95,18 +95,6 @@ stabilize 1 2
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgAppResp Term:1 Log:0/4
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4

# Check that we're not allowed to change membership again while in the joint state.
# This leads to an empty entry being proposed instead (index 5 in the stabilize block
Expand Down
16 changes: 0 additions & 16 deletions testdata/confchange_v2_replace_leader.txt
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,6 @@ stabilize
4->1 MsgAppResp Term:1 Log:0/4
> 1 receiving messages
4->1 MsgAppResp Term:1 Log:0/4
> 1 handling Ready
Ready MustSync=false:
Messages:
1->4 MsgApp Term:1 Log:1/4 Commit:4
> 4 receiving messages
1->4 MsgApp Term:1 Log:1/4 Commit:4
> 4 handling Ready
Ready MustSync=false:
Messages:
4->1 MsgAppResp Term:1 Log:0/4
> 1 receiving messages
4->1 MsgAppResp Term:1 Log:0/4


# Transfer leadership while in the joint config.
Expand Down Expand Up @@ -284,12 +272,10 @@ stabilize
CommittedEntries:
2/5 EntryNormal ""
Messages:
4->1 MsgApp Term:2 Log:2/5 Commit:4
4->1 MsgApp Term:2 Log:2/5 Commit:5
4->2 MsgApp Term:2 Log:2/5 Commit:5
4->3 MsgApp Term:2 Log:2/5 Commit:5
> 1 receiving messages
4->1 MsgApp Term:2 Log:2/5 Commit:4
4->1 MsgApp Term:2 Log:2/5 Commit:5
> 2 receiving messages
4->2 MsgApp Term:2 Log:2/5 Commit:5
Expand All @@ -302,7 +288,6 @@ stabilize
2/5 EntryNormal ""
Messages:
1->4 MsgAppResp Term:2 Log:0/5
1->4 MsgAppResp Term:2 Log:0/5
> 2 handling Ready
Ready MustSync=false:
HardState Term:2 Vote:4 Commit:5
Expand All @@ -318,7 +303,6 @@ stabilize
Messages:
3->4 MsgAppResp Term:2 Log:0/5
> 4 receiving messages
1->4 MsgAppResp Term:2 Log:0/5
1->4 MsgAppResp Term:2 Log:0/5
2->4 MsgAppResp Term:2 Log:0/5
3->4 MsgAppResp Term:2 Log:0/5
Expand Down
60 changes: 0 additions & 60 deletions testdata/probe_and_replicate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -513,18 +513,6 @@ stabilize 1 2
2->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
2->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgApp Term:8 Log:8/21 Commit:18
> 2 receiving messages
1->2 MsgApp Term:8 Log:8/21 Commit:18
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
2->1 MsgAppResp Term:8 Log:0/21

stabilize 1 3
----
Expand Down Expand Up @@ -579,18 +567,6 @@ stabilize 1 3
3->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
3->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgApp Term:8 Log:8/21 Commit:18
> 3 receiving messages
1->3 MsgApp Term:8 Log:8/21 Commit:18
> 3 handling Ready
Ready MustSync=false:
Messages:
3->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
3->1 MsgAppResp Term:8 Log:0/21

stabilize 1 4
----
Expand Down Expand Up @@ -674,18 +650,6 @@ stabilize 1 5
5->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
5->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->5 MsgApp Term:8 Log:8/21 Commit:21
> 5 receiving messages
1->5 MsgApp Term:8 Log:8/21 Commit:21
> 5 handling Ready
Ready MustSync=false:
Messages:
5->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
5->1 MsgAppResp Term:8 Log:0/21

stabilize 1 6
----
Expand Down Expand Up @@ -741,18 +705,6 @@ stabilize 1 6
6->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
6->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->6 MsgApp Term:8 Log:8/21 Commit:21
> 6 receiving messages
1->6 MsgApp Term:8 Log:8/21 Commit:21
> 6 handling Ready
Ready MustSync=false:
Messages:
6->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
6->1 MsgAppResp Term:8 Log:0/21

stabilize 1 7
----
Expand Down Expand Up @@ -816,15 +768,3 @@ stabilize 1 7
7->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
7->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->7 MsgApp Term:8 Log:8/21 Commit:21
> 7 receiving messages
1->7 MsgApp Term:8 Log:8/21 Commit:21
> 7 handling Ready
Ready MustSync=false:
Messages:
7->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
7->1 MsgAppResp Term:8 Log:0/21
15 changes: 2 additions & 13 deletions testdata/snapshot_succeed_via_app_resp.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ stabilize 1
> 1 receiving messages
3->1 MsgHeartbeatResp Term:1 Log:0/0
DEBUG 1 [firstindex: 12, commit: 11] sent snapshot[index: 11, term: 1] to 3 [StateProbe match=0 next=11]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=11 paused pendingSnap=11]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=12 paused pendingSnap=11]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand All @@ -98,7 +98,7 @@ status 1
----
1: StateReplicate match=11 next=12
2: StateReplicate match=11 next=12
3: StateSnapshot match=0 next=11 paused pendingSnap=11
3: StateSnapshot match=0 next=12 paused pendingSnap=11

# Follower applies the snapshot. Note how it reacts with a MsgAppResp upon completion.
# The snapshot fully catches the follower up (i.e. there are no more log entries it
Expand Down Expand Up @@ -127,10 +127,6 @@ stabilize 1
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/11
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=11]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgApp Term:1 Log:1/11 Commit:11

status 1
----
Expand All @@ -143,16 +139,9 @@ stabilize
----
> 2 receiving messages
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11
> 3 receiving messages
1->3 MsgApp Term:1 Log:1/11 Commit:11
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgHeartbeatResp Term:1 Log:0/0
> 3 handling Ready
Ready MustSync=false:
Messages:
3->1 MsgAppResp Term:1 Log:0/11
> 1 receiving messages
2->1 MsgHeartbeatResp Term:1 Log:0/0
3->1 MsgAppResp Term:1 Log:0/11
4 changes: 2 additions & 2 deletions testdata/snapshot_succeed_via_app_resp_behind.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ stabilize 1
DEBUG 1 received MsgAppResp(rejected, hint: (index 5, term 1)) from 3 for index 10
DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=6]
DEBUG 1 [firstindex: 11, commit: 12] sent snapshot[index: 12, term: 1] to 3 [StateProbe match=0 next=6]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused pendingSnap=12]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=13 paused pendingSnap=12]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand Down Expand Up @@ -152,7 +152,7 @@ stabilize 1
----
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/11
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=12]
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=13 paused pendingSnap=12]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand Down
Loading
Loading