diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index f4ecd015feef..ebefe53ae51e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -112,8 +112,57 @@ type RaftInterface interface { // // Requires Replica.mu to be held. SendPingReplicaMuLocked(roachpb.ReplicaID) bool + // MakeMsgAppRaftMuLocked is used to construct a MsgApp for entries in + // [start, end) and must only be called in MsgAppPull mode for followers. + // + // REQUIRES: + // - replicaID i, is in StateReplicate. + // - start == Next(i) + // - end <= NextUnstableIndex + // - maxSize > 0. + // + // If the sum of all entries in [start,end) are <= maxSize, all will be + // returned. Else, entries will be returned until, and including, the first + // entry that causes maxSize to be equaled or exceeded. This implies at + // least one entry will be returned in the MsgApp on success. + // + // Returns an error if log truncated, or there is some other transient + // problem. If no error, there is at least one entry in the message, and + // Next is advanced to be equal to the index+1 of the last entry in the + // returned message. + // + // Requires Replica.raftMu to be held. + // + // TODO(pav-kv): There are some rare non log truncation cases, where the + // flow stays in StateReplicate. We should define or eliminate these cases. + // + // TODO(sumeer): This is a temporary API. LogSnapshot and LogSlice will + // replace it. + MakeMsgAppRaftMuLocked( + replicaID roachpb.ReplicaID, start, end uint64, maxSize int64) (raftpb.Message, error) } +// RaftMsgAppMode specifies how Raft (at the leader) generates MsgApps. In +// both modes, Raft knows that (Match(i), Next(i)) are in-flight for a +// follower i. +type RaftMsgAppMode uint8 + +const ( + // MsgAppPush is the classic way in which Raft operates, where the Ready + // includes MsgApps for followers. We want to preserve this mode for now due + // to confidence in its performance characteristics, and to lower the risk + // of a bug in replication flow control affecting MsgApps. In this mode Raft + // is responsible for flow control, i.e., deciding when to send the entries + // in [Next(i),NextUnstableIndex), to follower i. + MsgAppPush RaftMsgAppMode = iota + // MsgAppPull is the way in which Raft operates when the RangeController is + // using send tokens to pace sending of work to a follower. The MsgApps are + // generated by calling a method on RaftInterface, and Raft's flow control + // is disabled. That is, the caller asks Raft to generate MsgApps for a + // prefix of [Next(i),NextUnstableIndex), for follower i. + MsgAppPull +) + type ReplicaStateInfo struct { State tracker.StateType @@ -125,6 +174,8 @@ type ReplicaStateInfo struct { // RaftEvent carries a RACv2-relevant subset of raft state sent to storage. type RaftEvent struct { + // MsgAppMode is the current mode. This is only relevant on the leader. + MsgAppMode RaftMsgAppMode // Term is the leader term on whose behalf the entries or snapshot are // written. Note that it may be behind the raft node's current term. Not // populated if Entries is empty and Snap is nil. @@ -171,17 +222,19 @@ type RaftEvent struct { // msgAppScratch is used as the map in RaftEvent.MsgApps. Returns the zero // value if the MsgStorageAppend is empty and there are no MsgApps. func RaftEventFromMsgStorageAppendAndMsgApps( + mode RaftMsgAppMode, replicaID roachpb.ReplicaID, appendMsg raftpb.Message, outboundMsgs []raftpb.Message, msgAppScratch map[roachpb.ReplicaID][]raftpb.Message, ) RaftEvent { - var event RaftEvent + event := RaftEvent{MsgAppMode: mode} if appendMsg.Type == raftpb.MsgStorageAppend { event = RaftEvent{ - Term: appendMsg.LogTerm, - Snap: appendMsg.Snapshot, - Entries: appendMsg.Entries, + MsgAppMode: event.MsgAppMode, + Term: appendMsg.LogTerm, + Snap: appendMsg.Snapshot, + Entries: appendMsg.Entries, } } if len(outboundMsgs) == 0 { diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index 4d2c5f31aabb..36e55ef9d874 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -323,6 +323,12 @@ func (r *testingRCRange) SendPingReplicaMuLocked(roachpb.ReplicaID) bool { return false } +func (r *testingRCRange) MakeMsgAppRaftMuLocked( + replicaID roachpb.ReplicaID, start, end uint64, maxSize int64, +) (raftpb.Message, error) { + panic("unimplemented") +} + func (r *testingRCRange) startWaitForEval(name string, pri admissionpb.WorkPriority) { r.mu.Lock() defer r.mu.Unlock() @@ -1278,17 +1284,20 @@ func TestRaftEventFromMsgStorageAppendAndMsgAppsBasic(t *testing.T) { msgAppScratch := map[roachpb.ReplicaID][]raftpb.Message{} // No outbound msgs. - event := RaftEventFromMsgStorageAppendAndMsgApps(20, appendMsg, nil, msgAppScratch) + event := RaftEventFromMsgStorageAppendAndMsgApps( + MsgAppPush, 20, appendMsg, nil, msgAppScratch) require.Equal(t, uint64(10), event.Term) require.Equal(t, appendMsg.Snapshot, event.Snap) require.Equal(t, appendMsg.Entries, event.Entries) require.Nil(t, event.MsgApps) // Zero value. - event = RaftEventFromMsgStorageAppendAndMsgApps(20, raftpb.Message{}, nil, msgAppScratch) + event = RaftEventFromMsgStorageAppendAndMsgApps( + MsgAppPush, 20, raftpb.Message{}, nil, msgAppScratch) require.Equal(t, RaftEvent{}, event) // Outbound msgs contains no MsgApps for a follower, since the only MsgApp // is for the leader. - event = RaftEventFromMsgStorageAppendAndMsgApps(20, appendMsg, outboundMsgs[:2], msgAppScratch) + event = RaftEventFromMsgStorageAppendAndMsgApps( + MsgAppPush, 20, appendMsg, outboundMsgs[:2], msgAppScratch) require.Equal(t, uint64(10), event.Term) require.Equal(t, appendMsg.Snapshot, event.Snap) require.Equal(t, appendMsg.Entries, event.Entries) @@ -1296,7 +1305,8 @@ func TestRaftEventFromMsgStorageAppendAndMsgAppsBasic(t *testing.T) { // Outbound msgs contains MsgApps for followers. We call this twice to // ensure msgAppScratch is cleared before reuse. for i := 0; i < 2; i++ { - event = RaftEventFromMsgStorageAppendAndMsgApps(19, appendMsg, outboundMsgs, msgAppScratch) + event = RaftEventFromMsgStorageAppendAndMsgApps( + MsgAppPush, 19, appendMsg, outboundMsgs, msgAppScratch) require.Equal(t, uint64(10), event.Term) require.Equal(t, appendMsg.Snapshot, event.Snap) require.Equal(t, appendMsg.Entries, event.Entries) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/simulation_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/simulation_test.go index b36726276978..600a8fbc8bec 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/simulation_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/simulation_test.go @@ -1106,6 +1106,7 @@ func (r *testingRCRange) testingDeductTokens( r.mu.r.replicaSet[k] = testR } require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{ + MsgAppMode: MsgAppPush, ReplicasStateInfo: r.replicasStateInfo(), Term: r.mu.quorumPosition.Term, Entries: []raftpb.Entry{entry}, diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 05b4584304a2..bc67a150b0b4 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -143,6 +143,12 @@ func (rn *testRaftNode) SendPingReplicaMuLocked(to roachpb.ReplicaID) bool { return true } +func (rn *testRaftNode) MakeMsgAppRaftMuLocked( + replicaID roachpb.ReplicaID, start, end uint64, maxSize int64, +) (raftpb.Message, error) { + panic("unimplemented") +} + func (rn *testRaftNode) setMark(t *testing.T, mark rac2.LogMark) { require.True(t, mark.After(rn.mark)) rn.mark = mark diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go index 30d7e6d960cb..e5236f880d55 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go @@ -59,3 +59,10 @@ func (rn raftNodeForRACv2) ReplicasStateLocked( func (rn raftNodeForRACv2) SendPingReplicaMuLocked(to roachpb.ReplicaID) bool { return rn.RawNode.SendPing(raftpb.PeerID(to)) } + +// MakeMsgAppRaftMuLocked implements rac2.RaftInterface. +func (rn raftNodeForRACv2) MakeMsgAppRaftMuLocked( + replicaID roachpb.ReplicaID, start, end uint64, maxSize int64, +) (raftpb.Message, error) { + panic("unimplemented") +} diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index e6307812a91b..29956835835c 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -934,7 +934,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // Even if we don't have a Ready, or entries in Ready, // replica_rac2.Processor may need to do some work. raftEvent := rac2.RaftEventFromMsgStorageAppendAndMsgApps( - r.ReplicaID(), msgStorageAppend, outboundMsgs, r.raftMu.msgAppScratchForFlowControl) + rac2.MsgAppPush, r.ReplicaID(), msgStorageAppend, outboundMsgs, + r.raftMu.msgAppScratchForFlowControl) r.flowControlV2.HandleRaftReadyRaftMuLocked(ctx, raftEvent) if !hasReady { // We must update the proposal quota even if we don't have a ready.