diff --git a/raft.go b/raft.go index 7f591f26..a2e5ad0f 100644 --- a/raft.go +++ b/raft.go @@ -599,6 +599,10 @@ func (r *raft) sendAppend(to uint64) { // argument controls whether messages with no entries will be sent // ("empty" messages are useful to convey updated Commit indexes, but // are undesirable when we're sending multiple messages in a batch). +// +// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress +// struct contains all the state necessary for deciding whether to send a +// message. func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { pr := r.trk.Progress[to] if pr.IsPaused() { @@ -640,7 +644,8 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { Entries: ents, Commit: r.raftLog.committed, }) - pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents))) + pr.SentEntries(len(ents), uint64(payloadsSize(ents))) + pr.SentCommit(r.raftLog.committed) return true } @@ -675,21 +680,21 @@ func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool { // sendHeartbeat sends a heartbeat RPC to the given peer. func (r *raft) sendHeartbeat(to uint64, ctx []byte) { + pr := r.trk.Progress[to] // Attach the commit as min(to.matched, r.committed). // When the leader sends out heartbeat message, // the receiver(follower) might not be matched with the leader // or it might not have all the committed entries. // The leader MUST NOT forward the follower's commit to // an unmatched index. - commit := min(r.trk.Progress[to].Match, r.raftLog.committed) - m := pb.Message{ + commit := min(pr.Match, r.raftLog.committed) + r.send(pb.Message{ To: to, Type: pb.MsgHeartbeat, Commit: commit, Context: ctx, - } - - r.send(m) + }) + pr.SentCommit(commit) } // bcastAppend sends RPC, with entries to all peers that are not up-to-date @@ -1480,7 +1485,6 @@ func stepLeader(r *raft, m pb.Message) error { r.sendAppend(m.From) } } else { - oldPaused := pr.IsPaused() // We want to update our tracking if the response updates our // matched index or if the response can move a probing peer back // into StateReplicate (see heartbeat_rep_recovers_from_probing.txt @@ -1517,9 +1521,11 @@ func stepLeader(r *raft, m pb.Message) error { // to respond to pending read index requests releasePendingReadIndexMessages(r) r.bcastAppend() - } else if oldPaused { - // If we were paused before, this node may be missing the - // latest commit index, so send it. + } else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) { + // This node may be missing the latest commit index, so send it. + // NB: this is not strictly necessary because the periodic heartbeat + // messages deliver commit indices too. However, a message sent now + // may arrive earlier than the next heartbeat fires. r.sendAppend(m.From) } // We've updated flow control information above, which may diff --git a/testdata/confchange_v1_add_single.txt b/testdata/confchange_v1_add_single.txt index e54a183f..f13d3077 100644 --- a/testdata/confchange_v1_add_single.txt +++ b/testdata/confchange_v1_add_single.txt @@ -72,7 +72,7 @@ stabilize DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] - DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] + DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4] > 1 handling Ready Ready MustSync=false: Messages: @@ -94,15 +94,3 @@ stabilize > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/4 DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4] -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->2 MsgApp Term:1 Log:1/4 Commit:4 -> 2 receiving messages - 1->2 MsgApp Term:1 Log:1/4 Commit:4 -> 2 handling Ready - Ready MustSync=false: - Messages: - 2->1 MsgAppResp Term:1 Log:0/4 -> 1 receiving messages - 2->1 MsgAppResp Term:1 Log:0/4 diff --git a/testdata/confchange_v2_add_double_auto.txt b/testdata/confchange_v2_add_double_auto.txt index bf1dfcbe..f290c980 100644 --- a/testdata/confchange_v2_add_double_auto.txt +++ b/testdata/confchange_v2_add_double_auto.txt @@ -95,7 +95,7 @@ stabilize 1 2 DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] - DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] + DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4] > 1 handling Ready Ready MustSync=false: Messages: @@ -171,7 +171,7 @@ stabilize 1 3 DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 3 for index 3 DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 5] sent snapshot[index: 5, term: 1] to 3 [StateProbe match=0 next=1] - DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=1 paused pendingSnap=5] + DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused pendingSnap=5] > 1 handling Ready Ready MustSync=false: Messages: @@ -193,18 +193,6 @@ stabilize 1 3 > 1 receiving messages 3->1 MsgAppResp Term:1 Log:0/5 DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=5 next=6 paused pendingSnap=5] -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->3 MsgApp Term:1 Log:1/5 Commit:5 -> 3 receiving messages - 1->3 MsgApp Term:1 Log:1/5 Commit:5 -> 3 handling Ready - Ready MustSync=false: - Messages: - 3->1 MsgAppResp Term:1 Log:0/5 -> 1 receiving messages - 3->1 MsgAppResp Term:1 Log:0/5 # Nothing else happens. stabilize diff --git a/testdata/confchange_v2_add_double_implicit.txt b/testdata/confchange_v2_add_double_implicit.txt index 536d66b8..2db3b7c1 100644 --- a/testdata/confchange_v2_add_double_implicit.txt +++ b/testdata/confchange_v2_add_double_implicit.txt @@ -78,7 +78,7 @@ stabilize 1 2 DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] - DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] + DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4] > 1 handling Ready Ready MustSync=false: Messages: diff --git a/testdata/confchange_v2_add_single_auto.txt b/testdata/confchange_v2_add_single_auto.txt index 1c487da8..3c234159 100644 --- a/testdata/confchange_v2_add_single_auto.txt +++ b/testdata/confchange_v2_add_single_auto.txt @@ -73,7 +73,7 @@ stabilize DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] - DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] + DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4] > 1 handling Ready Ready MustSync=false: Messages: @@ -95,15 +95,3 @@ stabilize > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/4 DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4] -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->2 MsgApp Term:1 Log:1/4 Commit:4 -> 2 receiving messages - 1->2 MsgApp Term:1 Log:1/4 Commit:4 -> 2 handling Ready - Ready MustSync=false: - Messages: - 2->1 MsgAppResp Term:1 Log:0/4 -> 1 receiving messages - 2->1 MsgAppResp Term:1 Log:0/4 diff --git a/testdata/confchange_v2_add_single_explicit.txt b/testdata/confchange_v2_add_single_explicit.txt index 123cd17a..1d390e55 100644 --- a/testdata/confchange_v2_add_single_explicit.txt +++ b/testdata/confchange_v2_add_single_explicit.txt @@ -73,7 +73,7 @@ stabilize 1 2 DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] - DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] + DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4] > 1 handling Ready Ready MustSync=false: Messages: @@ -95,18 +95,6 @@ stabilize 1 2 > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/4 DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4] -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->2 MsgApp Term:1 Log:1/4 Commit:4 -> 2 receiving messages - 1->2 MsgApp Term:1 Log:1/4 Commit:4 -> 2 handling Ready - Ready MustSync=false: - Messages: - 2->1 MsgAppResp Term:1 Log:0/4 -> 1 receiving messages - 2->1 MsgAppResp Term:1 Log:0/4 # Check that we're not allowed to change membership again while in the joint state. # This leads to an empty entry being proposed instead (index 5 in the stabilize block diff --git a/testdata/confchange_v2_replace_leader.txt b/testdata/confchange_v2_replace_leader.txt index ae43ce21..628cc447 100644 --- a/testdata/confchange_v2_replace_leader.txt +++ b/testdata/confchange_v2_replace_leader.txt @@ -143,18 +143,6 @@ stabilize 4->1 MsgAppResp Term:1 Log:0/4 > 1 receiving messages 4->1 MsgAppResp Term:1 Log:0/4 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->4 MsgApp Term:1 Log:1/4 Commit:4 -> 4 receiving messages - 1->4 MsgApp Term:1 Log:1/4 Commit:4 -> 4 handling Ready - Ready MustSync=false: - Messages: - 4->1 MsgAppResp Term:1 Log:0/4 -> 1 receiving messages - 4->1 MsgAppResp Term:1 Log:0/4 # Transfer leadership while in the joint config. @@ -284,12 +272,10 @@ stabilize CommittedEntries: 2/5 EntryNormal "" Messages: - 4->1 MsgApp Term:2 Log:2/5 Commit:4 4->1 MsgApp Term:2 Log:2/5 Commit:5 4->2 MsgApp Term:2 Log:2/5 Commit:5 4->3 MsgApp Term:2 Log:2/5 Commit:5 > 1 receiving messages - 4->1 MsgApp Term:2 Log:2/5 Commit:4 4->1 MsgApp Term:2 Log:2/5 Commit:5 > 2 receiving messages 4->2 MsgApp Term:2 Log:2/5 Commit:5 @@ -302,7 +288,6 @@ stabilize 2/5 EntryNormal "" Messages: 1->4 MsgAppResp Term:2 Log:0/5 - 1->4 MsgAppResp Term:2 Log:0/5 > 2 handling Ready Ready MustSync=false: HardState Term:2 Vote:4 Commit:5 @@ -318,7 +303,6 @@ stabilize Messages: 3->4 MsgAppResp Term:2 Log:0/5 > 4 receiving messages - 1->4 MsgAppResp Term:2 Log:0/5 1->4 MsgAppResp Term:2 Log:0/5 2->4 MsgAppResp Term:2 Log:0/5 3->4 MsgAppResp Term:2 Log:0/5 diff --git a/testdata/probe_and_replicate.txt b/testdata/probe_and_replicate.txt index c4100e97..d970183c 100644 --- a/testdata/probe_and_replicate.txt +++ b/testdata/probe_and_replicate.txt @@ -513,18 +513,6 @@ stabilize 1 2 2->1 MsgAppResp Term:8 Log:0/21 > 1 receiving messages 2->1 MsgAppResp Term:8 Log:0/21 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->2 MsgApp Term:8 Log:8/21 Commit:18 -> 2 receiving messages - 1->2 MsgApp Term:8 Log:8/21 Commit:18 -> 2 handling Ready - Ready MustSync=false: - Messages: - 2->1 MsgAppResp Term:8 Log:0/21 -> 1 receiving messages - 2->1 MsgAppResp Term:8 Log:0/21 stabilize 1 3 ---- @@ -579,18 +567,6 @@ stabilize 1 3 3->1 MsgAppResp Term:8 Log:0/21 > 1 receiving messages 3->1 MsgAppResp Term:8 Log:0/21 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->3 MsgApp Term:8 Log:8/21 Commit:18 -> 3 receiving messages - 1->3 MsgApp Term:8 Log:8/21 Commit:18 -> 3 handling Ready - Ready MustSync=false: - Messages: - 3->1 MsgAppResp Term:8 Log:0/21 -> 1 receiving messages - 3->1 MsgAppResp Term:8 Log:0/21 stabilize 1 4 ---- @@ -674,18 +650,6 @@ stabilize 1 5 5->1 MsgAppResp Term:8 Log:0/21 > 1 receiving messages 5->1 MsgAppResp Term:8 Log:0/21 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->5 MsgApp Term:8 Log:8/21 Commit:21 -> 5 receiving messages - 1->5 MsgApp Term:8 Log:8/21 Commit:21 -> 5 handling Ready - Ready MustSync=false: - Messages: - 5->1 MsgAppResp Term:8 Log:0/21 -> 1 receiving messages - 5->1 MsgAppResp Term:8 Log:0/21 stabilize 1 6 ---- @@ -741,18 +705,6 @@ stabilize 1 6 6->1 MsgAppResp Term:8 Log:0/21 > 1 receiving messages 6->1 MsgAppResp Term:8 Log:0/21 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->6 MsgApp Term:8 Log:8/21 Commit:21 -> 6 receiving messages - 1->6 MsgApp Term:8 Log:8/21 Commit:21 -> 6 handling Ready - Ready MustSync=false: - Messages: - 6->1 MsgAppResp Term:8 Log:0/21 -> 1 receiving messages - 6->1 MsgAppResp Term:8 Log:0/21 stabilize 1 7 ---- @@ -816,15 +768,3 @@ stabilize 1 7 7->1 MsgAppResp Term:8 Log:0/21 > 1 receiving messages 7->1 MsgAppResp Term:8 Log:0/21 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->7 MsgApp Term:8 Log:8/21 Commit:21 -> 7 receiving messages - 1->7 MsgApp Term:8 Log:8/21 Commit:21 -> 7 handling Ready - Ready MustSync=false: - Messages: - 7->1 MsgAppResp Term:8 Log:0/21 -> 1 receiving messages - 7->1 MsgAppResp Term:8 Log:0/21 diff --git a/testdata/snapshot_succeed_via_app_resp.txt b/testdata/snapshot_succeed_via_app_resp.txt index 5c4b0c61..b1fc4144 100644 --- a/testdata/snapshot_succeed_via_app_resp.txt +++ b/testdata/snapshot_succeed_via_app_resp.txt @@ -87,7 +87,7 @@ stabilize 1 > 1 receiving messages 3->1 MsgHeartbeatResp Term:1 Log:0/0 DEBUG 1 [firstindex: 12, commit: 11] sent snapshot[index: 11, term: 1] to 3 [StateProbe match=0 next=11] - DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=11 paused pendingSnap=11] + DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=12 paused pendingSnap=11] > 1 handling Ready Ready MustSync=false: Messages: @@ -98,7 +98,7 @@ status 1 ---- 1: StateReplicate match=11 next=12 2: StateReplicate match=11 next=12 -3: StateSnapshot match=0 next=11 paused pendingSnap=11 +3: StateSnapshot match=0 next=12 paused pendingSnap=11 # Follower applies the snapshot. Note how it reacts with a MsgAppResp upon completion. # The snapshot fully catches the follower up (i.e. there are no more log entries it @@ -127,10 +127,6 @@ stabilize 1 > 1 receiving messages 3->1 MsgAppResp Term:1 Log:0/11 DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=11] -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->3 MsgApp Term:1 Log:1/11 Commit:11 status 1 ---- @@ -143,16 +139,9 @@ stabilize ---- > 2 receiving messages 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11 -> 3 receiving messages - 1->3 MsgApp Term:1 Log:1/11 Commit:11 > 2 handling Ready Ready MustSync=false: Messages: 2->1 MsgHeartbeatResp Term:1 Log:0/0 -> 3 handling Ready - Ready MustSync=false: - Messages: - 3->1 MsgAppResp Term:1 Log:0/11 > 1 receiving messages 2->1 MsgHeartbeatResp Term:1 Log:0/0 - 3->1 MsgAppResp Term:1 Log:0/11 diff --git a/testdata/snapshot_succeed_via_app_resp_behind.txt b/testdata/snapshot_succeed_via_app_resp_behind.txt index b0f5883b..3bab832d 100644 --- a/testdata/snapshot_succeed_via_app_resp_behind.txt +++ b/testdata/snapshot_succeed_via_app_resp_behind.txt @@ -124,7 +124,7 @@ stabilize 1 DEBUG 1 received MsgAppResp(rejected, hint: (index 5, term 1)) from 3 for index 10 DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=6] DEBUG 1 [firstindex: 11, commit: 12] sent snapshot[index: 12, term: 1] to 3 [StateProbe match=0 next=6] - DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused pendingSnap=12] + DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=13 paused pendingSnap=12] > 1 handling Ready Ready MustSync=false: Messages: @@ -152,7 +152,7 @@ stabilize 1 ---- > 1 receiving messages 3->1 MsgAppResp Term:1 Log:0/11 - DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=12] + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=13 paused pendingSnap=12] > 1 handling Ready Ready MustSync=false: Messages: diff --git a/tracker/progress.go b/tracker/progress.go index cb4312a9..b475002f 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -35,8 +35,19 @@ type Progress struct { // entries with indices in (Match, Next) interval are already in flight. // // Invariant: 0 <= Match < Next. + // NB: it follows that Next >= 1. + // + // In StateSnapshot, Next == PendingSnapshot + 1. Next uint64 + // sentCommit is the highest commit index in flight to the follower. + // + // Generally, it is monotonic, but con regress in some cases, e.g. when + // converting to `StateProbe` or when receiving a rejection from a follower. + // + // In StateSnapshot, sentCommit == PendingSnapshot == Next-1. + sentCommit uint64 + // State defines how the leader should interact with the follower. // // When in StateProbe, leader sends at most one replication message @@ -128,6 +139,7 @@ func (pr *Progress) BecomeProbe() { pr.ResetState(StateProbe) pr.Next = pr.Match + 1 } + pr.sentCommit = min(pr.sentCommit, pr.Next-1) } // BecomeReplicate transitions into StateReplicate, resetting Next to Match+1. @@ -141,14 +153,16 @@ func (pr *Progress) BecomeReplicate() { func (pr *Progress) BecomeSnapshot(snapshoti uint64) { pr.ResetState(StateSnapshot) pr.PendingSnapshot = snapshoti + pr.Next = snapshoti + 1 + pr.sentCommit = snapshoti } -// UpdateOnEntriesSend updates the progress on the given number of consecutive -// entries being sent in a MsgApp, with the given total bytes size, appended at -// log indices >= pr.Next. +// SentEntries updates the progress on the given number of consecutive entries +// being sent in a MsgApp, with the given total bytes size, appended at log +// indices >= pr.Next. // // Must be used with StateProbe or StateReplicate. -func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) { +func (pr *Progress) SentEntries(entries int, bytes uint64) { switch pr.State { case StateReplicate: if entries > 0 { @@ -170,6 +184,21 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) { } } +// CanBumpCommit returns true if sending the given commit index can potentially +// advance the follower's commit index. +func (pr *Progress) CanBumpCommit(index uint64) bool { + // Sending the given commit index may bump the follower's commit index up to + // Next-1 in normal operation, or higher in some rare cases. Allow sending a + // commit index eagerly only if we haven't already sent one that bumps the + // follower's commit all the way to Next-1. + return index > pr.sentCommit && pr.sentCommit < pr.Next-1 +} + +// SentCommit updates the sentCommit. +func (pr *Progress) SentCommit(commit uint64) { + pr.sentCommit = commit +} + // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the // index acked by it. The method returns false if the given n index comes from // an outdated message. Otherwise it updates the progress and returns true. @@ -205,6 +234,8 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { // // TODO(tbg): why not use matchHint if it's larger? pr.Next = pr.Match + 1 + // Regress the sentCommit since it unlikely has been applied. + pr.sentCommit = min(pr.sentCommit, pr.Next-1) return true } @@ -216,6 +247,8 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { } pr.Next = max(min(rejected, matchHint+1), pr.Match+1) + // Regress the sentCommit since it unlikely has been applied. + pr.sentCommit = min(pr.sentCommit, pr.Next-1) pr.MsgAppFlowPaused = false return true }