Skip to content

Commit

Permalink
log: clean-up log conflict search
Browse files Browse the repository at this point in the history
Use the new entryID type. Move the preceding entry check into the
conflict search method rather than do it outside.

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Feb 7, 2024
1 parent 2a0c44f commit 17f5596
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 54 deletions.
94 changes: 55 additions & 39 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,27 +105,24 @@ func (l *raftLog) String() string {
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(a logSlice, committed uint64) (lastnewi uint64, ok bool) {
if !l.matchTerm(a.prev) {
match, ok := l.findConflict(a)
if !ok {
return 0, false
}
// TODO(pav-kv): propagate logSlice down the stack. It will be used all the
// way down in unstable, for safety checks, and for useful bookkeeping.

lastnewi = a.prev.index + uint64(len(a.entries))
ci := l.findConflict(a.entries)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := a.prev.index + 1
if ci-offset > uint64(len(a.entries)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(a.entries))
}
l.append(a.entries[ci-offset:]...)
if match.index < a.lastIndex() && match.index < l.committed {
l.logger.Panicf("entry %d is already committed [committed(%d)]", match.index+1, l.committed)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true

// Fast-forward to the first mismatching or missing entry.
// NB: prev.index <= match.index <= a.lastIndex(), so the sub-slicing is safe.
a.entries = a.entries[match.index-a.prev.index:]
a.prev = match

// TODO(pav-kv): pass the logSlice down the stack, for safety checks and
// bookkeeping in the unstable structure.
l.append(a.entries...)
l.commitTo(min(committed, a.lastIndex()))
return a.lastIndex(), true
}

func (l *raftLog) append(ents ...pb.Entry) uint64 {
Expand All @@ -139,29 +136,48 @@ func (l *raftLog) append(ents ...pb.Entry) uint64 {
return l.lastIndex()
}

// findConflict finds the index of the conflict.
// It returns the first pair of conflicting entries between the existing
// entries and the given entries, if there are any.
// If there is no conflicting entries, and the existing entries contains
// all the given entries, zero will be returned.
// If there is no conflicting entries, but the given entries contains new
// entries, the index of the first new entry will be returned.
// An entry is considered to be conflicting if it has the same index but
// a different term.
// The index of the given entries MUST be continuously increasing.
func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
for i := range ents {
if id := pbEntryID(&ents[i]); !l.matchTerm(id) {
if id.index <= l.lastIndex() {
// TODO(pav-kv): can simply print %+v of the id. This will change the
// log format though.
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term)
}
return id.index
// findConflict finds the last entry in the given log slice that matches the
// log. The next entry either mismatches, or is missing.
//
// If the slice partially/fully matches, this method returns true. The returned
// entryID is the ID of the last matching entry. It can be s.prev if it is the
// only matching entry. It is guaranteed that the returned entryID.index is in
// the [s.prev.index, s.lastIndex()] range.
//
// All the entries up to the returned entryID are already present in the log,
// and do not need to be appended again. The caller can safely fast-forward an
// append request to the next entry after it.
//
// Returns false if the given slice mismatches the log entirely, i.e. the s.prev
// entry has a mismatching entryID.term. In this case an append request can not
// proceed.
func (l *raftLog) findConflict(s logSlice) (entryID, bool) {
// TODO(pav-kv): add a fast-path here. If s.term == raftLog.lastTerm, we can
// skip the match checks entirely. We can double-check only the last entry
// match, to be sure, but it is not necessary if raft invariants are true.
if !l.matchTerm(s.prev) {
return entryID{}, false
}

// TODO(pav-kv): every matchTerm call in the linear scan below can fall back
// to fetching an entry from storage. This is inefficient, we can improve it.
// NB: logs that don't match at one index, don't match at all indices above.
// So we can use binary search to find the fork.
match := s.prev
for i := range s.entries {
id := pbEntryID(&s.entries[i])
if l.matchTerm(id) {
match = id
continue
}
if id.index <= l.lastIndex() {
// TODO(pav-kv): should simply print %+v of the id.
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term)
}
return match, true
}
return 0
return match, true // all entries match
}

// findConflictByTerm returns a best guess on where this log ends matching
Expand Down
49 changes: 34 additions & 15 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,49 @@ import (

func TestFindConflict(t *testing.T) {
previousEnts := index(1).terms(1, 2, 3)
ids := make([]entryID, 1, len(previousEnts)+1) // dummy (0, 0) at index 0
for i := range previousEnts {
ids = append(ids, pbEntryID(&previousEnts[i]))
}
for _, tt := range []struct {
ents []pb.Entry
want uint64
prev entryID
ents []pb.Entry
notOk bool
want entryID
}{
// no conflict, empty entries
{ents: nil, want: 0},
{ents: nil, want: ids[0]},
// prev does not match the log
{prev: entryID{term: 10, index: 1}, notOk: true},
// no conflict
{ents: index(1).terms(1, 2, 3), want: 0},
{ents: index(2).terms(2, 3), want: 0},
{ents: index(3).terms(3), want: 0},
{prev: ids[0], ents: index(1).terms(1, 2, 3), want: ids[3]},
{prev: ids[1], ents: index(2).terms(2, 3), want: ids[3]},
{prev: ids[2], ents: index(3).terms(3), want: ids[3]},
// no conflict, but has new entries
{ents: index(1).terms(1, 2, 3, 4, 4), want: 4},
{ents: index(2).terms(2, 3, 4, 5), want: 4},
{ents: index(3).terms(3, 4, 4), want: 4},
{ents: index(4).terms(4, 4), want: 4},
// conflicts with existing entries
{ents: index(1).terms(4, 4), want: 1},
{ents: index(2).terms(1, 4, 4), want: 2},
{ents: index(3).terms(1, 2, 4, 4), want: 3},
{prev: ids[0], ents: index(1).terms(1, 2, 3, 4, 4), want: ids[3]},
{prev: ids[1], ents: index(2).terms(2, 3, 4, 4), want: ids[3]},
{prev: ids[2], ents: index(3).terms(3, 4, 4), want: ids[3]},
{prev: ids[3], ents: index(4).terms(4, 4), want: ids[3]},
// passes prev check, but conflicts with existing entries
{prev: ids[0], ents: index(1).terms(4, 4), want: ids[0]},
{prev: ids[1], ents: index(2).terms(1, 4, 4), want: ids[1]},
{prev: ids[2], ents: index(3).terms(2, 2, 4, 4), want: ids[2]},
// prev does not match
{prev: entryID{term: 4, index: 1}, ents: index(2).terms(4, 4), notOk: true},
{prev: entryID{term: 5, index: 2}, ents: index(3).terms(5, 6), notOk: true},
// out of bounds
{prev: entryID{term: 3, index: 10}, ents: index(11).terms(3), notOk: true},
// just touching the right bound, but still out of bounds
{prev: entryID{term: 3, index: 4}, ents: index(5).terms(3, 3, 4), notOk: true},
} {
t.Run("", func(t *testing.T) {
log := newLog(NewMemoryStorage(), discardLogger)
log.append(previousEnts...)
require.Equal(t, tt.want, log.findConflict(tt.ents))
app := logSlice{term: 100, prev: tt.prev, entries: tt.ents}
require.NoError(t, app.valid())
match, ok := log.findConflict(app)
require.Equal(t, !tt.notOk, ok)
require.Equal(t, tt.want, match)
})
}
}
Expand Down

0 comments on commit 17f5596

Please sign in to comment.