Skip to content

Commit

Permalink
Merge #130967
Browse files Browse the repository at this point in the history
130967: raft: introduce LogSnapshot API r=tbg a=pav-kv

The API allows reading from raft log while `RawNode` continues making progress. Practically, this means we can unlock `Replica.mu` after having obtained the log snapshot, and can safely read from it while only holding `Replica.raftMu`.

To be used for:
- Fetching committed entries in `Ready` handler while only holding `raftMu` (instead of both `mu` and `raftMu`).
- Building leader->follower `MsgApp` messages on demand, driven by RACv2. Today these messages can be constructed (and incur IO) eagerly on any `Step` while holding `mu`.

Part of #128779
Part of #130955

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Sep 25, 2024
2 parents b82225e + 3fdfbef commit d926d5e
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 10 deletions.
21 changes: 21 additions & 0 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,27 @@ func (r *Replica) GetFirstIndex() kvpb.RaftIndex {
return r.raftFirstIndexRLocked()
}

// LogSnapshot returns an immutable point-in-time snapshot of the log storage.
func (r *replicaRaftStorage) LogSnapshot() raft.LogStorageSnapshot {
r.raftMu.AssertHeld()
r.mu.AssertRHeld()
// TODO(pav-kv): return a wrapper which, in all methods, checks that the log
// storage hasn't been written to. A more relaxed version of it should assert
// that only the relevant part of the log hasn't been overwritten, e.g. a new
// term leader hasn't appended a log slice that truncated the log, or the log
// hasn't been wiped.
//
// This would require auditing and integrating with the write paths. Today,
// this type implements only reads, and writes are in various places like the
// logstore.LogStore type, or the code in the split handler which creates an
// empty range state.
//
// We don't need a fully fledged Pebble snapshot here. For our purposes, we
// can also make sure that raftMu is held for the entire period of using the
// LogSnapshot - this should guarantee its immutability.
return r
}

