Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: add rac2 v1 integration tests #130728

Merged
merged 8 commits into from
Sep 25, 2024
3,054 changes: 2,698 additions & 356 deletions pkg/kv/kvserver/flow_control_integration_test.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (h *Handle) deductTokensForInner(
}

if fn := h.knobs.OverrideTokenDeduction; fn != nil {
tokens = fn()
tokens = fn(tokens)
}

for _, c := range h.mu.connections {
Expand Down
26 changes: 9 additions & 17 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,6 @@ type RangeController interface {
// TODO(pav-kv): This interface a placeholder for the interface containing raft
// methods. Replace this as part of #128019.
type RaftInterface interface {
// FollowerStateRaftMuLocked returns the current state of a follower. The
// value of Match, Next are populated iff in StateReplicate. All entries >=
// Next have not had MsgApps constructed during the lifetime of this
// StateReplicate (they may have been constructed previously).
//
// When a follower transitions from {StateProbe,StateSnapshot} =>
// StateReplicate, we start trying to send MsgApps. We should
// notice such transitions both in HandleRaftEvent and
// SetReplicasRaftMuLocked.
//
// Requires Replica.raftMu to be held, Replica.mu is not held.
FollowerStateRaftMuLocked(roachpb.ReplicaID) FollowerStateInfo
// SendPingRaftMuLocked sends a MsgApp ping to the given raft peer if there
// wasn't a recent MsgApp to this peer. The message is added to raft's message
// queue, and will be extracted and sent during the next Ready processing.
Expand All @@ -116,7 +104,7 @@ type RaftInterface interface {
SendPingRaftMuLocked(roachpb.ReplicaID) bool
}

type FollowerStateInfo struct {
type ReplicaStateInfo struct {
State tracker.StateType

// Remaining only populated in StateReplicate.
Expand Down Expand Up @@ -160,6 +148,11 @@ type RaftEvent struct {
// A key can map to an empty slice, in order to reuse already allocated
// slice memory.
MsgApps map[roachpb.ReplicaID][]raftpb.Message
// ReplicasStateInfo contains the state of all replicas. This is used to
// determine if the state of a replica has changed, and if so, to update the
// flow control state. It also informs the RangeController of a replica's
// Match and Next.
ReplicasStateInfo map[roachpb.ReplicaID]ReplicaStateInfo
}

// RaftEventFromMsgStorageAppendAndMsgApps constructs a RaftEvent from the
Expand Down Expand Up @@ -460,8 +453,7 @@ retry:
func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e RaftEvent) error {
shouldWaitChange := false
for r, rs := range rc.replicaMap {
info := rc.opts.RaftInterface.FollowerStateRaftMuLocked(r)
shouldWaitChange = rs.handleReadyState(ctx, info) || shouldWaitChange
shouldWaitChange = rs.handleReadyState(ctx, e.ReplicasStateInfo[r]) || shouldWaitChange
}
// If there was a quorum change, update the voter sets, triggering the
// refresh channel for any requests waiting for eval tokens.
Expand Down Expand Up @@ -810,7 +802,7 @@ func (rs *replicaState) handleReadyEntries(ctx context.Context, entries []entryF
}
tokens := entry.tokens
if fn := rs.parent.opts.Knobs.OverrideTokenDeduction; fn != nil {
tokens = fn()
tokens = fn(tokens)
}
rs.sendStream.mu.tracker.Track(ctx, entry.term, entry.index, entry.pri, tokens)
rs.evalTokenCounter.Deduct(
Expand All @@ -824,7 +816,7 @@ func (rs *replicaState) handleReadyEntries(ctx context.Context, entries []entryF
// provided follower state information. If the state changes in a way that
// affects requests waiting for evaluation, returns true.
func (rs *replicaState) handleReadyState(
ctx context.Context, info FollowerStateInfo,
ctx context.Context, info ReplicaStateInfo,
) (shouldWaitChange bool) {
switch info.State {
case tracker.StateProbe:
Expand Down
30 changes: 18 additions & 12 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ func (s *testingRCState) getOrInitRange(t *testing.T, r testingRange) *testingRC
TenantID: r.tenantID,
LocalReplicaID: r.localReplicaID,
SSTokenCounter: s.ssTokenCounter,
RaftInterface: testRC,
Clock: s.clock,
CloseTimerScheduler: s.probeToCloseScheduler,
EvalWaitMetrics: s.evalMetrics,
Expand All @@ -258,7 +257,7 @@ func (s *testingRCState) getOrInitRange(t *testing.T, r testingRange) *testingRC
s.maybeSetInitialTokens(r)
// Send through an empty raft event to trigger creating necessary replica
// send streams for the range.
require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(s.testCtx, RaftEvent{}))
require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(s.testCtx, testRC.makeRaftEventWithReplicasState()))
return testRC
}

Expand Down Expand Up @@ -294,15 +293,21 @@ type testingRCRange struct {
}
}

func (r *testingRCRange) FollowerStateRaftMuLocked(replicaID roachpb.ReplicaID) FollowerStateInfo {
func (r *testingRCRange) makeRaftEventWithReplicasState() RaftEvent {
return RaftEvent{
ReplicasStateInfo: r.replicasStateInfo(),
}
}

func (r *testingRCRange) replicasStateInfo() map[roachpb.ReplicaID]ReplicaStateInfo {
r.mu.Lock()
defer r.mu.Unlock()

replica, ok := r.mu.r.replicaSet[replicaID]
if !ok {
return FollowerStateInfo{}
replicasStateInfo := map[roachpb.ReplicaID]ReplicaStateInfo{}
for _, replica := range r.mu.r.replicaSet {
replicasStateInfo[replica.desc.ReplicaID] = replica.info
}
return replica.info
return replicasStateInfo
}

func (r *testingRCRange) SendPingRaftMuLocked(roachpb.ReplicaID) bool {
Expand Down Expand Up @@ -386,7 +391,7 @@ const invalidTrackerState = tracker.StateSnapshot + 1

type testingReplica struct {
desc roachpb.ReplicaDescriptor
info FollowerStateInfo
info ReplicaStateInfo
}

func scanRanges(t *testing.T, input string) []testingRange {
Expand Down Expand Up @@ -503,7 +508,7 @@ func scanReplica(t *testing.T, line string) testingReplica {
ReplicaID: roachpb.ReplicaID(replicaID),
Type: replicaType,
},
info: FollowerStateInfo{State: state},
info: ReplicaStateInfo{State: state},
}
}

Expand Down Expand Up @@ -597,7 +602,7 @@ func (t *testingProbeToCloseTimerScheduler) ScheduleSendStreamCloseRaftMuLocked(
}
timer.MarkRead()
require.NoError(t.state.t,
t.state.ranges[rangeID].rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{}))
t.state.ranges[rangeID].rc.HandleRaftEventRaftMuLocked(ctx, t.state.ranges[rangeID].makeRaftEventWithReplicasState()))
}()
}

Expand Down Expand Up @@ -791,7 +796,7 @@ func TestRangeController(t *testing.T) {
require.NoError(t, testRC.rc.SetReplicasRaftMuLocked(ctx, r.replicas()))
// Send an empty raft event in order to trigger any potential
// connectedState changes.
require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{}))
require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(ctx, testRC.makeRaftEventWithReplicasState()))
}
// Sleep for a bit to allow any timers to fire.
time.Sleep(20 * time.Millisecond)
Expand Down Expand Up @@ -877,7 +882,8 @@ func TestRangeController(t *testing.T) {

propRangeEntries := func() {
event := RaftEvent{
Entries: make([]raftpb.Entry, len(buf)),
Entries: make([]raftpb.Entry, len(buf)),
ReplicasStateInfo: state.ranges[lastRangeID].replicasStateInfo(),
}
for i, state := range buf {
event.Entries[i] = testingCreateEntry(t, state)
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/kvflowcontrol/rac2/simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,8 @@ func (r *testingRCRange) testingDeductTokens(
r.mu.quorumPosition.Index++

require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{
Term: r.mu.quorumPosition.Term,
ReplicasStateInfo: r.replicasStateInfo(),
Term: r.mu.quorumPosition.Term,
Entries: []raftpb.Entry{testingCreateEntry(t, entryInfo{
term: r.mu.quorumPosition.Term,
index: r.mu.quorumPosition.Index,
Expand Down Expand Up @@ -1161,7 +1162,7 @@ func (r *testingRCRange) testingReturnTokens(
repl.info.Next = r.mu.quorumPosition.Index + 1
r.mu.r.replicaSet[rid] = repl
r.rc.AdmitRaftMuLocked(ctx, rs.desc.ReplicaID, av)
require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{}))
require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, r.makeRaftEventWithReplicasState()))
}
}

Expand Down Expand Up @@ -1198,15 +1199,15 @@ func (r *testingRCRange) testingConnectStream(
ReplicaID: roachpb.ReplicaID(stream.StoreID),
Type: roachpb.VOTER_FULL,
},
info: FollowerStateInfo{
info: ReplicaStateInfo{
State: tracker.StateReplicate,
Match: position.Index,
Next: position.Index + 1,
},
}
// Send an empty raft event in order to trigger any state changes.
require.NoError(t, r.rc.SetReplicasRaftMuLocked(ctx, r.mu.r.replicas()))
require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{}))
require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, r.makeRaftEventWithReplicasState()))
}

