Skip to content

Commit

Permalink
raft,kvserver: introduce LogSnapshot API
Browse files Browse the repository at this point in the history
The API allows reading from raft log while RawNode continues making
progress. Practically, this means we can unlock Replica.mu after having
obtained the log snapshot, and can safely read from it while only
holding Replica.raftMu.

Epic: none
Release note: none
  • Loading branch information
pav-kv committed Sep 25, 2024
1 parent 3e338af commit 3fdfbef
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 10 deletions.
21 changes: 21 additions & 0 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,27 @@ func (r *Replica) GetFirstIndex() kvpb.RaftIndex {
return r.raftFirstIndexRLocked()
}

// LogSnapshot returns an immutable point-in-time snapshot of the log storage.
func (r *replicaRaftStorage) LogSnapshot() raft.LogStorageSnapshot {
r.raftMu.AssertHeld()
r.mu.AssertRHeld()
// TODO(pav-kv): return a wrapper which, in all methods, checks that the log
// storage hasn't been written to. A more relaxed version of it should assert
// that only the relevant part of the log hasn't been overwritten, e.g. a new
// term leader hasn't appended a log slice that truncated the log, or the log
// hasn't been wiped.
//
// This would require auditing and integrating with the write paths. Today,
// this type implements only reads, and writes are in various places like the
// logstore.LogStore type, or the code in the split handler which creates an
// empty range state.
//
// We don't need a fully fledged Pebble snapshot here. For our purposes, we
// can also make sure that raftMu is held for the entire period of using the
// LogSnapshot - this should guarantee its immutability.
return r
}

