From 863ff68863730137490fc28e903404c580875efc Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 29 Jan 2024 13:10:11 +0000 Subject: [PATCH 1/3] tracker: throttle empty probes Signed-off-by: Pavel Kalinnikov --- tracker/progress.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tracker/progress.go b/tracker/progress.go index b475002f..cb566eac 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -173,12 +173,7 @@ func (pr *Progress) SentEntries(entries int, bytes uint64) { // consider this message being a probe, so that the flow is paused. pr.MsgAppFlowPaused = pr.Inflights.Full() case StateProbe: - // TODO(pavelkalinnikov): this condition captures the previous behaviour, - // but we should set MsgAppFlowPaused unconditionally for simplicity, because any - // MsgApp in StateProbe is a probe, not only non-empty ones. - if entries > 0 { - pr.MsgAppFlowPaused = true - } + pr.MsgAppFlowPaused = true default: panic(fmt.Sprintf("sending append in unhandled state %s", pr.State)) } From 1ce78f5b42d01cb4c1ca041e9779acd7d8f4844b Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 23 Feb 2024 11:21:33 +0000 Subject: [PATCH 2/3] tracker: consolidate MsgApp decisions in Progress This commit consolidates all decision-making about sending append messages into a single maybeSendAppend method. Previously, the behaviour depended on the sendIfEmpty flag which was set/unset depending on the circumstances in which the method is called. This is unnecessary because the Progress struct contains enough information about the leader->follower flow state, so maybeSendAppend can be made stand-alone. Signed-off-by: Pavel Kalinnikov --- doc.go | 8 +- raft.go | 132 +++++++------------- raft_test.go | 14 +-- testdata/replicate_pause.txt | 3 + testdata/slow_follower_after_compaction.txt | 2 + tracker/inflights.go | 2 + tracker/progress.go | 74 ++++++++--- tracker/progress_test.go | 11 +- 8 files changed, 128 insertions(+), 118 deletions(-) diff --git a/doc.go b/doc.go index 06253f4e..45138cb1 100644 --- a/doc.go +++ b/doc.go @@ -315,7 +315,7 @@ stale log entries: rafthttp package. 'MsgApp' contains log entries to replicate. A leader calls bcastAppend, - which calls sendAppend, which sends soon-to-be-replicated logs in 'MsgApp' + which calls maybeSendAppend, which sends soon-to-be-replicated logs in 'MsgApp' type. When 'MsgApp' is passed to candidate's Step method, candidate reverts back to follower, because it indicates that there is a valid leader sending 'MsgApp' messages. Candidate and follower respond to this message in @@ -353,8 +353,8 @@ stale log entries: 'MsgSnap' requests to install a snapshot message. When a node has just become a leader or the leader receives 'MsgProp' message, it calls - 'bcastAppend' method, which then calls 'sendAppend' method to each - follower. In 'sendAppend', if a leader fails to get term or entries, + 'bcastAppend' method, which then calls 'maybeSendAppend' method to each + follower. In 'maybeSendAppend', if a leader fails to get term or entries, the leader requests snapshot by sending 'MsgSnap' type message. 'MsgSnapStatus' tells the result of snapshot install message. When a @@ -376,7 +376,7 @@ stale log entries: 'MsgHeartbeatResp' is a response to 'MsgHeartbeat'. When 'MsgHeartbeatResp' is passed to leader's Step method, the leader knows which follower responded. And only when the leader's last committed index is greater than - follower's Match index, the leader runs 'sendAppend` method. + follower's Match index, the leader runs 'maybeSendAppend` method. 'MsgUnreachable' tells that request(message) wasn't delivered. When 'MsgUnreachable' is passed to leader's Step method, the leader discovers diff --git a/raft.go b/raft.go index a2e5ad0f..912c5c05 100644 --- a/raft.go +++ b/raft.go @@ -588,24 +588,24 @@ func (r *raft) send(m pb.Message) { } } -// sendAppend sends an append RPC with new entries (if any) and the -// current commit index to the given peer. -func (r *raft) sendAppend(to uint64) { - r.maybeSendAppend(to, true) -} - -// maybeSendAppend sends an append RPC with new entries to the given peer, -// if necessary. Returns true if a message was sent. The sendIfEmpty -// argument controls whether messages with no entries will be sent -// ("empty" messages are useful to convey updated Commit indexes, but -// are undesirable when we're sending multiple messages in a batch). +// maybeSendAppend sends an append RPC with log entries (if any) that are not +// yet known to be replicated in the given peer's log, as well as the current +// commit index. Usually it sends a MsgApp message, but in some cases (e.g. the +// log has been compacted) it can send a MsgSnap. +// +// In some cases, the MsgApp message can have zero entries, and yet being sent. +// When the follower log is not fully up-to-date, we must send a MsgApp +// periodically so that eventually the flow is either accepted or rejected. Not +// doing so can result in replication stall, in cases when a MsgApp is dropped. // -// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress -// struct contains all the state necessary for deciding whether to send a -// message. -func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { +// Returns true if a message was sent, or false otherwise. A message is not sent +// if the follower log and commit index are up-to-date, the flow is paused (for +// reasons like in-flight limits), or the message could not be constructed. +func (r *raft) maybeSendAppend(to uint64) bool { pr := r.trk.Progress[to] - if pr.IsPaused() { + + last, commit := r.raftLog.lastIndex(), r.raftLog.committed + if !pr.ShouldSendMsgApp(last, commit) { return false } @@ -617,35 +617,25 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { return r.maybeSendSnapshot(to, pr) } - var ents []pb.Entry - // In a throttled StateReplicate only send empty MsgApp, to ensure progress. - // Otherwise, if we had a full Inflights and all inflight messages were in - // fact dropped, replication to that follower would stall. Instead, an empty - // MsgApp will eventually reach the follower (heartbeats responses prompt the - // leader to send an append), allowing it to be acked or rejected, both of - // which will clear out Inflights. - if pr.State != tracker.StateReplicate || !pr.Inflights.Full() { - ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize) - } - if len(ents) == 0 && !sendIfEmpty { - return false - } - // TODO(pav-kv): move this check up to where err is returned. - if err != nil { // send a snapshot if we failed to get the entries - return r.maybeSendSnapshot(to, pr) + var entries []pb.Entry + if pr.CanSendEntries(last) { + if entries, err = r.raftLog.entries(pr.Next, r.maxMsgSize); err != nil { + // Send a snapshot if we failed to get the entries. + return r.maybeSendSnapshot(to, pr) + } } - // Send the actual MsgApp otherwise, and update the progress accordingly. + // Send the MsgApp, and update the progress accordingly. r.send(pb.Message{ To: to, Type: pb.MsgApp, Index: prevIndex, LogTerm: prevTerm, - Entries: ents, - Commit: r.raftLog.committed, + Entries: entries, + Commit: commit, }) - pr.SentEntries(len(ents), uint64(payloadsSize(ents))) - pr.SentCommit(r.raftLog.committed) + pr.SentEntries(len(entries), uint64(payloadsSize(entries))) + pr.SentCommit(commit) return true } @@ -704,7 +694,7 @@ func (r *raft) bcastAppend() { if id == r.id { return } - r.sendAppend(id) + r.maybeSendAppend(id) }) } @@ -1482,7 +1472,7 @@ func stepLeader(r *raft, m pb.Message) error { if pr.State == tracker.StateReplicate { pr.BecomeProbe() } - r.sendAppend(m.From) + r.maybeSendAppend(m.From) } } else { // We want to update our tracking if the response updates our @@ -1521,21 +1511,13 @@ func stepLeader(r *raft, m pb.Message) error { // to respond to pending read index requests releasePendingReadIndexMessages(r) r.bcastAppend() - } else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) { - // This node may be missing the latest commit index, so send it. - // NB: this is not strictly necessary because the periodic heartbeat - // messages deliver commit indices too. However, a message sent now - // may arrive earlier than the next heartbeat fires. - r.sendAppend(m.From) } - // We've updated flow control information above, which may - // allow us to send multiple (size-limited) in-flight messages - // at once (such as when transitioning from probe to - // replicate, or when freeTo() covers multiple messages). If - // we have more entries to send, send as many messages as we - // can (without sending empty messages for the commit index) + // We've updated flow control information above, which may allow us to + // send multiple (size-limited) in-flight messages at once (such as when + // transitioning from probe to replicate, or when freeTo() covers + // multiple messages). Send as many messages as we can. if r.id != m.From { - for r.maybeSendAppend(m.From, false /* sendIfEmpty */) { + for r.maybeSendAppend(m.From) { } } // Transfer leadership is in progress. @@ -1547,24 +1529,8 @@ func stepLeader(r *raft, m pb.Message) error { } case pb.MsgHeartbeatResp: pr.RecentActive = true - pr.MsgAppFlowPaused = false - - // NB: if the follower is paused (full Inflights), this will still send an - // empty append, allowing it to recover from situations in which all the - // messages that filled up Inflights in the first place were dropped. Note - // also that the outgoing heartbeat already communicated the commit index. - // - // If the follower is fully caught up but also in StateProbe (as can happen - // if ReportUnreachable was called), we also want to send an append (it will - // be empty) to allow the follower to transition back to StateReplicate once - // it responds. - // - // Note that StateSnapshot typically satisfies pr.Match < lastIndex, but - // `pr.Paused()` is always true for StateSnapshot, so sendAppend is a - // no-op. - if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe { - r.sendAppend(m.From) - } + pr.PauseMsgAppProbes(false) + r.maybeSendAppend(m.From) if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { return nil @@ -1634,7 +1600,8 @@ func stepLeader(r *raft, m pb.Message) error { r.sendTimeoutNow(leadTransferee) r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee) } else { - r.sendAppend(leadTransferee) + pr.PauseMsgAppProbes(false) + r.maybeSendAppend(leadTransferee) } } return nil @@ -1982,21 +1949,14 @@ func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.Co return cs } - if r.maybeCommit() { - // If the configuration change means that more entries are committed now, - // broadcast/append to everyone in the updated config. - r.bcastAppend() - } else { - // Otherwise, still probe the newly added replicas; there's no reason to - // let them wait out a heartbeat interval (or the next incoming - // proposal). - r.trk.Visit(func(id uint64, pr *tracker.Progress) { - if id == r.id { - return - } - r.maybeSendAppend(id, false /* sendIfEmpty */) - }) - } + r.maybeCommit() + // If the configuration change means that more entries are committed now, + // broadcast/append to everyone in the updated config. + // + // Otherwise, still probe the newly added replicas; there's no reason to let + // them wait out a heartbeat interval (or the next incoming proposal). + r.bcastAppend() + // If the leadTransferee was removed or demoted, abort the leadership transfer. if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 { r.abortLeaderTransfer() diff --git a/raft_test.go b/raft_test.go index b72c2324..c22e322f 100644 --- a/raft_test.go +++ b/raft_test.go @@ -132,7 +132,7 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { assert.False(t, r.trk.Progress[2].MsgAppFlowPaused) r.trk.Progress[2].MsgAppFlowPaused = true r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - assert.False(t, r.trk.Progress[2].MsgAppFlowPaused) + assert.True(t, r.trk.Progress[2].MsgAppFlowPaused) } func TestProgressPaused(t *testing.T) { @@ -2299,7 +2299,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { // loop. After that, the follower is paused until a heartbeat response is // received. mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2) msg := r.readMessages() assert.Len(t, msg, 1) assert.Zero(t, msg[0].Index) @@ -2308,7 +2308,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { assert.True(t, r.trk.Progress[2].MsgAppFlowPaused) for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2) assert.Empty(t, r.readMessages()) } @@ -2341,7 +2341,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) { for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2) msgs := r.readMessages() assert.Len(t, msgs, 1, "#%d", i) } @@ -2356,7 +2356,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2) msgs := r.readMessages() assert.Empty(t, msgs, "#%d", i) } @@ -3876,10 +3876,10 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) { // r1 sends 2 MsgApp messages to r2. mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")}) - r1.sendAppend(2) + r1.maybeSendAppend(2) req1 := expectOneMessage(t, r1) mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")}) - r1.sendAppend(2) + r1.maybeSendAppend(2) req2 := expectOneMessage(t, r1) // r2 receives the second MsgApp first due to reordering. diff --git a/testdata/replicate_pause.txt b/testdata/replicate_pause.txt index d9cee59f..4931480e 100644 --- a/testdata/replicate_pause.txt +++ b/testdata/replicate_pause.txt @@ -76,6 +76,9 @@ deliver-msgs drop=3 dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1_12"] dropped: 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_1_13"] dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:11 Entries:[1/14 EntryNormal "prop_1_14"] +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:12 +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:13 +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 # Repeat committing 3 entries. diff --git a/testdata/slow_follower_after_compaction.txt b/testdata/slow_follower_after_compaction.txt index 0d3d48c8..2ce02ada 100644 --- a/testdata/slow_follower_after_compaction.txt +++ b/testdata/slow_follower_after_compaction.txt @@ -88,6 +88,8 @@ deliver-msgs drop=3 ---- dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 Entries:[1/15 EntryNormal "prop_1_15"] dropped: 1->3 MsgApp Term:1 Log:1/15 Commit:14 Entries:[1/16 EntryNormal "prop_1_16"] +dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:15 +dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:16 # Truncate the leader's log beyond node 3 log size. compact 1 17 diff --git a/tracker/inflights.go b/tracker/inflights.go index cb091e54..e22bcb8d 100644 --- a/tracker/inflights.go +++ b/tracker/inflights.go @@ -32,6 +32,8 @@ type Inflights struct { count int // number of inflight messages in the buffer bytes uint64 // number of inflight bytes + // TODO(pav-kv): do not store the limits here, pass them to methods. For flow + // control, we need to support dynamic limits. size int // the max number of inflight messages maxBytes uint64 // the max total byte size of inflight messages diff --git a/tracker/progress.go b/tracker/progress.go index cb566eac..f57c3bd2 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -27,6 +27,9 @@ import ( // NB(tbg): Progress is basically a state machine whose transitions are mostly // strewn around `*raft.raft`. Additionally, some fields are only used when in a // certain State. All of this isn't ideal. +// +// TODO(pav-kv): consolidate all flow control state changes here. Much of the +// transitions in raft.go logically belong here. type Progress struct { // Match is the index up to which the follower's log is known to match the // leader's. @@ -119,7 +122,7 @@ type Progress struct { // ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused, // PendingSnapshot, and Inflights. func (pr *Progress) ResetState(state StateType) { - pr.MsgAppFlowPaused = false + pr.PauseMsgAppProbes(false) pr.PendingSnapshot = 0 pr.State = state pr.Inflights.reset() @@ -163,20 +166,25 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) { // // Must be used with StateProbe or StateReplicate. func (pr *Progress) SentEntries(entries int, bytes uint64) { - switch pr.State { - case StateReplicate: - if entries > 0 { - pr.Next += uint64(entries) - pr.Inflights.Add(pr.Next-1, bytes) - } - // If this message overflows the in-flights tracker, or it was already full, - // consider this message being a probe, so that the flow is paused. - pr.MsgAppFlowPaused = pr.Inflights.Full() - case StateProbe: - pr.MsgAppFlowPaused = true - default: - panic(fmt.Sprintf("sending append in unhandled state %s", pr.State)) + if pr.State == StateReplicate && entries > 0 { + pr.Next += uint64(entries) + pr.Inflights.Add(pr.Next-1, bytes) } + pr.PauseMsgAppProbes(true) +} + +// PauseMsgAppProbes pauses or unpauses empty MsgApp messages flow, depending on +// the passed-in bool. +func (pr *Progress) PauseMsgAppProbes(pause bool) { + pr.MsgAppFlowPaused = pause +} + +// CanSendEntries returns true if the flow control state allows sending at least +// one log entry to this follower. +// +// Must be used with StateProbe or StateReplicate. +func (pr *Progress) CanSendEntries(lastIndex uint64) bool { + return pr.Next <= lastIndex && (pr.State == StateProbe || !pr.Inflights.Full()) } // CanBumpCommit returns true if sending the given commit index can potentially @@ -203,7 +211,6 @@ func (pr *Progress) MaybeUpdate(n uint64) bool { } pr.Match = n pr.Next = max(pr.Next, n+1) // invariant: Match < Next - pr.MsgAppFlowPaused = false return true } @@ -244,7 +251,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { pr.Next = max(min(rejected, matchHint+1), pr.Match+1) // Regress the sentCommit since it unlikely has been applied. pr.sentCommit = min(pr.sentCommit, pr.Next-1) - pr.MsgAppFlowPaused = false + pr.PauseMsgAppProbes(false) return true } @@ -259,7 +266,7 @@ func (pr *Progress) IsPaused() bool { case StateProbe: return pr.MsgAppFlowPaused case StateReplicate: - return pr.MsgAppFlowPaused + return pr.MsgAppFlowPaused && pr.Inflights.Full() case StateSnapshot: return true default: @@ -267,6 +274,39 @@ func (pr *Progress) IsPaused() bool { } } +// ShouldSendMsgApp returns true if the leader should send a MsgApp to the +// follower represented by this Progress. The given last and commit index of the +// leader log help determining if there is outstanding workload, and contribute +// to this decision-making. +// +// In StateProbe, a message is sent periodically. The flow is paused after every +// message, and un-paused on a heartbeat response. This ensures that probes are +// not too frequent, and eventually the MsgApp is either accepted or rejected. +// +// In StateReplicate, generally a message is sent if there are log entries that +// are not yet in-flight, and the in-flight limits are not exceeded. Otherwise, +// we don't send a message, or send a "probe" message in a few situations. +// +// A probe message (containing no log entries) is sent if the follower's commit +// index can be updated, or there hasn't been a probe message recently. We must +// send a message periodically even if all log entries are in-flight, in order +// to guarantee that eventually the flow is either accepted or rejected. +// +// In StateSnapshot, we do not send append messages. +func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool { + switch pr.State { + case StateProbe: + return !pr.MsgAppFlowPaused + case StateReplicate: + return pr.CanBumpCommit(commit) || + pr.Match < last && (!pr.MsgAppFlowPaused || pr.CanSendEntries(last)) + case StateSnapshot: + return false + default: + panic("unexpected state") + } +} + func (pr *Progress) String() string { var buf strings.Builder fmt.Fprintf(&buf, "%s match=%d next=%d", pr.State, pr.Match, pr.Next) diff --git a/tracker/progress_test.go b/tracker/progress_test.go index 49dedb53..5ceaa59f 100644 --- a/tracker/progress_test.go +++ b/tracker/progress_test.go @@ -47,7 +47,7 @@ func TestProgressIsPaused(t *testing.T) { {StateProbe, false, false}, {StateProbe, true, true}, {StateReplicate, false, false}, - {StateReplicate, true, true}, + {StateReplicate, true, false}, {StateSnapshot, false, true}, {StateSnapshot, true, true}, } @@ -61,8 +61,11 @@ func TestProgressIsPaused(t *testing.T) { } } -// TestProgressResume ensures that MaybeUpdate and MaybeDecrTo will reset -// MsgAppFlowPaused. +// TestProgressResume ensures that MaybeDecrTo resets MsgAppFlowPaused, and +// MaybeUpdate does not. +// +// TODO(pav-kv): there is little sense in testing these micro-behaviours in the +// struct. We should test the visible behaviour instead. func TestProgressResume(t *testing.T) { p := &Progress{ Next: 2, @@ -72,7 +75,7 @@ func TestProgressResume(t *testing.T) { assert.False(t, p.MsgAppFlowPaused) p.MsgAppFlowPaused = true p.MaybeUpdate(2) - assert.False(t, p.MsgAppFlowPaused) + assert.True(t, p.MsgAppFlowPaused) } func TestProgressBecomeProbe(t *testing.T) { From 91981c3f884a25f342795de031ee734974ca71be Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 23 Feb 2024 11:28:08 +0000 Subject: [PATCH 3/3] tracker: rename the paused probes flow field Signed-off-by: Pavel Kalinnikov --- raft.go | 2 +- raft_snap_test.go | 8 ++++---- raft_test.go | 16 ++++++++-------- tracker/progress.go | 31 ++++++++++++++++++------------- tracker/progress_test.go | 19 +++++++++---------- 5 files changed, 40 insertions(+), 36 deletions(-) diff --git a/raft.go b/raft.go index 912c5c05..270ad6f3 100644 --- a/raft.go +++ b/raft.go @@ -1563,7 +1563,7 @@ func stepLeader(r *raft, m pb.Message) error { // If snapshot finish, wait for the MsgAppResp from the remote node before sending // out the next MsgApp. // If snapshot failure, wait for a heartbeat interval before next try - pr.MsgAppFlowPaused = true + pr.PauseMsgAppProbes(true) case pb.MsgUnreachable: // During optimistic replication, if the remote becomes unreachable, // there is huge probability that a MsgApp is lost. diff --git a/raft_snap_test.go b/raft_snap_test.go index e6058c68..a69d1993 100644 --- a/raft_snap_test.go +++ b/raft_snap_test.go @@ -83,8 +83,8 @@ func TestSnapshotFailure(t *testing.T) { if sm.trk.Progress[2].Next != 1 { t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next) } - if !sm.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused) + if !sm.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("msgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused) } } @@ -106,8 +106,8 @@ func TestSnapshotSucceed(t *testing.T) { if sm.trk.Progress[2].Next != 12 { t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next) } - if !sm.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused) + if !sm.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused) } } diff --git a/raft_test.go b/raft_test.go index c22e322f..d46f2065 100644 --- a/raft_test.go +++ b/raft_test.go @@ -123,16 +123,16 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.trk.Progress[2].MsgAppFlowPaused = true + r.trk.Progress[2].PauseMsgAppProbes(true) r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - assert.True(t, r.trk.Progress[2].MsgAppFlowPaused) + assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) r.trk.Progress[2].BecomeReplicate() - assert.False(t, r.trk.Progress[2].MsgAppFlowPaused) - r.trk.Progress[2].MsgAppFlowPaused = true + assert.False(t, r.trk.Progress[2].MsgAppProbesPaused) + r.trk.Progress[2].PauseMsgAppProbes(true) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - assert.True(t, r.trk.Progress[2].MsgAppFlowPaused) + assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) } func TestProgressPaused(t *testing.T) { @@ -2305,7 +2305,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { assert.Zero(t, msg[0].Index) } - assert.True(t, r.trk.Progress[2].MsgAppFlowPaused) + assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) r.maybeSendAppend(2) @@ -2316,7 +2316,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } - assert.True(t, r.trk.Progress[2].MsgAppFlowPaused) + assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) // consume the heartbeat msg := r.readMessages() @@ -2329,7 +2329,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { msg := r.readMessages() assert.Len(t, msg, 1) assert.Zero(t, msg[0].Index) - assert.True(t, r.trk.Progress[2].MsgAppFlowPaused) + assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) } func TestSendAppendForProgressReplicate(t *testing.T) { diff --git a/tracker/progress.go b/tracker/progress.go index f57c3bd2..0cf075cd 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -93,13 +93,13 @@ type Progress struct { // This is always true on the leader. RecentActive bool - // MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This - // happens in StateProbe, or StateReplicate with saturated Inflights. In both - // cases, we need to continue sending MsgApp once in a while to guarantee - // progress, but we only do so when MsgAppFlowPaused is false (it is reset on - // receiving a heartbeat response), to not overflow the receiver. See - // IsPaused(). - MsgAppFlowPaused bool + // MsgAppProbesPaused set to true prevents sending "probe" MsgApp messages to + // this follower. Used in StateProbe, or StateReplicate when all entries are + // in-flight or the in-flight volume exceeds limits. See ShouldSendMsgApp(). + // + // TODO(pav-kv): unexport this field. It is used by a few tests, but should be + // replaced by PauseMsgAppProbes() and ShouldSendMsgApp(). + MsgAppProbesPaused bool // Inflights is a sliding window for the inflight messages. // Each inflight message contains one or more log entries. @@ -119,7 +119,7 @@ type Progress struct { IsLearner bool } -// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused, +// ResetState moves the Progress into the specified State, resetting MsgAppProbesPaused, // PendingSnapshot, and Inflights. func (pr *Progress) ResetState(state StateType) { pr.PauseMsgAppProbes(false) @@ -176,7 +176,7 @@ func (pr *Progress) SentEntries(entries int, bytes uint64) { // PauseMsgAppProbes pauses or unpauses empty MsgApp messages flow, depending on // the passed-in bool. func (pr *Progress) PauseMsgAppProbes(pause bool) { - pr.MsgAppFlowPaused = pause + pr.MsgAppProbesPaused = pause } // CanSendEntries returns true if the flow control state allows sending at least @@ -261,12 +261,17 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { // operation, this is false. A throttled node will be contacted less frequently // until it has reached a state in which it's able to accept a steady stream of // log entries again. +// +// TODO(pav-kv): this method is deprecated, remove it. It is still used in tests +// and String(), find a way to avoid this. The problem is that the actual flow +// control state depends on the log size and commit index, which are not part of +// this Progress struct - they are passed-in to methods like ShouldSendMsgApp(). func (pr *Progress) IsPaused() bool { switch pr.State { case StateProbe: - return pr.MsgAppFlowPaused + return pr.MsgAppProbesPaused case StateReplicate: - return pr.MsgAppFlowPaused && pr.Inflights.Full() + return pr.MsgAppProbesPaused && pr.Inflights.Full() case StateSnapshot: return true default: @@ -296,10 +301,10 @@ func (pr *Progress) IsPaused() bool { func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool { switch pr.State { case StateProbe: - return !pr.MsgAppFlowPaused + return !pr.MsgAppProbesPaused case StateReplicate: return pr.CanBumpCommit(commit) || - pr.Match < last && (!pr.MsgAppFlowPaused || pr.CanSendEntries(last)) + pr.Match < last && (!pr.MsgAppProbesPaused || pr.CanSendEntries(last)) case StateSnapshot: return false default: diff --git a/tracker/progress_test.go b/tracker/progress_test.go index 5ceaa59f..9acd6a8a 100644 --- a/tracker/progress_test.go +++ b/tracker/progress_test.go @@ -29,7 +29,6 @@ func TestProgressString(t *testing.T) { State: StateSnapshot, PendingSnapshot: 123, RecentActive: false, - MsgAppFlowPaused: true, IsLearner: true, Inflights: ins, } @@ -53,29 +52,29 @@ func TestProgressIsPaused(t *testing.T) { } for i, tt := range tests { p := &Progress{ - State: tt.state, - MsgAppFlowPaused: tt.paused, - Inflights: NewInflights(256, 0), + State: tt.state, + MsgAppProbesPaused: tt.paused, + Inflights: NewInflights(256, 0), } assert.Equal(t, tt.w, p.IsPaused(), i) } } -// TestProgressResume ensures that MaybeDecrTo resets MsgAppFlowPaused, and +// TestProgressResume ensures that MaybeDecrTo resets MsgAppProbesPaused, and // MaybeUpdate does not. // // TODO(pav-kv): there is little sense in testing these micro-behaviours in the // struct. We should test the visible behaviour instead. func TestProgressResume(t *testing.T) { p := &Progress{ - Next: 2, - MsgAppFlowPaused: true, + Next: 2, + MsgAppProbesPaused: true, } p.MaybeDecrTo(1, 1) - assert.False(t, p.MsgAppFlowPaused) - p.MsgAppFlowPaused = true + assert.False(t, p.MsgAppProbesPaused) + p.MsgAppProbesPaused = true p.MaybeUpdate(2) - assert.True(t, p.MsgAppFlowPaused) + assert.True(t, p.MsgAppProbesPaused) } func TestProgressBecomeProbe(t *testing.T) {