Skip to content

Commit

Permalink
raft: use entryID in raftLog.stableTo
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Jan 30, 2024
1 parent 5c5cd24 commit 3efb5cd
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 23 deletions.
2 changes: 1 addition & 1 deletion log.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (l *raftLog) acceptApplying(i uint64, size entryEncodingSize, allowUnstable
i < l.maxAppliableIndex(allowUnstable)
}

func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
func (l *raftLog) stableTo(id entryID) { l.unstable.stableTo(id) }

func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }

Expand Down
14 changes: 7 additions & 7 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func TestHasNextCommittedEnts(t *testing.T) {

raftLog := newLog(storage, raftLogger)
raftLog.append(ents...)
raftLog.stableTo(4, 1)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(5, 1)
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
Expand Down Expand Up @@ -458,7 +458,7 @@ func TestNextCommittedEnts(t *testing.T) {

raftLog := newLog(storage, raftLogger)
raftLog.append(ents...)
raftLog.stableTo(4, 1)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(5, 1)
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
Expand Down Expand Up @@ -517,7 +517,7 @@ func TestAcceptApplying(t *testing.T) {

raftLog := newLogWithSize(storage, raftLogger, maxSize)
raftLog.append(ents...)
raftLog.stableTo(4, 1)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(5, 1)
raftLog.appliedTo(3, 0 /* size */)

Expand Down Expand Up @@ -566,7 +566,7 @@ func TestAppliedTo(t *testing.T) {

raftLog := newLogWithSize(storage, raftLogger, maxSize)
raftLog.append(ents...)
raftLog.stableTo(4, 1)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(5, 1)
raftLog.appliedTo(3, 0 /* size */)
raftLog.acceptApplying(5, maxSize+overshoot, false /* allowUnstable */)
Expand Down Expand Up @@ -604,7 +604,7 @@ func TestNextUnstableEnts(t *testing.T) {

ents := raftLog.nextUnstableEnts()
if l := len(ents); l > 0 {
raftLog.stableTo(ents[l-1].Index, ents[l-1].Term)
raftLog.stableTo(entryID{term: ents[l-1].Term, index: ents[l-1].Index})
}
require.Equal(t, tt.wents, ents)
require.Equal(t, previousEnts[len(previousEnts)-1].Index+1, raftLog.unstable.offset)
Expand Down Expand Up @@ -655,7 +655,7 @@ func TestStableTo(t *testing.T) {
t.Run(fmt.Sprint(i), func(t *testing.T) {
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...)
raftLog.stableTo(tt.stablei, tt.stablet)
raftLog.stableTo(entryID{term: tt.stablet, index: tt.stablei})
require.Equal(t, tt.wunstable, raftLog.unstable.offset)
})
}
Expand Down Expand Up @@ -692,7 +692,7 @@ func TestStableToWithSnap(t *testing.T) {
require.NoError(t, s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}}))
raftLog := newLog(s, raftLogger)
raftLog.append(tt.newEnts...)
raftLog.stableTo(tt.stablei, tt.stablet)
raftLog.stableTo(entryID{term: tt.stablet, index: tt.stablei})
require.Equal(t, tt.wunstable, raftLog.unstable.offset)
})

Expand Down
18 changes: 9 additions & 9 deletions log_unstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,30 +131,30 @@ func (u *unstable) acceptInProgress() {
// The method should only be called when the caller can attest that the entries
// can not be overwritten by an in-progress log append. See the related comment
// in newStorageAppendRespMsg.
func (u *unstable) stableTo(i, t uint64) {
gt, ok := u.maybeTerm(i)
func (u *unstable) stableTo(id entryID) {
gt, ok := u.maybeTerm(id.index)
if !ok {
// Unstable entry missing. Ignore.
u.logger.Infof("entry at index %d missing from unstable log; ignoring", i)
u.logger.Infof("entry at index %d missing from unstable log; ignoring", id.index)
return
}
if i < u.offset {
if id.index < u.offset {
// Index matched unstable snapshot, not unstable entry. Ignore.
u.logger.Infof("entry at index %d matched unstable snapshot; ignoring", i)
u.logger.Infof("entry at index %d matched unstable snapshot; ignoring", id.index)
return
}
if gt != t {
if gt != id.term {
// Term mismatch between unstable entry and specified entry. Ignore.
// This is possible if part or all of the unstable log was replaced
// between that time that a set of entries started to be written to
// stable storage and when they finished.
u.logger.Infof("entry at (index,term)=(%d,%d) mismatched with "+
"entry at (%d,%d) in unstable log; ignoring", i, t, i, gt)
"entry at (%d,%d) in unstable log; ignoring", id.index, id.term, id.index, gt)
return
}
num := int(i + 1 - u.offset)
num := int(id.index + 1 - u.offset)
u.entries = u.entries[num:]
u.offset = i + 1
u.offset = id.index + 1
u.offsetInProgress = max(u.offsetInProgress, u.offset)
u.shrinkEntriesArray()
}
Expand Down
2 changes: 1 addition & 1 deletion log_unstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func TestUnstableStableTo(t *testing.T) {
snapshot: tt.snap,
logger: raftLogger,
}
u.stableTo(tt.index, tt.term)
u.stableTo(entryID{term: tt.term, index: tt.index})
require.Equal(t, tt.woffset, u.offset)
require.Equal(t, tt.woffsetInProgress, u.offsetInProgress)
require.Equal(t, tt.wlen, len(u.entries))
Expand Down
2 changes: 1 addition & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,7 @@ func (r *raft) Step(m pb.Message) error {

case pb.MsgStorageAppendResp:
if m.Index != 0 {
r.raftLog.stableTo(m.Index, m.LogTerm)
r.raftLog.stableTo(entryID{term: m.LogTerm, index: m.Index})
}
if m.Snapshot != nil {
r.appliedSnap(m.Snapshot)
Expand Down
3 changes: 1 addition & 2 deletions raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,8 +923,7 @@ func commitNoopEntry(r *raft, s *MemoryStorage) {
r.readMessages()
s.Append(r.raftLog.nextUnstableEnts())
r.raftLog.appliedTo(r.raftLog.committed, 0 /* size */)
tip := r.raftLog.tip()
r.raftLog.stableTo(tip.index, tip.term) // TODO(pav-kv): pass tip directly
r.raftLog.stableTo(r.raftLog.tip())
}

func acceptAndReply(m pb.Message) pb.Message {
Expand Down
3 changes: 1 addition & 2 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ import (
func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
// Append unstable entries.
s.Append(r.raftLog.nextUnstableEnts())
tip := r.raftLog.tip()
r.raftLog.stableTo(tip.index, tip.term) // TODO(pav-kv): pass tip directly
r.raftLog.stableTo(r.raftLog.tip())

// Run post-append steps.
r.advanceMessagesAfterAppend()
Expand Down

0 comments on commit 3efb5cd

Please sign in to comment.