Skip to content

Commit

Permalink
kvserver: move racv2 enabled level to kvflowcontrol and add test knob
Browse files Browse the repository at this point in the history
Move `EnabledWhenLeaderLevel` from `replica_rac2` to the parent package
`kvflowcontrol` and rename `V2EnabledWhenLeaderLevel` to reflect the
move to a shared v1/v2 package.

Also move the corresponding function `racV2EnabledWhenLeaderLevel` to
`kvflowcontrol`. `GetV2EnabledWhenLeaderLevel` will check if there are
testing knob overrides for the enabled level, and if not continue
returning `V2NotEnabledWhenLeader`. Some commentary and todos are also
left around this function, for when we enable the protocol and
separately, pull mode.

Part of: #130187
Release note: None
  • Loading branch information
kvoli committed Sep 20, 2024
1 parent f455bc3 commit 7cf9492
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 47 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/flow_control_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (sh *storeForFlowControl) LookupReplicationAdmissionHandle(
}
// NB: Admit is called soon after this lookup.
level := repl.flowControlV2.GetEnabledWhenLeader()
useV1 := level == replica_rac2.NotEnabledWhenLeader
useV1 := level == kvflowcontrol.V2NotEnabledWhenLeader
var v1Handle kvflowcontrol.ReplicationAdmissionHandle
if useV1 {
repl.mu.Lock()
Expand Down Expand Up @@ -453,7 +453,7 @@ func (h admissionDemuxHandle) Admit(
// can cause either value of admitted. See the comment in
// ReplicationAdmissionHandle.
level := h.r.flowControlV2.GetEnabledWhenLeader()
if level == replica_rac2.NotEnabledWhenLeader {
if level == kvflowcontrol.V2NotEnabledWhenLeader {
return admitted, err
}
// Transition from v1 => v2 happened while waiting. Fall through to wait
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/metamorphic",
"@com_github_cockroachdb_redact//:redact",
Expand Down
53 changes: 53 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -118,6 +119,58 @@ var validateTokenRange = settings.WithValidateInt(func(b int64) error {
return nil
})

// V2EnabledWhenLeaderLevel captures the level at which RACv2 is enabled when
// this replica is the leader.
//
// State transitions are V2NotEnabledWhenLeader =>
// V2EnabledWhenLeaderV1Encoding => V2EnabledWhenLeaderV2Encoding, i.e., the
// level will never regress.
type V2EnabledWhenLeaderLevel = uint32

const (
V2NotEnabledWhenLeader V2EnabledWhenLeaderLevel = iota
V2EnabledWhenLeaderV1Encoding
V2EnabledWhenLeaderV2Encoding
)

// GetV2EnabledWhenLeaderLevel returns the level at which RACV2 is enabled when
// this replica is the leader.
//
// The level is determined by the cluster version, and is ratcheted up as the
// cluster version advances. The level is used to determine:
//
// 1. Whether the leader should use the RACv2 protocol.
// 2. Whether the leader should use the V1 or V2 entry encoding iff (1) is
// true.
//
// Upon the leader first seeing V24_3_UseRACV2WithV1EntryEncoding, it will
// create a RangeController and use the V1 entry encoding, operating in Push
// mode. Upon the leader first seeing V24_3_UseRACV2Full, it will continue
// using the RACV2 protocol, but will switch to the V2 entry encoding. Note the
// necessary migration for V2NotEnabledWhenLeader =>
// V2EnabledWhenLeaderV1Encoding occurs before anything else in
// kvserver.handleRaftReadyRaftMuLocked.
//
// TODO(kvoli,sumeerbhola,pav-kv): When we introduce pull mode (and associated
// cluster setting), update this comment to mention that the cluster setting is
// only relevant when at V2EnabledWhenLeaderV2Encoding level.
func GetV2EnabledWhenLeaderLevel(
ctx context.Context, st *cluster.Settings, knobs *TestingKnobs,
) V2EnabledWhenLeaderLevel {
if knobs != nil && knobs.OverrideV2EnabledWhenLeaderLevel != nil {
return knobs.OverrideV2EnabledWhenLeaderLevel()
}
// TODO(kvoli): Enable once #130619 merges and tests affected by enabling v2
// are addressed:
// if st.Version.IsActive(ctx, clusterversion.V24_3_UseRACV2Full) {
// return V2EnabledWhenLeaderV2Encoding
// }
// if st.Version.IsActive(ctx, clusterversion.V24_3_UseRACV2WithV1EntryEncoding) {
// return V2EnabledWhenLeaderV1Encoding
// }
return V2NotEnabledWhenLeader
}

// Stream models the stream over which we replicate data traffic, the
// transmission for which we regulate using flow control. It's segmented by the
// specific store the traffic is bound for and the tenant driving it. Despite
Expand Down
30 changes: 9 additions & 21 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,19 +183,6 @@ type RangeControllerFactory interface {
New(ctx context.Context, state rangeControllerInitState) rac2.RangeController
}

// EnabledWhenLeaderLevel captures the level at which RACv2 is enabled when
// this replica is the leader.
//
// State transitions are NotEnabledWhenLeader => EnabledWhenLeaderV1Encoding
// => EnabledWhenLeaderV2Encoding, i.e., the level will never regress.
type EnabledWhenLeaderLevel = uint32

const (
NotEnabledWhenLeader EnabledWhenLeaderLevel = iota
EnabledWhenLeaderV1Encoding
EnabledWhenLeaderV2Encoding
)

// ProcessorOptions are specified when creating a new Processor.
type ProcessorOptions struct {
// Various constant fields that are duplicated from Replica, since we
Expand All @@ -216,7 +203,7 @@ type ProcessorOptions struct {
Settings *cluster.Settings
EvalWaitMetrics *rac2.EvalWaitMetrics

EnabledWhenLeaderLevel EnabledWhenLeaderLevel
EnabledWhenLeaderLevel kvflowcontrol.V2EnabledWhenLeaderLevel
Knobs *kvflowcontrol.TestingKnobs
}

Expand Down Expand Up @@ -289,14 +276,14 @@ type Processor interface {
// This may be a noop if the level has already been reached.
//
// raftMu is held.
SetEnabledWhenLeaderRaftMuLocked(ctx context.Context, level EnabledWhenLeaderLevel)
SetEnabledWhenLeaderRaftMuLocked(ctx context.Context, level kvflowcontrol.V2EnabledWhenLeaderLevel)
// GetEnabledWhenLeader returns the current level. It may be used in
// highly concurrent settings at the leaseholder, when waiting for eval,
// and when encoding a proposal. Note that if the leaseholder is not the
// leader and the leader has switched to a higher level, there is no harm
// done, since the leaseholder can continue waiting for v1 tokens and use
// the v1 entry encoding.
GetEnabledWhenLeader() EnabledWhenLeaderLevel
GetEnabledWhenLeader() kvflowcontrol.V2EnabledWhenLeaderLevel

// OnDescChangedLocked provides a possibly updated RangeDescriptor. The
// tenantID passed in all calls must be the same.
Expand Down Expand Up @@ -502,7 +489,7 @@ type processorImpl struct {
// enabledWhenLeader indicates the RACv2 mode of operation when this replica
// is the leader. Atomic value, for serving GetEnabledWhenLeader. Updated only
// while holding raftMu. Can be read non-atomically if raftMu is held.
enabledWhenLeader EnabledWhenLeaderLevel
enabledWhenLeader kvflowcontrol.V2EnabledWhenLeaderLevel

v1EncodingPriorityMismatch log.EveryN
}
Expand Down Expand Up @@ -547,14 +534,15 @@ func (p *processorImpl) OnDestroyRaftMuLocked(ctx context.Context) {

// SetEnabledWhenLeaderRaftMuLocked implements Processor.
func (p *processorImpl) SetEnabledWhenLeaderRaftMuLocked(
ctx context.Context, level EnabledWhenLeaderLevel,
ctx context.Context, level kvflowcontrol.V2EnabledWhenLeaderLevel,
) {
p.opts.Replica.RaftMuAssertHeld()
if p.destroyed || p.enabledWhenLeader >= level {
return
}
atomic.StoreUint32(&p.enabledWhenLeader, level)
if level != EnabledWhenLeaderV1Encoding || p.desc.replicas == nil {
if level != kvflowcontrol.V2EnabledWhenLeaderV1Encoding ||
p.desc.replicas == nil {
return
}
// May need to create RangeController.
Expand All @@ -576,7 +564,7 @@ func (p *processorImpl) SetEnabledWhenLeaderRaftMuLocked(
}

// GetEnabledWhenLeader implements Processor.
func (p *processorImpl) GetEnabledWhenLeader() EnabledWhenLeaderLevel {
func (p *processorImpl) GetEnabledWhenLeader() kvflowcontrol.V2EnabledWhenLeaderLevel {
return atomic.LoadUint32(&p.enabledWhenLeader)
}

Expand Down Expand Up @@ -693,7 +681,7 @@ func (p *processorImpl) makeStateConsistentRaftMuLocked(
return
}
// Is the leader.
if p.enabledWhenLeader == NotEnabledWhenLeader {
if p.enabledWhenLeader == kvflowcontrol.V2NotEnabledWhenLeader {
return
}
if p.leader.rc != nil && termChanged {
Expand Down
22 changes: 12 additions & 10 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func TestProcessorBasic(t *testing.T) {
var st *cluster.Settings
var p *processorImpl
tenantID := roachpb.MustMakeTenantID(4)
reset := func(enabled EnabledWhenLeaderLevel) {
reset := func(enabled kvflowcontrol.V2EnabledWhenLeaderLevel) {
b.Reset()
r = newTestReplica(&b)
sched = testRaftScheduler{b: &b}
Expand Down Expand Up @@ -533,31 +533,33 @@ func parseAdmissionPriority(t *testing.T, td *datadriven.TestData) admissionpb.W
return admissionpb.NormalPri
}

func parseEnabledLevel(t *testing.T, td *datadriven.TestData) EnabledWhenLeaderLevel {
func parseEnabledLevel(
t *testing.T, td *datadriven.TestData,
) kvflowcontrol.V2EnabledWhenLeaderLevel {
if td.HasArg("enabled-level") {
var str string
td.ScanArgs(t, "enabled-level", &str)
switch str {
case "not-enabled":
return NotEnabledWhenLeader
return kvflowcontrol.V2NotEnabledWhenLeader
case "v1-encoding":
return EnabledWhenLeaderV1Encoding
return kvflowcontrol.V2EnabledWhenLeaderV1Encoding
case "v2-encoding":
return EnabledWhenLeaderV2Encoding
return kvflowcontrol.V2EnabledWhenLeaderV2Encoding
default:
t.Fatalf("unrecoginized level %s", str)
}
}
return NotEnabledWhenLeader
return kvflowcontrol.V2NotEnabledWhenLeader
}

func enabledLevelString(enabledLevel EnabledWhenLeaderLevel) string {
func enabledLevelString(enabledLevel kvflowcontrol.V2EnabledWhenLeaderLevel) string {
switch enabledLevel {
case NotEnabledWhenLeader:
case kvflowcontrol.V2NotEnabledWhenLeader:
return "not-enabled"
case EnabledWhenLeaderV1Encoding:
case kvflowcontrol.V2EnabledWhenLeaderV1Encoding:
return "v1-encoding"
case EnabledWhenLeaderV2Encoding:
case kvflowcontrol.V2EnabledWhenLeaderV2Encoding:
return "v2-encoding"
}
return "unknown-level"
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type TestingKnobs struct {
// OverrideTokenDeduction is used to override how many tokens are deducted
// post-evaluation.
OverrideTokenDeduction func() Tokens
// OverrideV2EnabledWhenLeaderLevel is used to override the level at which
// RACv2 is enabled when a replica is the leader.
OverrideV2EnabledWhenLeaderLevel func() V2EnabledWhenLeaderLevel
}

// TestingKnobsV1 are the testing knobs that appply to replication flow control
Expand Down
10 changes: 2 additions & 8 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -328,7 +329,7 @@ type Replica struct {
// being applied to the state machine.
bytesAccount logstore.BytesAccount

flowControlLevel replica_rac2.EnabledWhenLeaderLevel
flowControlLevel kvflowcontrol.V2EnabledWhenLeaderLevel

// Scratch for populating RaftEvent for flowControlV2.
msgAppScratchForFlowControl map[roachpb.ReplicaID][]raftpb.Message
Expand Down Expand Up @@ -2527,13 +2528,6 @@ func (r *Replica) GetMutexForTesting() *ReplicaMutex {
return &r.mu.ReplicaMutex
}

func racV2EnabledWhenLeaderLevel(
ctx context.Context, st *cluster.Settings,
) replica_rac2.EnabledWhenLeaderLevel {
// TODO(sumeer): implement fully, once all the dependencies are implemented.
return replica_rac2.NotEnabledWhenLeader
}

// maybeEnqueueProblemRange will enqueue the replica for processing into the
// replicate queue iff:
//
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
Expand Down Expand Up @@ -224,7 +225,8 @@ func newUninitializedReplicaWithoutRaftGroup(
makeStoreFlowControlHandleFactory(r.store),
r.store.TestingKnobs().FlowControlTestingKnobs,
)
r.raftMu.flowControlLevel = racV2EnabledWhenLeaderLevel(r.raftCtx, store.cfg.Settings)
r.raftMu.flowControlLevel = kvflowcontrol.GetV2EnabledWhenLeaderLevel(
r.raftCtx, store.ClusterSettings(), store.TestingKnobs().FlowControlTestingKnobs)
r.raftMu.msgAppScratchForFlowControl = map[roachpb.ReplicaID][]raftpb.Message{}
r.flowControlV2 = replica_rac2.NewProcessor(replica_rac2.ProcessorOptions{
NodeID: store.NodeID(),
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (r *Replica) evalAndPropose(
}

func (r *Replica) encodePriorityForRACv2() bool {
return r.flowControlV2.GetEnabledWhenLeader() == replica_rac2.EnabledWhenLeaderV2Encoding
return r.flowControlV2.GetEnabledWhenLeader() == kvflowcontrol.V2EnabledWhenLeaderV2Encoding
}

// propose encodes a command, starts tracking it, and proposes it to Raft.
Expand Down Expand Up @@ -833,11 +833,12 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// Before doing anything, including calling Ready(), see if we need to
// ratchet up the flow control level. This code will go away when RACv1 =>
// RACv2 transition is complete and RACv1 code is removed.
if r.raftMu.flowControlLevel < replica_rac2.EnabledWhenLeaderV2Encoding {
if r.raftMu.flowControlLevel < kvflowcontrol.V2EnabledWhenLeaderV2Encoding {
// Not already at highest level.
level := racV2EnabledWhenLeaderLevel(ctx, r.store.cfg.Settings)
level := kvflowcontrol.GetV2EnabledWhenLeaderLevel(
ctx, r.store.ClusterSettings(), r.store.TestingKnobs().FlowControlTestingKnobs)
if level > r.raftMu.flowControlLevel {
if r.raftMu.flowControlLevel == replica_rac2.NotEnabledWhenLeader {
if r.raftMu.flowControlLevel == kvflowcontrol.V2NotEnabledWhenLeader {
func() {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down Expand Up @@ -1938,7 +1939,7 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
FromReplica: fromReplica,
Message: msg,
RangeStartKey: startKey, // usually nil
UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() >= replica_rac2.EnabledWhenLeaderV1Encoding,
UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() >= kvflowcontrol.V2EnabledWhenLeaderV1Encoding,
}
// For RACv2, annotate successful MsgAppResp messages with the vector of
// admitted log indices, by priority.
Expand Down

0 comments on commit 7cf9492

Please sign in to comment.