Skip to content

Commit

Permalink
kvserver: add rac2 v1 integration tests
Browse files Browse the repository at this point in the history
Introduce several tests in `flow_control_integration_test.go`, mirroring
the existing tests but applied to the replication flow control v2
machinery.

The tests largely follow an identical pattern to the existing v1 tests,
swapping in rac2 metric and vtables.

The following tests are added:

```
TestFlowControlBasic
TestFlowControlRangeSplitMerge
TestFlowControlBlockedAdmission
TestFlowControlAdmissionPostSplitMerge
TestFlowControlCrashedNode
TestFlowControlRaftSnapshot
TestFlowControlRaftMembership
TestFlowControlRaftMembershipRemoveSelf
TestFlowControlClassPrioritization
TestFlowControlTransferLease
TestFlowControlGranterAdmitOneByOne
```

Another two tests are ommitted, `TestFlowControlRaftTransportBreak` and
`TestFlowControlRaftTransportCulled`, because they behave identically to
`TestFlowControlCrashedNode` as rac2 is less tightly coupled to the raft
transport.

One test, `TestFlowControlLeaderNotLeaseholder` has divering
pre-evaluation admit behavior and is therefore delayed to a subsequent
commit.

Lastly, two tests, `TestFlowControlQuiescedRange` and
`TestFlowControlUnquiescedRange` are dependent on #129581, and delayed
to a subsequent commit as well.

Part of: #130187
Release note: None
  • Loading branch information
kvoli committed Sep 17, 2024
1 parent bf8dc2f commit a7d70cf
Show file tree
Hide file tree
Showing 28 changed files with 3,007 additions and 56 deletions.
1,791 changes: 1,764 additions & 27 deletions pkg/kv/kvserver/flow_control_integration_test.go

Large diffs are not rendered by default.

19 changes: 12 additions & 7 deletions pkg/kv/kvserver/flow_control_replica_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (f *replicaFlowControlIntegrationImpl) onRaftTransportDisconnected(
return // nothing to do
}

