diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index ba7c5a0c8826..23086d72165a 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -214,6 +214,27 @@ func (r *Replica) GetFirstIndex() kvpb.RaftIndex { return r.raftFirstIndexRLocked() } +// LogSnapshot returns an immutable point-in-time snapshot of the log storage. +func (r *replicaRaftStorage) LogSnapshot() raft.LogStorageSnapshot { + r.raftMu.AssertHeld() + r.mu.AssertRHeld() + // TODO(pav-kv): return a wrapper which, in all methods, checks that the log + // storage hasn't been written to. A more relaxed version of it should assert + // that only the relevant part of the log hasn't been overwritten, e.g. a new + // term leader hasn't appended a log slice that truncated the log, or the log + // hasn't been wiped. + // + // This would require auditing and integrating with the write paths. Today, + // this type implements only reads, and writes are in various places like the + // logstore.LogStore type, or the code in the split handler which creates an + // empty range state. + // + // We don't need a fully fledged Pebble snapshot here. For our purposes, we + // can also make sure that raftMu is held for the entire period of using the + // LogSnapshot - this should guarantee its immutability. + return r +} + // GetLeaseAppliedIndex returns the lease index of the last applied command. func (r *Replica) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex { r.mu.RLock() diff --git a/pkg/raft/log.go b/pkg/raft/log.go index 09ab8a1d35f8..779b2bd2c7e8 100644 --- a/pkg/raft/log.go +++ b/pkg/raft/log.go @@ -23,6 +23,28 @@ import ( pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" ) +// LogSnapshot encapsulates a point-in-time state of the raft log accessible +// outside the raft package for reads. +// +// To access it safely, the user must not mutate the underlying raft log storage +// between when the snapshot is obtained and the reads are done. +// +// TODO(pav-kv): this should be part of the Ready API. Instead of pre-fetching +// entries (e.g. the committed entries subject to state machine application), +// allow the application to read them from LogSnapshot in the Ready handler. +// This gives the application direct control on resource allocation, and +// flexibility to do raft log IO without blocking RawNode operation. +type LogSnapshot struct { + // first is the first available log index. + first uint64 + // storage contains the stable log entries. + storage LogStorage + // unstable contains the unstable log entries. + unstable logSlice + // logger gives access to logging errors. + logger Logger +} + type raftLog struct { // storage contains all stable entries since the last snapshot. storage Storage @@ -409,8 +431,12 @@ func (l *raftLog) lastEntryID() entryID { return entryID{term: t, index: index} } +func (l *raftLog) term(i uint64) (uint64, error) { + return l.snap(l.storage).term(i) +} + // term returns the term of the log entry at the given index. -func (l *raftLog) term(index uint64) (uint64, error) { +func (l LogSnapshot) term(index uint64) (uint64, error) { // Check the unstable log first, even before computing the valid index range, // which may need to access the storage. If we find the entry's term in the // unstable log, we know it was in the valid range. @@ -418,7 +444,7 @@ func (l *raftLog) term(index uint64) (uint64, error) { return 0, ErrUnavailable } else if index >= l.unstable.prev.index { return l.unstable.termAt(index), nil - } else if index+1 < l.firstIndex() { + } else if index+1 < l.first { return 0, ErrCompacted } @@ -518,6 +544,35 @@ func (l *raftLog) scan(lo, hi uint64, pageSize entryEncodingSize, v func([]pb.En // The returned slice can be appended to, but the entries in it must not be // mutated. func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) { + return l.snap(l.storage).slice(lo, hi, maxSize) +} + +// LogSlice returns a valid log slice for a prefix of the (lo, hi] log index +// interval, with the total entries size not exceeding maxSize. +// +// Returns at least one entry if the interval contains any. The maxSize can only +// be exceeded if the first entry (lo+1) is larger. +// +// TODO(pav-kv): export the logSlice type so that the caller can use it. +func (l LogSnapshot) LogSlice(lo, hi uint64, maxSize uint64) (logSlice, error) { + prevTerm, err := l.term(lo) + if err != nil { + // The log is probably compacted at index > lo (err == ErrCompacted), or it + // can be a custom storage error. + return logSlice{}, err + } + ents, err := l.slice(lo, hi, entryEncodingSize(maxSize)) + if err != nil { + return logSlice{}, err + } + return logSlice{ + term: l.unstable.term, + prev: entryID{term: prevTerm, index: lo}, + entries: ents, + }, nil +} + +func (l LogSnapshot) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) { if err := l.mustCheckOutOfBounds(lo, hi); err != nil { return nil, err } else if lo >= hi { @@ -576,13 +631,13 @@ func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, e // mustCheckOutOfBounds checks that the (lo, hi] interval is within the bounds // of this raft log: l.firstIndex()-1 <= lo <= hi <= l.lastIndex(). -func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error { +func (l LogSnapshot) mustCheckOutOfBounds(lo, hi uint64) error { if lo > hi { l.logger.Panicf("invalid slice %d > %d", lo, hi) } - if fi := l.firstIndex(); lo+1 < fi { + if fi := l.first; lo+1 < fi { return ErrCompacted - } else if li := l.lastIndex(); hi > li { + } else if li := l.unstable.lastIndex(); hi > li { l.logger.Panicf("slice(%d,%d] out of bound [%d,%d]", lo, hi, fi, li) } return nil @@ -598,3 +653,14 @@ func (l *raftLog) zeroTermOnOutOfBounds(t uint64, err error) uint64 { l.logger.Panicf("unexpected error (%v)", err) return 0 } + +// snap returns a point-in-time snapshot of the raft log. This snapshot can be +// read from while the underlying storage is not mutated. +func (l *raftLog) snap(storage LogStorage) LogSnapshot { + return LogSnapshot{ + first: l.firstIndex(), + storage: storage, + unstable: l.unstable.logSlice, + logger: l.logger, + } +} diff --git a/pkg/raft/log_test.go b/pkg/raft/log_test.go index eb0ca2601bc7..13c2a8c0ce48 100644 --- a/pkg/raft/log_test.go +++ b/pkg/raft/log_test.go @@ -706,7 +706,7 @@ func TestIsOutOfBounds(t *testing.T) { require.True(t, tt.wpanic) } }() - err := l.mustCheckOutOfBounds(tt.lo, tt.hi) + err := l.snap(l.storage).mustCheckOutOfBounds(tt.lo, tt.hi) require.False(t, tt.wpanic) require.False(t, tt.wErrCompacted && err != ErrCompacted) require.False(t, !tt.wErrCompacted && err != nil) diff --git a/pkg/raft/rawnode.go b/pkg/raft/rawnode.go index 4bb4b6990818..cd119d6be4b2 100644 --- a/pkg/raft/rawnode.go +++ b/pkg/raft/rawnode.go @@ -115,6 +115,24 @@ func (rn *RawNode) Step(m pb.Message) error { return rn.raft.Step(m) } +// LogSnapshot returns a point-in-time read-only state of the raft log. +// +// The returned snapshot can be read from while RawNode continues operation, as +// long as the application guarantees immutability of the underlying log storage +// snapshot (returned from the LogStorage.LogSnapshot method) while the snapshot +// is being used. +// +// One way the application can implement an immutable snapshot is by blocking +// the entire log storage for new writes. This also means the Ready() handling +// loop isn't able to hand over log writes to storage. +// +// A more advanced implementation can grab an immutable storage engine snapshot +// that does not block writes. Not blocking writes is beneficial for commit tail +// latency, since it doesn't prevent MsgApp/Resp exchange with the leader. +func (rn *RawNode) LogSnapshot() LogSnapshot { + return rn.raft.raftLog.snap(rn.raft.raftLog.storage.LogSnapshot()) +} + // Ready returns the outstanding work that the application needs to handle. This // includes appending and applying entries or a snapshot, updating the HardState, // and sending messages. The returned Ready() *must* be handled and subsequently diff --git a/pkg/raft/storage.go b/pkg/raft/storage.go index de4b03c53891..a445a2deb712 100644 --- a/pkg/raft/storage.go +++ b/pkg/raft/storage.go @@ -38,11 +38,8 @@ var ErrSnapOutOfDate = errors.New("requested index is older than the existing sn // are unavailable. var ErrUnavailable = errors.New("requested entry at index is unavailable") -// LogStorage is a read API for the raft log. +// LogStorage is a read-only API for the raft log. type LogStorage interface { - // InitialState returns the saved HardState and ConfState information. - InitialState() (pb.HardState, pb.ConfState, error) - // Entries returns a slice of consecutive log entries in the range [lo, hi), // starting from lo. The maxSize limits the total size of the log entries // returned, but Entries returns at least one entry if any. @@ -87,6 +84,16 @@ type LogStorage interface { // TODO(pav-kv): replace this with a Prev() method equivalent to logSlice's // prev field. The log storage is just a storage-backed logSlice. FirstIndex() uint64 + + // LogSnapshot returns an immutable point-in-time log storage snapshot. + LogSnapshot() LogStorageSnapshot +} + +// LogStorageSnapshot is a read-only API for the raft log which has extended +// immutability guarantees outside RawNode. The immutability must be provided by +// the application layer. +type LogStorageSnapshot interface { + LogStorage } // StateStorage provides read access to the state machine storage. @@ -103,6 +110,13 @@ type StateStorage interface { // // TODO(pav-kv): audit all error handling and document the contract. type Storage interface { + // InitialState returns the saved HardState and ConfState information. + // + // TODO(sep-raft-log): this would need to be fetched (fully or partially) from + // both log and state machine storage on startup, to detect which of the two + // storages is ahead, and initialize correctly. + InitialState() (pb.HardState, pb.ConfState, error) + LogStorage StateStorage } @@ -212,6 +226,12 @@ func (ms *MemoryStorage) firstIndex() uint64 { return ms.ents[0].Index + 1 } +// LogSnapshot implements the LogStorage interface. +func (ms *MemoryStorage) LogSnapshot() LogStorageSnapshot { + // TODO(pav-kv): return an immutable subset of MemoryStorage. + return ms +} + // Snapshot implements the Storage interface. func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error) { ms.Lock()