Skip to content

Commit

Permalink
Merge #131041
Browse files Browse the repository at this point in the history
131041: kvserver: document raft Storage mental model r=nvanbenschoten a=pav-kv

This PR documents the use of `Replica.{raftMu,mu}` mutexes in `raft.{RawNode,Storage}`, and fixes a few minor things along the way.

Part of #130955

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Sep 24, 2024
2 parents a055dec + 30c04cd commit d26f61d
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 64 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/logstore/sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ type SideloadStorage interface {
// files that remain, or an error.
TruncateTo(_ context.Context, index kvpb.RaftIndex) (freed, retained int64, _ error)
// BytesIfTruncatedFromTo returns the number of bytes that would be freed,
// if one were to truncate [from, to). Additionally, it returns the the
// number of bytes that would be retained >= to.
// if one were to truncate [from, to). Additionally, it returns the number
// of bytes that would be retained >= to.
BytesIfTruncatedFromTo(_ context.Context, from kvpb.RaftIndex, to kvpb.RaftIndex) (freed, retained int64, _ error)
// Returns an absolute path to the file that Get() would return the contents
// of. Does not check whether the file actually exists.
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ type Replica struct {
// depending on which lock is being held.
stateLoader stateloader.StateLoader
// on-disk storage for sideloaded SSTables. Always non-nil.
// TODO(pavelkalinnikov): remove sideloaded == nil checks.
sideloaded logstore.SideloadStorage
// stateMachine is used to apply committed raft entries.
stateMachine replicaStateMachine
Expand Down
10 changes: 2 additions & 8 deletions pkg/kv/kvserver/replica_destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ func (s destroyStatus) Removed() bool {
const mergedTombstoneReplicaID roachpb.ReplicaID = math.MaxInt32

func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCStats) error {
// NB: we need the nil check below because it's possible that we're GC'ing a
// Replica without a replicaID, in which case it does not have a sideloaded
// storage.
//
// TODO(tschottdorf): at node startup, we should remove all on-disk
// directories belonging to replicas which aren't present. A crash before a
// call to postDestroyRaftMuLocked will currently leave the files around
Expand All @@ -88,10 +84,8 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS
// TODO(pavelkalinnikov): coming back in 2023, the above may still happen if:
// (1) state machine syncs, (2) OS crashes before (3) sideloaded was able to
// sync the files removal. The files should be cleaned up on restart.
if r.raftMu.sideloaded != nil {
if err := r.raftMu.sideloaded.Clear(ctx); err != nil {
return err
}
if err := r.raftMu.sideloaded.Clear(ctx); err != nil {
return err
}

// Release the reference to this tenant in metrics, we know the tenant ID is
Expand Down
16 changes: 5 additions & 11 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2843,8 +2843,6 @@ func handleTruncatedStateBelowRaftPreApply(
// storage engine. This will iterate over the Raft log and sideloaded files, so
// depending on the size of these it can be mildly to extremely expensive and
// thus should not be called frequently.
//
// The sideloaded storage may be nil, in which case it is treated as empty.
func ComputeRaftLogSize(
ctx context.Context,
rangeID roachpb.RangeID,
Expand All @@ -2857,15 +2855,11 @@ func ComputeRaftLogSize(
if err != nil {
return 0, err
}
var totalSideloaded int64
if sideloaded != nil {
var err error
// The remaining bytes if one were to truncate [0, 0) gives us the total
// number of bytes in sideloaded files.
_, totalSideloaded, err = sideloaded.BytesIfTruncatedFromTo(ctx, 0, 0)
if err != nil {
return 0, err
}
// The remaining bytes if one were to truncate [0, 0) gives us the total
// number of bytes in sideloaded files.
_, totalSideloaded, err := sideloaded.BytesIfTruncatedFromTo(ctx, 0, 0)
if err != nil {
return 0, err
}
return ms.SysBytes + totalSideloaded, nil
}
Expand Down
170 changes: 128 additions & 42 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,50 +57,107 @@ var snapshotIngestAsWriteThreshold = settings.RegisterByteSizeSetting(
}())

// replicaRaftStorage implements the raft.Storage interface.
//
// All mutating calls to raft.RawNode require that r.mu is held. All read-only
// calls to raft.RawNode require that r.mu is held at least for reads.
//
// All methods implementing raft.Storage are called from within, or on behalf of
// a RawNode. When called from within RawNode, r.mu is held necessarily (and
// maybe r.raftMu). Conceptually, r.mu only needs to be locked for reading, but
// implementation details may require an exclusive lock (see method comments).
// When called from outside RawNode (on behalf of a RawNode "snapshot"), the
// caller must hold r.raftMu and/or r.mu.
//
// RawNode has the in-memory "unstable" state which services most of its needs.
// Most RawNode.Step updates are completed in memory, while only holding r.mu.
//
// RawNode falls back to reading from Storage when it does not have the needed
// state in memory. For example, the leader may need to read log entries from
// storage to construct a log append request for a follower, or a follower may
// need to interact with its storage upon receiving such a request to check
// whether the appended log slice is consistent with raft rules.
//
// (1) RawNode guarantees that everything it reads from Storage has no in-flight
// writes. Raft always reads state that it knows to be stable (meaning it does
// not have pending writes) and, in some cases, also synced / durable. Storage
// acknowledges completed writes / syncs back to RawNode, under r.mu, so that
// RawNode can correctly implement this guarantee.
//
// (2) The content of raft.Storage is always mutated while holding r.raftMu,
// which is an un-contended "IO" mutex and is allowed to be held longer. Most
// writes are extracted from RawNode while holding r.raftMu and r.mu (in the
// Ready() loop), and handed over to storage under r.raftMu. There are a few
// cases when CRDB synthesizes the writes (e.g. during a range split / merge, or
// raft log truncations) under r.raftMu.
//
// The guarantees explain why holding only r.mu is sufficient for RawNode or its
// snapshot to be in a consistent state. Under r.mu, new writes are blocked,
// because of (2), and by (1) reads never conflict with the in-flight writes.
//
// However, r.mu is a widely used mutex, and not recommended for IO. When doing
// work on behalf RawNode that involves IO (like constructing log appends for a
// follower), we would like to release r.mu. The two guarantees make it possible
// to observe a consistent RawNode snapshot while only holding r.raftMu.
//
// While both r.raftMu and r.mu are held, we can take a shallow / COW copy of
// the RawNode or its relevant subset (e.g. the raft log; the Ready struct is
// also considered such). A subsequent release of r.mu allows RawNode to resume
// making progress. The raft.Storage does not observe any new writes while
// r.raftMu is still held, by the guarantee (2). Combined with guarantee (1), it
// means that both the original and the snapshot RawNode remain consistent. The
// shallow copy represents a valid past state of the RawNode.
//
// TODO(pav-kv): the snapshotting with only r.raftMu held is not implemented,
// but should be done soon.
//
// All the implementation methods assume that the required locks are held, and
// don't acquire them. The specific locking requirements are noted in each
// method's comment. The method names do not follow our "Locked" naming
// conventions, due to being an implementation of raft.Storage interface from a
// different package.
//
// Many of the methods defined in this file are wrappers around static
// functions. This is done to facilitate their use from Replica.Snapshot(),
// where it is important that all the data that goes into the snapshot comes
// from a consistent view of the database, and not the replica's in-memory state
// or via a reference to Replica.store.Engine().
type replicaRaftStorage Replica

var _ raft.Storage = (*replicaRaftStorage)(nil)

// All calls to raft.RawNode require that both Replica.raftMu and
// Replica.mu are held. All of the functions exposed via the
// raft.Storage interface will in turn be called from RawNode, so none
// of these methods may acquire either lock, but they may require
// their caller to hold one or both locks (even though they do not
// follow our "Locked" naming convention). Specific locking
// requirements (e.g. whether r.mu must be held for reading or writing)
// are noted in each method's comments.
//
// Many of the methods defined in this file are wrappers around static
// functions. This is done to facilitate their use from
// Replica.Snapshot(), where it is important that all the data that
// goes into the snapshot comes from a consistent view of the
// database, and not the replica's in-memory state or via a reference
// to Replica.store.Engine().

// InitialState implements the raft.Storage interface.
// InitialState requires that r.mu is held for writing because it requires
// exclusive access to r.mu.stateLoader.
func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState, error) {
// The call must synchronize with raft IO. Called when raft is initialized
// under both r.raftMu and r.mu. We don't technically need r.mu here, but we
// know it is held.
r.raftMu.AssertHeld()
r.mu.AssertHeld()

ctx := r.AnnotateCtx(context.TODO())
hs, err := r.mu.stateLoader.LoadHardState(ctx, r.store.TODOEngine())
// For uninitialized ranges, membership is unknown at this point.
if raft.IsEmptyHardState(hs) || err != nil {
if err != nil {
r.reportRaftStorageError(err)
}
hs, err := r.raftMu.stateLoader.LoadHardState(ctx, r.store.TODOEngine())
if err != nil {
r.reportRaftStorageError(err)
return raftpb.HardState{}, raftpb.ConfState{}, err
}
// For uninitialized ranges, membership is unknown at this point.
if raft.IsEmptyHardState(hs) {
return raftpb.HardState{}, raftpb.ConfState{}, nil
}
// NB: r.mu.state is guarded by both r.raftMu and r.mu.
cs := r.mu.state.Desc.Replicas().ConfState()
return hs, cs, nil
}

// Entries implements the raft.Storage interface. Note that maxBytes is advisory
// and this method will always return at least one entry even if it exceeds
// maxBytes. Sideloaded proposals count towards maxBytes with their payloads inlined.
// Entries requires that r.mu is held for writing because it requires exclusive
// access to r.mu.stateLoader.
// Entries implements the raft.Storage interface.
//
// NB: maxBytes is advisory, and this method returns at least one entry (unless
// there are none in the requested interval), even if its size exceeds maxBytes.
// Sideloaded entries count towards maxBytes with their payloads inlined.
//
// Entries can return log entries that are not yet stable in durable storage.
// Entries can return log entries that are not yet durable / synced in storage.
//
// Requires that r.mu is held for writing.
// TODO(pav-kv): make it possible to call with only raftMu held.
func (r *replicaRaftStorage) Entries(lo, hi uint64, maxBytes uint64) ([]raftpb.Entry, error) {
entries, err := r.TypedEntries(kvpb.RaftIndex(lo), kvpb.RaftIndex(hi), maxBytes)
if err != nil {
Expand All @@ -112,14 +169,37 @@ func (r *replicaRaftStorage) Entries(lo, hi uint64, maxBytes uint64) ([]raftpb.E
func (r *replicaRaftStorage) TypedEntries(
lo, hi kvpb.RaftIndex, maxBytes uint64,
) ([]raftpb.Entry, error) {
ctx := r.AnnotateCtx(context.TODO())
if r.raftMu.sideloaded == nil {
return nil, errors.New("sideloaded storage is uninitialized")
}
ents, _, loadedSize, err := logstore.LoadEntries(ctx, r.mu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID,
r.store.raftEntryCache, r.raftMu.sideloaded, lo, hi, maxBytes, &r.raftMu.bytesAccount)
// The call is always initiated by RawNode, under r.mu. Need it locked for
// writes, for r.mu.stateLoader.
//
// TODO(pav-kv): we have a large class of cases when we would rather only hold
// raftMu while reading the entries. The r.mu lock should be narrow.
r.mu.AssertHeld()
// Writes to the storage engine and the sideloaded storage are made under
// raftMu only. Since we are holding r.mu, but may or may not be holding
// raftMu, this read could be racing with a write.
//
// Such races are prevented at a higher level, in RawNode. Raft never reads at
// a log index for which there is at least one in-flight entry (possibly
// multiple, issued at different leader terms) to storage. It always reads
// "stable" entries.
//
// NB: without this guarantee, there would be a concern with the sideloaded
// storage: it doesn't provide a consistent snapshot to the reader, unlike the
// storage engine. Its Put method writes / syncs a file sequentially, so a
// racing reader would be able to read partial entries.
//
// TODO(pav-kv): we need better safety guardrails here. The log storage type
// can remember the readable bounds, and assert that reads do not cross them.
// TODO(pav-kv): r.raftMu.bytesAccount is broken - can't rely on raftMu here.
entries, _, loadedSize, err := logstore.LoadEntries(
r.AnnotateCtx(context.TODO()),
r.mu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID,
r.store.raftEntryCache, r.raftMu.sideloaded, lo, hi, maxBytes,
&r.raftMu.bytesAccount,
)
r.store.metrics.RaftStorageReadBytes.Inc(int64(loadedSize))
return ents, err
return entries, err
}

// raftEntriesLocked requires that r.mu is held for writing.
Expand All @@ -143,17 +223,23 @@ func (r *replicaRaftStorage) Term(i uint64) (uint64, error) {
return uint64(term), err
}

// TypedTerm requires that r.mu is held for writing because it requires exclusive
// access to r.mu.stateLoader.
// TypedTerm requires that r.mu is held for writing because it requires
// exclusive access to r.mu.stateLoader.
//
// TODO(pav-kv): make it possible to read with only raftMu held.
func (r *replicaRaftStorage) TypedTerm(i kvpb.RaftIndex) (kvpb.RaftTerm, error) {
r.mu.AssertHeld()
// TODO(nvanbenschoten): should we set r.mu.lastTermNotDurable when
// r.mu.lastIndexNotDurable == i && r.mu.lastTermNotDurable == invalidLastTerm?
// TODO(pav-kv): we should rather always remember the last entry term, and
// remove invalidLastTerm special case.
if r.mu.lastIndexNotDurable == i && r.mu.lastTermNotDurable != invalidLastTerm {
return r.mu.lastTermNotDurable, nil
}
ctx := r.AnnotateCtx(context.TODO())
return logstore.LoadTerm(ctx, r.mu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID,
r.store.raftEntryCache, i)
return logstore.LoadTerm(r.AnnotateCtx(context.TODO()),
r.mu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID,
r.store.raftEntryCache, i,
)
}

// raftTermLocked requires that r.mu is locked for writing.
Expand Down

0 comments on commit d26f61d

Please sign in to comment.