Skip to content

Commit

Permalink
Merge #131360
Browse files Browse the repository at this point in the history
131360: rac2,replica_rac2: clarify locking for sending pings r=sumeerbhola a=sumeerbhola

A consequence of Replica.mu being held when calling MaybeSendPingsLocked is that replicaSendStream.mu is ordered after Replica.mu.

Epic: CRDB-37515

Release note: None

Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
craig[bot] and sumeerbhola committed Sep 26, 2024
2 parents b0098d9 + 5aa49cc commit bec01ef
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 38 deletions.
47 changes: 28 additions & 19 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ import (
// range at the leader. It must be created for a particular leader term, and
// closed if the term changes.
//
// None of the methods are called with Replica.mu held. The caller should
// typically order its mutexes before Replica.mu.
// Almost none of the methods are called with Replica.mu held. The caller and
// callee should typically order their mutexes before Replica.mu. The one
// exception is MaybeSendPingsLocked.
type RangeController interface {
// WaitForEval seeks admission to evaluate a request at the given priority.
// This blocks until there are positive tokens available for the request to
Expand Down Expand Up @@ -70,12 +71,12 @@ type RangeController interface {
//
// Requires replica.raftMu to be held.
AdmitRaftMuLocked(context.Context, roachpb.ReplicaID, AdmittedVector)
// MaybeSendPingsRaftMuLocked sends a MsgApp ping to each raft peer in
// StateReplicate whose admitted vector is lagging, and there wasn't a recent
// MsgApp to this peer.
// MaybeSendPingsLocked sends a MsgApp ping to each raft peer in
// StateReplicate whose admitted vector is lagging, and there wasn't a
// recent MsgApp to this peer.
//
// Requires replica.raftMu to be held.
MaybeSendPingsRaftMuLocked()
// Requires replica.raftMu and replica.mu to be held.
MaybeSendPingsLocked()
// SetReplicasRaftMuLocked sets the replicas of the range. The caller will
// never mutate replicas, and neither should the callee.
//
Expand All @@ -94,17 +95,23 @@ type RangeController interface {
InspectRaftMuLocked(ctx context.Context) kvflowinspectpb.Handle
}

// RaftInterface implements methods needed by RangeController.
//
// Locking reminder: as noted in replicaSendStream, replicaSendStream.mu is
// ordered after Replica.mu.
//
// TODO(pav-kv): This interface a placeholder for the interface containing raft
// methods. Replace this as part of #128019.
type RaftInterface interface {
// SendPingRaftMuLocked sends a MsgApp ping to the given raft peer if there
// wasn't a recent MsgApp to this peer. The message is added to raft's message
// queue, and will be extracted and sent during the next Ready processing.
// SendPingReplicaMuLocked sends a MsgApp ping to the given raft peer if
// there wasn't a recent MsgApp to this peer. The message is added to raft's
// message queue, and will be extracted and sent during the next Ready
// processing.
//
// If the peer is not in StateReplicate, this call does nothing.
//
// Requires Replica.raftMu to be held.
SendPingRaftMuLocked(roachpb.ReplicaID) bool
// Requires Replica.mu to be held.
SendPingReplicaMuLocked(roachpb.ReplicaID) bool
}

type ReplicaStateInfo struct {
Expand Down Expand Up @@ -701,18 +708,16 @@ func (rc *rangeController) AdmitRaftMuLocked(
}
}

// MaybeSendPingsRaftMuLocked sends a MsgApp ping to each raft peer in
// StateReplicate whose admitted vector is lagging, and there wasn't a recent
// MsgApp to this peer.
//
// Requires replica.raftMu to be held.
func (rc *rangeController) MaybeSendPingsRaftMuLocked() {
// MaybeSendPingsLocked implements RangeController.
func (rc *rangeController) MaybeSendPingsLocked() {
// NB: Replica.mu is already held, so no IO is permitted.
for id, state := range rc.replicaMap {
if id == rc.opts.LocalReplicaID {
continue
}
// s.shouldPing acquires replicaSendStream.mu.
if s := state.sendStream; s != nil && s.shouldPing() {
rc.opts.RaftInterface.SendPingRaftMuLocked(id)
rc.opts.RaftInterface.SendPingReplicaMuLocked(id)
}
}
}
Expand Down Expand Up @@ -918,6 +923,10 @@ func NewReplicaState(
type replicaSendStream struct {
parent *replicaState

// Because of MaybeSendPingsLocked being called with Replica.mu held, this
// mutex is ordered after Replica.mu. This ordering is a bit unfortunate, in
// that we need to ensure that we never need to acquire Replica.mu while
// holding replicaSendStream.mu.
mu struct {
syncutil.Mutex
// connectedStateStart is the time when the connectedState was last
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (r *testingRCRange) replicasStateInfo() map[roachpb.ReplicaID]ReplicaStateI
return replicasStateInfo
}

func (r *testingRCRange) SendPingRaftMuLocked(roachpb.ReplicaID) bool {
func (r *testingRCRange) SendPingReplicaMuLocked(roachpb.ReplicaID) bool {
return false
}

Expand Down
26 changes: 14 additions & 12 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,14 @@ type SideChannelInfoUsingRaftMessageRequest struct {
// We *strongly* prefer methods to be called without holding Replica.mu, since
// then the callee (implementation of Processor) does not need to worry about
// (a) deadlocks, since it sometimes needs to lock Replica.mu itself, (b) the
// amount of work it is doing under this critical section. The only exception is
// OnDescChangedLocked, where this was hard to achieve.
// amount of work it is doing under this critical section. There are three
// exceptions to this, due to difficulty in changing the calling code:
// InitRaftLocked, OnDescChangedLocked and MaybeSendPingsLocked.
type Processor interface {
// InitRaftLocked is called when RaftNode is initialized for the Replica.
// NB: can be called twice before the Replica is fully initialized.
//
// Both Replica mu and raftMu are held.
// Both Replica.raftMu and Replica.mu are held.
InitRaftLocked(context.Context, RaftNode)

// OnDestroyRaftMuLocked is called when the Replica is being destroyed.
Expand Down Expand Up @@ -299,7 +300,7 @@ type Processor interface {
// OnDescChangedLocked provides a possibly updated RangeDescriptor. The
// tenantID passed in all calls must be the same.
//
// Both Replica mu and raftMu are held.
// Both Replica.raftMu and Replica.mu are held.
OnDescChangedLocked(
ctx context.Context, desc *roachpb.RangeDescriptor, tenantID roachpb.TenantID)

Expand Down Expand Up @@ -375,15 +376,15 @@ type Processor interface {
// raftMu is held.
AdmitRaftMuLocked(context.Context, roachpb.ReplicaID, rac2.AdmittedVector)

// MaybeSendPingsRaftMuLocked sends a MsgApp ping to each raft peer whose
// admitted vector is lagging, and there wasn't a recent MsgApp to this peer.
// The messages are added to raft's message queue, and will be extracted from
// MaybeSendPingsLocked sends a MsgApp ping to each raft peer whose admitted
// vector is lagging, and there wasn't a recent MsgApp to this peer. The
// messages are added to raft's message queue, and will be extracted from
// raft and sent during the next Ready processing.
//
// If the replica is not the leader, this call does nothing.
//
// raftMu is held.
MaybeSendPingsRaftMuLocked()
// Both Replica.raftMu and Replica.mu are held.
MaybeSendPingsLocked()

// AdmitForEval is called to admit work that wants to evaluate at the
// leaseholder.
Expand Down Expand Up @@ -1130,11 +1131,12 @@ func (p *processorImpl) AdmitRaftMuLocked(
}
}

// MaybeSendPingsRaftMuLocked implements Processor.
func (p *processorImpl) MaybeSendPingsRaftMuLocked() {
// MaybeSendPingsLocked implements Processor.
func (p *processorImpl) MaybeSendPingsLocked() {
p.opts.Replica.RaftMuAssertHeld()
p.opts.Replica.MuAssertHeld()
if rc := p.leader.rc; rc != nil {
rc.MaybeSendPingsRaftMuLocked()
rc.MaybeSendPingsLocked()
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ func (rn *testRaftNode) ReplicasStateLocked(_ map[roachpb.ReplicaID]rac2.Replica
fmt.Fprint(rn.b, " RaftNode.ReplicasStateLocked\n")
}

func (rn *testRaftNode) SendPingRaftMuLocked(to roachpb.ReplicaID) bool {
fmt.Fprintf(rn.b, " RaftNode.SendPingRaftMuLocked(%d)\n", to)
func (rn *testRaftNode) SendPingReplicaMuLocked(to roachpb.ReplicaID) bool {
fmt.Fprintf(rn.b, " RaftNode.SendPingReplicaMuLocked(%d)\n", to)
return true
}

Expand Down Expand Up @@ -246,8 +246,8 @@ func (c *testRangeController) AdmitRaftMuLocked(
fmt.Fprintf(c.b, " RangeController.AdmitRaftMuLocked(%s, %+v)\n", replicaID, av)
}

func (c *testRangeController) MaybeSendPingsRaftMuLocked() {
fmt.Fprintf(c.b, " RangeController.MaybeSendPingsRaftMuLocked()\n")
func (c *testRangeController) MaybeSendPingsLocked() {
fmt.Fprintf(c.b, " RangeController.MaybeSendPingsLocked()\n")
}

func (c *testRangeController) SetReplicasRaftMuLocked(
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (rn raftNodeForRACv2) ReplicasStateLocked(
})
}

func (rn raftNodeForRACv2) SendPingRaftMuLocked(to roachpb.ReplicaID) bool {
// SendPingReplicaMuLocked implements rac2.RaftInterface.
func (rn raftNodeForRACv2) SendPingReplicaMuLocked(to roachpb.ReplicaID) bool {
return rn.RawNode.SendPing(raftpb.PeerID(to))
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1448,7 +1448,7 @@ func (r *Replica) tick(

// NB: since we are returning true below, there will be a Ready handling
// immediately after this call, so any pings stashed in raft will be sent.
r.flowControlV2.MaybeSendPingsRaftMuLocked()
r.flowControlV2.MaybeSendPingsLocked()
return true, nil
}

Expand Down

0 comments on commit bec01ef

Please sign in to comment.