// testingDisconnectStream changes the tracker state of a given stream's
Expand All @@ -1223,7 +1224,7 @@ func (r *testingRCRange) testingDisconnectStream(
rs.info.State = tracker.StateSnapshot
r.mu.r.replicaSet[rid] = rs
// Send an empty raft event in order to trigger any state changes.
require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{}))
require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, r.makeRaftEventWithReplicasState()))
}

// testingString returns a string representation of the tracker state for use
Expand Down
52 changes: 41 additions & 11 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ type RaftScheduler interface {
// reads Raft state at various points while holding raftMu, and expects those
// various reads to be mutually consistent.
type RaftNode interface {
// RaftInterface is an interface that abstracts the raft.RawNode for use in
// the RangeController.
rac2.RaftInterface
// TermLocked returns the current term of this replica.
TermLocked() uint64
Expand All @@ -114,6 +112,19 @@ type RaftNode interface {
// NB: NextUnstableIndex can regress when the node accepts appends or
// snapshots from a newer leader.
NextUnstableIndexLocked() uint64
// ReplicasStateLocked returns the current status state of all replicas.
// RACv2 uses the Match and Next indices only for replicas in StateReplicate.
// All entries >= Next have not had MsgApps constructed during the lifetime
// of this StateReplicate (they may have been constructed previously).
//
// When a follower transitions from {StateProbe,StateSnapshot} =>
// StateReplicate, we start trying to send MsgApps. We should notice such
// transitions both in rac2.HandleRaftEventRaftMuLocked and
// rac2.SetReplicasRaftMuLocked.
//
// infoMap is an in-out parameter. It is expected to be empty, and is
// populated with the ReplicaStateInfos for all replicas.
ReplicasStateLocked(infoMap map[roachpb.ReplicaID]rac2.ReplicaStateInfo)
}

// AdmittedPiggybacker is used to enqueue admitted vector messages addressed to
Expand Down Expand Up @@ -423,6 +434,10 @@ type processorImpl struct {
// leaseholderID is the currently known leaseholder replica.
leaseholderID roachpb.ReplicaID

// scratchInfoMap is used as a pre-allocated in-out parameter for calling
// ReplicasStateLocked when constructing a rac2.RaftEvent.
scratchInfoMap map[roachpb.ReplicaID]rac2.ReplicaStateInfo

// State at a follower.
follower struct {
// isLeaderUsingV2Protocol is true when the leaderID indicated that it's
Expand Down Expand Up @@ -511,6 +526,7 @@ func NewProcessor(opts ProcessorOptions) Processor {
opts: opts,
enabledWhenLeader: opts.EnabledWhenLeaderLevel,
v1EncodingPriorityMismatch: log.Every(time.Minute),
scratchInfoMap: make(map[roachpb.ReplicaID]rac2.ReplicaStateInfo),
}
}

Expand All @@ -519,7 +535,7 @@ func NewProcessor(opts ProcessorOptions) Processor {
func (p *processorImpl) isLeaderUsingV2ProcLocked() bool {
// We are the leader using V2, or a follower who learned that the leader is
// using the V2 protocol.
return p.leader.rc != nil || p.follower.isLeaderUsingV2Protocol
return p.leader.rc != nil || (p.opts.ReplicaID != p.leaderID && p.follower.isLeaderUsingV2Protocol)
}

// InitRaftLocked implements Processor.
Expand Down Expand Up @@ -775,6 +791,10 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
log.Fatal(ctx, "RaftNode is not initialized")
return
}

// We will use the scratchInfoMap to get the latest state of the replicas and
// construct the RaftEvent. Ensure that it is empty before we start.
clear(p.scratchInfoMap)
// NB: we need to call makeStateConsistentRaftMuLocked even if
// NotEnabledWhenLeader, since this replica could be a follower and the leader
// may switch to v2.
Expand All @@ -790,6 +810,7 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
leaderID = p.replMu.raftNode.LeaderLocked()
leaseholderID = p.opts.Replica.LeaseholderMuRLocked()
term = p.replMu.raftNode.TermLocked()
p.replMu.raftNode.ReplicasStateLocked(p.scratchInfoMap)
}()
if len(e.Entries) > 0 {
nextUnstableIndex = e.Entries[0].Index
Expand All @@ -803,6 +824,7 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
// NB: since we've registered the latest log/snapshot write (if any) above,
// our admitted vector is likely consistent with the latest leader term.
p.maybeSendAdmittedRaftMuLocked(ctx)
e.ReplicasStateInfo = p.scratchInfoMap
if rc := p.leader.rc; rc != nil {
if knobs := p.opts.Knobs; knobs == nil || !knobs.UseOnlyForScratchRanges ||
p.opts.Replica.IsScratchRange() {
Expand Down Expand Up @@ -911,7 +933,7 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2
log.Infof(ctx,
"decoded v2 raft admission meta below-raft: pri=%v create-time=%d "+
"proposer=n%v receiver=[n%d,s%v] tenant=t%d tokens≈%d "+
"sideloaded=%t raft-entry=%d/%d",
"sideloaded=%t raft-entry=%d/%d lead-v2=%v",
raftpb.Priority(meta.AdmissionPriority),
meta.AdmissionCreateTime,
meta.AdmissionOriginNode,
Expand All @@ -922,12 +944,13 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2
typ.IsSideloaded(),
entry.Term,
entry.Index,
p.isLeaderUsingV2ProcLocked(),
)
} else {
log.Infof(ctx,
"decoded v1 raft admission meta below-raft: pri=%v create-time=%d "+
"proposer=n%v receiver=[n%d,s%v] tenant=t%d tokens≈%d "+
"sideloaded=%t raft-entry=%d/%d",
"sideloaded=%t raft-entry=%d/%d lead-v2=%v",
admissionpb.WorkPriority(meta.AdmissionPriority),
meta.AdmissionCreateTime,
meta.AdmissionOriginNode,
Expand All @@ -938,6 +961,7 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2
typ.IsSideloaded(),
entry.Term,
entry.Index,
p.isLeaderUsingV2ProcLocked(),
)
}
}
Expand Down Expand Up @@ -1018,18 +1042,24 @@ func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx conte
if p.destroyed {
return
}
// updates is left unset (so empty unallocated map) or refers to the map in
// p.leader.scratch, so can be read and written without holding
// pendingAdmittedMu.
var updates map[roachpb.ReplicaID]rac2.AdmittedVector
// Swap the updates map with the empty scratch. This is an optimization to
// minimize the time we hold the pendingAdmittedMu lock.
func() {
// Swap the pendingAdmittedMu.updates map with the empty scratch if
// non-empty. This is an optimization to minimize the time we hold the
// pendingAdmittedMu lock.
if updatesEmpty := func() bool {
p.leader.pendingAdmittedMu.Lock()
defer p.leader.pendingAdmittedMu.Unlock()
if updates = p.leader.pendingAdmittedMu.updates; len(updates) != 0 {
if len(p.leader.pendingAdmittedMu.updates) > 0 {
updates = p.leader.pendingAdmittedMu.updates
p.leader.pendingAdmittedMu.updates = p.leader.scratch
p.leader.scratch = updates
return false
}
}()
if len(updates) == 0 {
return true
}(); updatesEmpty {
return
}
for replicaID, state := range updates {
Expand Down
10 changes: 3 additions & 7 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,9 @@ func (rn *testRaftNode) NextUnstableIndexLocked() uint64 {
return rn.nextUnstableIndex
}

func (rn *testRaftNode) FollowerStateRaftMuLocked(
replicaID roachpb.ReplicaID,
) rac2.FollowerStateInfo {
rn.r.mu.AssertHeld()
fmt.Fprintf(rn.b, " RaftNode.FollowerStateRaftMuLocked(%v)\n", replicaID)
// TODO(kvoli,sumeerbhola): implement.
return rac2.FollowerStateInfo{}
func (rn *testRaftNode) ReplicasStateLocked(_ map[roachpb.ReplicaID]rac2.ReplicaStateInfo) {
rn.r.mu.AssertRHeld()
fmt.Fprint(rn.b, " RaftNode.ReplicasStateLocked\n")
}

func (rn *testRaftNode) SendPingRaftMuLocked(to roachpb.ReplicaID) bool {
Expand Down
Loading
Loading