Skip to content

Commit

Permalink
raft: don't panic when looking for term conflicts
Browse files Browse the repository at this point in the history
This commit fixes a hypothetical panic that may occur when a stale MsgApp
message arrives to a follower. The conflict searching algorithm in
findConflictByTerm may return a log index which is not present in the log, and
thus the raftLog.term() method may return an error. It is safe to ignore this
error and send MsgAppResp with the found index and a zero LogTerm.

This commit also restores the behaviour that existed before commit d0fb0cd.
Back then, the term() function would silently return 0 instead of an error, and
a zero LogTerm would be sent with the rejecting MsgAppResp. After that commit,
there is a new possible panic. We remove the possibility of this panic here.

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Mar 15, 2023
1 parent c5dabf8 commit 0c22de0
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 40 deletions.
47 changes: 22 additions & 25 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,34 +162,31 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
return 0
}

// findConflictByTerm takes an (index, term) pair (indicating a conflicting log
// entry on a leader/follower during an append) and finds the largest index in
// log l with a term <= `term` and an index <= `index`. If no such index exists
// in the log, the log's first index is returned.
// findConflictByTerm returns a best guess on where this log ends matching
// another log, given that the only information known about the other log is the
// (index, term) of its single entry.
//
// The index provided MUST be equal to or less than l.lastIndex(). Invalid
// inputs log a warning and the input index is returned.
func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 {
if li := l.lastIndex(); index > li {
// NB: such calls should not exist, but since there is a straightfoward
// way to recover, do it.
//
// It is tempting to also check something about the first index, but
// there is odd behavior with peers that have no log, in which case
// lastIndex will return zero and firstIndex will return one, which
// leads to calls with an index of zero into this method.
l.logger.Warningf("index(%d) is out of range [0, lastIndex(%d)] in findConflictByTerm",
index, li)
return index
}
for {
logTerm, err := l.term(index)
if logTerm <= term || err != nil {
break
// Specifically, the first returned value is the max guessIndex <= index, such
// that term(guessIndex) <= term or term(guessIndex) is not known (because this
// index is compacted or not yet stored).
//
// The second returned value is the term(guessIndex), or 0 if it is unknown.
//
// This function is used by a follower and leader to resolve log conflicts after
// an unsuccessful append to a follower, and ultimately restore the steady flow
// of appends.
func (l *raftLog) findConflictByTerm(index uint64, term uint64) (uint64, uint64) {
for ; index > 0; index-- {
// If there is an error (likely ErrCompacted or ErrUnavailable), we don't
// know whether it's a match or not, so assume a possible match and return
// the index, with 0 term indicating an unknown term.
if ourTerm, err := l.term(index); err != nil {
return index, 0
} else if ourTerm <= term {
return index, ourTerm
}
index--
}
return index
return 0, 0
}

// nextUnstableEnts returns all entries that are available to be written to the
Expand Down
7 changes: 6 additions & 1 deletion log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ func TestFindConflictByTerm(t *testing.T) {
}})
l := newLog(st, raftLogger)
l.append(tt.ents[1:]...)
require.Equal(t, tt.want, l.findConflictByTerm(tt.index, tt.term))

index, term := l.findConflictByTerm(tt.index, tt.term)
require.Equal(t, tt.want, index)
wantTerm, err := l.term(index)
wantTerm = l.zeroTermOnOutOfBounds(wantTerm, err)
require.Equal(t, wantTerm, term)
})
}
}
Expand Down
27 changes: 15 additions & 12 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,7 @@ func stepLeader(r *raft, m pb.Message) error {
// 7, the rejection points it at the end of the follower's log
// which is at a higher log term than the actually committed
// log.
nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
nextProbeIdx, _ = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
}
if pr.MaybeDecrTo(m.Index, nextProbeIdx) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
Expand Down Expand Up @@ -1652,24 +1652,27 @@ func (r *raft) handleAppendEntries(m pb.Message) {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
return
}

r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)

