Skip to content

Commit

Permalink
kvserver: add rac2 v1 integration tests
Browse files Browse the repository at this point in the history
Introduce several tests in `flow_control_integration_test.go`, mirroring
the existing tests but applied to the replication flow control v2
machinery.

Part of: #130187
Release note: None
  • Loading branch information
kvoli committed Sep 17, 2024
1 parent bf8dc2f commit 9d9e279
Show file tree
Hide file tree
Showing 28 changed files with 2,997 additions and 56 deletions.
1,781 changes: 1,754 additions & 27 deletions pkg/kv/kvserver/flow_control_integration_test.go

Large diffs are not rendered by default.

19 changes: 12 additions & 7 deletions pkg/kv/kvserver/flow_control_replica_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (f *replicaFlowControlIntegrationImpl) onRaftTransportDisconnected(
return // nothing to do
}

if fn := f.knobs.MaintainStreamsForBrokenRaftTransport; fn != nil && fn() {
if fn := f.knobs.V1.MaintainStreamsForBrokenRaftTransport; fn != nil && fn() {
return // nothing to do
}

Expand Down Expand Up @@ -311,12 +311,12 @@ func (f *replicaFlowControlIntegrationImpl) notActivelyReplicatingTo() []roachpb
inactiveFollowers := f.replicaForFlowControl.getInactiveFollowers()
disconnectedFollowers := f.replicaForFlowControl.getDisconnectedFollowers()

maintainStreamsForBrokenRaftTransport := f.knobs.MaintainStreamsForBrokenRaftTransport != nil &&
f.knobs.MaintainStreamsForBrokenRaftTransport()
maintainStreamsForInactiveFollowers := f.knobs.MaintainStreamsForInactiveFollowers != nil &&
f.knobs.MaintainStreamsForInactiveFollowers()
maintainStreamsForBehindFollowers := f.knobs.MaintainStreamsForBehindFollowers != nil &&
f.knobs.MaintainStreamsForBehindFollowers()
maintainStreamsForBrokenRaftTransport := f.knobs.V1.MaintainStreamsForBrokenRaftTransport != nil &&
f.knobs.V1.MaintainStreamsForBrokenRaftTransport()
maintainStreamsForInactiveFollowers := f.knobs.V1.MaintainStreamsForInactiveFollowers != nil &&
f.knobs.V1.MaintainStreamsForInactiveFollowers()
maintainStreamsForBehindFollowers := f.knobs.V1.MaintainStreamsForBehindFollowers != nil &&
f.knobs.V1.MaintainStreamsForBehindFollowers()

notActivelyReplicatingTo := make(map[roachpb.ReplicaDescriptor]struct{})
ourReplicaID := f.replicaForFlowControl.getReplicaID()
Expand Down Expand Up @@ -486,3 +486,8 @@ func (r *replicaForRACv2) MuUnlock() {
func (r *replicaForRACv2) LeaseholderMuLocked() roachpb.ReplicaID {
return r.mu.state.Lease.Replica.ReplicaID
}

// IsScratchRange implements replica_rac2.Replica.
func (r *replicaForRACv2) IsScratchRange() bool {
return (*Replica)(r).IsScratchRange()
}
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/flow_control_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@ func (ss *storesForRACv2) lookup(
if r == nil || r.replicaID != replicaID {
return nil
}
if flowTestKnobs := r.store.TestingKnobs().FlowControlTestingKnobs; flowTestKnobs != nil &&
flowTestKnobs.UseOnlyForScratchRanges && !r.IsScratchRange() {
return nil
}
return r.flowControlV2
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvadmission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/rac2",
"//pkg/kv/kvserver/kvflowcontrol/replica_rac2",
"//pkg/kv/kvserver/raftlog",
"//pkg/raft/raftpb",
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
Expand Down Expand Up @@ -356,7 +357,7 @@ func (n *controllerImpl) AdmitKVWork(
// TODO(sumeerbhola,kvoli): The priority needs to be converted to a
// raftpb.Priority when v2 encoding is enabled. e.g.,
// rac2.AdmissionToRaftPriority().
AdmissionPriority: int32(admissionInfo.Priority),
AdmissionPriority: int32(rac2.AdmissionToRaftPriority(admissionInfo.Priority)),
AdmissionCreateTime: admissionInfo.CreateTime,
AdmissionOriginNode: n.nodeID.Get(),
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ func (h *Handle) deductTokensForInner(
return nil // unused return value in production code
}

if h.knobs.OverrideTokenDeduction != nil {
tokens = h.knobs.OverrideTokenDeduction()
if fn := h.knobs.OverrideTokenDeduction; fn != nil {
tokens = fn()
}

for _, c := range h.mu.connections {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (dt *Tracker) Untrack(
break
}

if fn := dt.knobs.UntrackTokensInterceptor; fn != nil {
if fn := dt.knobs.V1.UntrackTokensInterceptor; fn != nil {
fn(deduction.tokens, deduction.position)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestTracker(t *testing.T) {
count := 0
var buf strings.Builder
buf.WriteString(fmt.Sprintf("pri=%s\n", pri))
knobs.UntrackTokensInterceptor = func(tokens kvflowcontrol.Tokens, position kvflowcontrolpb.RaftLogPosition) {
knobs.V1.UntrackTokensInterceptor = func(tokens kvflowcontrol.Tokens, position kvflowcontrolpb.RaftLogPosition) {
count += 1
buf.WriteString(fmt.Sprintf(" tokens=%s %s\n",
testingPrintTrimmedTokens(tokens), position))
Expand Down
11 changes: 8 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ type RangeControllerOptions struct {
Clock *hlc.Clock
CloseTimerScheduler ProbeToCloseTimerScheduler
EvalWaitMetrics *EvalWaitMetrics
Knobs *kvflowcontrol.TestingKnobs
}

// RangeControllerInitState is the initial state at the time of creation.
Expand Down Expand Up @@ -771,11 +772,15 @@ func (rs *replicaState) handleReadyEntries(ctx context.Context, entries []entryF
if !entry.usesFlowControl {
continue
}
rs.sendStream.mu.tracker.Track(ctx, entry.term, entry.index, entry.pri, entry.tokens)
tokens := entry.tokens
if fn := rs.parent.opts.Knobs.OverrideTokenDeduction; fn != nil {
tokens = fn()
}
rs.sendStream.mu.tracker.Track(ctx, entry.term, entry.index, entry.pri, tokens)
rs.evalTokenCounter.Deduct(
ctx, WorkClassFromRaftPriority(entry.pri), entry.tokens)
ctx, WorkClassFromRaftPriority(entry.pri), tokens)
rs.sendTokenCounter.Deduct(
ctx, WorkClassFromRaftPriority(entry.pri), entry.tokens)
ctx, WorkClassFromRaftPriority(entry.pri), tokens)
}
}

Expand Down
18 changes: 16 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ type Replica interface {
// At least Replica mu is held. The caller does not make any claims about
// whether it holds raftMu or not.
LeaseholderMuLocked() roachpb.ReplicaID
// IsScratchRange returns true if this is range is a scratch range (i.e.
// overlaps with the scratch span and has a start key <=
// keys.ScratchRangeMin).
IsScratchRange() bool
}

// RaftScheduler abstracts kvserver.raftScheduler.
Expand Down Expand Up @@ -213,6 +217,7 @@ type ProcessorOptions struct {
EvalWaitMetrics *rac2.EvalWaitMetrics

EnabledWhenLeaderLevel EnabledWhenLeaderLevel
Knobs *kvflowcontrol.TestingKnobs
}

// SideChannelInfoUsingRaftMessageRequest is used to provide a follower
Expand Down Expand Up @@ -517,6 +522,8 @@ func NewProcessor(opts ProcessorOptions) Processor {
func (p *processorImpl) isLeaderUsingV2ProcLocked() bool {
// We are the leader using V2, or a follower who learned that the leader is
// using the V2 protocol.
// TODO(kvoli): Why doesn't this work currently?
// return true
return p.leader.rc != nil || p.follower.isLeaderUsingV2Protocol
}

Expand Down Expand Up @@ -800,8 +807,11 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
// our admitted vector is likely consistent with the latest leader term.
p.maybeSendAdmittedRaftMuLocked(ctx)
if rc := p.leader.rc; rc != nil {
if err := rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil {
log.Errorf(ctx, "error handling raft event: %v", err)
if knobs := p.opts.Knobs; knobs == nil || !knobs.UseOnlyForScratchRanges ||
p.opts.Replica.IsScratchRange() {
if err := rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil {
log.Errorf(ctx, "error handling raft event: %v", err)
}
}
}
}
Expand Down Expand Up @@ -1141,19 +1151,22 @@ type RangeControllerFactoryImpl struct {
evalWaitMetrics *rac2.EvalWaitMetrics
streamTokenCounterProvider *rac2.StreamTokenCounterProvider
closeTimerScheduler rac2.ProbeToCloseTimerScheduler
knobs *kvflowcontrol.TestingKnobs
}

func NewRangeControllerFactoryImpl(
clock *hlc.Clock,
evalWaitMetrics *rac2.EvalWaitMetrics,
streamTokenCounterProvider *rac2.StreamTokenCounterProvider,
closeTimerScheduler rac2.ProbeToCloseTimerScheduler,
knobs *kvflowcontrol.TestingKnobs,
) RangeControllerFactoryImpl {
return RangeControllerFactoryImpl{
clock: clock,
evalWaitMetrics: evalWaitMetrics,
streamTokenCounterProvider: streamTokenCounterProvider,
closeTimerScheduler: closeTimerScheduler,
knobs: knobs,
}
}

Expand All @@ -1172,6 +1185,7 @@ func (f RangeControllerFactoryImpl) New(
Clock: f.clock,
CloseTimerScheduler: f.closeTimerScheduler,
EvalWaitMetrics: f.evalWaitMetrics,
Knobs: f.knobs,
},
rac2.RangeControllerInitState{
ReplicaSet: state.replicaSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ HandleRaftReady:
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 52
Replica.MuUnlock
RangeController.AdmitRaftMuLocked(5, term:52, admitted:[LowPri:26,NormalPri:26,AboveNormalPri:26,HighPri:26])
RangeController.HandleRaftEventRaftMuLocked([])
.....

Expand Down
17 changes: 11 additions & 6 deletions pkg/kv/kvserver/kvflowcontrol/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ import (
// TestingKnobs provide fine-grained control over the various kvflowcontrol
// components for testing.
type TestingKnobs struct {
// UntrackTokensInterceptor is invoked whenever tokens are untracked, along
// with their corresponding log positions.
UntrackTokensInterceptor func(Tokens, kvflowcontrolpb.RaftLogPosition)
V1 TestingKnobsV1
UseOnlyForScratchRanges bool
// OverrideTokenDeduction is used to override how many tokens are deducted
// post-evaluation.
OverrideTokenDeduction func() Tokens
}

// TestingKnobsV1 are the testing knobs that appply to replication flow control
// v1, which is mostly contained in the kvflowcontroller, kvflowdispatch,
// kvflowhandle and kvflowtokentracker packages.
type TestingKnobsV1 struct {
// UntrackTokensInterceptor is invoked whenever tokens are untracked, along
// with their corresponding log positions.
UntrackTokensInterceptor func(Tokens, kvflowcontrolpb.RaftLogPosition)
// MaintainStreamsForBehindFollowers is used in tests to maintain
// replication streams for behind followers.
MaintainStreamsForBehindFollowers func() bool
Expand All @@ -34,9 +42,6 @@ type TestingKnobs struct {
// replication streams for followers we're no longer connected to via the
// RaftTransport.
MaintainStreamsForBrokenRaftTransport func() bool
// UseOnlyForScratchRanges enables the use of kvflowcontrol
// only for scratch ranges.
UseOnlyForScratchRanges bool
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2531,7 +2531,9 @@ func racV2EnabledWhenLeaderLevel(
ctx context.Context, st *cluster.Settings,
) replica_rac2.EnabledWhenLeaderLevel {
// TODO(sumeer): implement fully, once all the dependencies are implemented.
return replica_rac2.NotEnabledWhenLeader
// TODO(kvoli): Should this be a cluster setting, or are we ratcheting it up
// via cluster version.
return replica_rac2.EnabledWhenLeaderV2Encoding
}

// maybeEnqueueProblemRange will enqueue the replica for processing into the
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ func newUninitializedReplicaWithoutRaftGroup(
EvalWaitMetrics: r.store.cfg.KVFlowEvalWaitMetrics,
RangeControllerFactory: r.store.kvflowRangeControllerFactory,
EnabledWhenLeaderLevel: r.raftMu.flowControlLevel,
Knobs: r.store.TestingKnobs().FlowControlTestingKnobs,
})
return r
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1923,11 +1923,12 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {

req := newRaftMessageRequest()
*req = kvserverpb.RaftMessageRequest{
RangeID: r.RangeID,
ToReplica: toReplica,
FromReplica: fromReplica,
Message: msg,
RangeStartKey: startKey, // usually nil
RangeID: r.RangeID,
ToReplica: toReplica,
FromReplica: fromReplica,
Message: msg,
RangeStartKey: startKey, // usually nil
UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() == replica_rac2.EnabledWhenLeaderV2Encoding,
}
// For RACv2, annotate successful MsgAppResp messages with the vector of
// admitted log indices, by priority.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1560,6 +1560,7 @@ func NewStore(
s.cfg.KVFlowStreamTokenProvider,
replica_rac2.NewStreamCloseScheduler(
s.stopper, timeutil.DefaultTimeSource{}, s.scheduler),
s.TestingKnobs().FlowControlTestingKnobs,
)

// Run a log SyncWaiter loop for every 32 raft scheduler goroutines.
Expand Down
Loading

0 comments on commit 9d9e279

Please sign in to comment.