// GetLeaseAppliedIndex returns the lease index of the last applied command.
func (r *Replica) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex {
r.mu.RLock()
Expand Down
76 changes: 71 additions & 5 deletions pkg/raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,28 @@ import (
pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
)

// LogSnapshot encapsulates a point-in-time state of the raft log accessible
// outside the raft package for reads.
//
// To access it safely, the user must not mutate the underlying raft log storage
// between when the snapshot is obtained and the reads are done.
//
// TODO(pav-kv): this should be part of the Ready API. Instead of pre-fetching
// entries (e.g. the committed entries subject to state machine application),
// allow the application to read them from LogSnapshot in the Ready handler.
// This gives the application direct control on resource allocation, and
// flexibility to do raft log IO without blocking RawNode operation.
type LogSnapshot struct {
// first is the first available log index.
first uint64
// storage contains the stable log entries.
storage LogStorage
// unstable contains the unstable log entries.
unstable logSlice
// logger gives access to logging errors.
logger Logger
}

type raftLog struct {
// storage contains all stable entries since the last snapshot.
storage Storage
Expand Down Expand Up @@ -409,16 +431,20 @@ func (l *raftLog) lastEntryID() entryID {
return entryID{term: t, index: index}
}

func (l *raftLog) term(i uint64) (uint64, error) {
return l.snap(l.storage).term(i)
}

// term returns the term of the log entry at the given index.
func (l *raftLog) term(index uint64) (uint64, error) {
func (l LogSnapshot) term(index uint64) (uint64, error) {
// Check the unstable log first, even before computing the valid index range,
// which may need to access the storage. If we find the entry's term in the
// unstable log, we know it was in the valid range.
if index > l.unstable.lastIndex() {
return 0, ErrUnavailable
} else if index >= l.unstable.prev.index {
return l.unstable.termAt(index), nil
} else if index+1 < l.firstIndex() {
} else if index+1 < l.first {
return 0, ErrCompacted
}

Expand Down Expand Up @@ -518,6 +544,35 @@ func (l *raftLog) scan(lo, hi uint64, pageSize entryEncodingSize, v func([]pb.En
// The returned slice can be appended to, but the entries in it must not be
// mutated.
func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
return l.snap(l.storage).slice(lo, hi, maxSize)
}

// LogSlice returns a valid log slice for a prefix of the (lo, hi] log index
// interval, with the total entries size not exceeding maxSize.
//
// Returns at least one entry if the interval contains any. The maxSize can only
// be exceeded if the first entry (lo+1) is larger.
//
// TODO(pav-kv): export the logSlice type so that the caller can use it.
func (l LogSnapshot) LogSlice(lo, hi uint64, maxSize uint64) (logSlice, error) {
prevTerm, err := l.term(lo)
if err != nil {
// The log is probably compacted at index > lo (err == ErrCompacted), or it
// can be a custom storage error.
return logSlice{}, err
}
ents, err := l.slice(lo, hi, entryEncodingSize(maxSize))
if err != nil {
return logSlice{}, err
}
return logSlice{
term: l.unstable.term,
prev: entryID{term: prevTerm, index: lo},
entries: ents,
}, nil
}

func (l LogSnapshot) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
if err := l.mustCheckOutOfBounds(lo, hi); err != nil {
return nil, err
} else if lo >= hi {
Expand Down Expand Up @@ -576,13 +631,13 @@ func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, e

// mustCheckOutOfBounds checks that the (lo, hi] interval is within the bounds
// of this raft log: l.firstIndex()-1 <= lo <= hi <= l.lastIndex().
func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
func (l LogSnapshot) mustCheckOutOfBounds(lo, hi uint64) error {
if lo > hi {
l.logger.Panicf("invalid slice %d > %d", lo, hi)
}
if fi := l.firstIndex(); lo+1 < fi {
if fi := l.first; lo+1 < fi {
return ErrCompacted
} else if li := l.lastIndex(); hi > li {
} else if li := l.unstable.lastIndex(); hi > li {
l.logger.Panicf("slice(%d,%d] out of bound [%d,%d]", lo, hi, fi, li)
}
return nil
Expand All @@ -598,3 +653,14 @@ func (l *raftLog) zeroTermOnOutOfBounds(t uint64, err error) uint64 {
l.logger.Panicf("unexpected error (%v)", err)
return 0
}

// snap returns a point-in-time snapshot of the raft log. This snapshot can be
// read from while the underlying storage is not mutated.
func (l *raftLog) snap(storage LogStorage) LogSnapshot {
return LogSnapshot{
first: l.firstIndex(),
storage: storage,
unstable: l.unstable.logSlice,
logger: l.logger,
}
}
2 changes: 1 addition & 1 deletion pkg/raft/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func TestIsOutOfBounds(t *testing.T) {
require.True(t, tt.wpanic)
}
}()
err := l.mustCheckOutOfBounds(tt.lo, tt.hi)
err := l.snap(l.storage).mustCheckOutOfBounds(tt.lo, tt.hi)
require.False(t, tt.wpanic)
require.False(t, tt.wErrCompacted && err != ErrCompacted)
require.False(t, !tt.wErrCompacted && err != nil)
Expand Down
18 changes: 18 additions & 0 deletions pkg/raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,24 @@ func (rn *RawNode) Step(m pb.Message) error {
return rn.raft.Step(m)
}

// LogSnapshot returns a point-in-time read-only state of the raft log.
//
// The returned snapshot can be read from while RawNode continues operation, as
// long as the application guarantees immutability of the underlying log storage
// snapshot (returned from the LogStorage.LogSnapshot method) while the snapshot
// is being used.
//
// One way the application can implement an immutable snapshot is by blocking
// the entire log storage for new writes. This also means the Ready() handling
// loop isn't able to hand over log writes to storage.
//
// A more advanced implementation can grab an immutable storage engine snapshot
// that does not block writes. Not blocking writes is beneficial for commit tail
// latency, since it doesn't prevent MsgApp/Resp exchange with the leader.
func (rn *RawNode) LogSnapshot() LogSnapshot {
return rn.raft.raftLog.snap(rn.raft.raftLog.storage.LogSnapshot())
}

// Ready returns the outstanding work that the application needs to handle. This
// includes appending and applying entries or a snapshot, updating the HardState,
// and sending messages. The returned Ready() *must* be handled and subsequently
Expand Down
28 changes: 24 additions & 4 deletions pkg/raft/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,8 @@ var ErrSnapOutOfDate = errors.New("requested index is older than the existing sn
// are unavailable.
var ErrUnavailable = errors.New("requested entry at index is unavailable")

// LogStorage is a read API for the raft log.
// LogStorage is a read-only API for the raft log.
type LogStorage interface {
// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)

// Entries returns a slice of consecutive log entries in the range [lo, hi),
// starting from lo. The maxSize limits the total size of the log entries
// returned, but Entries returns at least one entry if any.
Expand Down Expand Up @@ -87,6 +84,16 @@ type LogStorage interface {
// TODO(pav-kv): replace this with a Prev() method equivalent to logSlice's
// prev field. The log storage is just a storage-backed logSlice.
FirstIndex() uint64

// LogSnapshot returns an immutable point-in-time log storage snapshot.
LogSnapshot() LogStorageSnapshot
}

// LogStorageSnapshot is a read-only API for the raft log which has extended
// immutability guarantees outside RawNode. The immutability must be provided by
// the application layer.
type LogStorageSnapshot interface {
LogStorage
}

// StateStorage provides read access to the state machine storage.
Expand All @@ -103,6 +110,13 @@ type StateStorage interface {
//
// TODO(pav-kv): audit all error handling and document the contract.
type Storage interface {
// InitialState returns the saved HardState and ConfState information.
//
// TODO(sep-raft-log): this would need to be fetched (fully or partially) from
// both log and state machine storage on startup, to detect which of the two
// storages is ahead, and initialize correctly.
InitialState() (pb.HardState, pb.ConfState, error)

LogStorage
StateStorage
}
Expand Down Expand Up @@ -212,6 +226,12 @@ func (ms *MemoryStorage) firstIndex() uint64 {
return ms.ents[0].Index + 1
}

// LogSnapshot implements the LogStorage interface.
func (ms *MemoryStorage) LogSnapshot() LogStorageSnapshot {
// TODO(pav-kv): return an immutable subset of MemoryStorage.
return ms
}

// Snapshot implements the Storage interface.
func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error) {
ms.Lock()
Expand Down

0 comments on commit 3fdfbef

Please sign in to comment.