// GetLeaseAppliedIndex returns the lease index of the last applied command.
func (r *Replica) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex {
r.mu.RLock()
Expand Down
76 changes: 71 additions & 5 deletions pkg/raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,28 @@ import (
pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
)

// LogSnapshot encapsulates a point-in-time state of the raft log accessible
// outside the raft package for reads.
//
// To access it safely, the user must not mutate the underlying raft log storage
// between when the snapshot is obtained and the reads are done.
//
// TODO(pav-kv): this should be part of the Ready API. Instead of pre-fetching
// entries (e.g. the committed entries subject to state machine application),
// allow the application to read them from LogSnapshot in the Ready handler.
// This gives the application direct control on resource allocation, and
// flexibility to do raft log IO without blocking RawNode operation.
type LogSnapshot struct {
// first is the first available log index.
first uint64
// storage contains the stable log entries.
storage LogStorage
// unstable contains the unstable log entries.
unstable logSlice
// logger gives access to logging errors.
logger Logger
}

type raftLog struct {
// storage contains all stable entries since the last snapshot.
storage Storage
Expand Down Expand Up @@ -409,16 +431,20 @@ func (l *raftLog) lastEntryID() entryID {
return entryID{term: t, index: index}
}

func (l *raftLog) term(i uint64) (uint64, error) {
return l.snap(l.storage).term(i)
}

// term returns the term of the log entry at the given index.
func (l *raftLog) term(index uint64) (uint64, error) {
func (l LogSnapshot) term(index uint64) (uint64, error) {
// Check the unstable log first, even before computing the valid index range,
// which may need to access the storage. If we find the entry's term in the
// unstable log, we know it was in the valid range.
if index > l.unstable.lastIndex() {
return 0, ErrUnavailable
} else if index >= l.unstable.prev.index {
return l.unstable.termAt(index), nil
} else if index+1 < l.firstIndex() {
} else if index+1 < l.first {
return 0, ErrCompacted
}

Expand Down Expand Up @@ -518,6 +544,35 @@ func (l *raftLog) scan(lo, hi uint64, pageSize entryEncodingSize, v func([]pb.En
// The returned slice can be appended to, but the entries in it must not be
// mutated.
func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
return l.snap(l.storage).slice(lo, hi, maxSize)
}

// LogSlice returns a valid log slice for a prefix of the (lo, hi] log index
// interval, with the total entries size not exceeding maxSize.
//
// Returns at least one entry if the interval contains any. The maxSize can only
// be exceeded if the first entry (lo+1) is larger.
//
// TODO(pav-kv): export the logSlice type so that the caller can use it.
func (l LogSnapshot) LogSlice(lo, hi uint64, maxSize uint64) (logSlice, error) {
prevTerm, err := l.term(lo)
if err != nil {
// The log is probably compacted at index > lo (err == ErrCompacted), or it
// can be a custom storage error.
return logSlice{}, err
}
ents, err := l.slice(lo, hi, entryEncodingSize(maxSize))
if err != nil {
return logSlice{}, err
}
return logSlice{
term: l.unstable.term,
prev: entryID{term: prevTerm, index: lo},
entries: ents,
}, nil
}

func (l LogSnapshot) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
if err := l.mustCheckOutOfBounds(lo, hi); err != nil {
return nil, err
} else if lo >= hi {
Expand Down Expand Up @@ -576,13 +631,13 @@ func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, e

// mustCheckOutOfBounds checks that the (lo, hi] interval is within the bounds
// of this raft log: l.firstIndex()-1 <= lo <= hi <= l.lastIndex().
func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
func (l LogSnapshot) mustCheckOutOfBounds(lo, hi uint64) error {
if lo > hi {
l.logger.Panicf("invalid slice %d > %d", lo, hi)
}
if fi := l.firstIndex(); lo+1 < fi {
if fi := l.first; lo+1 < fi {
return ErrCompacted
} else if li := l.lastIndex(); hi > li {
} else if li := l.unstable.lastIndex(); hi > li {
l.logger.Panicf("slice(%d,%d] out of bound [%d,%d]", lo, hi, fi, li)
}
return nil
Expand All @@ -598,3 +653,14 @@ func (l *raftLog) zeroTermOnOutOfBounds(t uint64, err error) uint64 {
l.logger.Panicf("unexpected error (%v)", err)
return 0
}

// snap returns a point-in-time snapshot of the raft log. This snapshot can be
// read from while the underlying storage is not mutated.
func (l *raftLog) snap(storage LogStorage) LogSnapshot {
return LogSnapshot{
first: l.firstIndex(),
storage: storage,
unstable: l.unstable.logSlice,
logger: l.logger,
}
}
2 changes: 1 addition & 1 deletion pkg/raft/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func TestIsOutOfBounds(t *testing.T) {
require.True(t, tt.wpanic)
}
}()
err := l.mustCheckOutOfBounds(tt.lo, tt.hi)
err := l.snap(l.storage).mustCheckOutOfBounds(tt.lo, tt.hi)
require.False(t, tt.wpanic)
require.False(t, tt.wErrCompacted && err != ErrCompacted)
require.False(t, !tt.wErrCompacted && err != nil)
Expand Down
18 changes: 18 additions & 0 deletions pkg/raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,24 @@ func (rn *RawNode) Step(m pb.Message) error {
return rn.raft.Step(m)
}

// LogSnapshot returns a point-in-time read-only state of the raft log.
//
// The returned snapshot can be read from while RawNode continues operation, as
// long as the application guarantees immutability of the underlying log storage
// snapshot (returned from the LogStorage.LogSnapshot method) while the snapshot
// is being used.
//
// One way the application can implement an immutable snapshot is by blocking
// the entire log storage for new writes. This also means the Ready() handling
// loop isn't able to hand over log writes to storage.
//
// A more advanced implementation can grab an immutable storage engine snapshot
// that does not block writes. Not blocking writes is beneficial for commit tail
// latency, since it doesn't prevent MsgApp/Resp exchange with the leader.
func (rn *RawNode) LogSnapshot() LogSnapshot {
return rn.raft.raftLog.snap(rn.raft.raftLog.storage.LogSnapshot())
}

// Ready returns the outstanding work that the application needs to handle. This
// includes appending and applying entries or a snapshot, updating the HardState,
// and sending messages. The returned Ready() *must* be handled and subsequently
Expand Down
28 changes: 24 additions & 4 deletions pkg/raft/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,8 @@ var ErrSnapOutOfDate = errors.New("requested index is older than the existing sn
// are unavailable.
var ErrUnavailable = errors.New("requested entry at index is unavailable")

// LogStorage is a read API for the raft log.
// LogStorage is a read-only API for the raft log.
type LogStorage interface {
// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)

// Entries returns a slice of consecutive log entries in the range [lo, hi),
// starting from lo. The maxSize limits the total size of the log entries
// returned, but Entries returns at least one entry if any.
Expand Down Expand Up @@ -87,6 +84,16 @@ type LogStorage interface {
// TODO(pav-kv): replace this with a Prev() method equivalent to logSlice's
// prev field. The log storage is just a storage-backed logSlice.
FirstIndex() uint64

// LogSnapshot returns an immutable point-in-time log storage snapshot.
LogSnapshot() LogStorageSnapshot
}

// LogStorageSnapshot is a read-only API for the raft log which has extended
// immutability guarantees outside RawNode. The immutability must be provided by
// the application layer.
type LogStorageSnapshot interface {
LogStorage
}

// StateStorage provides read access to the state machine storage.
Expand All @@ -103,6 +110,13 @@ type StateStorage interface {
//
// TODO(pav-kv): audit all error handling and document the contract.
type Storage interface {
// InitialState returns the saved HardState and ConfState information.
//
// TODO(sep-raft-log): this would need to be fetched (fully or partially) from
// both log and state machine storage on startup, to detect which of the two
// storages is ahead, and initialize correctly.
InitialState() (pb.HardState, pb.ConfState, error)

LogStorage
StateStorage
}
Expand Down Expand Up @@ -212,6 +226,12 @@ func (ms *MemoryStorage) firstIndex() uint64 {
return ms.ents[0].Index + 1
}

// LogSnapshot implements the LogStorage interface.
func (ms *MemoryStorage) LogSnapshot() LogStorageSnapshot {
// TODO(pav-kv): return an immutable subset of MemoryStorage.
return ms
}

// Snapshot implements the Storage interface.
func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error) {
ms.Lock()
Expand Down

0 comments on commit d926d5e

Please sign in to comment.