if fn := f.knobs.MaintainStreamsForBrokenRaftTransport; fn != nil && fn() {
if fn := f.knobs.V1.MaintainStreamsForBrokenRaftTransport; fn != nil && fn() {
return // nothing to do
}

Expand Down Expand Up @@ -311,12 +311,12 @@ func (f *replicaFlowControlIntegrationImpl) notActivelyReplicatingTo() []roachpb
inactiveFollowers := f.replicaForFlowControl.getInactiveFollowers()
disconnectedFollowers := f.replicaForFlowControl.getDisconnectedFollowers()

maintainStreamsForBrokenRaftTransport := f.knobs.MaintainStreamsForBrokenRaftTransport != nil &&
f.knobs.MaintainStreamsForBrokenRaftTransport()
maintainStreamsForInactiveFollowers := f.knobs.MaintainStreamsForInactiveFollowers != nil &&
f.knobs.MaintainStreamsForInactiveFollowers()
maintainStreamsForBehindFollowers := f.knobs.MaintainStreamsForBehindFollowers != nil &&
f.knobs.MaintainStreamsForBehindFollowers()
maintainStreamsForBrokenRaftTransport := f.knobs.V1.MaintainStreamsForBrokenRaftTransport != nil &&
f.knobs.V1.MaintainStreamsForBrokenRaftTransport()
maintainStreamsForInactiveFollowers := f.knobs.V1.MaintainStreamsForInactiveFollowers != nil &&
f.knobs.V1.MaintainStreamsForInactiveFollowers()
maintainStreamsForBehindFollowers := f.knobs.V1.MaintainStreamsForBehindFollowers != nil &&
f.knobs.V1.MaintainStreamsForBehindFollowers()

notActivelyReplicatingTo := make(map[roachpb.ReplicaDescriptor]struct{})
ourReplicaID := f.replicaForFlowControl.getReplicaID()
Expand Down Expand Up @@ -486,3 +486,8 @@ func (r *replicaForRACv2) MuUnlock() {
func (r *replicaForRACv2) LeaseholderMuLocked() roachpb.ReplicaID {
return r.mu.state.Lease.Replica.ReplicaID
}

// IsScratchRange implements replica_rac2.Replica.
func (r *replicaForRACv2) IsScratchRange() bool {
return (*Replica)(r).IsScratchRange()
}
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/flow_control_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@ func (ss *storesForRACv2) lookup(
if r == nil || r.replicaID != replicaID {
return nil
}
if flowTestKnobs := r.store.TestingKnobs().FlowControlTestingKnobs; flowTestKnobs != nil &&
flowTestKnobs.UseOnlyForScratchRanges && !r.IsScratchRange() {
return nil
}
return r.flowControlV2
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvadmission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/rac2",
"//pkg/kv/kvserver/kvflowcontrol/replica_rac2",
"//pkg/kv/kvserver/raftlog",
"//pkg/raft/raftpb",
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
Expand Down Expand Up @@ -356,7 +357,7 @@ func (n *controllerImpl) AdmitKVWork(
// TODO(sumeerbhola,kvoli): The priority needs to be converted to a
// raftpb.Priority when v2 encoding is enabled. e.g.,
// rac2.AdmissionToRaftPriority().
AdmissionPriority: int32(admissionInfo.Priority),
AdmissionPriority: int32(rac2.AdmissionToRaftPriority(admissionInfo.Priority)),
AdmissionCreateTime: admissionInfo.CreateTime,
AdmissionOriginNode: n.nodeID.Get(),
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ func (h *Handle) deductTokensForInner(
return nil // unused return value in production code
}

if h.knobs.OverrideTokenDeduction != nil {
tokens = h.knobs.OverrideTokenDeduction()
if fn := h.knobs.OverrideTokenDeduction; fn != nil {
tokens = fn()
}

for _, c := range h.mu.connections {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (dt *Tracker) Untrack(
break
}

if fn := dt.knobs.UntrackTokensInterceptor; fn != nil {
if fn := dt.knobs.V1.UntrackTokensInterceptor; fn != nil {
fn(deduction.tokens, deduction.position)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestTracker(t *testing.T) {
count := 0
var buf strings.Builder
buf.WriteString(fmt.Sprintf("pri=%s\n", pri))
knobs.UntrackTokensInterceptor = func(tokens kvflowcontrol.Tokens, position kvflowcontrolpb.RaftLogPosition) {
knobs.V1.UntrackTokensInterceptor = func(tokens kvflowcontrol.Tokens, position kvflowcontrolpb.RaftLogPosition) {
count += 1
buf.WriteString(fmt.Sprintf(" tokens=%s %s\n",
testingPrintTrimmedTokens(tokens), position))
Expand Down
11 changes: 8 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ type RangeControllerOptions struct {
Clock *hlc.Clock
CloseTimerScheduler ProbeToCloseTimerScheduler
EvalWaitMetrics *EvalWaitMetrics
Knobs *kvflowcontrol.TestingKnobs
}

// RangeControllerInitState is the initial state at the time of creation.
Expand Down Expand Up @@ -771,11 +772,15 @@ func (rs *replicaState) handleReadyEntries(ctx context.Context, entries []entryF
if !entry.usesFlowControl {
continue
}
rs.sendStream.mu.tracker.Track(ctx, entry.term, entry.index, entry.pri, entry.tokens)
tokens := entry.tokens
if fn := rs.parent.opts.Knobs.OverrideTokenDeduction; fn != nil {
tokens = fn()
}
rs.sendStream.mu.tracker.Track(ctx, entry.term, entry.index, entry.pri, tokens)
rs.evalTokenCounter.Deduct(
ctx, WorkClassFromRaftPriority(entry.pri), entry.tokens)
ctx, WorkClassFromRaftPriority(entry.pri), tokens)
rs.sendTokenCounter.Deduct(
ctx, WorkClassFromRaftPriority(entry.pri), entry.tokens)
ctx, WorkClassFromRaftPriority(entry.pri), tokens)
}
}

Expand Down
18 changes: 16 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ type Replica interface {
// At least Replica mu is held. The caller does not make any claims about
// whether it holds raftMu or not.
LeaseholderMuLocked() roachpb.ReplicaID
// IsScratchRange returns true if this is range is a scratch range (i.e.
// overlaps with the scratch span and has a start key <=
// keys.ScratchRangeMin).
IsScratchRange() bool
}

// RaftScheduler abstracts kvserver.raftScheduler.
Expand Down Expand Up @@ -213,6 +217,7 @@ type ProcessorOptions struct {
EvalWaitMetrics *rac2.EvalWaitMetrics

EnabledWhenLeaderLevel EnabledWhenLeaderLevel
Knobs *kvflowcontrol.TestingKnobs
}

// SideChannelInfoUsingRaftMessageRequest is used to provide a follower
Expand Down Expand Up @@ -517,6 +522,8 @@ func NewProcessor(opts ProcessorOptions) Processor {
func (p *processorImpl) isLeaderUsingV2ProcLocked() bool {
// We are the leader using V2, or a follower who learned that the leader is
// using the V2 protocol.
// TODO(kvoli): Why doesn't this work currently?
// return true
return p.leader.rc != nil || p.follower.isLeaderUsingV2Protocol
}

Expand Down Expand Up @@ -800,8 +807,11 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
// our admitted vector is likely consistent with the latest leader term.
p.maybeSendAdmittedRaftMuLocked(ctx)
if rc := p.leader.rc; rc != nil {
if err := rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil {
log.Errorf(ctx, "error handling raft event: %v", err)
if knobs := p.opts.Knobs; knobs == nil || !knobs.UseOnlyForScratchRanges ||
p.opts.Replica.IsScratchRange() {
if err := rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil {
log.Errorf(ctx, "error handling raft event: %v", err)
}
}
}
}
Expand Down Expand Up @@ -1141,19 +1151,22 @@ type RangeControllerFactoryImpl struct {
evalWaitMetrics *rac2.EvalWaitMetrics
streamTokenCounterProvider *rac2.StreamTokenCounterProvider
closeTimerScheduler rac2.ProbeToCloseTimerScheduler
knobs *kvflowcontrol.TestingKnobs
}

func NewRangeControllerFactoryImpl(
clock *hlc.Clock,
evalWaitMetrics *rac2.EvalWaitMetrics,
streamTokenCounterProvider *rac2.StreamTokenCounterProvider,
closeTimerScheduler rac2.ProbeToCloseTimerScheduler,
knobs *kvflowcontrol.TestingKnobs,
) RangeControllerFactoryImpl {
return RangeControllerFactoryImpl{
clock: clock,
evalWaitMetrics: evalWaitMetrics,
streamTokenCounterProvider: streamTokenCounterProvider,
closeTimerScheduler: closeTimerScheduler,
knobs: knobs,
}
}

Expand All @@ -1172,6 +1185,7 @@ func (f RangeControllerFactoryImpl) New(
Clock: f.clock,
CloseTimerScheduler: f.closeTimerScheduler,
EvalWaitMetrics: f.evalWaitMetrics,
Knobs: f.knobs,
},
rac2.RangeControllerInitState{
ReplicaSet: state.replicaSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ HandleRaftReady:
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 52
Replica.MuUnlock
RangeController.AdmitRaftMuLocked(5, term:52, admitted:[LowPri:26,NormalPri:26,AboveNormalPri:26,HighPri:26])
RangeController.HandleRaftEventRaftMuLocked([])
.....

Expand Down
17 changes: 11 additions & 6 deletions pkg/kv/kvserver/kvflowcontrol/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ import (
// TestingKnobs provide fine-grained control over the various kvflowcontrol
// components for testing.
type TestingKnobs struct {
// UntrackTokensInterceptor is invoked whenever tokens are untracked, along
// with their corresponding log positions.
UntrackTokensInterceptor func(Tokens, kvflowcontrolpb.RaftLogPosition)
V1 TestingKnobsV1
UseOnlyForScratchRanges bool
// OverrideTokenDeduction is used to override how many tokens are deducted
// post-evaluation.
OverrideTokenDeduction func() Tokens
}

// TestingKnobsV1 are the testing knobs that appply to replication flow control
// v1, which is mostly contained in the kvflowcontroller, kvflowdispatch,
// kvflowhandle and kvflowtokentracker packages.
type TestingKnobsV1 struct {
// UntrackTokensInterceptor is invoked whenever tokens are untracked, along
// with their corresponding log positions.
UntrackTokensInterceptor func(Tokens, kvflowcontrolpb.RaftLogPosition)
// MaintainStreamsForBehindFollowers is used in tests to maintain
// replication streams for behind followers.
MaintainStreamsForBehindFollowers func() bool
Expand All @@ -34,9 +42,6 @@ type TestingKnobs struct {
// replication streams for followers we're no longer connected to via the
// RaftTransport.
MaintainStreamsForBrokenRaftTransport func() bool
// UseOnlyForScratchRanges enables the use of kvflowcontrol
// only for scratch ranges.
UseOnlyForScratchRanges bool
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2531,7 +2531,9 @@ func racV2EnabledWhenLeaderLevel(
ctx context.Context, st *cluster.Settings,
) replica_rac2.EnabledWhenLeaderLevel {
// TODO(sumeer): implement fully, once all the dependencies are implemented.
return replica_rac2.NotEnabledWhenLeader
// TODO(kvoli): Should this be a cluster setting, or are we ratcheting it up
// via cluster version.
return replica_rac2.EnabledWhenLeaderV2Encoding
}

// maybeEnqueueProblemRange will enqueue the replica for processing into the
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ func newUninitializedReplicaWithoutRaftGroup(
EvalWaitMetrics: r.store.cfg.KVFlowEvalWaitMetrics,
RangeControllerFactory: r.store.kvflowRangeControllerFactory,
EnabledWhenLeaderLevel: r.raftMu.flowControlLevel,
Knobs: r.store.TestingKnobs().FlowControlTestingKnobs,
})
return r
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1923,11 +1923,12 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {

req := newRaftMessageRequest()
*req = kvserverpb.RaftMessageRequest{
RangeID: r.RangeID,
ToReplica: toReplica,
FromReplica: fromReplica,
Message: msg,
RangeStartKey: startKey, // usually nil
RangeID: r.RangeID,
ToReplica: toReplica,
FromReplica: fromReplica,
Message: msg,
RangeStartKey: startKey, // usually nil
UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() == replica_rac2.EnabledWhenLeaderV2Encoding,
}
// For RACv2, annotate successful MsgAppResp messages with the vector of
// admitted log indices, by priority.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1560,6 +1560,7 @@ func NewStore(
s.cfg.KVFlowStreamTokenProvider,
replica_rac2.NewStreamCloseScheduler(
s.stopper, timeutil.DefaultTimeSource{}, s.scheduler),
s.TestingKnobs().FlowControlTestingKnobs,
)

// Run a log SyncWaiter loop for every 32 raft scheduler goroutines.
Expand Down
Loading

0 comments on commit a7d70cf

Please sign in to comment.