Skip to content

Commit

Permalink
Merge #131413
Browse files Browse the repository at this point in the history
131413: rac2,replica_rac2: add interface changes for pull mode r=kvoli a=sumeerbhola

Informs #130433

Epic: CRDB-37515

Release note: None

Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
craig[bot] and sumeerbhola committed Sep 26, 2024
2 parents 72ad9e1 + c18599a commit a9a171f
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 9 deletions.
61 changes: 57 additions & 4 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,57 @@ type RaftInterface interface {
//
// Requires Replica.mu to be held.
SendPingReplicaMuLocked(roachpb.ReplicaID) bool
// MakeMsgAppRaftMuLocked is used to construct a MsgApp for entries in
// [start, end) and must only be called in MsgAppPull mode for followers.
//
// REQUIRES:
// - replicaID i, is in StateReplicate.
// - start == Next(i)
// - end <= NextUnstableIndex
// - maxSize > 0.
//
// If the sum of all entries in [start,end) are <= maxSize, all will be
// returned. Else, entries will be returned until, and including, the first
// entry that causes maxSize to be equaled or exceeded. This implies at
// least one entry will be returned in the MsgApp on success.
//
// Returns an error if log truncated, or there is some other transient
// problem. If no error, there is at least one entry in the message, and
// Next is advanced to be equal to the index+1 of the last entry in the
// returned message.
//
// Requires Replica.raftMu to be held.
//
// TODO(pav-kv): There are some rare non log truncation cases, where the
// flow stays in StateReplicate. We should define or eliminate these cases.
//
// TODO(sumeer): This is a temporary API. LogSnapshot and LogSlice will
// replace it.
MakeMsgAppRaftMuLocked(
replicaID roachpb.ReplicaID, start, end uint64, maxSize int64) (raftpb.Message, error)
}

// RaftMsgAppMode specifies how Raft (at the leader) generates MsgApps. In
// both modes, Raft knows that (Match(i), Next(i)) are in-flight for a
// follower i.
type RaftMsgAppMode uint8

const (
// MsgAppPush is the classic way in which Raft operates, where the Ready
// includes MsgApps for followers. We want to preserve this mode for now due
// to confidence in its performance characteristics, and to lower the risk
// of a bug in replication flow control affecting MsgApps. In this mode Raft
// is responsible for flow control, i.e., deciding when to send the entries
// in [Next(i),NextUnstableIndex), to follower i.
MsgAppPush RaftMsgAppMode = iota
// MsgAppPull is the way in which Raft operates when the RangeController is
// using send tokens to pace sending of work to a follower. The MsgApps are
// generated by calling a method on RaftInterface, and Raft's flow control
// is disabled. That is, the caller asks Raft to generate MsgApps for a
// prefix of [Next(i),NextUnstableIndex), for follower i.
MsgAppPull
)

