From 3fdfbef4d12cf98914774497786838ebdd26a7f8 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 18 Sep 2024 19:26:56 +0100 Subject: [PATCH] raft,kvserver: introduce LogSnapshot API The API allows reading from raft log while RawNode continues making progress. Practically, this means we can unlock Replica.mu after having obtained the log snapshot, and can safely read from it while only holding Replica.raftMu. Epic: none Release note: none --- pkg/kv/kvserver/replica_raftstorage.go | 21 +++++++ pkg/raft/log.go | 76 ++++++++++++++++++++++++-- pkg/raft/log_test.go | 2 +- pkg/raft/rawnode.go | 18 ++++++ pkg/raft/storage.go | 28 ++++++++-- 5 files changed, 135 insertions(+), 10 deletions(-) diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index ba7c5a0c8826..23086d72165a 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -214,6 +214,27 @@ func (r *Replica) GetFirstIndex() kvpb.RaftIndex { return r.raftFirstIndexRLocked() } +// LogSnapshot returns an immutable point-in-time snapshot of the log storage. +func (r *replicaRaftStorage) LogSnapshot() raft.LogStorageSnapshot { + r.raftMu.AssertHeld() + r.mu.AssertRHeld() + // TODO(pav-kv): return a wrapper which, in all methods, checks that the log + // storage hasn't been written to. A more relaxed version of it should assert + // that only the relevant part of the log hasn't been overwritten, e.g. a new + // term leader hasn't appended a log slice that truncated the log, or the log + // hasn't been wiped. + // + // This would require auditing and integrating with the write paths. Today, + // this type implements only reads, and writes are in various places like the + // logstore.LogStore type, or the code in the split handler which creates an + // empty range state. + // + // We don't need a fully fledged Pebble snapshot here. For our purposes, we + // can also make sure that raftMu is held for the entire period of using the + // LogSnapshot - this should guarantee its immutability. + return r +} + // GetLeaseAppliedIndex returns the lease index of the last applied command. func (r *Replica) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex { r.mu.RLock() diff --git a/pkg/raft/log.go b/pkg/raft/log.go index 09ab8a1d35f8..779b2bd2c7e8 100644 --- a/pkg/raft/log.go +++ b/pkg/raft/log.go @@ -23,6 +23,28 @@ import ( pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" ) +// LogSnapshot encapsulates a point-in-time state of the raft log accessible +// outside the raft package for reads. +// +// To access it safely, the user must not mutate the underlying raft log storage +// between when the snapshot is obtained and the reads are done. +// +// TODO(pav-kv): this should be part of the Ready API. Instead of pre-fetching +// entries (e.g. the committed entries subject to state machine application), +// allow the application to read them from LogSnapshot in the Ready handler. +// This gives the application direct control on resource allocation, and +// flexibility to do raft log IO without blocking RawNode operation. +type LogSnapshot struct { + // first is the first available log index. + first uint64 + // storage contains the stable log entries. + storage LogStorage + // unstable contains the unstable log entries. + unstable logSlice + // logger gives access to logging errors. + logger Logger +} + type raftLog struct { // storage contains all stable entries since the last snapshot. storage Storage @@ -409,8 +431,12 @@ func (l *raftLog) lastEntryID() entryID { return entryID{term: t, index: index} } +func (l *raftLog) term(i uint64) (uint64, error) { + return l.snap(l.storage).term(i) +} + // term returns the term of the log entry at the given index. -func (l *raftLog) term(index uint64) (uint64, error) { +func (l LogSnapshot) term(index uint64) (uint64, error) { // Check the unstable log first, even before computing the valid index range, // which may need to access the storage. If we find the entry's term in the // unstable log, we know it was in the valid range. @@ -418,7 +444,7 @@ func (l *raftLog) term(index uint64) (uint64, error) { return 0, ErrUnavailable } else if index >= l.unstable.prev.index { return l.unstable.termAt(index), nil - } else if index+1 < l.firstIndex() { + } else if index+1 < l.first { return 0, ErrCompacted } @@ -518,6 +544,35 @@ func (l *raftLog) scan(lo, hi uint64, pageSize entryEncodingSize, v func([]pb.En // The returned slice can be appended to, but the entries in it must not be // mutated. func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) { + return l.snap(l.storage).slice(lo, hi, maxSize) +} + +// LogSlice returns a valid log slice for a prefix of the (lo, hi] log index +// interval, with the total entries size not exceeding maxSize. +// +// Returns at least one entry if the interval contains any. The maxSize can only +// be exceeded if the first entry (lo+1) is larger. +// +// TODO(pav-kv): export the logSlice type so that the caller can use it. +func (l LogSnapshot) LogSlice(lo, hi uint64, maxSize uint64) (logSlice, error) { + prevTerm, err := l.term(lo) + if err != nil { + // The log is probably compacted at index > lo (err == ErrCompacted), or it + // can be a custom storage error. + return logSlice{}, err + } + ents, err := l.slice(lo, hi, entryEncodingSize(maxSize)) + if err != nil { + return logSlice{}, err + } + return logSlice{ + term: l.unstable.term, + prev: entryID{term: prevTerm, index: lo}, + entries: ents, + }, nil +} + +func (l LogSnapshot) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) { if err := l.mustCheckOutOfBounds(lo, hi); err != nil { return nil, err } else if lo >= hi { @@ -576,13 +631,13 @@ func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, e // mustCheckOutOfBounds checks that the (lo, hi] interval is within the bounds // of this raft log: l.firstIndex()-1 <= lo <= hi <= l.lastIndex(). -func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error { +func (l LogSnapshot) mustCheckOutOfBounds(lo, hi uint64) error { if lo > hi { l.logger.Panicf("invalid slice %d > %d", lo, hi) } - if fi := l.firstIndex(); lo+1 < fi { + if fi := l.first; lo+1 < fi { return ErrCompacted - } else if li := l.lastIndex(); hi > li { + } else if li := l.unstable.lastIndex(); hi > li { l.logger.Panicf("slice(%d,%d] out of bound [%d,%d]", lo, hi, fi, li) } return nil @@ -598,3 +653,14 @@ func (l *raftLog) zeroTermOnOutOfBounds(t uint64, err error) uint64 { l.logger.Panicf("unexpected error (%v)", err) return 0 } + +// snap returns a point-in-time snapshot of the raft log. This snapshot can be +// read from while the underlying storage is not mutated. +func (l *raftLog) snap(storage LogStorage) LogSnapshot { + return LogSnapshot{ + first: l.firstIndex(), + storage: storage, + unstable: l.unstable.logSlice, + logger: l.logger, + } +} diff --git a/pkg/raft/log_test.go b/pkg/raft/log_test.go index eb0ca2601bc7..13c2a8c0ce48 100644 --- a/pkg/raft/log_test.go +++ b/pkg/raft/log_test.go @@ -706,7 +706,7 @@ func TestIsOutOfBounds(t *testing.T) { require.True(t, tt.wpanic) } }() - err := l.mustCheckOutOfBounds(tt.lo, tt.hi) + err := l.snap(l.storage).mustCheckOutOfBounds(tt.lo, tt.hi) require.False(t, tt.wpanic) require.False(t, tt.wErrCompacted && err != ErrCompacted) require.False(t, !tt.wErrCompacted && err != nil) diff --git a/pkg/raft/rawnode.go b/pkg/raft/rawnode.go index 4bb4b6990818..cd119d6be4b2 100644 --- a/pkg/raft/rawnode.go +++ b/pkg/raft/rawnode.go @@ -115,6 +115,24 @@ func (rn *RawNode) Step(m pb.Message) error { return rn.raft.Step(m) } +// LogSnapshot returns a point-in-time read-only state of the raft log. +// +// The returned snapshot can be read from while RawNode continues operation, as +// long as the application guarantees immutability of the underlying log storage +// snapshot (returned from the LogStorage.LogSnapshot method) while the snapshot +// is being used. +// +// One way the application can implement an immutable snapshot is by blocking +// the entire log storage for new writes. This also means the Ready() handling +// loop isn't able to hand over log writes to storage. +// +// A more advanced implementation can grab an immutable storage engine snapshot +// that does not block writes. Not blocking writes is beneficial for commit tail +// latency, since it doesn't prevent MsgApp/Resp exchange with the leader. +func (rn *RawNode) LogSnapshot() LogSnapshot { + return rn.raft.raftLog.snap(rn.raft.raftLog.storage.LogSnapshot()) +} + // Ready returns the outstanding work that the application needs to handle. This // includes appending and applying entries or a snapshot, updating the HardState, // and sending messages. The returned Ready() *must* be handled and subsequently diff --git a/pkg/raft/storage.go b/pkg/raft/storage.go index de4b03c53891..a445a2deb712 100644 --- a/pkg/raft/storage.go +++ b/pkg/raft/storage.go @@ -38,11 +38,8 @@ var ErrSnapOutOfDate = errors.New("requested index is older than the existing sn // are unavailable. var ErrUnavailable = errors.New("requested entry at index is unavailable") -// LogStorage is a read API for the raft log. +// LogStorage is a read-only API for the raft log. type LogStorage interface { - // InitialState returns the saved HardState and ConfState information. - InitialState() (pb.HardState, pb.ConfState, error) - // Entries returns a slice of consecutive log entries in the range [lo, hi), // starting from lo. The maxSize limits the total size of the log entries // returned, but Entries returns at least one entry if any. @@ -87,6 +84,16 @@ type LogStorage interface { // TODO(pav-kv): replace this with a Prev() method equivalent to logSlice's // prev field. The log storage is just a storage-backed logSlice. FirstIndex() uint64 + + // LogSnapshot returns an immutable point-in-time log storage snapshot. + LogSnapshot() LogStorageSnapshot +} + +// LogStorageSnapshot is a read-only API for the raft log which has extended +// immutability guarantees outside RawNode. The immutability must be provided by +// the application layer. +type LogStorageSnapshot interface { + LogStorage } // StateStorage provides read access to the state machine storage. @@ -103,6 +110,13 @@ type StateStorage interface { // // TODO(pav-kv): audit all error handling and document the contract. type Storage interface { + // InitialState returns the saved HardState and ConfState information. + // + // TODO(sep-raft-log): this would need to be fetched (fully or partially) from + // both log and state machine storage on startup, to detect which of the two + // storages is ahead, and initialize correctly. + InitialState() (pb.HardState, pb.ConfState, error) + LogStorage StateStorage } @@ -212,6 +226,12 @@ func (ms *MemoryStorage) firstIndex() uint64 { return ms.ents[0].Index + 1 } +// LogSnapshot implements the LogStorage interface. +func (ms *MemoryStorage) LogSnapshot() LogStorageSnapshot { + // TODO(pav-kv): return an immutable subset of MemoryStorage. + return ms +} + // Snapshot implements the Storage interface. func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error) { ms.Lock()