From 5aa49ccbc91d8fa9199694a01b31901a49928d71 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Wed, 25 Sep 2024 12:10:29 -0400 Subject: [PATCH] rac2,replica_rac2: clarify locking for sending pings A consequence of Replica.mu being held when calling MaybeSendPingsLocked is that replicaSendStream.mu is ordered after Replica.mu. Epic: CRDB-37515 Release note: None --- .../kvflowcontrol/rac2/range_controller.go | 47 +++++++++++-------- .../rac2/range_controller_test.go | 2 +- .../kvflowcontrol/replica_rac2/processor.go | 26 +++++----- .../replica_rac2/processor_test.go | 8 ++-- .../kvflowcontrol/replica_rac2/raft_node.go | 3 +- pkg/kv/kvserver/replica_raft.go | 2 +- 6 files changed, 50 insertions(+), 38 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 46c5dab3606a..cde095c21d72 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -35,8 +35,9 @@ import ( // RangeController provides flow control for replication traffic in KV, for a // range at the leader. // -// None of the methods are called with Replica.mu held. The caller should -// typically order its mutexes before Replica.mu. +// Almost none of the methods are called with Replica.mu held. The caller and +// callee should typically order their mutexes before Replica.mu. The one +// exception is MaybeSendPingsLocked. type RangeController interface { // WaitForEval seeks admission to evaluate a request at the given priority. // This blocks until there are positive tokens available for the request to @@ -67,12 +68,12 @@ type RangeController interface { // // Requires replica.raftMu to be held. AdmitRaftMuLocked(context.Context, roachpb.ReplicaID, AdmittedVector) - // MaybeSendPingsRaftMuLocked sends a MsgApp ping to each raft peer in - // StateReplicate whose admitted vector is lagging, and there wasn't a recent - // MsgApp to this peer. + // MaybeSendPingsLocked sends a MsgApp ping to each raft peer in + // StateReplicate whose admitted vector is lagging, and there wasn't a + // recent MsgApp to this peer. // - // Requires replica.raftMu to be held. - MaybeSendPingsRaftMuLocked() + // Requires replica.raftMu and replica.mu to be held. + MaybeSendPingsLocked() // SetReplicasRaftMuLocked sets the replicas of the range. The caller will // never mutate replicas, and neither should the callee. // @@ -91,17 +92,23 @@ type RangeController interface { InspectRaftMuLocked(ctx context.Context) kvflowinspectpb.Handle } +// RaftInterface implements methods needed by RangeController. +// +// Locking reminder: as noted in replicaSendStream, replicaSendStream.mu is +// ordered after Replica.mu. +// // TODO(pav-kv): This interface a placeholder for the interface containing raft // methods. Replace this as part of #128019. type RaftInterface interface { - // SendPingRaftMuLocked sends a MsgApp ping to the given raft peer if there - // wasn't a recent MsgApp to this peer. The message is added to raft's message - // queue, and will be extracted and sent during the next Ready processing. + // SendPingReplicaMuLocked sends a MsgApp ping to the given raft peer if + // there wasn't a recent MsgApp to this peer. The message is added to raft's + // message queue, and will be extracted and sent during the next Ready + // processing. // // If the peer is not in StateReplicate, this call does nothing. // - // Requires Replica.raftMu to be held. - SendPingRaftMuLocked(roachpb.ReplicaID) bool + // Requires Replica.mu to be held. + SendPingReplicaMuLocked(roachpb.ReplicaID) bool } type ReplicaStateInfo struct { @@ -494,18 +501,16 @@ func (rc *rangeController) AdmitRaftMuLocked( } } -// MaybeSendPingsRaftMuLocked sends a MsgApp ping to each raft peer in -// StateReplicate whose admitted vector is lagging, and there wasn't a recent -// MsgApp to this peer. -// -// Requires replica.raftMu to be held. -func (rc *rangeController) MaybeSendPingsRaftMuLocked() { +// MaybeSendPingsLocked implements RangeController. +func (rc *rangeController) MaybeSendPingsLocked() { + // NB: Replica.mu is already held, so no IO is permitted. for id, state := range rc.replicaMap { if id == rc.opts.LocalReplicaID { continue } + // s.shouldPing acquires replicaSendStream.mu. if s := state.sendStream; s != nil && s.shouldPing() { - rc.opts.RaftInterface.SendPingRaftMuLocked(id) + rc.opts.RaftInterface.SendPingReplicaMuLocked(id) } } } @@ -708,6 +713,10 @@ func NewReplicaState( type replicaSendStream struct { parent *replicaState + // Because of MaybeSendPingsLocked being called with Replica.mu held, this + // mutex is ordered after Replica.mu. This ordering is a bit unfortunate, in + // that we need to ensure that we never need to acquire Replica.mu while + // holding replicaSendStream.mu. mu struct { syncutil.Mutex // connectedStateStart is the time when the connectedState was last diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index 00f476a1b528..5a5d2c35f255 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -310,7 +310,7 @@ func (r *testingRCRange) replicasStateInfo() map[roachpb.ReplicaID]ReplicaStateI return replicasStateInfo } -func (r *testingRCRange) SendPingRaftMuLocked(roachpb.ReplicaID) bool { +func (r *testingRCRange) SendPingReplicaMuLocked(roachpb.ReplicaID) bool { return false } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index 8d65c6a42164..e3b4d8a167a5 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -261,13 +261,14 @@ type SideChannelInfoUsingRaftMessageRequest struct { // We *strongly* prefer methods to be called without holding Replica.mu, since // then the callee (implementation of Processor) does not need to worry about // (a) deadlocks, since it sometimes needs to lock Replica.mu itself, (b) the -// amount of work it is doing under this critical section. The only exception is -// OnDescChangedLocked, where this was hard to achieve. +// amount of work it is doing under this critical section. There are three +// exceptions to this, due to difficulty in changing the calling code: +// InitRaftLocked, OnDescChangedLocked and MaybeSendPingsLocked. type Processor interface { // InitRaftLocked is called when RaftNode is initialized for the Replica. // NB: can be called twice before the Replica is fully initialized. // - // Both Replica mu and raftMu are held. + // Both Replica.raftMu and Replica.mu are held. InitRaftLocked(context.Context, RaftNode) // OnDestroyRaftMuLocked is called when the Replica is being destroyed. @@ -299,7 +300,7 @@ type Processor interface { // OnDescChangedLocked provides a possibly updated RangeDescriptor. The // tenantID passed in all calls must be the same. // - // Both Replica mu and raftMu are held. + // Both Replica.raftMu and Replica.mu are held. OnDescChangedLocked( ctx context.Context, desc *roachpb.RangeDescriptor, tenantID roachpb.TenantID) @@ -375,15 +376,15 @@ type Processor interface { // raftMu is held. AdmitRaftMuLocked(context.Context, roachpb.ReplicaID, rac2.AdmittedVector) - // MaybeSendPingsRaftMuLocked sends a MsgApp ping to each raft peer whose - // admitted vector is lagging, and there wasn't a recent MsgApp to this peer. - // The messages are added to raft's message queue, and will be extracted from + // MaybeSendPingsLocked sends a MsgApp ping to each raft peer whose admitted + // vector is lagging, and there wasn't a recent MsgApp to this peer. The + // messages are added to raft's message queue, and will be extracted from // raft and sent during the next Ready processing. // // If the replica is not the leader, this call does nothing. // - // raftMu is held. - MaybeSendPingsRaftMuLocked() + // Both Replica.raftMu and Replica.mu are held. + MaybeSendPingsLocked() // AdmitForEval is called to admit work that wants to evaluate at the // leaseholder. @@ -1130,11 +1131,12 @@ func (p *processorImpl) AdmitRaftMuLocked( } } -// MaybeSendPingsRaftMuLocked implements Processor. -func (p *processorImpl) MaybeSendPingsRaftMuLocked() { +// MaybeSendPingsLocked implements Processor. +func (p *processorImpl) MaybeSendPingsLocked() { p.opts.Replica.RaftMuAssertHeld() + p.opts.Replica.MuAssertHeld() if rc := p.leader.rc; rc != nil { - rc.MaybeSendPingsRaftMuLocked() + rc.MaybeSendPingsLocked() } } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 86640a137aa1..05b4584304a2 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -138,8 +138,8 @@ func (rn *testRaftNode) ReplicasStateLocked(_ map[roachpb.ReplicaID]rac2.Replica fmt.Fprint(rn.b, " RaftNode.ReplicasStateLocked\n") } -func (rn *testRaftNode) SendPingRaftMuLocked(to roachpb.ReplicaID) bool { - fmt.Fprintf(rn.b, " RaftNode.SendPingRaftMuLocked(%d)\n", to) +func (rn *testRaftNode) SendPingReplicaMuLocked(to roachpb.ReplicaID) bool { + fmt.Fprintf(rn.b, " RaftNode.SendPingReplicaMuLocked(%d)\n", to) return true } @@ -246,8 +246,8 @@ func (c *testRangeController) AdmitRaftMuLocked( fmt.Fprintf(c.b, " RangeController.AdmitRaftMuLocked(%s, %+v)\n", replicaID, av) } -func (c *testRangeController) MaybeSendPingsRaftMuLocked() { - fmt.Fprintf(c.b, " RangeController.MaybeSendPingsRaftMuLocked()\n") +func (c *testRangeController) MaybeSendPingsLocked() { + fmt.Fprintf(c.b, " RangeController.MaybeSendPingsLocked()\n") } func (c *testRangeController) SetReplicasRaftMuLocked( diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go index 4974ed665f53..30d7e6d960cb 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go @@ -55,6 +55,7 @@ func (rn raftNodeForRACv2) ReplicasStateLocked( }) } -func (rn raftNodeForRACv2) SendPingRaftMuLocked(to roachpb.ReplicaID) bool { +// SendPingReplicaMuLocked implements rac2.RaftInterface. +func (rn raftNodeForRACv2) SendPingReplicaMuLocked(to roachpb.ReplicaID) bool { return rn.RawNode.SendPing(raftpb.PeerID(to)) } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 22f9116cd623..e938ac73e690 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1448,7 +1448,7 @@ func (r *Replica) tick( // NB: since we are returning true below, there will be a Ready handling // immediately after this call, so any pings stashed in raft will be sent. - r.flowControlV2.MaybeSendPingsRaftMuLocked() + r.flowControlV2.MaybeSendPingsLocked() return true, nil }