From de5c136dbc9b924fe8b34c1c5314e933e6f57fa1 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 25 Jan 2024 16:31:43 +0000 Subject: [PATCH 1/5] raft: factor out maybeSendSnapshot method This is a mechanical change, for code readability purposes. Signed-off-by: Pavel Kalinnikov --- raft.go | 54 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/raft.go b/raft.go index 5a150562..7b55de0d 100644 --- a/raft.go +++ b/raft.go @@ -623,30 +623,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { } if errt != nil || erre != nil { // send snapshot if we failed to get term or entries - if !pr.RecentActive { - r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) - return false - } - - snapshot, err := r.raftLog.snapshot() - if err != nil { - if err == ErrSnapshotTemporarilyUnavailable { - r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to) - return false - } - panic(err) // TODO(bdarnell) - } - if IsEmptySnap(snapshot) { - panic("need non-empty snapshot") - } - sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term - r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", - r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) - pr.BecomeSnapshot(sindex) - r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) - - r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot}) - return true + return r.maybeSendSnapshot(to, pr) } // Send the actual MsgApp otherwise, and update the progress accordingly. @@ -665,6 +642,35 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { return true } +// maybeSendSnapshot fetches a snapshot from Storage, and sends it to the given +// node. Returns true iff the snapshot message has been emitted successfully. +func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool { + if !pr.RecentActive { + r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) + return false + } + + snapshot, err := r.raftLog.snapshot() + if err != nil { + if err == ErrSnapshotTemporarilyUnavailable { + r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to) + return false + } + panic(err) // TODO(bdarnell) + } + if IsEmptySnap(snapshot) { + panic("need non-empty snapshot") + } + sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term + r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", + r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) + pr.BecomeSnapshot(sindex) + r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) + + r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot}) + return true +} + // sendHeartbeat sends a heartbeat RPC to the given peer. func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // Attach the commit as min(to.matched, r.committed). From 50b6d04c2d1c4855a34e1db9b7db4a3b94b6f704 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 25 Jan 2024 16:37:43 +0000 Subject: [PATCH 2/5] tracker: remove redundant argument Signed-off-by: Pavel Kalinnikov --- raft.go | 2 +- tracker/progress.go | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/raft.go b/raft.go index 7b55de0d..558c5e23 100644 --- a/raft.go +++ b/raft.go @@ -627,7 +627,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { } // Send the actual MsgApp otherwise, and update the progress accordingly. - if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), nextIndex); err != nil { + if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents))); err != nil { r.logger.Panicf("%x: %v", r.id, err) } // NB: pr has been updated, but we make sure to only use its old values below. diff --git a/tracker/progress.go b/tracker/progress.go index 4002b1ec..b1acd8a7 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -151,14 +151,13 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) { // UpdateOnEntriesSend updates the progress on the given number of consecutive // entries being sent in a MsgApp, with the given total bytes size, appended at -// and after the given log index. -func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) error { +// log indices >= pr.Next. +func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) error { switch pr.State { case StateReplicate: if entries > 0 { - last := nextIndex + uint64(entries) - 1 - pr.OptimisticUpdate(last) - pr.Inflights.Add(last, bytes) + pr.Next += uint64(entries) + pr.Inflights.Add(pr.Next-1, bytes) } // If this message overflows the in-flights tracker, or it was already full, // consider this message being a probe, so that the flow is paused. From ee089614ac1bd77dda31f6a40d9d44114aa51d75 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 25 Jan 2024 16:42:54 +0000 Subject: [PATCH 3/5] tracker: make UpdateOnEntrieSend errorless The method will not be used in states other than StateProbe or StateReplicate, so there is little sense in having the error path in it. Signed-off-by: Pavel Kalinnikov --- raft.go | 5 +---- tracker/progress.go | 7 ++++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/raft.go b/raft.go index 558c5e23..7994877b 100644 --- a/raft.go +++ b/raft.go @@ -627,10 +627,6 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { } // Send the actual MsgApp otherwise, and update the progress accordingly. - if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents))); err != nil { - r.logger.Panicf("%x: %v", r.id, err) - } - // NB: pr has been updated, but we make sure to only use its old values below. r.send(pb.Message{ To: to, Type: pb.MsgApp, @@ -639,6 +635,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { Entries: ents, Commit: r.raftLog.committed, }) + pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents))) return true } diff --git a/tracker/progress.go b/tracker/progress.go index b1acd8a7..8c618765 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -152,7 +152,9 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) { // UpdateOnEntriesSend updates the progress on the given number of consecutive // entries being sent in a MsgApp, with the given total bytes size, appended at // log indices >= pr.Next. -func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) error { +// +// Must be used with StateProbe or StateReplicate. +func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) { switch pr.State { case StateReplicate: if entries > 0 { @@ -170,9 +172,8 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) error { pr.MsgAppFlowPaused = true } default: - return fmt.Errorf("sending append in unhandled state %s", pr.State) + panic(fmt.Sprintf("sending append in unhandled state %s", pr.State)) } - return nil } // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the From 40a19f183e6b76be5593c3fea9c678677ebe63f8 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 25 Jan 2024 16:55:22 +0000 Subject: [PATCH 4/5] raft: exit maybeSendAppend early if term is not found Previously, maybeSendAppend would also try to fetch entries, only to return a few lines below at the errors check. Signed-off-by: Pavel Kalinnikov --- raft.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/raft.go b/raft.go index 7994877b..85b066ec 100644 --- a/raft.go +++ b/raft.go @@ -603,11 +603,15 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { return false } - lastIndex, nextIndex := pr.Next-1, pr.Next - lastTerm, errt := r.raftLog.term(lastIndex) + prevIndex := pr.Next - 1 + prevTerm, err := r.raftLog.term(prevIndex) + if err != nil { + // The log probably got truncated at >= pr.Next, so we can't catch up the + // follower log anymore. Send a snapshot instead. + return r.maybeSendSnapshot(to, pr) + } var ents []pb.Entry - var erre error // In a throttled StateReplicate only send empty MsgApp, to ensure progress. // Otherwise, if we had a full Inflights and all inflight messages were in // fact dropped, replication to that follower would stall. Instead, an empty @@ -615,14 +619,13 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { // leader to send an append), allowing it to be acked or rejected, both of // which will clear out Inflights. if pr.State != tracker.StateReplicate || !pr.Inflights.Full() { - ents, erre = r.raftLog.entries(nextIndex, r.maxMsgSize) + ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize) } - if len(ents) == 0 && !sendIfEmpty { return false } - - if errt != nil || erre != nil { // send snapshot if we failed to get term or entries + // TODO(pav-kv): move this check up to where err is returned. + if err != nil { // send a snapshot if we failed to get the entries return r.maybeSendSnapshot(to, pr) } @@ -630,8 +633,8 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { r.send(pb.Message{ To: to, Type: pb.MsgApp, - Index: lastIndex, - LogTerm: lastTerm, + Index: prevIndex, + LogTerm: prevTerm, Entries: ents, Commit: r.raftLog.committed, }) From c55352f998f2812d0c1f13ce1703df5f1cd0e8bc Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 26 Jan 2024 15:33:50 +0000 Subject: [PATCH 5/5] tracker: remove unused OptimisticUpdate method Signed-off-by: Pavel Kalinnikov --- raft_test.go | 2 +- tracker/progress.go | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/raft_test.go b/raft_test.go index 5bc6d52e..e9d9657b 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2901,7 +2901,7 @@ func TestRecvMsgUnreachable(t *testing.T) { // set node 2 to state replicate r.trk.Progress[2].Match = 3 r.trk.Progress[2].BecomeReplicate() - r.trk.Progress[2].OptimisticUpdate(5) + r.trk.Progress[2].Next = 6 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) diff --git a/tracker/progress.go b/tracker/progress.go index 8c618765..8a0f3116 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -190,10 +190,6 @@ func (pr *Progress) MaybeUpdate(n uint64) bool { return updated } -// OptimisticUpdate signals that appends all the way up to and including index n -// are in-flight. As a result, Next is increased to n+1. -func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 } - // MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The // arguments are the index of the append message rejected by the follower, and // the hint that we want to decrease to.