Skip to content

Commit

Permalink
Merge pull request etcd-io#136 from pav-kv/send-append-cleanup
Browse files Browse the repository at this point in the history
raft: clean-up maybeSendAppend area
  • Loading branch information
ahrtr committed Jan 30, 2024
2 parents 026484c + c55352f commit d14e61b
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 49 deletions.
80 changes: 43 additions & 37 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,65 +603,71 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
return false
}

lastIndex, nextIndex := pr.Next-1, pr.Next
lastTerm, errt := r.raftLog.term(lastIndex)
prevIndex := pr.Next - 1
prevTerm, err := r.raftLog.term(prevIndex)
if err != nil {
// The log probably got truncated at >= pr.Next, so we can't catch up the
// follower log anymore. Send a snapshot instead.
return r.maybeSendSnapshot(to, pr)
}

var ents []pb.Entry
var erre error
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
// Otherwise, if we had a full Inflights and all inflight messages were in
// fact dropped, replication to that follower would stall. Instead, an empty
// MsgApp will eventually reach the follower (heartbeats responses prompt the
// leader to send an append), allowing it to be acked or rejected, both of
// which will clear out Inflights.
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
ents, erre = r.raftLog.entries(nextIndex, r.maxMsgSize)
ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize)
}

if len(ents) == 0 && !sendIfEmpty {
return false
}

if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return false
}

snapshot, err := r.raftLog.snapshot()
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return false
}
panic(err) // TODO(bdarnell)
}
if IsEmptySnap(snapshot) {
panic("need non-empty snapshot")
}
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)

r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
return true
// TODO(pav-kv): move this check up to where err is returned.
if err != nil { // send a snapshot if we failed to get the entries
return r.maybeSendSnapshot(to, pr)
}

// Send the actual MsgApp otherwise, and update the progress accordingly.
if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), nextIndex); err != nil {
r.logger.Panicf("%x: %v", r.id, err)
}
// NB: pr has been updated, but we make sure to only use its old values below.
r.send(pb.Message{
To: to,
Type: pb.MsgApp,
Index: lastIndex,
LogTerm: lastTerm,
Index: prevIndex,
LogTerm: prevTerm,
Entries: ents,
Commit: r.raftLog.committed,
})
pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)))
return true
}

// maybeSendSnapshot fetches a snapshot from Storage, and sends it to the given
// node. Returns true iff the snapshot message has been emitted successfully.
func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return false
}

snapshot, err := r.raftLog.snapshot()
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return false
}
panic(err) // TODO(bdarnell)
}
if IsEmptySnap(snapshot) {
panic("need non-empty snapshot")
}
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)

r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
return true
}

Expand Down
2 changes: 1 addition & 1 deletion raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2901,7 +2901,7 @@ func TestRecvMsgUnreachable(t *testing.T) {
// set node 2 to state replicate
r.trk.Progress[2].Match = 3
r.trk.Progress[2].BecomeReplicate()
r.trk.Progress[2].OptimisticUpdate(5)
r.trk.Progress[2].Next = 6

r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})

Expand Down
18 changes: 7 additions & 11 deletions tracker/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,15 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) {

// UpdateOnEntriesSend updates the progress on the given number of consecutive
// entries being sent in a MsgApp, with the given total bytes size, appended at
// and after the given log index.
func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) error {
// log indices >= pr.Next.
//
// Must be used with StateProbe or StateReplicate.
func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) {
switch pr.State {
case StateReplicate:
if entries > 0 {
last := nextIndex + uint64(entries) - 1
pr.OptimisticUpdate(last)
pr.Inflights.Add(last, bytes)
pr.Next += uint64(entries)
pr.Inflights.Add(pr.Next-1, bytes)
}
// If this message overflows the in-flights tracker, or it was already full,
// consider this message being a probe, so that the flow is paused.
Expand All @@ -157,9 +158,8 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) er
pr.MsgAppFlowPaused = true
}
default:
return fmt.Errorf("sending append in unhandled state %s", pr.State)
panic(fmt.Sprintf("sending append in unhandled state %s", pr.State))
}
return nil
}

// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
Expand All @@ -176,10 +176,6 @@ func (pr *Progress) MaybeUpdate(n uint64) bool {
return updated
}

// OptimisticUpdate signals that appends all the way up to and including index n
// are in-flight. As a result, Next is increased to n+1.
func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }

// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
// arguments are the index of the append message rejected by the follower, and
// the hint that we want to decrease to.
Expand Down

0 comments on commit d14e61b

Please sign in to comment.