// Return a hint to the leader about the maximum index and term that the two
// logs could be divergent at. Do this by searching through the follower's log
// for the maximum (index, term) pair with a term <= the MsgApp's LogTerm and
// an index <= the MsgApp's Index. This can help skip all indexes in the
// follower's uncommitted tail with terms greater than the MsgApp's LogTerm.
// Our log does not match the leader's at index m.Index. Return a hint to the
// leader - a guess on the maximal (index, term) at which the logs match. Do
// this by searching through the follower's log for the maximum (index, term)
// pair with a term <= the MsgApp's LogTerm and an index <= the MsgApp's
// Index. This can help skip all indexes in the follower's uncommitted tail
// with terms greater than the MsgApp's LogTerm.
//
// See the other caller for findConflictByTerm (in stepLeader) for a much more
// detailed explanation of this mechanism.

// NB: m.Index >= raftLog.committed by now (see the early return above), and
// raftLog.lastIndex() >= raftLog.committed by invariant, so min of the two is
// also >= raftLog.committed. Hence, the findConflictByTerm argument is within
// the valid interval, which then will return a valid (index, term) pair with
// a non-zero term (unless the log is empty). However, it is safe to send a zero
// LogTerm in this response in any case, so we don't verify it here.
hintIndex := min(m.Index, r.raftLog.lastIndex())
hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
hintTerm, err := r.raftLog.term(hintIndex)
if err != nil {
panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err))
}
hintIndex, hintTerm := r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
r.send(pb.Message{
To: m.From,
Type: pb.MsgAppResp,
Expand Down
36 changes: 34 additions & 2 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4494,6 +4494,7 @@ func TestFastLogRejection(t *testing.T) {
tests := []struct {
leaderLog []pb.Entry // Logs on the leader
followerLog []pb.Entry // Logs on the follower
followerCompact uint64 // Index at which the follower log is compacted.
rejectHintTerm uint64 // Expected term included in rejected MsgAppResp.
rejectHintIndex uint64 // Expected index included in rejected MsgAppResp.
nextAppendTerm uint64 // Expected term when leader appends after rejected.
Expand Down Expand Up @@ -4698,10 +4699,35 @@ func TestFastLogRejection(t *testing.T) {
{Term: 4, Index: 7},
{Term: 4, Index: 8},
},
nextAppendTerm: 2,
nextAppendIndex: 1,
rejectHintTerm: 2,
rejectHintIndex: 1,
nextAppendTerm: 2,
nextAppendIndex: 1,
},
// A case when a stale MsgApp from leader arrives after the corresponding
// log index got compacted.
// A stale (type=MsgApp,index=3,logTerm=3,entries=[(term=3,index=4)]) is
// delivered to a follower who has already compacted beyond log index 3. The
// MsgAppResp rejection will return same index=3, with logTerm=0. The leader
// will rollback by one entry, and send MsgApp with index=2,logTerm=1.
{
leaderLog: []pb.Entry{
{Term: 1, Index: 1},
{Term: 1, Index: 2},
{Term: 3, Index: 3},
},
followerLog: []pb.Entry{
{Term: 1, Index: 1},
{Term: 1, Index: 2},
{Term: 3, Index: 3},
{Term: 3, Index: 4},
{Term: 3, Index: 5}, // <- this entry and below are compacted
},
followerCompact: 5,
rejectHintTerm: 0,
rejectHintIndex: 3,
nextAppendTerm: 1,
nextAppendIndex: 2,
},
}

Expand All @@ -4728,6 +4754,12 @@ func TestFastLogRejection(t *testing.T) {
Commit: 0,
})
n2 := newTestRaft(2, 10, 1, s2)
if test.followerCompact != 0 {
s2.Compact(test.followerCompact)
// NB: the state of n2 after this compaction isn't realistic because the
// commit index is still at 0. We do this to exercise a "doesn't happen"
// edge case behaviour, in case it still does happen in some other way.
}

require.NoError(t, n2.Step(pb.Message{From: 1, To: 2, Type: pb.MsgHeartbeat}))
msgs := n2.readMessages()
Expand Down
Empty file.

0 comments on commit 0c22de0

Please sign in to comment.