Skip to content

Commit

Permalink
raft: pass the leader term to append/commit methods
Browse files Browse the repository at this point in the history
This change improves safety of the append operations on raftLog. This
helped fixing some tests which made incorrect assumptions about the log.

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Jan 29, 2024
1 parent 87ac09e commit 4e08f52
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 103 deletions.
2 changes: 1 addition & 1 deletion bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {

ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
}
rn.raft.raftLog.append(ents...)
rn.raft.raftLog.append(rn.raft.Term, ents...)

// Now apply them, mainly so that the application can call Campaign
// immediately after StartNode in tests. Note that these nodes will
Expand Down
79 changes: 57 additions & 22 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,43 +141,72 @@ func (l *raftLog) String() string {
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
//
// TODO(pav-kv): pass in the term of the leader who sent this update. It is only
// safe to handle this append if this term is >= l.leaderTerm. It is only safe
// to override an uncommitted suffix of entries if term > l.leaderTerm.
//
// TODO(pav-kv): introduce a struct that consolidates the append metadata. The
// (prevEntryIndex, prevEntryTerm, leaderTerm) tuple must always be carried
// together, and safety of this append must be checked at the lowest layer here,
// (leaderTerm, prevIndex, prevTerm) tuple must always be carried together, so
// that safety properties for this append are checked at the lowest layers
// rather than up in raft.go.
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if !l.matchTerm(index, logTerm) {
func (l *raftLog) maybeAppend(leaderTerm, prevIndex, prevTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
// Can not accept append requests from an outdated leader.
if leaderTerm < l.leaderTerm {
return 0, false
}
// Can not accept append requests that are not consistent with our log.
//
// NB: it is unnecessary to check matchTerm() if leaderTerm == l.leaderTerm,
// because the leader always sends self-consistent appends. For ensuring raft
// safety, this check is only necessary if leaderTerm > l.leaderTerm.
//
// TODO(pav-kv): however, we should log an error if leaderTerm == l.leaderTerm
// and the entry does not match. This means either the leader is sending
// inconsistent appends, or there is some state corruption in general.
if !l.matchTerm(prevIndex, prevTerm) {
return 0, false
}

lastnewi = index + uint64(len(ents))
lastnewi = prevIndex + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
offset := prevIndex + 1
if ci-offset > uint64(len(ents)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents))
}
l.append(ents[ci-offset:]...)
l.append(leaderTerm, ents[ci-offset:]...)
}
l.commitTo(min(committed, lastnewi))
// TODO(pav-kv): call commitTo from outside of this method, for a smaller API.
// TODO(pav-kv): it is safe to pass committed index as is here instead of min,
// but it breaks some tests that make incorrect assumptions. Fix this.
l.commitTo(leaderTerm, min(committed, lastnewi))
return lastnewi, true
}

