diff --git a/raft.go b/raft.go index 7f591f26..16545836 100644 --- a/raft.go +++ b/raft.go @@ -599,6 +599,10 @@ func (r *raft) sendAppend(to uint64) { // argument controls whether messages with no entries will be sent // ("empty" messages are useful to convey updated Commit indexes, but // are undesirable when we're sending multiple messages in a batch). +// +// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress +// struct contains all the state necessary for deciding whether to send a +// message. func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { pr := r.trk.Progress[to] if pr.IsPaused() { @@ -641,6 +645,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { Commit: r.raftLog.committed, }) pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents))) + pr.SentCommit(r.raftLog.committed) return true } @@ -675,21 +680,21 @@ func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool { // sendHeartbeat sends a heartbeat RPC to the given peer. func (r *raft) sendHeartbeat(to uint64, ctx []byte) { + pr := r.trk.Progress[to] // Attach the commit as min(to.matched, r.committed). // When the leader sends out heartbeat message, // the receiver(follower) might not be matched with the leader // or it might not have all the committed entries. // The leader MUST NOT forward the follower's commit to // an unmatched index. - commit := min(r.trk.Progress[to].Match, r.raftLog.committed) - m := pb.Message{ + commit := min(pr.Match, r.raftLog.committed) + r.send(pb.Message{ To: to, Type: pb.MsgHeartbeat, Commit: commit, Context: ctx, - } - - r.send(m) + }) + pr.SentCommit(commit) } // bcastAppend sends RPC, with entries to all peers that are not up-to-date @@ -1480,7 +1485,6 @@ func stepLeader(r *raft, m pb.Message) error { r.sendAppend(m.From) } } else { - oldPaused := pr.IsPaused() // We want to update our tracking if the response updates our // matched index or if the response can move a probing peer back // into StateReplicate (see heartbeat_rep_recovers_from_probing.txt @@ -1517,9 +1521,11 @@ func stepLeader(r *raft, m pb.Message) error { // to respond to pending read index requests releasePendingReadIndexMessages(r) r.bcastAppend() - } else if oldPaused { - // If we were paused before, this node may be missing the - // latest commit index, so send it. + } else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) { + // This node may be missing the latest commit index, so send it. + // NB: this is not strictly necessary because the periodic heartbeat + // messages deliver commit indices too. However, a message sent now + // may arrive earlier than the next heartbeat fires. r.sendAppend(m.From) } // We've updated flow control information above, which may diff --git a/testdata/confchange_v1_add_single.txt b/testdata/confchange_v1_add_single.txt index e473c688..f13d3077 100644 --- a/testdata/confchange_v1_add_single.txt +++ b/testdata/confchange_v1_add_single.txt @@ -94,15 +94,3 @@ stabilize > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/4 DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4] -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->2 MsgApp Term:1 Log:1/4 Commit:4 -> 2 receiving messages - 1->2 MsgApp Term:1 Log:1/4 Commit:4 -> 2 handling Ready - Ready MustSync=false: - Messages: - 2->1 MsgAppResp Term:1 Log:0/4 -> 1 receiving messages - 2->1 MsgAppResp Term:1 Log:0/4 diff --git a/testdata/confchange_v2_add_double_auto.txt b/testdata/confchange_v2_add_double_auto.txt index b31ba4fb..f290c980 100644 --- a/testdata/confchange_v2_add_double_auto.txt +++ b/testdata/confchange_v2_add_double_auto.txt @@ -193,18 +193,6 @@ stabilize 1 3 > 1 receiving messages 3->1 MsgAppResp Term:1 Log:0/5 DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=5 next=6 paused pendingSnap=5] -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->3 MsgApp Term:1 Log:1/5 Commit:5 -> 3 receiving messages - 1->3 MsgApp Term:1 Log:1/5 Commit:5 -> 3 handling Ready - Ready MustSync=false: - Messages: - 3->1 MsgAppResp Term:1 Log:0/5 -> 1 receiving messages - 3->1 MsgAppResp Term:1 Log:0/5 # Nothing else happens. stabilize diff --git a/testdata/confchange_v2_add_single_auto.txt b/testdata/confchange_v2_add_single_auto.txt index 301f10c4..3c234159 100644 --- a/testdata/confchange_v2_add_single_auto.txt +++ b/testdata/confchange_v2_add_single_auto.txt @@ -95,15 +95,3 @@ stabilize > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/4 DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4] -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->2 MsgApp Term:1 Log:1/4 Commit:4 -> 2 receiving messages - 1->2 MsgApp Term:1 Log:1/4 Commit:4 -> 2 handling Ready - Ready MustSync=false: - Messages: - 2->1 MsgAppResp Term:1 Log:0/4 -> 1 receiving messages - 2->1 MsgAppResp Term:1 Log:0/4 diff --git a/testdata/confchange_v2_add_single_explicit.txt b/testdata/confchange_v2_add_single_explicit.txt index c910658c..1d390e55 100644 --- a/testdata/confchange_v2_add_single_explicit.txt +++ b/testdata/confchange_v2_add_single_explicit.txt @@ -95,18 +95,6 @@ stabilize 1 2 > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/4 DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4] -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->2 MsgApp Term:1 Log:1/4 Commit:4 -> 2 receiving messages - 1->2 MsgApp Term:1 Log:1/4 Commit:4 -> 2 handling Ready - Ready MustSync=false: - Messages: - 2->1 MsgAppResp Term:1 Log:0/4 -> 1 receiving messages - 2->1 MsgAppResp Term:1 Log:0/4 # Check that we're not allowed to change membership again while in the joint state. # This leads to an empty entry being proposed instead (index 5 in the stabilize block diff --git a/testdata/confchange_v2_replace_leader.txt b/testdata/confchange_v2_replace_leader.txt index ae43ce21..628cc447 100644 --- a/testdata/confchange_v2_replace_leader.txt +++ b/testdata/confchange_v2_replace_leader.txt @@ -143,18 +143,6 @@ stabilize 4->1 MsgAppResp Term:1 Log:0/4 > 1 receiving messages 4->1 MsgAppResp Term:1 Log:0/4 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->4 MsgApp Term:1 Log:1/4 Commit:4 -> 4 receiving messages - 1->4 MsgApp Term:1 Log:1/4 Commit:4 -> 4 handling Ready - Ready MustSync=false: - Messages: - 4->1 MsgAppResp Term:1 Log:0/4 -> 1 receiving messages - 4->1 MsgAppResp Term:1 Log:0/4 # Transfer leadership while in the joint config. @@ -284,12 +272,10 @@ stabilize CommittedEntries: 2/5 EntryNormal "" Messages: - 4->1 MsgApp Term:2 Log:2/5 Commit:4 4->1 MsgApp Term:2 Log:2/5 Commit:5 4->2 MsgApp Term:2 Log:2/5 Commit:5 4->3 MsgApp Term:2 Log:2/5 Commit:5 > 1 receiving messages - 4->1 MsgApp Term:2 Log:2/5 Commit:4 4->1 MsgApp Term:2 Log:2/5 Commit:5 > 2 receiving messages 4->2 MsgApp Term:2 Log:2/5 Commit:5 @@ -302,7 +288,6 @@ stabilize 2/5 EntryNormal "" Messages: 1->4 MsgAppResp Term:2 Log:0/5 - 1->4 MsgAppResp Term:2 Log:0/5 > 2 handling Ready Ready MustSync=false: HardState Term:2 Vote:4 Commit:5 @@ -318,7 +303,6 @@ stabilize Messages: 3->4 MsgAppResp Term:2 Log:0/5 > 4 receiving messages - 1->4 MsgAppResp Term:2 Log:0/5 1->4 MsgAppResp Term:2 Log:0/5 2->4 MsgAppResp Term:2 Log:0/5 3->4 MsgAppResp Term:2 Log:0/5 diff --git a/testdata/probe_and_replicate.txt b/testdata/probe_and_replicate.txt index c4100e97..d970183c 100644 --- a/testdata/probe_and_replicate.txt +++ b/testdata/probe_and_replicate.txt @@ -513,18 +513,6 @@ stabilize 1 2 2->1 MsgAppResp Term:8 Log:0/21 > 1 receiving messages 2->1 MsgAppResp Term:8 Log:0/21 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->2 MsgApp Term:8 Log:8/21 Commit:18 -> 2 receiving messages - 1->2 MsgApp Term:8 Log:8/21 Commit:18 -> 2 handling Ready - Ready MustSync=false: - Messages: - 2->1 MsgAppResp Term:8 Log:0/21 -> 1 receiving messages - 2->1 MsgAppResp Term:8 Log:0/21 stabilize 1 3 ---- @@ -579,18 +567,6 @@ stabilize 1 3 3->1 MsgAppResp Term:8 Log:0/21 > 1 receiving messages 3->1 MsgAppResp Term:8 Log:0/21 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->3 MsgApp Term:8 Log:8/21 Commit:18 -> 3 receiving messages - 1->3 MsgApp Term:8 Log:8/21 Commit:18 -> 3 handling Ready - Ready MustSync=false: - Messages: - 3->1 MsgAppResp Term:8 Log:0/21 -> 1 receiving messages - 3->1 MsgAppResp Term:8 Log:0/21 stabilize 1 4 ---- @@ -674,18 +650,6 @@ stabilize 1 5 5->1 MsgAppResp Term:8 Log:0/21 > 1 receiving messages 5->1 MsgAppResp Term:8 Log:0/21 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->5 MsgApp Term:8 Log:8/21 Commit:21 -> 5 receiving messages - 1->5 MsgApp Term:8 Log:8/21 Commit:21 -> 5 handling Ready - Ready MustSync=false: - Messages: - 5->1 MsgAppResp Term:8 Log:0/21 -> 1 receiving messages - 5->1 MsgAppResp Term:8 Log:0/21 stabilize 1 6 ---- @@ -741,18 +705,6 @@ stabilize 1 6 6->1 MsgAppResp Term:8 Log:0/21 > 1 receiving messages 6->1 MsgAppResp Term:8 Log:0/21 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->6 MsgApp Term:8 Log:8/21 Commit:21 -> 6 receiving messages - 1->6 MsgApp Term:8 Log:8/21 Commit:21 -> 6 handling Ready - Ready MustSync=false: - Messages: - 6->1 MsgAppResp Term:8 Log:0/21 -> 1 receiving messages - 6->1 MsgAppResp Term:8 Log:0/21 stabilize 1 7 ---- @@ -816,15 +768,3 @@ stabilize 1 7 7->1 MsgAppResp Term:8 Log:0/21 > 1 receiving messages 7->1 MsgAppResp Term:8 Log:0/21 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->7 MsgApp Term:8 Log:8/21 Commit:21 -> 7 receiving messages - 1->7 MsgApp Term:8 Log:8/21 Commit:21 -> 7 handling Ready - Ready MustSync=false: - Messages: - 7->1 MsgAppResp Term:8 Log:0/21 -> 1 receiving messages - 7->1 MsgAppResp Term:8 Log:0/21 diff --git a/testdata/snapshot_succeed_via_app_resp.txt b/testdata/snapshot_succeed_via_app_resp.txt index 68532bf8..b1fc4144 100644 --- a/testdata/snapshot_succeed_via_app_resp.txt +++ b/testdata/snapshot_succeed_via_app_resp.txt @@ -127,10 +127,6 @@ stabilize 1 > 1 receiving messages 3->1 MsgAppResp Term:1 Log:0/11 DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=11] -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->3 MsgApp Term:1 Log:1/11 Commit:11 status 1 ---- @@ -143,16 +139,9 @@ stabilize ---- > 2 receiving messages 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11 -> 3 receiving messages - 1->3 MsgApp Term:1 Log:1/11 Commit:11 > 2 handling Ready Ready MustSync=false: Messages: 2->1 MsgHeartbeatResp Term:1 Log:0/0 -> 3 handling Ready - Ready MustSync=false: - Messages: - 3->1 MsgAppResp Term:1 Log:0/11 > 1 receiving messages 2->1 MsgHeartbeatResp Term:1 Log:0/0 - 3->1 MsgAppResp Term:1 Log:0/11 diff --git a/tracker/progress.go b/tracker/progress.go index 8c562fa9..65a2ee60 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -40,6 +40,11 @@ type Progress struct { // In StateSnapshot, Next == PendingSnapshot + 1. Next uint64 + // sentCommit is the highest commit index in flight to the follower. + // + // In StateSnapshot, sentCommit == PendingSnapshot == Next-1. + sentCommit uint64 + // State defines how the leader should interact with the follower. // // When in StateProbe, leader sends at most one replication message @@ -131,6 +136,7 @@ func (pr *Progress) BecomeProbe() { pr.ResetState(StateProbe) pr.Next = pr.Match + 1 } + pr.sentCommit = min(pr.sentCommit, pr.Next-1) } // BecomeReplicate transitions into StateReplicate, resetting Next to Match+1. @@ -145,6 +151,7 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) { pr.ResetState(StateSnapshot) pr.PendingSnapshot = snapshoti pr.Next = snapshoti + 1 + pr.sentCommit = snapshoti } // UpdateOnEntriesSend updates the progress on the given number of consecutive @@ -174,6 +181,21 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) { } } +// CanBumpCommit returns true if sending the given commit index can potentially +// advance the follower's commit index. +func (pr *Progress) CanBumpCommit(index uint64) bool { + // Sending the given commit index may bump the follower's commit index up to + // Next-1 in normal operation, or higher in some rare cases. Allow sending a + // commit index eagerly only if we haven't already sent one that bumps the + // follower's commit all the way to Next-1. + return index > pr.sentCommit && pr.sentCommit < pr.Next-1 +} + +// SentCommit updates the sentCommit. +func (pr *Progress) SentCommit(commit uint64) { + pr.sentCommit = commit +} + // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the // index acked by it. The method returns false if the given n index comes from // an outdated message. Otherwise it updates the progress and returns true. @@ -209,6 +231,8 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { // // TODO(tbg): why not use matchHint if it's larger? pr.Next = pr.Match + 1 + // Regress the sentCommit since it unlikely has been applied. + pr.sentCommit = min(pr.sentCommit, pr.Next-1) return true } @@ -220,6 +244,8 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { } pr.Next = max(min(rejected, matchHint+1), pr.Match+1) + // Regress the sentCommit since it unlikely has been applied. + pr.sentCommit = min(pr.sentCommit, pr.Next-1) pr.MsgAppFlowPaused = false return true }