Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clusterversion: introduce rac2 cluster version gates #131106

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,4 @@ trace.snapshot.rate duration 0s if non-zero, interval at which background trace
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez application
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. application
ui.display_timezone enumeration etc/utc the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1] application
version version 1000024.2-upgrading-to-1000024.3-step-016 set the active cluster version in the format '<major>.<minor>' application
version version 1000024.2-upgrading-to-1000024.3-step-020 set the active cluster version in the format '<major>.<minor>' application
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,6 @@
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-ui-display-timezone" class="anchored"><code>ui.display_timezone</code></div></td><td>enumeration</td><td><code>etc/utc</code></td><td>the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000024.2-upgrading-to-1000024.3-step-016</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000024.2-upgrading-to-1000024.3-step-020</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
</tbody>
</table>
11 changes: 11 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,15 @@ const (
// policies.
V24_3_MaybePreventUpgradeForCoreLicenseDeprecation

// V24_3_UseRACV2WithV1EntryEncoding is the earliest version which supports
// ranges using replication flow control v2, still with v1 entry encoding.
V24_3_UseRACV2WithV1EntryEncoding

// V24_3_UseRACV2Full is the earliest version which supports ranges using
// replication flow control v2, with v2 entry encoding. Replication flow
// control v1 is unsupported at this version.
V24_3_UseRACV2Full

// *************************************************
// Step (1) Add new versions above this comment.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -302,6 +311,8 @@ var versionTable = [numKeys]roachpb.Version{
V24_3_AdvanceCommitIndexViaMsgApps: {Major: 24, Minor: 2, Internal: 12},
V24_3_SQLInstancesAddDraining: {Major: 24, Minor: 2, Internal: 14},
V24_3_MaybePreventUpgradeForCoreLicenseDeprecation: {Major: 24, Minor: 2, Internal: 16},
V24_3_UseRACV2WithV1EntryEncoding: {Major: 24, Minor: 2, Internal: 18},
V24_3_UseRACV2Full: {Major: 24, Minor: 2, Internal: 20},

// *************************************************
// Step (2): Add new versions above this comment.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/flow_control_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (sh *storeForFlowControl) LookupReplicationAdmissionHandle(
}
// NB: Admit is called soon after this lookup.
level := repl.flowControlV2.GetEnabledWhenLeader()
useV1 := level == replica_rac2.NotEnabledWhenLeader
useV1 := level == kvflowcontrol.V2NotEnabledWhenLeader
var v1Handle kvflowcontrol.ReplicationAdmissionHandle
if useV1 {
repl.mu.Lock()
Expand Down Expand Up @@ -453,7 +453,7 @@ func (h admissionDemuxHandle) Admit(
// can cause either value of admitted. See the comment in
// ReplicationAdmissionHandle.
level := h.r.flowControlV2.GetEnabledWhenLeader()
if level == replica_rac2.NotEnabledWhenLeader {
if level == kvflowcontrol.V2NotEnabledWhenLeader {
return admitted, err
}
// Transition from v1 => v2 happened while waiting. Fall through to wait
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/metamorphic",
"@com_github_cockroachdb_redact//:redact",
Expand Down
53 changes: 53 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -118,6 +119,58 @@ var validateTokenRange = settings.WithValidateInt(func(b int64) error {
return nil
})

// V2EnabledWhenLeaderLevel captures the level at which RACv2 is enabled when
// this replica is the leader.
//
// State transitions are V2NotEnabledWhenLeader =>
// V2EnabledWhenLeaderV1Encoding => V2EnabledWhenLeaderV2Encoding, i.e., the
// level will never regress.
type V2EnabledWhenLeaderLevel = uint32

const (
V2NotEnabledWhenLeader V2EnabledWhenLeaderLevel = iota
V2EnabledWhenLeaderV1Encoding
V2EnabledWhenLeaderV2Encoding
)

// GetV2EnabledWhenLeaderLevel returns the level at which RACV2 is enabled when
// this replica is the leader.
//
// The level is determined by the cluster version, and is ratcheted up as the
// cluster version advances. The level is used to determine:
//
// 1. Whether the leader should use the RACv2 protocol.
// 2. Whether the leader should use the V1 or V2 entry encoding iff (1) is
// true.
//
// Upon the leader first seeing V24_3_UseRACV2WithV1EntryEncoding, it will
// create a RangeController and use the V1 entry encoding, operating in Push
// mode. Upon the leader first seeing V24_3_UseRACV2Full, it will continue
// using the RACV2 protocol, but will switch to the V2 entry encoding. Note the
// necessary migration for V2NotEnabledWhenLeader =>
// V2EnabledWhenLeaderV1Encoding occurs before anything else in
// kvserver.handleRaftReadyRaftMuLocked.
//
// TODO(kvoli,sumeerbhola,pav-kv): When we introduce pull mode (and associated
// cluster setting), update this comment to mention that the cluster setting is
// only relevant when at V2EnabledWhenLeaderV2Encoding level.
func GetV2EnabledWhenLeaderLevel(
ctx context.Context, st *cluster.Settings, knobs *TestingKnobs,
) V2EnabledWhenLeaderLevel {
if knobs != nil && knobs.OverrideV2EnabledWhenLeaderLevel != nil {
return knobs.OverrideV2EnabledWhenLeaderLevel()
}
// TODO(kvoli): Enable once #130619 merges and tests affected by enabling v2
// are addressed:
// if st.Version.IsActive(ctx, clusterversion.V24_3_UseRACV2Full) {
// return V2EnabledWhenLeaderV2Encoding
// }
// if st.Version.IsActive(ctx, clusterversion.V24_3_UseRACV2WithV1EntryEncoding) {
// return V2EnabledWhenLeaderV1Encoding
// }
return V2NotEnabledWhenLeader
}

// Stream models the stream over which we replicate data traffic, the
// transmission for which we regulate using flow control. It's segmented by the
// specific store the traffic is bound for and the tenant driving it. Despite
Expand Down
30 changes: 9 additions & 21 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,19 +183,6 @@ type RangeControllerFactory interface {
New(ctx context.Context, state rangeControllerInitState) rac2.RangeController
}

// EnabledWhenLeaderLevel captures the level at which RACv2 is enabled when
// this replica is the leader.
//
// State transitions are NotEnabledWhenLeader => EnabledWhenLeaderV1Encoding
// => EnabledWhenLeaderV2Encoding, i.e., the level will never regress.
type EnabledWhenLeaderLevel = uint32

const (
NotEnabledWhenLeader EnabledWhenLeaderLevel = iota
EnabledWhenLeaderV1Encoding
EnabledWhenLeaderV2Encoding
)

// ProcessorOptions are specified when creating a new Processor.
type ProcessorOptions struct {
// Various constant fields that are duplicated from Replica, since we
Expand All @@ -216,7 +203,7 @@ type ProcessorOptions struct {
Settings *cluster.Settings
EvalWaitMetrics *rac2.EvalWaitMetrics

EnabledWhenLeaderLevel EnabledWhenLeaderLevel
EnabledWhenLeaderLevel kvflowcontrol.V2EnabledWhenLeaderLevel
Knobs *kvflowcontrol.TestingKnobs
}

Expand Down Expand Up @@ -289,14 +276,14 @@ type Processor interface {
// This may be a noop if the level has already been reached.
//
// raftMu is held.
SetEnabledWhenLeaderRaftMuLocked(ctx context.Context, level EnabledWhenLeaderLevel)
SetEnabledWhenLeaderRaftMuLocked(ctx context.Context, level kvflowcontrol.V2EnabledWhenLeaderLevel)
// GetEnabledWhenLeader returns the current level. It may be used in
// highly concurrent settings at the leaseholder, when waiting for eval,
// and when encoding a proposal. Note that if the leaseholder is not the
// leader and the leader has switched to a higher level, there is no harm
// done, since the leaseholder can continue waiting for v1 tokens and use
// the v1 entry encoding.
GetEnabledWhenLeader() EnabledWhenLeaderLevel
GetEnabledWhenLeader() kvflowcontrol.V2EnabledWhenLeaderLevel

// OnDescChangedLocked provides a possibly updated RangeDescriptor. The
// tenantID passed in all calls must be the same.
Expand Down Expand Up @@ -502,7 +489,7 @@ type processorImpl struct {
// enabledWhenLeader indicates the RACv2 mode of operation when this replica
// is the leader. Atomic value, for serving GetEnabledWhenLeader. Updated only
// while holding raftMu. Can be read non-atomically if raftMu is held.
enabledWhenLeader EnabledWhenLeaderLevel
enabledWhenLeader kvflowcontrol.V2EnabledWhenLeaderLevel

v1EncodingPriorityMismatch log.EveryN
}
Expand Down Expand Up @@ -547,14 +534,15 @@ func (p *processorImpl) OnDestroyRaftMuLocked(ctx context.Context) {

// SetEnabledWhenLeaderRaftMuLocked implements Processor.
func (p *processorImpl) SetEnabledWhenLeaderRaftMuLocked(
ctx context.Context, level EnabledWhenLeaderLevel,
ctx context.Context, level kvflowcontrol.V2EnabledWhenLeaderLevel,
) {
p.opts.Replica.RaftMuAssertHeld()
if p.destroyed || p.enabledWhenLeader >= level {
return
}
atomic.StoreUint32(&p.enabledWhenLeader, level)
if level != EnabledWhenLeaderV1Encoding || p.desc.replicas == nil {
if level != kvflowcontrol.V2EnabledWhenLeaderV1Encoding ||
p.desc.replicas == nil {
return
}
// May need to create RangeController.
Expand All @@ -576,7 +564,7 @@ func (p *processorImpl) SetEnabledWhenLeaderRaftMuLocked(
}

// GetEnabledWhenLeader implements Processor.
func (p *processorImpl) GetEnabledWhenLeader() EnabledWhenLeaderLevel {
func (p *processorImpl) GetEnabledWhenLeader() kvflowcontrol.V2EnabledWhenLeaderLevel {
return atomic.LoadUint32(&p.enabledWhenLeader)
}

Expand Down Expand Up @@ -693,7 +681,7 @@ func (p *processorImpl) makeStateConsistentRaftMuLocked(
return
}
// Is the leader.
if p.enabledWhenLeader == NotEnabledWhenLeader {
if p.enabledWhenLeader == kvflowcontrol.V2NotEnabledWhenLeader {
return
}
if p.leader.rc != nil && termChanged {
Expand Down
22 changes: 12 additions & 10 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func TestProcessorBasic(t *testing.T) {
var st *cluster.Settings
var p *processorImpl
tenantID := roachpb.MustMakeTenantID(4)
reset := func(enabled EnabledWhenLeaderLevel) {
reset := func(enabled kvflowcontrol.V2EnabledWhenLeaderLevel) {
b.Reset()
r = newTestReplica(&b)
sched = testRaftScheduler{b: &b}
Expand Down Expand Up @@ -533,31 +533,33 @@ func parseAdmissionPriority(t *testing.T, td *datadriven.TestData) admissionpb.W
return admissionpb.NormalPri
}

func parseEnabledLevel(t *testing.T, td *datadriven.TestData) EnabledWhenLeaderLevel {
func parseEnabledLevel(
t *testing.T, td *datadriven.TestData,
) kvflowcontrol.V2EnabledWhenLeaderLevel {
if td.HasArg("enabled-level") {
var str string
td.ScanArgs(t, "enabled-level", &str)
switch str {
case "not-enabled":
return NotEnabledWhenLeader
return kvflowcontrol.V2NotEnabledWhenLeader
case "v1-encoding":
return EnabledWhenLeaderV1Encoding
return kvflowcontrol.V2EnabledWhenLeaderV1Encoding
case "v2-encoding":
return EnabledWhenLeaderV2Encoding
return kvflowcontrol.V2EnabledWhenLeaderV2Encoding
default:
t.Fatalf("unrecoginized level %s", str)
}
}
return NotEnabledWhenLeader
return kvflowcontrol.V2NotEnabledWhenLeader
}

func enabledLevelString(enabledLevel EnabledWhenLeaderLevel) string {
func enabledLevelString(enabledLevel kvflowcontrol.V2EnabledWhenLeaderLevel) string {
switch enabledLevel {
case NotEnabledWhenLeader:
case kvflowcontrol.V2NotEnabledWhenLeader:
return "not-enabled"
case EnabledWhenLeaderV1Encoding:
case kvflowcontrol.V2EnabledWhenLeaderV1Encoding:
return "v1-encoding"
case EnabledWhenLeaderV2Encoding:
case kvflowcontrol.V2EnabledWhenLeaderV2Encoding:
return "v2-encoding"
}
return "unknown-level"
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type TestingKnobs struct {
// OverrideTokenDeduction is used to override how many tokens are deducted
// post-evaluation.
OverrideTokenDeduction func() Tokens
// OverrideV2EnabledWhenLeaderLevel is used to override the level at which
// RACv2 is enabled when a replica is the leader.
OverrideV2EnabledWhenLeaderLevel func() V2EnabledWhenLeaderLevel
}

// TestingKnobsV1 are the testing knobs that appply to replication flow control
Expand Down
10 changes: 2 additions & 8 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -328,7 +329,7 @@ type Replica struct {
// being applied to the state machine.
bytesAccount logstore.BytesAccount

flowControlLevel replica_rac2.EnabledWhenLeaderLevel
flowControlLevel kvflowcontrol.V2EnabledWhenLeaderLevel

// Scratch for populating RaftEvent for flowControlV2.
msgAppScratchForFlowControl map[roachpb.ReplicaID][]raftpb.Message
Expand Down Expand Up @@ -2527,13 +2528,6 @@ func (r *Replica) GetMutexForTesting() *ReplicaMutex {
return &r.mu.ReplicaMutex
}

func racV2EnabledWhenLeaderLevel(
ctx context.Context, st *cluster.Settings,
) replica_rac2.EnabledWhenLeaderLevel {
// TODO(sumeer): implement fully, once all the dependencies are implemented.
return replica_rac2.NotEnabledWhenLeader
}

// maybeEnqueueProblemRange will enqueue the replica for processing into the
// replicate queue iff:
//
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
Expand Down Expand Up @@ -224,7 +225,8 @@ func newUninitializedReplicaWithoutRaftGroup(
makeStoreFlowControlHandleFactory(r.store),
r.store.TestingKnobs().FlowControlTestingKnobs,
)
r.raftMu.flowControlLevel = racV2EnabledWhenLeaderLevel(r.raftCtx, store.cfg.Settings)
r.raftMu.flowControlLevel = kvflowcontrol.GetV2EnabledWhenLeaderLevel(
r.raftCtx, store.ClusterSettings(), store.TestingKnobs().FlowControlTestingKnobs)
r.raftMu.msgAppScratchForFlowControl = map[roachpb.ReplicaID][]raftpb.Message{}
r.flowControlV2 = replica_rac2.NewProcessor(replica_rac2.ProcessorOptions{
NodeID: store.NodeID(),
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (r *Replica) evalAndPropose(
}

func (r *Replica) encodePriorityForRACv2() bool {
return r.flowControlV2.GetEnabledWhenLeader() == replica_rac2.EnabledWhenLeaderV2Encoding
return r.flowControlV2.GetEnabledWhenLeader() == kvflowcontrol.V2EnabledWhenLeaderV2Encoding
}

// propose encodes a command, starts tracking it, and proposes it to Raft.
Expand Down Expand Up @@ -833,11 +833,12 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// Before doing anything, including calling Ready(), see if we need to
// ratchet up the flow control level. This code will go away when RACv1 =>
// RACv2 transition is complete and RACv1 code is removed.
if r.raftMu.flowControlLevel < replica_rac2.EnabledWhenLeaderV2Encoding {
if r.raftMu.flowControlLevel < kvflowcontrol.V2EnabledWhenLeaderV2Encoding {
// Not already at highest level.
level := racV2EnabledWhenLeaderLevel(ctx, r.store.cfg.Settings)
level := kvflowcontrol.GetV2EnabledWhenLeaderLevel(
ctx, r.store.ClusterSettings(), r.store.TestingKnobs().FlowControlTestingKnobs)
if level > r.raftMu.flowControlLevel {
if r.raftMu.flowControlLevel == replica_rac2.NotEnabledWhenLeader {
if r.raftMu.flowControlLevel == kvflowcontrol.V2NotEnabledWhenLeader {
func() {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down Expand Up @@ -1938,7 +1939,7 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
FromReplica: fromReplica,
Message: msg,
RangeStartKey: startKey, // usually nil
UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() >= replica_rac2.EnabledWhenLeaderV1Encoding,
UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() >= kvflowcontrol.V2EnabledWhenLeaderV1Encoding,
}
// For RACv2, annotate successful MsgAppResp messages with the vector of
// admitted log indices, by priority.
Expand Down
Loading