Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: add entryID and logSlice types #145

Merged
merged 9 commits into from
Feb 5, 2024
Merged
68 changes: 38 additions & 30 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,25 @@ func (l *raftLog) String() string {

// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if !l.matchTerm(index, logTerm) {
func (l *raftLog) maybeAppend(a logSlice, committed uint64) (lastnewi uint64, ok bool) {
if !l.matchTerm(a.prev) {
return 0, false
}
// TODO(pav-kv): propagate logSlice down the stack. It will be used all the
// way down in unstable, for safety checks, and for useful bookkeeping.

lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents)
lastnewi = a.prev.index + uint64(len(a.entries))
ci := l.findConflict(a.entries)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
if ci-offset > uint64(len(ents)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents))
offset := a.prev.index + 1
if ci-offset > uint64(len(a.entries)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(a.entries))
}
l.append(ents[ci-offset:]...)
l.append(a.entries[ci-offset:]...)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
Expand Down Expand Up @@ -150,13 +152,15 @@ func (l *raftLog) append(ents ...pb.Entry) uint64 {
// a different term.
// The index of the given entries MUST be continuously increasing.
func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
for _, ne := range ents {
if !l.matchTerm(ne.Index, ne.Term) {
if ne.Index <= l.lastIndex() {
for i := range ents {
if id := pbEntryID(&ents[i]); !l.matchTerm(id) {
if id.index <= l.lastIndex() {
// TODO(pav-kv): can simply print %+v of the id. This will change the
// log format though.
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
ne.Index, l.zeroTermOnOutOfBounds(l.term(ne.Index)), ne.Term)
id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term)
}
return ne.Index
return id.index
}
}
return 0
Expand Down Expand Up @@ -360,7 +364,7 @@ func (l *raftLog) acceptApplying(i uint64, size entryEncodingSize, allowUnstable
i < l.maxAppliableIndex(allowUnstable)
}

func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
func (l *raftLog) stableTo(id entryID) { l.unstable.stableTo(id) }

func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }

Expand All @@ -370,12 +374,14 @@ func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
// to Ready().
func (l *raftLog) acceptUnstable() { l.unstable.acceptInProgress() }

func (l *raftLog) lastTerm() uint64 {
t, err := l.term(l.lastIndex())
// lastEntryID returns the ID of the last entry in the log.
func (l *raftLog) lastEntryID() entryID {
index := l.lastIndex()
t, err := l.term(index)
if err != nil {
l.logger.Panicf("unexpected error when getting the last term (%v)", err)
l.logger.Panicf("unexpected error when getting the last term at %d: %v", index, err)
}
return t
return entryID{term: t, index: index}
}

func (l *raftLog) term(i uint64) (uint64, error) {
Expand Down Expand Up @@ -426,30 +432,32 @@ func (l *raftLog) allEntries() []pb.Entry {
panic(err)
}

// isUpToDate determines if the given (lastIndex,term) log is more up-to-date
// isUpToDate determines if a log with the given last entry is more up-to-date
// by comparing the index and term of the last entries in the existing logs.
//
// If the logs have last entries with different terms, then the log with the
// later term is more up-to-date. If the logs end with the same term, then
// whichever log has the larger lastIndex is more up-to-date. If the logs are
// the same, the given log is up-to-date.
func (l *raftLog) isUpToDate(lasti, term uint64) bool {
return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
func (l *raftLog) isUpToDate(their entryID) bool {
our := l.lastEntryID()
return their.term > our.term || their.term == our.term && their.index >= our.index
}

func (l *raftLog) matchTerm(i, term uint64) bool {
t, err := l.term(i)
func (l *raftLog) matchTerm(id entryID) bool {
t, err := l.term(id.index)
if err != nil {
return false
}
return t == term
return t == id.term
}

func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
// NB: term should never be 0 on a commit because the leader campaigns at
// least at term 1. But if it is 0 for some reason, we don't want to consider
// this a term match in case zeroTermOnOutOfBounds returns 0.
if maxIndex > l.committed && term != 0 && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term {
l.commitTo(maxIndex)
func (l *raftLog) maybeCommit(at entryID) bool {
// NB: term should never be 0 on a commit because the leader campaigned at
pav-kv marked this conversation as resolved.
Show resolved Hide resolved
// least at term 1. But if it is 0 for some reason, we don't consider this a
// term match.
if at.term != 0 && at.index > l.committed && l.matchTerm(at) {
l.commitTo(at.index)
return true
}
return false
Expand Down
87 changes: 53 additions & 34 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestIsUpToDate(t *testing.T) {

for i, tt := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, tt.wUpToDate, raftLog.isUpToDate(tt.lastIndex, tt.term))
require.Equal(t, tt.wUpToDate, raftLog.isUpToDate(entryID{term: tt.term, index: tt.lastIndex}))
})
}
}
Expand Down Expand Up @@ -208,9 +208,9 @@ func TestLogMaybeAppend(t *testing.T) {
lastterm := uint64(3)
commit := uint64(1)

// TODO(pav-kv): clean-up this test.
tests := []struct {
logTerm uint64
index uint64
prev entryID
committed uint64
ents []pb.Entry

Expand All @@ -221,71 +221,91 @@ func TestLogMaybeAppend(t *testing.T) {
}{
// not match: term is different
{
lastterm - 1, lastindex, lastindex, []pb.Entry{{Index: lastindex + 1, Term: 4}},
entryID{term: lastterm - 1, index: lastindex}, lastindex,
[]pb.Entry{{Index: lastindex + 1, Term: 4}},
0, false, commit, false,
},
// not match: index out of bound
{
lastterm, lastindex + 1, lastindex, []pb.Entry{{Index: lastindex + 2, Term: 4}},
entryID{term: lastterm, index: lastindex + 1}, lastindex,
[]pb.Entry{{Index: lastindex + 2, Term: 4}},
0, false, commit, false,
},
// match with the last existing entry
{
lastterm, lastindex, lastindex, nil,
entryID{term: lastterm, index: lastindex}, lastindex, nil,
lastindex, true, lastindex, false,
},
{
lastterm, lastindex, lastindex + 1, nil,
entryID{term: lastterm, index: lastindex}, lastindex + 1, nil,
lastindex, true, lastindex, false, // do not increase commit higher than lastnewi
},
{
lastterm, lastindex, lastindex - 1, nil,
entryID{term: lastterm, index: lastindex}, lastindex - 1, nil,
lastindex, true, lastindex - 1, false, // commit up to the commit in the message
},
{
lastterm, lastindex, 0, nil,
entryID{term: lastterm, index: lastindex}, 0, nil,
lastindex, true, commit, false, // commit do not decrease
},
{
0, 0, lastindex, nil,
entryID{}, lastindex, nil,
0, true, commit, false, // commit do not decrease
},
{
lastterm, lastindex, lastindex, []pb.Entry{{Index: lastindex + 1, Term: 4}},
entryID{term: lastterm, index: lastindex}, lastindex,
[]pb.Entry{{Index: lastindex + 1, Term: 4}},
lastindex + 1, true, lastindex, false,
},
{
lastterm, lastindex, lastindex + 1, []pb.Entry{{Index: lastindex + 1, Term: 4}},
entryID{term: lastterm, index: lastindex}, lastindex + 1,
[]pb.Entry{{Index: lastindex + 1, Term: 4}},
lastindex + 1, true, lastindex + 1, false,
},
{
lastterm, lastindex, lastindex + 2, []pb.Entry{{Index: lastindex + 1, Term: 4}},
entryID{term: lastterm, index: lastindex}, lastindex + 2,
[]pb.Entry{{Index: lastindex + 1, Term: 4}},
lastindex + 1, true, lastindex + 1, false, // do not increase commit higher than lastnewi
},
{
lastterm, lastindex, lastindex + 2, []pb.Entry{{Index: lastindex + 1, Term: 4}, {Index: lastindex + 2, Term: 4}},
entryID{term: lastterm, index: lastindex}, lastindex + 2,
[]pb.Entry{{Index: lastindex + 1, Term: 4}, {Index: lastindex + 2, Term: 4}},
lastindex + 2, true, lastindex + 2, false,
},
// match with the entry in the middle
{
lastterm - 1, lastindex - 1, lastindex, []pb.Entry{{Index: lastindex, Term: 4}},
entryID{term: lastterm - 1, index: lastindex - 1}, lastindex,
[]pb.Entry{{Index: lastindex, Term: 4}},
lastindex, true, lastindex, false,
},
{
lastterm - 2, lastindex - 2, lastindex, []pb.Entry{{Index: lastindex - 1, Term: 4}},
entryID{term: lastterm - 2, index: lastindex - 2}, lastindex,
[]pb.Entry{{Index: lastindex - 1, Term: 4}},
lastindex - 1, true, lastindex - 1, false,
},
{
lastterm - 3, lastindex - 3, lastindex, []pb.Entry{{Index: lastindex - 2, Term: 4}},
entryID{term: lastterm - 3, index: lastindex - 3}, lastindex,
[]pb.Entry{{Index: lastindex - 2, Term: 4}},
lastindex - 2, true, lastindex - 2, true, // conflict with existing committed entry
},
{
lastterm - 2, lastindex - 2, lastindex, []pb.Entry{{Index: lastindex - 1, Term: 4}, {Index: lastindex, Term: 4}},
entryID{term: lastterm - 2, index: lastindex - 2}, lastindex,
[]pb.Entry{{Index: lastindex - 1, Term: 4}, {Index: lastindex, Term: 4}},
lastindex, true, lastindex, false,
},
}

for i, tt := range tests {
// TODO(pav-kv): for now, we pick a high enough app.term so that it
// represents a valid append message. The maybeAppend currently ignores it,
// but it must check that the append does not regress the term.
app := logSlice{
term: 100,
prev: tt.prev,
entries: tt.ents,
}
require.NoError(t, app.valid())

raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append(previousEnts...)
raftLog.committed = commit
Expand All @@ -296,7 +316,7 @@ func TestLogMaybeAppend(t *testing.T) {
require.True(t, tt.wpanic)
}
}()
glasti, gappend := raftLog.maybeAppend(tt.index, tt.logTerm, tt.committed, tt.ents...)
glasti, gappend := raftLog.maybeAppend(app, tt.committed)
require.Equal(t, tt.wlasti, glasti)
require.Equal(t, tt.wappend, gappend)
require.Equal(t, tt.wcommit, raftLog.committed)
Expand All @@ -316,7 +336,6 @@ func TestCompactionSideEffects(t *testing.T) {
// Populate the log with 1000 entries; 750 in stable storage and 250 in unstable.
lastIndex := uint64(1000)
unstableIndex := uint64(750)
lastTerm := lastIndex
storage := NewMemoryStorage()
for i = 1; i <= unstableIndex; i++ {
storage.Append([]pb.Entry{{Term: i, Index: i}})
Expand All @@ -326,7 +345,7 @@ func TestCompactionSideEffects(t *testing.T) {
raftLog.append(pb.Entry{Term: i + 1, Index: i + 1})
}

require.True(t, raftLog.maybeCommit(lastIndex, lastTerm))
require.True(t, raftLog.maybeCommit(raftLog.lastEntryID()))
raftLog.appliedTo(raftLog.committed, 0 /* size */)

offset := uint64(500)
Expand All @@ -338,7 +357,7 @@ func TestCompactionSideEffects(t *testing.T) {
}

for j := offset; j <= raftLog.lastIndex(); j++ {
require.True(t, raftLog.matchTerm(j, j))
require.True(t, raftLog.matchTerm(entryID{term: j, index: j}))
}

unstableEnts := raftLog.nextUnstableEnts()
Expand Down Expand Up @@ -397,8 +416,8 @@ func TestHasNextCommittedEnts(t *testing.T) {

raftLog := newLog(storage, raftLogger)
raftLog.append(ents...)
raftLog.stableTo(4, 1)
raftLog.maybeCommit(5, 1)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(entryID{term: 1, index: 5})
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
raftLog.applyingEntsPaused = tt.paused
Expand Down Expand Up @@ -455,8 +474,8 @@ func TestNextCommittedEnts(t *testing.T) {

raftLog := newLog(storage, raftLogger)
raftLog.append(ents...)
raftLog.stableTo(4, 1)
raftLog.maybeCommit(5, 1)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(entryID{term: 1, index: 5})
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
raftLog.applyingEntsPaused = tt.paused
Expand Down Expand Up @@ -514,8 +533,8 @@ func TestAcceptApplying(t *testing.T) {

raftLog := newLogWithSize(storage, raftLogger, maxSize)
raftLog.append(ents...)
raftLog.stableTo(4, 1)
raftLog.maybeCommit(5, 1)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(entryID{term: 1, index: 5})
raftLog.appliedTo(3, 0 /* size */)

raftLog.acceptApplying(tt.index, tt.size, tt.allowUnstable)
Expand Down Expand Up @@ -563,8 +582,8 @@ func TestAppliedTo(t *testing.T) {

raftLog := newLogWithSize(storage, raftLogger, maxSize)
raftLog.append(ents...)
raftLog.stableTo(4, 1)
raftLog.maybeCommit(5, 1)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(entryID{term: 1, index: 5})
raftLog.appliedTo(3, 0 /* size */)
raftLog.acceptApplying(5, maxSize+overshoot, false /* allowUnstable */)

Expand Down Expand Up @@ -601,7 +620,7 @@ func TestNextUnstableEnts(t *testing.T) {

ents := raftLog.nextUnstableEnts()
if l := len(ents); l > 0 {
raftLog.stableTo(ents[l-1].Index, ents[l-1].Term)
raftLog.stableTo(pbEntryID(&ents[l-1]))
}
require.Equal(t, tt.wents, ents)
require.Equal(t, previousEnts[len(previousEnts)-1].Index+1, raftLog.unstable.offset)
Expand Down Expand Up @@ -652,7 +671,7 @@ func TestStableTo(t *testing.T) {
t.Run(fmt.Sprint(i), func(t *testing.T) {
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...)
raftLog.stableTo(tt.stablei, tt.stablet)
raftLog.stableTo(entryID{term: tt.stablet, index: tt.stablei})
require.Equal(t, tt.wunstable, raftLog.unstable.offset)
})
}
Expand Down Expand Up @@ -689,7 +708,7 @@ func TestStableToWithSnap(t *testing.T) {
require.NoError(t, s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}}))
raftLog := newLog(s, raftLogger)
raftLog.append(tt.newEnts...)
raftLog.stableTo(tt.stablei, tt.stablet)
raftLog.stableTo(entryID{term: tt.stablet, index: tt.stablei})
require.Equal(t, tt.wunstable, raftLog.unstable.offset)
})

Expand Down Expand Up @@ -723,7 +742,7 @@ func TestCompaction(t *testing.T) {
storage.Append([]pb.Entry{{Index: i}})
}
raftLog := newLog(storage, raftLogger)
raftLog.maybeCommit(tt.lastIndex, 0)
raftLog.maybeCommit(entryID{term: 0, index: tt.lastIndex}) // TODO(pav-kv): this is a no-op

raftLog.appliedTo(raftLog.committed, 0 /* size */)
for j := 0; j < len(tt.compact); j++ {
Expand Down
Loading
Loading