type ReplicaStateInfo struct {
State tracker.StateType

Expand All @@ -125,6 +174,8 @@ type ReplicaStateInfo struct {

// RaftEvent carries a RACv2-relevant subset of raft state sent to storage.
type RaftEvent struct {
// MsgAppMode is the current mode. This is only relevant on the leader.
MsgAppMode RaftMsgAppMode
// Term is the leader term on whose behalf the entries or snapshot are
// written. Note that it may be behind the raft node's current term. Not
// populated if Entries is empty and Snap is nil.
Expand Down Expand Up @@ -171,17 +222,19 @@ type RaftEvent struct {
// msgAppScratch is used as the map in RaftEvent.MsgApps. Returns the zero
// value if the MsgStorageAppend is empty and there are no MsgApps.
func RaftEventFromMsgStorageAppendAndMsgApps(
mode RaftMsgAppMode,
replicaID roachpb.ReplicaID,
appendMsg raftpb.Message,
outboundMsgs []raftpb.Message,
msgAppScratch map[roachpb.ReplicaID][]raftpb.Message,
) RaftEvent {
var event RaftEvent
event := RaftEvent{MsgAppMode: mode}
if appendMsg.Type == raftpb.MsgStorageAppend {
event = RaftEvent{
Term: appendMsg.LogTerm,
Snap: appendMsg.Snapshot,
Entries: appendMsg.Entries,
MsgAppMode: event.MsgAppMode,
Term: appendMsg.LogTerm,
Snap: appendMsg.Snapshot,
Entries: appendMsg.Entries,
}
}
if len(outboundMsgs) == 0 {
Expand Down
18 changes: 14 additions & 4 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,12 @@ func (r *testingRCRange) SendPingReplicaMuLocked(roachpb.ReplicaID) bool {
return false
}

func (r *testingRCRange) MakeMsgAppRaftMuLocked(
replicaID roachpb.ReplicaID, start, end uint64, maxSize int64,
) (raftpb.Message, error) {
panic("unimplemented")
}

func (r *testingRCRange) startWaitForEval(name string, pri admissionpb.WorkPriority) {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down Expand Up @@ -1278,25 +1284,29 @@ func TestRaftEventFromMsgStorageAppendAndMsgAppsBasic(t *testing.T) {
msgAppScratch := map[roachpb.ReplicaID][]raftpb.Message{}

// No outbound msgs.
event := RaftEventFromMsgStorageAppendAndMsgApps(20, appendMsg, nil, msgAppScratch)
event := RaftEventFromMsgStorageAppendAndMsgApps(
MsgAppPush, 20, appendMsg, nil, msgAppScratch)
require.Equal(t, uint64(10), event.Term)
require.Equal(t, appendMsg.Snapshot, event.Snap)
require.Equal(t, appendMsg.Entries, event.Entries)
require.Nil(t, event.MsgApps)
// Zero value.
event = RaftEventFromMsgStorageAppendAndMsgApps(20, raftpb.Message{}, nil, msgAppScratch)
event = RaftEventFromMsgStorageAppendAndMsgApps(
MsgAppPush, 20, raftpb.Message{}, nil, msgAppScratch)
require.Equal(t, RaftEvent{}, event)
// Outbound msgs contains no MsgApps for a follower, since the only MsgApp
// is for the leader.
event = RaftEventFromMsgStorageAppendAndMsgApps(20, appendMsg, outboundMsgs[:2], msgAppScratch)
event = RaftEventFromMsgStorageAppendAndMsgApps(
MsgAppPush, 20, appendMsg, outboundMsgs[:2], msgAppScratch)
require.Equal(t, uint64(10), event.Term)
require.Equal(t, appendMsg.Snapshot, event.Snap)
require.Equal(t, appendMsg.Entries, event.Entries)
require.Nil(t, event.MsgApps)
// Outbound msgs contains MsgApps for followers. We call this twice to
// ensure msgAppScratch is cleared before reuse.
for i := 0; i < 2; i++ {
event = RaftEventFromMsgStorageAppendAndMsgApps(19, appendMsg, outboundMsgs, msgAppScratch)
event = RaftEventFromMsgStorageAppendAndMsgApps(
MsgAppPush, 19, appendMsg, outboundMsgs, msgAppScratch)
require.Equal(t, uint64(10), event.Term)
require.Equal(t, appendMsg.Snapshot, event.Snap)
require.Equal(t, appendMsg.Entries, event.Entries)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,7 @@ func (r *testingRCRange) testingDeductTokens(
r.mu.r.replicaSet[k] = testR
}
require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{
MsgAppMode: MsgAppPush,
ReplicasStateInfo: r.replicasStateInfo(),
Term: r.mu.quorumPosition.Term,
Entries: []raftpb.Entry{entry},
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ func (rn *testRaftNode) SendPingReplicaMuLocked(to roachpb.ReplicaID) bool {
return true
}

func (rn *testRaftNode) MakeMsgAppRaftMuLocked(
replicaID roachpb.ReplicaID, start, end uint64, maxSize int64,
) (raftpb.Message, error) {
panic("unimplemented")
}

func (rn *testRaftNode) setMark(t *testing.T, mark rac2.LogMark) {
require.True(t, mark.After(rn.mark))
rn.mark = mark
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,10 @@ func (rn raftNodeForRACv2) ReplicasStateLocked(
func (rn raftNodeForRACv2) SendPingReplicaMuLocked(to roachpb.ReplicaID) bool {
return rn.RawNode.SendPing(raftpb.PeerID(to))
}

// MakeMsgAppRaftMuLocked implements rac2.RaftInterface.
func (rn raftNodeForRACv2) MakeMsgAppRaftMuLocked(
replicaID roachpb.ReplicaID, start, end uint64, maxSize int64,
) (raftpb.Message, error) {
panic("unimplemented")
}
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// Even if we don't have a Ready, or entries in Ready,
// replica_rac2.Processor may need to do some work.
raftEvent := rac2.RaftEventFromMsgStorageAppendAndMsgApps(
r.ReplicaID(), msgStorageAppend, outboundMsgs, r.raftMu.msgAppScratchForFlowControl)
rac2.MsgAppPush, r.ReplicaID(), msgStorageAppend, outboundMsgs,
r.raftMu.msgAppScratchForFlowControl)
r.flowControlV2.HandleRaftReadyRaftMuLocked(ctx, raftEvent)
if !hasReady {
// We must update the proposal quota even if we don't have a ready.
Expand Down

0 comments on commit a9a171f

Please sign in to comment.