func (l *raftLog) append(ents ...pb.Entry) uint64 {
if len(ents) == 0 {
func (l *raftLog) append(leaderTerm uint64, ents ...pb.Entry) uint64 {
// Can not accept append requests from an outdated leader.
if leaderTerm < l.leaderTerm {
return l.lastIndex()
}
if len(ents) == 0 { // no-op
return l.lastIndex()
}
if after := ents[0].Index - 1; after < l.committed {
l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
}

// INVARIANT: l.term(i) <= l.leaderTerm, for any entry in the log.
//
// TODO(pav-kv): we should more generally check that the content of ents slice
// is correct: all entries have consecutive indices, and terms do not regress.
// We should do this validation once, on every incoming message, and pass the
// append in a type-safe "validated append" wrapper. This wrapper can provide
// convenient accessors to the prev/last entry, instead of raw slices access.
if lastTerm := ents[len(ents)-1].Term; lastTerm > leaderTerm {
l.logger.Panicf("leader at term %d tries to append a higher term %d", leaderTerm, lastTerm)
}
l.leaderTerm = leaderTerm // l.leaderTerm never regresses here

l.unstable.truncateAndAppend(ents)
return l.lastIndex()
}
Expand Down Expand Up @@ -358,12 +387,16 @@ func (l *raftLog) lastIndex() uint64 {
return i
}

func (l *raftLog) commitTo(tocommit uint64) {
// never decrease commit
if l.committed < tocommit {
if l.lastIndex() < tocommit {
l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
}
func (l *raftLog) commitTo(leaderTerm, tocommit uint64) {
// Do not accept the commit index update from a leader if our log is not
// consistent with the leader's log.
if leaderTerm != l.leaderTerm {
return
}
// Otherwise, we have the guarantee that our log is a prefix of the leader's
// log. All entries <= min(tocommit, lastIndex) can thus be committed.
tocommit = min(tocommit, l.lastIndex())
if tocommit > l.committed {
l.committed = tocommit
}
}
Expand Down Expand Up @@ -487,12 +520,14 @@ func (l *raftLog) matchTerm(i, term uint64) bool {
return t == term
}

func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
// TODO(pav-kv): clarify that (maxIndex, term) is the ID of the entry at the
// committed index. Clean this up.
func (l *raftLog) maybeCommit(leaderTerm, maxIndex, term uint64) bool {
// NB: term should never be 0 on a commit because the leader campaigns at
// least at term 1. But if it is 0 for some reason, we don't want to consider
// this a term match in case zeroTermOnOutOfBounds returns 0.
if maxIndex > l.committed && term != 0 && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term {
l.commitTo(maxIndex)
l.commitTo(leaderTerm, maxIndex)
return true
}
return false
Expand Down
70 changes: 34 additions & 36 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestFindConflict(t *testing.T) {
for i, tt := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append(previousEnts...)
raftLog.append(10, previousEnts...)
require.Equal(t, tt.wconflict, raftLog.findConflict(tt.ents))
})
}
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestFindConflictByTerm(t *testing.T) {
Term: tt.ents[0].Term,
}})
l := newLog(st, raftLogger)
l.append(tt.ents[1:]...)
l.append(10, tt.ents[1:]...)

index, term := l.findConflictByTerm(tt.index, tt.term)
require.Equal(t, tt.want, index)
Expand All @@ -115,7 +115,7 @@ func TestFindConflictByTerm(t *testing.T) {
func TestIsUpToDate(t *testing.T) {
previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append(previousEnts...)
raftLog.append(3, previousEnts...)
tests := []struct {
lastIndex uint64
term uint64
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestAppend(t *testing.T) {
storage := NewMemoryStorage()
storage.Append(previousEnts)
raftLog := newLog(storage, raftLogger)
require.Equal(t, tt.windex, raftLog.append(tt.ents...))
require.Equal(t, tt.windex, raftLog.append(10, tt.ents...))
g, err := raftLog.entries(1, noLimit)
require.NoError(t, err)
require.Equal(t, tt.wents, g)
Expand Down Expand Up @@ -287,7 +287,7 @@ func TestLogMaybeAppend(t *testing.T) {

for i, tt := range tests {
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append(previousEnts...)
raftLog.append(10, previousEnts...)
raftLog.committed = commit

t.Run(fmt.Sprint(i), func(t *testing.T) {
Expand All @@ -296,7 +296,7 @@ func TestLogMaybeAppend(t *testing.T) {
require.True(t, tt.wpanic)
}
}()
glasti, gappend := raftLog.maybeAppend(tt.index, tt.logTerm, tt.committed, tt.ents...)
glasti, gappend := raftLog.maybeAppend(10, tt.index, tt.logTerm, tt.committed, tt.ents...)
require.Equal(t, tt.wlasti, glasti)
require.Equal(t, tt.wappend, gappend)
require.Equal(t, tt.wcommit, raftLog.committed)
Expand All @@ -323,10 +323,10 @@ func TestCompactionSideEffects(t *testing.T) {
}
raftLog := newLog(storage, raftLogger)
for i = unstableIndex; i < lastIndex; i++ {
raftLog.append(pb.Entry{Term: i + 1, Index: i + 1})
raftLog.append(lastIndex, pb.Entry{Term: i + 1, Index: i + 1})
}

require.True(t, raftLog.maybeCommit(lastIndex, lastTerm))
require.True(t, raftLog.maybeCommit(lastIndex, lastIndex, lastTerm))
raftLog.appliedTo(raftLog.committed, 0 /* size */)

offset := uint64(500)
Expand All @@ -346,7 +346,7 @@ func TestCompactionSideEffects(t *testing.T) {
require.Equal(t, uint64(751), unstableEnts[0].Index)

prev := raftLog.lastIndex()
raftLog.append(pb.Entry{Index: raftLog.lastIndex() + 1, Term: raftLog.lastIndex() + 1})
raftLog.append(lastIndex, pb.Entry{Index: raftLog.lastIndex() + 1, Term: raftLog.lastIndex()})
require.Equal(t, prev+1, raftLog.lastIndex())

ents, err := raftLog.entries(raftLog.lastIndex(), noLimit)
Expand Down Expand Up @@ -396,9 +396,9 @@ func TestHasNextCommittedEnts(t *testing.T) {
require.NoError(t, storage.Append(ents[:1]))

raftLog := newLog(storage, raftLogger)
raftLog.append(ents...)
raftLog.append(raftLog.leaderTerm, ents...)
raftLog.stableTo(4, 1)
raftLog.maybeCommit(5, 1)
raftLog.maybeCommit(raftLog.leaderTerm, 5, 1)
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
raftLog.applyingEntsPaused = tt.paused
Expand Down Expand Up @@ -454,9 +454,9 @@ func TestNextCommittedEnts(t *testing.T) {
require.NoError(t, storage.Append(ents[:1]))

raftLog := newLog(storage, raftLogger)
raftLog.append(ents...)
raftLog.append(raftLog.leaderTerm, ents...)
raftLog.stableTo(4, 1)
raftLog.maybeCommit(5, 1)
raftLog.maybeCommit(raftLog.leaderTerm, 5, 1)
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
raftLog.applyingEntsPaused = tt.paused
Expand Down Expand Up @@ -513,9 +513,9 @@ func TestAcceptApplying(t *testing.T) {
require.NoError(t, storage.Append(ents[:1]))

raftLog := newLogWithSize(storage, raftLogger, maxSize)
raftLog.append(ents...)
raftLog.append(raftLog.leaderTerm, ents...)
raftLog.stableTo(4, 1)
raftLog.maybeCommit(5, 1)
raftLog.maybeCommit(raftLog.leaderTerm, 5, 1)
raftLog.appliedTo(3, 0 /* size */)

raftLog.acceptApplying(tt.index, tt.size, tt.allowUnstable)
Expand Down Expand Up @@ -562,9 +562,9 @@ func TestAppliedTo(t *testing.T) {
require.NoError(t, storage.Append(ents[:1]))

raftLog := newLogWithSize(storage, raftLogger, maxSize)
raftLog.append(ents...)
raftLog.append(raftLog.leaderTerm, ents...)
raftLog.stableTo(4, 1)
raftLog.maybeCommit(5, 1)
raftLog.maybeCommit(raftLog.leaderTerm, 5, 1)
raftLog.appliedTo(3, 0 /* size */)
raftLog.acceptApplying(5, maxSize+overshoot, false /* allowUnstable */)

Expand Down Expand Up @@ -597,7 +597,7 @@ func TestNextUnstableEnts(t *testing.T) {

// append unstable entries to raftlog
raftLog := newLog(storage, raftLogger)
raftLog.append(previousEnts[tt.unstable-1:]...)
raftLog.append(10, previousEnts[tt.unstable-1:]...)

ents := raftLog.nextUnstableEnts()
if l := len(ents); l > 0 {
Expand All @@ -612,26 +612,24 @@ func TestNextUnstableEnts(t *testing.T) {
func TestCommitTo(t *testing.T) {
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}}
commit := uint64(2)
const leadTerm = 10
tests := []struct {
term uint64
commit uint64
wcommit uint64
wpanic bool
}{
{3, 3, false},
{1, 2, false}, // never decrease
{4, 0, true}, // commit out of range -> panic
{term: leadTerm, commit: 3, wcommit: 3},
{term: leadTerm, commit: 1, wcommit: 2}, // never decrease
{term: leadTerm, commit: 4, wcommit: 3}, // commit out of range -> cut at the last entry
{term: leadTerm - 1, commit: 3, wcommit: 2}, // outdated leader can't commit
{term: leadTerm + 1, commit: 3, wcommit: 2}, // newer leader can't commit if log is out of sync
}
for i, tt := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
defer func() {
if r := recover(); r != nil {
require.True(t, tt.wpanic)
}
}()
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append(previousEnts...)
raftLog.append(leadTerm, previousEnts...)
raftLog.committed = commit
raftLog.commitTo(tt.commit)
raftLog.commitTo(tt.term, tt.commit)
require.Equal(t, tt.wcommit, raftLog.committed)
})
}
Expand All @@ -651,7 +649,7 @@ func TestStableTo(t *testing.T) {
for i, tt := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...)
raftLog.append(10, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...)
raftLog.stableTo(tt.stablei, tt.stablet)
require.Equal(t, tt.wunstable, raftLog.unstable.offset)
})
Expand Down Expand Up @@ -688,7 +686,7 @@ func TestStableToWithSnap(t *testing.T) {
s := NewMemoryStorage()
require.NoError(t, s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}}))
raftLog := newLog(s, raftLogger)
raftLog.append(tt.newEnts...)
raftLog.append(raftLog.leaderTerm, tt.newEnts...)
raftLog.stableTo(tt.stablei, tt.stablet)
require.Equal(t, tt.wunstable, raftLog.unstable.offset)
})
Expand Down Expand Up @@ -723,7 +721,7 @@ func TestCompaction(t *testing.T) {
storage.Append([]pb.Entry{{Index: i}})
}
raftLog := newLog(storage, raftLogger)
raftLog.maybeCommit(tt.lastIndex, 0)
raftLog.maybeCommit(raftLog.leaderTerm, tt.lastIndex, 0)

raftLog.appliedTo(raftLog.committed, 0 /* size */)
for j := 0; j < len(tt.compact); j++ {
Expand Down Expand Up @@ -761,7 +759,7 @@ func TestIsOutOfBounds(t *testing.T) {
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}})
l := newLog(storage, raftLogger)
for i := uint64(1); i <= num; i++ {
l.append(pb.Entry{Index: i + offset})
l.append(l.leaderTerm, pb.Entry{Index: i + offset})
}

first := offset + 1
Expand Down Expand Up @@ -835,7 +833,7 @@ func TestTerm(t *testing.T) {
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}})
l := newLog(storage, raftLogger)
for i := uint64(1); i < num; i++ {
l.append(pb.Entry{Index: offset + i, Term: i})
l.append(num, pb.Entry{Index: offset + i, Term: i})
}

for i, tt := range []struct {
Expand Down Expand Up @@ -909,7 +907,7 @@ func TestSlice(t *testing.T) {
Metadata: pb.SnapshotMetadata{Index: offset}}))
require.NoError(t, storage.Append(entries(offset+1, half)))
l := newLog(storage, raftLogger)
l.append(entries(half, last)...)
l.append(last, entries(half, last)...)

for _, tt := range []struct {
lo uint64
Expand Down Expand Up @@ -999,7 +997,7 @@ func TestScan(t *testing.T) {
Metadata: pb.SnapshotMetadata{Index: offset}}))
require.NoError(t, storage.Append(entries(offset+1, half)))
l := newLog(storage, raftLogger)
l.append(entries(half, last)...)
l.append(last, entries(half, last)...)

// Test that scan() returns the same entries as slice(), on all inputs.
for _, pageSize := range []entryEncodingSize{0, 1, 10, 100, entrySize, entrySize + 1} {
Expand Down
Loading

0 comments on commit 4e08f52

Please sign in to comment.