Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
129692: kv: don't quiesce leader leases r=nvanbenschoten a=nvanbenschoten

Closes #129688.

A fortified raft leader will not send raft heartbeats after we address #125248, so quiescence will not be needed. All liveness decisions are based on store liveness communication, which is cheap enough to not need a notion of quiescence.

We should not merge this until after we address #125248.

Release note: None

131439: rac2,replica_rac2: order replicaSendStream.mu before Replica.mu r=kvoli a=sumeerbhola

The other way around is too hard to work with when we add pull mode. We will call MakeMsgAppAndAssumeSent from inside a replicaSendStream and will be adjusting replicaSendStream state before and after that call. During that call Replica.mu needs to be held, since Raft state for this follower (like Next) is being modified. It is not convenient to release and reacquire replicaSendStream.mu just for this purpose.

Informs #130433

Epic: CRDB-37515

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
3 people committed Sep 26, 2024
3 parents c7cc32f + 684ef02 + 8d4977f commit c56def7
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 130 deletions.
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/flow_control_replica_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,3 +491,13 @@ func (r *replicaForRACv2) LeaseholderMuRLocked() roachpb.ReplicaID {
func (r *replicaForRACv2) IsScratchRange() bool {
return (*Replica)(r).IsScratchRange()
}

// MuLock implements replica_rac2.ReplicaForRaftNode.
func (r *replicaForRACv2) MuLock() {
r.mu.Lock()
}

// MuUnlock implements replica_rac2.ReplicaForRaftNode.
func (r *replicaForRACv2) MuUnlock() {
r.mu.Unlock()
}
41 changes: 19 additions & 22 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ import (
// range at the leader. It must be created for a particular leader term, and
// closed if the term changes.
//
// Almost none of the methods are called with Replica.mu held. The caller and
// callee should typically order their mutexes before Replica.mu. The one
// exception is MaybeSendPingsLocked.
// None of the methods are called with Replica.mu held. The caller and callee
// should order their mutexes before Replica.mu.
type RangeController interface {
// WaitForEval seeks admission to evaluate a request at the given priority.
// This blocks until there are positive tokens available for the request to
Expand Down Expand Up @@ -71,12 +70,12 @@ type RangeController interface {
//
// Requires replica.raftMu to be held.
AdmitRaftMuLocked(context.Context, roachpb.ReplicaID, AdmittedVector)
// MaybeSendPingsLocked sends a MsgApp ping to each raft peer in
// MaybeSendPingsRaftMuLocked sends a MsgApp ping to each raft peer in
// StateReplicate whose admitted vector is lagging, and there wasn't a
// recent MsgApp to this peer.
//
// Requires replica.raftMu and replica.mu to be held.
MaybeSendPingsLocked()
// Requires replica.raftMu to be held.
MaybeSendPingsRaftMuLocked()
// SetReplicasRaftMuLocked sets the replicas of the range. The caller will
// never mutate replicas, and neither should the callee.
//
Expand All @@ -97,21 +96,20 @@ type RangeController interface {

// RaftInterface implements methods needed by RangeController.
//
// Locking reminder: as noted in replicaSendStream, replicaSendStream.mu is
// ordered after Replica.mu.
// Replica.mu is not held when calling any methods. Replica.raftMu is held,
// though is not needed, and is mentioned in the method names purely from an
// informational perspective.
//
// TODO(pav-kv): This interface a placeholder for the interface containing raft
// methods. Replace this as part of #128019.
type RaftInterface interface {
// SendPingReplicaMuLocked sends a MsgApp ping to the given raft peer if
// SendPingRaftMuLocked sends a MsgApp ping to the given raft peer if
// there wasn't a recent MsgApp to this peer. The message is added to raft's
// message queue, and will be extracted and sent during the next Ready
// processing.
//
// If the peer is not in StateReplicate, this call does nothing.
//
// Requires Replica.mu to be held.
SendPingReplicaMuLocked(roachpb.ReplicaID) bool
SendPingRaftMuLocked(roachpb.ReplicaID) bool
// MakeMsgAppRaftMuLocked is used to construct a MsgApp for entries in
// [start, end) and must only be called in MsgAppPull mode for followers.
//
Expand All @@ -137,7 +135,11 @@ type RaftInterface interface {
// flow stays in StateReplicate. We should define or eliminate these cases.
//
// TODO(sumeer): This is a temporary API. LogSnapshot and LogSlice will
// replace it.
// replace it, and we will do this in two steps: (a) create a LogSlice while
// holding raftMu, which will not use RaftInterface, (b) call the following
// method with the LogSlice, and the callee will make the MsgApp and behave
// as if it was sent (i.e., update Next). Since we are not holding
// Replica.mu, the callee will need to acquire Replica.mu.
MakeMsgAppRaftMuLocked(
replicaID roachpb.ReplicaID, start, end uint64, maxSize int64) (raftpb.Message, error)
}
Expand Down Expand Up @@ -761,16 +763,14 @@ func (rc *rangeController) AdmitRaftMuLocked(
}
}

// MaybeSendPingsLocked implements RangeController.
func (rc *rangeController) MaybeSendPingsLocked() {
// NB: Replica.mu is already held, so no IO is permitted.
// MaybeSendPingsRaftMuLocked implements RangeController.
func (rc *rangeController) MaybeSendPingsRaftMuLocked() {
for id, state := range rc.replicaMap {
if id == rc.opts.LocalReplicaID {
continue
}
// s.shouldPing acquires replicaSendStream.mu.
if s := state.sendStream; s != nil && s.shouldPing() {
rc.opts.RaftInterface.SendPingReplicaMuLocked(id)
rc.opts.RaftInterface.SendPingRaftMuLocked(id)
}
}
}
Expand Down Expand Up @@ -976,10 +976,7 @@ func NewReplicaState(
type replicaSendStream struct {
parent *replicaState

// Because of MaybeSendPingsLocked being called with Replica.mu held, this
// mutex is ordered after Replica.mu. This ordering is a bit unfortunate, in
// that we need to ensure that we never need to acquire Replica.mu while
// holding replicaSendStream.mu.
// Mutex is ordered before Replica.mu.
mu struct {
syncutil.Mutex
// connectedStateStart is the time when the connectedState was last
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (r *testingRCRange) replicasStateInfo() map[roachpb.ReplicaID]ReplicaStateI
return replicasStateInfo
}

func (r *testingRCRange) SendPingReplicaMuLocked(roachpb.ReplicaID) bool {
func (r *testingRCRange) SendPingRaftMuLocked(roachpb.ReplicaID) bool {
return false
}

Expand Down
23 changes: 11 additions & 12 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,9 @@ type SideChannelInfoUsingRaftMessageRequest struct {
// We *strongly* prefer methods to be called without holding Replica.mu, since
// then the callee (implementation of Processor) does not need to worry about
// (a) deadlocks, since it sometimes needs to lock Replica.mu itself, (b) the
// amount of work it is doing under this critical section. There are three
// amount of work it is doing under this critical section. There are two
// exceptions to this, due to difficulty in changing the calling code:
// InitRaftLocked, OnDescChangedLocked and MaybeSendPingsLocked.
// InitRaftLocked, OnDescChangedLocked.
type Processor interface {
// InitRaftLocked is called when RaftNode is initialized for the Replica.
// NB: can be called twice before the Replica is fully initialized.
Expand Down Expand Up @@ -376,15 +376,15 @@ type Processor interface {
// raftMu is held.
AdmitRaftMuLocked(context.Context, roachpb.ReplicaID, rac2.AdmittedVector)

// MaybeSendPingsLocked sends a MsgApp ping to each raft peer whose admitted
// vector is lagging, and there wasn't a recent MsgApp to this peer. The
// messages are added to raft's message queue, and will be extracted from
// raft and sent during the next Ready processing.
// MaybeSendPingsRaftMuLocked sends a MsgApp ping to each raft peer whose
// admitted vector is lagging, and there wasn't a recent MsgApp to this
// peer. The messages are added to raft's message queue, and will be
// extracted from raft and sent during the next Ready processing.
//
// If the replica is not the leader, this call does nothing.
//
// Both Replica.raftMu and Replica.mu are held.
MaybeSendPingsLocked()
// raftMu is held.
MaybeSendPingsRaftMuLocked()

// AdmitForEval is called to admit work that wants to evaluate at the
// leaseholder.
Expand Down Expand Up @@ -1131,12 +1131,11 @@ func (p *processorImpl) AdmitRaftMuLocked(
}
}

// MaybeSendPingsLocked implements Processor.
func (p *processorImpl) MaybeSendPingsLocked() {
// MaybeSendPingsRaftMuLocked implements Processor.
func (p *processorImpl) MaybeSendPingsRaftMuLocked() {
p.opts.Replica.RaftMuAssertHeld()
p.opts.Replica.MuAssertHeld()
if rc := p.leader.rc; rc != nil {
rc.MaybeSendPingsLocked()
rc.MaybeSendPingsRaftMuLocked()
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ func (rn *testRaftNode) ReplicasStateLocked(_ map[roachpb.ReplicaID]rac2.Replica
fmt.Fprint(rn.b, " RaftNode.ReplicasStateLocked\n")
}

func (rn *testRaftNode) SendPingReplicaMuLocked(to roachpb.ReplicaID) bool {
fmt.Fprintf(rn.b, " RaftNode.SendPingReplicaMuLocked(%d)\n", to)
func (rn *testRaftNode) SendPingRaftMuLocked(to roachpb.ReplicaID) bool {
fmt.Fprintf(rn.b, " RaftNode.SendPingRaftMuLocked(%d)\n", to)
return true
}

Expand Down Expand Up @@ -252,8 +252,8 @@ func (c *testRangeController) AdmitRaftMuLocked(
fmt.Fprintf(c.b, " RangeController.AdmitRaftMuLocked(%s, %+v)\n", replicaID, av)
}

func (c *testRangeController) MaybeSendPingsLocked() {
fmt.Fprintf(c.b, " RangeController.MaybeSendPingsLocked()\n")
func (c *testRangeController) MaybeSendPingsRaftMuLocked() {
fmt.Fprintf(c.b, " RangeController.MaybeSendPingsRaftMuLocked()\n")
}

func (c *testRangeController) SetReplicasRaftMuLocked(
Expand Down
18 changes: 14 additions & 4 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,19 @@ import (

type raftNodeForRACv2 struct {
*raft.RawNode
r ReplicaForRaftNode
}

type ReplicaForRaftNode interface {
// MuLock acquires Replica.mu.
MuLock()
// MuUnlock releases Replica.mu.
MuUnlock()
}

// NewRaftNode creates a RaftNode implementation from the given RawNode.
func NewRaftNode(rn *raft.RawNode) RaftNode {
return raftNodeForRACv2{RawNode: rn}
func NewRaftNode(rn *raft.RawNode, r ReplicaForRaftNode) RaftNode {
return raftNodeForRACv2{RawNode: rn, r: r}
}

func (rn raftNodeForRACv2) TermLocked() uint64 {
Expand Down Expand Up @@ -55,8 +63,10 @@ func (rn raftNodeForRACv2) ReplicasStateLocked(
})
}

// SendPingReplicaMuLocked implements rac2.RaftInterface.
func (rn raftNodeForRACv2) SendPingReplicaMuLocked(to roachpb.ReplicaID) bool {
// SendPingRaftMuLocked implements rac2.RaftInterface.
func (rn raftNodeForRACv2) SendPingRaftMuLocked(to roachpb.ReplicaID) bool {
rn.r.MuLock()
defer rn.r.MuUnlock()
return rn.RawNode.SendPing(raftpb.PeerID(to))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error {
return err
}
r.mu.internalRaftGroup = rg
r.flowControlV2.InitRaftLocked(ctx, replica_rac2.NewRaftNode(rg))
r.flowControlV2.InitRaftLocked(ctx, replica_rac2.NewRaftNode(rg, (*replicaForRACv2)(r)))
return nil
}

Expand Down
Loading

0 comments on commit c56def7

Please sign in to comment.