diff --git a/raft.go b/raft.go index 5a150562..85b066ec 100644 --- a/raft.go +++ b/raft.go @@ -603,11 +603,15 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { return false } - lastIndex, nextIndex := pr.Next-1, pr.Next - lastTerm, errt := r.raftLog.term(lastIndex) + prevIndex := pr.Next - 1 + prevTerm, err := r.raftLog.term(prevIndex) + if err != nil { + // The log probably got truncated at >= pr.Next, so we can't catch up the + // follower log anymore. Send a snapshot instead. + return r.maybeSendSnapshot(to, pr) + } var ents []pb.Entry - var erre error // In a throttled StateReplicate only send empty MsgApp, to ensure progress. // Otherwise, if we had a full Inflights and all inflight messages were in // fact dropped, replication to that follower would stall. Instead, an empty @@ -615,53 +619,55 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { // leader to send an append), allowing it to be acked or rejected, both of // which will clear out Inflights. if pr.State != tracker.StateReplicate || !pr.Inflights.Full() { - ents, erre = r.raftLog.entries(nextIndex, r.maxMsgSize) + ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize) } - if len(ents) == 0 && !sendIfEmpty { return false } - - if errt != nil || erre != nil { // send snapshot if we failed to get term or entries - if !pr.RecentActive { - r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) - return false - } - - snapshot, err := r.raftLog.snapshot() - if err != nil { - if err == ErrSnapshotTemporarilyUnavailable { - r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to) - return false - } - panic(err) // TODO(bdarnell) - } - if IsEmptySnap(snapshot) { - panic("need non-empty snapshot") - } - sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term - r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", - r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) - pr.BecomeSnapshot(sindex) - r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) - - r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot}) - return true + // TODO(pav-kv): move this check up to where err is returned. + if err != nil { // send a snapshot if we failed to get the entries + return r.maybeSendSnapshot(to, pr) } // Send the actual MsgApp otherwise, and update the progress accordingly. - if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), nextIndex); err != nil { - r.logger.Panicf("%x: %v", r.id, err) - } - // NB: pr has been updated, but we make sure to only use its old values below. r.send(pb.Message{ To: to, Type: pb.MsgApp, - Index: lastIndex, - LogTerm: lastTerm, + Index: prevIndex, + LogTerm: prevTerm, Entries: ents, Commit: r.raftLog.committed, }) + pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents))) + return true +} + +// maybeSendSnapshot fetches a snapshot from Storage, and sends it to the given +// node. Returns true iff the snapshot message has been emitted successfully. +func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool { + if !pr.RecentActive { + r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) + return false + } + + snapshot, err := r.raftLog.snapshot() + if err != nil { + if err == ErrSnapshotTemporarilyUnavailable { + r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to) + return false + } + panic(err) // TODO(bdarnell) + } + if IsEmptySnap(snapshot) { + panic("need non-empty snapshot") + } + sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term + r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", + r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) + pr.BecomeSnapshot(sindex) + r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) + + r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot}) return true } diff --git a/raft_test.go b/raft_test.go index 5bc6d52e..e9d9657b 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2901,7 +2901,7 @@ func TestRecvMsgUnreachable(t *testing.T) { // set node 2 to state replicate r.trk.Progress[2].Match = 3 r.trk.Progress[2].BecomeReplicate() - r.trk.Progress[2].OptimisticUpdate(5) + r.trk.Progress[2].Next = 6 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) diff --git a/tracker/progress.go b/tracker/progress.go index b1a07fa0..dc4ac6b3 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -137,14 +137,15 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) { // UpdateOnEntriesSend updates the progress on the given number of consecutive // entries being sent in a MsgApp, with the given total bytes size, appended at -// and after the given log index. -func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) error { +// log indices >= pr.Next. +// +// Must be used with StateProbe or StateReplicate. +func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) { switch pr.State { case StateReplicate: if entries > 0 { - last := nextIndex + uint64(entries) - 1 - pr.OptimisticUpdate(last) - pr.Inflights.Add(last, bytes) + pr.Next += uint64(entries) + pr.Inflights.Add(pr.Next-1, bytes) } // If this message overflows the in-flights tracker, or it was already full, // consider this message being a probe, so that the flow is paused. @@ -157,9 +158,8 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) er pr.MsgAppFlowPaused = true } default: - return fmt.Errorf("sending append in unhandled state %s", pr.State) + panic(fmt.Sprintf("sending append in unhandled state %s", pr.State)) } - return nil } // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the @@ -176,10 +176,6 @@ func (pr *Progress) MaybeUpdate(n uint64) bool { return updated } -// OptimisticUpdate signals that appends all the way up to and including index n -// are in-flight. As a result, Next is increased to n+1. -func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 } - // MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The // arguments are the index of the append message rejected by the follower, and // the hint that we want to decrease to.