From 9d9e279c07190f610b39cda24617c7b66a131444 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 16 Sep 2024 12:24:46 -0400 Subject: [PATCH] kvserver: add rac2 v1 integration tests Introduce several tests in `flow_control_integration_test.go`, mirroring the existing tests but applied to the replication flow control v2 machinery. Part of: cockroachdb#130187 Release note: None --- .../kvserver/flow_control_integration_test.go | 1781 ++++++++++++++++- .../flow_control_replica_integration.go | 19 +- pkg/kv/kvserver/flow_control_stores.go | 4 + pkg/kv/kvserver/kvadmission/BUILD.bazel | 1 + pkg/kv/kvserver/kvadmission/kvadmission.go | 3 +- .../kvflowhandle/kvflowhandle.go | 4 +- .../kvflowtokentracker/tracker.go | 2 +- .../kvflowtokentracker/tracker_test.go | 2 +- .../kvflowcontrol/rac2/range_controller.go | 11 +- .../kvflowcontrol/replica_rac2/processor.go | 18 +- .../replica_rac2/testdata/processor | 1 + .../kvserver/kvflowcontrol/testing_knobs.go | 17 +- pkg/kv/kvserver/replica.go | 4 +- pkg/kv/kvserver/replica_init.go | 1 + pkg/kv/kvserver/replica_raft.go | 11 +- pkg/kv/kvserver/store.go | 1 + .../admission_post_split_merge | 138 ++ .../flow_control_integration_v2/basic | 89 + .../blocked_admission | 97 + .../class_prioritization | 91 + .../flow_control_integration_v2/crashed_node | 82 + .../granter_admit_one_by_one | 74 + .../leader_not_leaseholder | 4 + .../raft_membership | 124 ++ .../raft_membership_remove_self | 106 + .../flow_control_integration_v2/raft_snapshot | 209 ++ .../flow_control_integration_v2/split_merge | 113 ++ .../transfer_lease | 46 + 28 files changed, 2997 insertions(+), 56 deletions(-) create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/admission_post_split_merge create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/basic create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/blocked_admission create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/class_prioritization create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/crashed_node create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/granter_admit_one_by_one create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/leader_not_leaseholder create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership_remove_self create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_snapshot create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/split_merge create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/transfer_lease diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 0c9fc216260a..6ea0bddba5e9 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -671,8 +671,10 @@ func TestFlowControlCrashedNode(t *testing.T) { Store: &kvserver.StoreTestingKnobs{ FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ UseOnlyForScratchRanges: true, - MaintainStreamsForBrokenRaftTransport: func() bool { - return maintainStreamsForBrokenRaftTransport.Load() + V1: kvflowcontrol.TestingKnobsV1{ + MaintainStreamsForBrokenRaftTransport: func() bool { + return maintainStreamsForBrokenRaftTransport.Load() + }, }, }, }, @@ -804,14 +806,16 @@ func TestFlowControlRaftSnapshot(t *testing.T) { // deductions/returns. return kvflowcontrol.Tokens(1 << 20 /* 1MiB */) }, - MaintainStreamsForBehindFollowers: func() bool { - return maintainStreamsForBehindFollowers.Load() - }, - MaintainStreamsForInactiveFollowers: func() bool { - return maintainStreamsForInactiveFollowers.Load() - }, - MaintainStreamsForBrokenRaftTransport: func() bool { - return maintainStreamsForBrokenRaftTransport.Load() + V1: kvflowcontrol.TestingKnobsV1{ + MaintainStreamsForBehindFollowers: func() bool { + return maintainStreamsForBehindFollowers.Load() + }, + MaintainStreamsForInactiveFollowers: func() bool { + return maintainStreamsForInactiveFollowers.Load() + }, + MaintainStreamsForBrokenRaftTransport: func() bool { + return maintainStreamsForBrokenRaftTransport.Load() + }, }, }, }, @@ -1092,8 +1096,10 @@ func TestFlowControlRaftTransportBreak(t *testing.T) { Store: &kvserver.StoreTestingKnobs{ FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ UseOnlyForScratchRanges: true, - MaintainStreamsForInactiveFollowers: func() bool { - return maintainStreamsForInactiveFollowers.Load() + V1: kvflowcontrol.TestingKnobsV1{ + MaintainStreamsForInactiveFollowers: func() bool { + return maintainStreamsForInactiveFollowers.Load() + }, }, }, }, @@ -1212,6 +1218,7 @@ func TestFlowControlRaftTransportCulled(t *testing.T) { // differences. return kvflowcontrol.Tokens(1 << 20 /* 1MiB */) }, + V1: kvflowcontrol.TestingKnobsV1{}, }, }, AdmissionControl: &admission.TestingKnobs{ @@ -1717,6 +1724,7 @@ func TestFlowControlQuiescedRange(t *testing.T) { Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, OverrideTokenDeduction: func() kvflowcontrol.Tokens { // This test asserts on the exact values of tracked // tokens. In non-test code, the tokens deducted are @@ -1725,7 +1733,6 @@ func TestFlowControlQuiescedRange(t *testing.T) { // differences. return kvflowcontrol.Tokens(1 << 20 /* 1MiB */) }, - UseOnlyForScratchRanges: true, }, }, AdmissionControl: &admission.TestingKnobs{ @@ -1855,6 +1862,7 @@ func TestFlowControlUnquiescedRange(t *testing.T) { Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, OverrideTokenDeduction: func() kvflowcontrol.Tokens { // This test asserts on the exact values of tracked // tokens. In non-test code, the tokens deducted are @@ -1863,12 +1871,13 @@ func TestFlowControlUnquiescedRange(t *testing.T) { // differences. return kvflowcontrol.Tokens(1 << 20 /* 1MiB */) }, - UseOnlyForScratchRanges: true, - MaintainStreamsForInactiveFollowers: func() bool { - // This test deals with quiesced ranges where - // followers have no activity. We don't want to - // disconnect streams due to this inactivity. - return true + V1: kvflowcontrol.TestingKnobsV1{ + MaintainStreamsForInactiveFollowers: func() bool { + // This test deals with quiesced ranges where + // followers have no activity. We don't want to + // disconnect streams due to this inactivity. + return true + }, }, }, }, @@ -2228,14 +2237,6 @@ func TestFlowControlGranterAdmitOneByOne(t *testing.T) { Store: &kvserver.StoreTestingKnobs{ FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ UseOnlyForScratchRanges: true, - MaintainStreamsForBehindFollowers: func() bool { - // TODO(irfansharif): This test is flakey without - // this change -- we disconnect one stream or - // another because raft says we're no longer - // actively replicating through it. Why? Something - // to do with the many proposals we're issuing? - return true - }, OverrideTokenDeduction: func() kvflowcontrol.Tokens { // This test asserts on the exact values of tracked // tokens. In non-test code, the tokens deducted are @@ -2244,6 +2245,16 @@ func TestFlowControlGranterAdmitOneByOne(t *testing.T) { // differences. return kvflowcontrol.Tokens(1 << 10 /* 1KiB */) }, + V1: kvflowcontrol.TestingKnobsV1{ + MaintainStreamsForBehindFollowers: func() bool { + // TODO(irfansharif): This test is flakey without + // this change -- we disconnect one stream or + // another because raft says we're no longer + // actively replicating through it. Why? Something + // to do with the many proposals we're issuing? + return true + }, + }, }, }, AdmissionControl: &admission.TestingKnobs{ @@ -2477,3 +2488,1719 @@ func (h *flowControlTestHelper) put( func (h *flowControlTestHelper) close(filename string) { echotest.Require(h.t, h.buf.String(), datapathutils.TestDataPath(h.t, "flow_control_integration", filename)) } + +// TOOD(kvoli): The following tests are a port of the v2 flow control tests from +// above. We aim to port as many as possible. The tests are: +// +// - [x] TestFlowControlBasic +// - [x] TestFlowControlRangeSplitMerge +// - [x] TestFlowControlBlockedAdmission +// - [x] TestFlowControlAdmissionPostSplitMerge +// - [x] TestFlowControlCrashedNode +// - [-] TestFlowControlRaftSnapshot +// - [-] TestFlowControlRaftTransportBreak +// - [-] TestFlowControlRaftTransportCulled +// - [x] TestFlowControlRaftMembership +// - [x] TestFlowControlRaftMembershipRemoveSelf +// - [x] TestFlowControlClassPrioritization +// - [-] TestFlowControlQuiescedRange +// - [-] TestFlowControlUnquiescedRange +// - [x] TestFlowControlTransferLease +// - [-] TestFlowControlLeaderNotLeaseholder +// - [x] TestFlowControlGranterAdmitOneByOne + +// TestFlowControlBasicV2 runs a basic end-to-end test of the v2 kvflowcontrol +// machinery, replicating + admitting a single 1MiB regular write. The vmodule +// flags for running these tests with full logging are: +// +// --vmodule='replica_raft=1,replica_proposal_buf=1,raft_transport=2, +// kvadmission=1,work_queue=1,replica_flow_control=1, +// tracker=1,client_raft_helpers_test=1,range_controller=2, +// token_counter=2,token_tracker=2,processor=2' +func TestFlowControlBasicV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testutils.RunTrueAndFalse(t, "always-enqueue", func(t *testing.T, alwaysEnqueue bool) { + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: alwaysEnqueue, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + // Setup the test state with 3 voters, one on each of the three + // node/stores. + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("basic") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.enableVerboseRaftMsgLoggingForRange(desc) + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- Flow token metrics, before issuing the regular 1MiB replicated write.`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Issuing + admitting a regular 1MiB, triply replicated write...)`) + h.log("sending put request") + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + h.log("sent put request") + + h.waitForAllTokensReturned(ctx, 3) + h.comment(` +-- Stream counts as seen by n1 post-write. We should see three {regular,elastic} +-- streams given there are three nodes and we're using a replication factor of +-- three. +`) + h.query(n1, ` + SELECT name, value + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%stream%' +ORDER BY name ASC; +`) + + h.comment(`-- Another view of the stream count, using /inspectz-backed vtables.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +HAVING count(*) = 3 +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(` +-- Flow token metrics from n1 after issuing the regular 1MiB replicated write, +-- and it being admitted on n1, n2 and n3. We should see 3*1MiB = 3MiB of +-- {regular,elastic} tokens deducted and returned, and {8*3=24MiB,16*3=48MiB} of +-- {regular,elastic} tokens available. Everything should be accounted for. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + }) +} + +// TestFlowControlRangeSplitMergeV2 walks through what happens to flow tokens +// when a range splits/merges. +func TestFlowControlRangeSplitMergeV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("split_merge") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.log("sending put request to pre-split range") + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + h.log("sent put request to pre-split range") + + h.waitForAllTokensReturned(ctx, 3) + h.comment(` +-- Flow token metrics from n1 after issuing + admitting the regular 1MiB 3x +-- replicated write to the pre-split range. There should be 3MiB of +-- {regular,elastic} tokens {deducted,returned}. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Splitting range.)`) + left, right := tc.SplitRangeOrFatal(t, k.Next()) + h.waitForConnectedStreams(ctx, right.RangeID, 3) + + h.log("sending 2MiB put request to post-split LHS") + h.put(ctx, k, 2<<20 /* 2MiB */, admissionpb.NormalPri) + h.log("sent 2MiB put request to post-split LHS") + + h.log("sending 3MiB put request to post-split RHS") + h.put(ctx, roachpb.Key(right.StartKey), 3<<20 /* 3MiB */, admissionpb.NormalPri) + h.log("sent 3MiB put request to post-split RHS") + + h.waitForAllTokensReturned(ctx, 3) + h.comment(` +-- Flow token metrics from n1 after further issuing 2MiB and 3MiB writes to +-- post-split LHS and RHS ranges respectively. We should see 15MiB extra tokens +-- {deducted,returned}, which comes from (2MiB+3MiB)*3=15MiB. So we stand at +-- 3MiB+15MiB=18MiB now. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe the newly split off replica, with its own three streams.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(`-- (Merging ranges.)`) + merged := tc.MergeRangesOrFatal(t, left.StartKey.AsRawKey()) + + h.log("sending 4MiB put request to post-merge range") + h.put(ctx, roachpb.Key(merged.StartKey), 4<<20 /* 4MiB */, admissionpb.NormalPri) + h.log("sent 4MiB put request to post-merged range") + + h.waitForAllTokensReturned(ctx, 3) + h.comment(` +-- Flow token metrics from n1 after issuing 4MiB of regular replicated writes to +-- the post-merged range. We should see 12MiB extra tokens {deducted,returned}, +-- which comes from 4MiB*3=12MiB. So we stand at 18MiB+12MiB=30MiB now. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe only the merged replica with its own three streams.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") +} + +// TestFlowControlBlockedAdmissionV2 tests token tracking behavior by explicitly +// blocking below-raft admission. +func TestFlowControlBlockedAdmissionV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + + settings := cluster.MakeTestingClusterSettings() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("blocked_admission") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.enableVerboseRaftMsgLoggingForRange(desc) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 5 regular 1MiB, 3x replicated write that's not admitted.)`) + h.log("sending put requests") + for i := 0; i < 5; i++ { + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + } + h.log("sent put requests") + + h.comment(` +-- Flow token metrics from n1 after issuing 5 regular 1MiB 3x replicated writes +-- that are yet to get admitted. We see 5*1MiB*3=15MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe the total tracked tokens per-stream on n1.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- Observe the individual tracked tokens per-stream on the scratch range.`) + h.query(n1, ` + SELECT range_id, store_id, priority, crdb_internal.humanize_bytes(tokens::INT8) + FROM crdb_internal.kv_flow_token_deductions_v2 +`, "range_id", "store_id", "priority", "tokens") + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3) // wait for admission + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. We see 15MiB returns of +-- {regular,elastic} tokens, and the available capacities going back to what +-- they were. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// TestFlowControlAdmissionPostSplitMergeV2 walks through what happens with flow +// tokens when a range after undergoes splits/merges. It does this by blocking +// and later unblocking below-raft admission, verifying: +// - tokens for the RHS are released at the post-merge subsuming leaseholder, +// - admission for the RHS post-merge does not cause a double return of tokens, +// - admission for the LHS can happen post-merge, +// - admission for the LHS and RHS can happen post-split. +func TestFlowControlAdmissionPostSplitMergeV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + settings := cluster.MakeTestingClusterSettings() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("admission_post_split_merge") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.log("sending put request to pre-split range") + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + h.put(ctx, k.Next(), 1<<20 /* 1MiB */, admissionpb.NormalPri) + h.log("sent put request to pre-split range") + + h.comment(` +-- Flow token metrics from n1 after issuing a regular 2*1MiB 3x replicated write +-- that are yet to get admitted. We see 2*3*1MiB=6MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. The 2*1MiB writes +-- happened on what is soon going to be the LHS and RHS of a range being split. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Splitting range.)`) + left, right := tc.SplitRangeOrFatal(t, k.Next()) + h.waitForConnectedStreams(ctx, right.RangeID, 3) + + h.log("sending 2MiB put request to post-split LHS") + h.put(ctx, k, 2<<20 /* 2MiB */, admissionpb.NormalPri) + h.log("sent 2MiB put request to post-split LHS") + + h.log("sending 3MiB put request to post-split RHS") + h.put(ctx, roachpb.Key(right.StartKey), 3<<20 /* 3MiB */, admissionpb.NormalPri) + h.log("sent 3MiB put request to post-split RHS") + + h.comment(` +-- Flow token metrics from n1 after further issuing 2MiB and 3MiB writes to +-- post-split LHS and RHS ranges respectively. We should see 15MiB extra tokens +-- deducted which comes from (2MiB+3MiB)*3=15MiB. So we stand at +-- 6MiB+15MiB=21MiB now. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe the newly split off replica, with its own three streams.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(`-- (Merging ranges.)`) + merged := tc.MergeRangesOrFatal(t, left.StartKey.AsRawKey()) + + h.log("sending 4MiB put request to post-merge range") + h.put(ctx, roachpb.Key(merged.StartKey), 4<<20 /* 4MiB */, admissionpb.NormalPri) + h.log("sent 4MiB put request to post-merged range") + + h.comment(` +-- Flow token metrics from n1 after issuing 4MiB of regular replicated writes to +-- the post-merged range. We should see 12MiB extra tokens deducted which comes +-- from 4MiB*3=12MiB. So we stand at 21MiB+12MiB=33MiB tokens deducted now. The +-- RHS of the range is gone now, and the previously 3*3MiB=9MiB of tokens +-- deducted for it are released at the subsuming LHS leaseholder. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe only the merged replica with its own three streams.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3) // wait for admission + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. We see all outstanding +-- {regular,elastic} tokens returned, including those from: +-- - the LHS before the merge, and +-- - the LHS and RHS before the original split. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// TestFlowControlCrashedNodeV2 tests flow token behavior in the presence of +// crashed nodes. +func TestFlowControlCrashedNodeV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + kvserver.ExpirationLeasesOnly.Override(ctx, &settings.SV, true) + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + RaftConfig: base.RaftConfig{ + // Suppress timeout-based elections. This test doesn't want to + // deal with leadership changing hands. + RaftElectionTimeoutTicks: 1000000, + // Reduce the RangeLeaseDuration to speeds up failure detection + // below. + RangeLeaseDuration: time.Second, + }, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return true + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("crashed_node") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0)) + h.waitForConnectedStreams(ctx, desc.RangeID, 2) + + h.comment(`-- (Issuing regular 5x1MiB, 2x replicated writes that are not admitted.)`) + h.log("sending put requests") + for i := 0; i < 5; i++ { + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + } + h.log("sent put requests") + + h.comment(` +-- Flow token metrics from n1 after issuing 5 regular 1MiB 2x replicated writes +-- that are yet to get admitted. We see 5*1MiB*2=10MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + h.comment(`-- Observe the per-stream tracked tokens on n1, before n2 is crashed.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- (Crashing n2)`) + tc.StopServer(1) + h.waitForConnectedStreams(ctx, desc.RangeID, 1) + + h.comment(` +-- Observe the per-stream tracked tokens on n1, after n2 crashed. We're no +-- longer tracking the 5MiB held by n2. +`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(` +-- Flow token metrics from n1 after n2 crashed. Observe that we've returned the +-- 5MiB previously held by n2. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// TestFlowControlRaftSnapshotV2 tests flow token behavior when one replica +// needs to be caught up via raft snapshot. +func TestFlowControlRaftSnapshotV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const numServers int = 5 + stickyServerArgs := make(map[int]base.TestServerArgs) + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + for i := 0; i < numServers; i++ { + stickyServerArgs[i] = base.TestServerArgs{ + Settings: settings, + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), + }, + }, + RaftConfig: base.RaftConfig{ + // Suppress timeout-based elections. This test doesn't want to + // deal with leadership changing hands. + RaftElectionTimeoutTicks: 1000000, + }, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyVFSRegistry: fs.NewStickyRegistry(), + }, + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + OverrideTokenDeduction: func() kvflowcontrol.Tokens { + // This test makes use of (small) increment + // requests, but wants to see large token + // deductions/returns. + return kvflowcontrol.Tokens(1 << 20 /* 1MiB */) + }, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + RaftTransport: &kvserver.RaftTransportTestingKnobs{ + OverrideIdleTimeout: func() time.Duration { + // Effectively disable token returns due to underlying + // raft transport streams disconnecting due to + // inactivity. + return time.Hour + }, + }, + }, + } + } + + tc := testcluster.StartTestCluster(t, numServers, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: stickyServerArgs, + }) + defer tc.Stopper().Stop(ctx) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("raft_snapshot") + + store := tc.GetFirstStoreFromServer(t, 0) + + incA := int64(5) + incB := int64(7) + incAB := incA + incB + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + tc.AddVotersOrFatal(t, k, tc.Targets(3, 4)...) + repl := store.LookupReplica(roachpb.RKey(k)) + require.NotNil(t, repl) + h.waitForConnectedStreams(ctx, repl.RangeID, 5) + + // Set up a key to replicate across the cluster. We're going to modify this + // key and truncate the raft logs from that command after killing one of the + // nodes to check that it gets the new value after it comes up. + incArgs := incrementArgs(k, incA) + if _, err := kv.SendWrappedWithAdmission(ctx, tc.Server(0).DB().NonTransactionalSender(), kvpb.Header{}, kvpb.AdmissionHeader{ + Priority: int32(admissionpb.HighPri), + Source: kvpb.AdmissionHeader_FROM_SQL, + }, incArgs); err != nil { + t.Fatal(err) + } + + h.comment(` +-- Flow token metrics from n1 after issuing 1 regular 1MiB 5x replicated write +-- that's not admitted. Since this test is ignoring crashed nodes for token +-- deduction purposes, we see a deduction of 5MiB {regular,elastic} tokens. + `) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + `) + h.comment(` +-- Observe the total tracked tokens per-stream on n1. 1MiB is tracked for n1-n5. + `) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + `, "range_id", "store_id", "total_tracked_tokens") + + tc.WaitForValues(t, k, []int64{incA, incA, incA, incA, incA}) + + h.comment(` +-- (Killing n2 and n3, but preventing their tokens from being returned + +-- artificially allowing tokens to get deducted.)`) + + // Kill stores 1 + 2, increment the key on the other stores and truncate + // their logs to make sure that when store 1 + 2 comes back up they will + // require a snapshot from Raft. + tc.StopServer(1) + tc.StopServer(2) + + h.comment(` +-- Observe the total tracked tokens per-stream on n1. 1MiB is (still) tracked +-- for n1-n5, because they are not in StateSnapshot yet and it have likely been +-- in StateProbe for less than the close timer. + `) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + `, "range_id", "store_id", "total_tracked_tokens") + + h.comment(` +-- (Issuing another 1MiB of 5x replicated writes while n2 and n3 are down and +-- below-raft admission is paused.) +`) + incArgs = incrementArgs(k, incB) + if _, err := kv.SendWrappedWithAdmission(ctx, tc.Server(0).DB().NonTransactionalSender(), kvpb.Header{}, kvpb.AdmissionHeader{ + Priority: int32(admissionpb.HighPri), + Source: kvpb.AdmissionHeader_FROM_SQL, + }, incArgs); err != nil { + t.Fatal(err) + } + + h.comment(` +-- Flow token metrics from n1 after issuing 1 regular 1MiB 5x replicated write +-- that's not admitted. We'll have deducted another 5*1MiB=5MiB worth of tokens. + `) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + `) + h.comment(` +-- Observe the total tracked tokens per-stream on n1. 2MiB is tracked for n1-n5; +-- see last comment for an explanation why we're still deducting for n2, n3. +`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + `, "range_id", "store_id", "total_tracked_tokens") + + tc.WaitForValues(t, k, []int64{incAB, 0 /* stopped */, 0 /* stopped */, incAB, incAB}) + + index := repl.GetLastIndex() + h.comment(`-- (Truncating raft log.)`) + + // Truncate the log at index+1 (log entries < N are removed, so this + // includes the increment). + truncArgs := truncateLogArgs(index+1, repl.GetRangeID()) + if _, err := kv.SendWrappedWithAdmission(ctx, tc.Server(0).DB().NonTransactionalSender(), kvpb.Header{}, kvpb.AdmissionHeader{ + Priority: int32(admissionpb.HighPri), + Source: kvpb.AdmissionHeader_FROM_SQL, + }, truncArgs); err != nil { + t.Fatal(err) + } + + h.comment(`-- (Restarting n2 and n3.)`) + require.NoError(t, tc.RestartServer(1)) + require.NoError(t, tc.RestartServer(2)) + + tc.WaitForValues(t, k, []int64{incAB, incAB, incAB, incAB, incAB}) + + h.comment(` +-- Flow token metrics from n1 after restarting n2 and n3. We've returned the +-- 2MiB previously held by those nodes (2MiB each). We're reacting to it's raft +-- progress state, noting that since we've truncated our log, we need to catch +-- it up via snapshot. So we release all held tokens. + `) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(` +-- Observe the total tracked tokens per-stream on n1. There's nothing tracked +-- for n2 and n3 anymore. +`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + WHERE total_tracked_tokens > 0 +`, "range_id", "store_id", "total_tracked_tokens") + + h.waitForConnectedStreams(ctx, repl.RangeID, 5) + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + + h.waitForAllTokensReturned(ctx, 5) + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. We see the remaining +-- 6MiB of {regular,elastic} tokens returned. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(` +-- Observe the total tracked tokens per-stream on n1; there should be nothing. +`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- Another view of tokens, using /inspectz-backed vtables.`) + h.query(n1, ` +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; +`, "range_id", "eval_regular_available", "eval_elastic_available") +} + +// TestFlowControlRaftMembershipV2 tests flow token behavior when the raft +// membership changes. +func TestFlowControlRaftMembershipV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("raft_membership") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- (Adding a voting replica on n4.)`) + tc.AddVotersOrFatal(t, k, tc.Target(3)) + h.waitForConnectedStreams(ctx, desc.RangeID, 4) + + h.comment(` +-- Observe the total tracked tokens per-stream on n1. s1-s3 should have 1MiB +-- tracked each, and s4 should have none.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- (Issuing 1x1MiB, 4x replicated write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Observe the individual tracked tokens per-stream on the scratch range. s1-s3 +-- should have 2MiB tracked (they've observed 2x1MiB writes), s4 should have +-- 1MiB. +`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- (Removing voting replica from n3.)`) + tc.RemoveVotersOrFatal(t, k, tc.Target(2)) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Adding non-voting replica to n5.)`) + tc.AddNonVotersOrFatal(t, k, tc.Target(4)) + h.waitForConnectedStreams(ctx, desc.RangeID, 4) + + h.comment(`-- (Issuing 1x1MiB, 4x replicated write (w/ one non-voter) that's not admitted.`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Observe the individual tracked tokens per-stream on the scratch range. s1-s2 +-- should have 3MiB tracked (they've observed 3x1MiB writes), there should be +-- no s3 since it was removed, s4 and s5 should have 2MiB and 1MiB +-- respectively. +`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 5) + + h.comment(`-- Observe that there no tracked tokens across s1,s2,s4,s5.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. All {regular,elastic} +-- tokens deducted are returned, including from when s3 was removed as a raft +-- member. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// TestFlowControlRaftMembershipRemoveSelf tests flow token behavior when the +// raft leader removes itself from the raft group. +func TestFlowControlRaftMembershipRemoveSelfV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testutils.RunTrueAndFalse(t, "transfer-lease-first", func(t *testing.T, transferLeaseFirst bool) { + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + n2 := sqlutils.MakeSQLRunner(tc.ServerConn(1)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + // Note this test behaves identically independent of we transfer the lease + // first. + defer h.close("raft_membership_remove_self") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + + // Make sure the lease is on n1 and that we're triply connected. + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0)) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- (Replacing current raft leader on n1 in raft group with new n4 replica.)`) + testutils.SucceedsSoon(t, func() error { + // Relocate range from n1 -> n4. + if err := tc.Servers[2].DB(). + AdminRelocateRange( + context.Background(), desc.StartKey.AsRawKey(), + tc.Targets(1, 2, 3), nil, transferLeaseFirst); err != nil { + return err + } + leaseHolder, err := tc.FindRangeLeaseHolder(desc, nil) + if err != nil { + return err + } + if leaseHolder.Equal(tc.Target(0)) { + return errors.Errorf("expected leaseholder to not be on n1") + } + return nil + }) + h.waitForAllTokensReturned(ctx, 4) + + h.comment(` +-- Flow token metrics from n1 after raft leader removed itself from raft group. +-- All {regular,elastic} tokens deducted are returned. Note that the available +-- tokens increases, as n1 has seen 4 replication streams, s1,s2,s3,s4. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(` +-- n1 should have no connected streams now after transferring the lease to n2. +-- While, n2 should have 3 connected streams to s2,s3,s4. Query the stream count +-- on n1, then on n2. +`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(`-- Observe the stream count on n2.`) + h.query(n2, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 4) + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. Tokens were already +-- returned earlier, so there's no change. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + }) +} + +// TestFlowControlClassPrioritizationV2 shows how tokens are managed for both +// regular and elastic work. It does so by replicating + admitting a single +// 1MiB {regular,elastic} write. +func TestFlowControlClassPrioritizationV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + settings := cluster.MakeTestingClusterSettings() + tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("class_prioritization") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.BulkNormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB elastic 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of elastic tokens with +-- no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated regular write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of {regular,elastic} +-- tokens with no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3) + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. All {regular,elastic} +-- tokens deducted are returned. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// TestFlowControlTransferLeaseV2 tests flow control behavior when the range +// lease is transferred, and the raft leadership along with it. +func TestFlowControlTransferLeaseV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + settings := cluster.MakeTestingClusterSettings() + tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("transfer_lease") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- (Transferring range lease to n2 and allowing leadership to follow.)`) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + testutils.SucceedsSoon(t, func() error { + if leader := tc.GetRaftLeader(t, roachpb.RKey(k)); leader.NodeID() != tc.Target(1).NodeID { + return errors.Errorf("expected raft leadership to transfer to n1, found n%d", leader.NodeID()) + } + return nil + }) + h.waitForAllTokensReturned(ctx, 3) + + h.comment(` +-- Flow token metrics from n1 having lost the lease and raft leadership. All +-- deducted tokens are returned. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) +} + +// TestFlowControlLeaderNotLeaseholderV2 tests flow control behavior when the +// range leaseholder is not the raft leader. +func TestFlowControlLeaderNotLeaseholderV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + settings := cluster.MakeTestingClusterSettings() + kvflowcontrol.Enabled.Override(ctx, &settings.SV, true) + + tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + // Disable leader transfers during leaseholder changes so + // that we can easily create leader-not-leaseholder + // scenarios. + DisableLeaderFollowsLeaseholder: true, + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + n2 := sqlutils.MakeSQLRunner(tc.ServerConn(1)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("leader_not_leaseholder") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- (Transferring only range lease, not raft leadership, to n2.)`) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + require.Equal(t, tc.GetRaftLeader(t, roachpb.RKey(k)).NodeID(), tc.Target(0).NodeID) + + h.comment(` +-- Flow token metrics from n1 having lost the lease but retained raft +-- leadership. No deducted tokens are released. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(` +-- (Issuing another 1x1MiB, 3x replicated write that's not admitted while in +-- this leader != leaseholder state.) +`) + // TODO(kvoli): This write blocks at the store work queue, since granting is + // disabled and synchronous. Why does the v1 test not encounter the same + // problem? The put must be bypassing kvadmission with the handle being + // closed or something. + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Looking at n1's flow token metrics, there's no change. No additional tokens +-- are deducted since the write is not being proposed here. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(` +-- Looking at n2's flow token metrics, there's no activity. n2 never acquired +-- the raft leadership. +`) + h.query(n2, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3) // wait for admission + + h.comment(` +-- All deducted flow tokens are returned back to where the raft leader is. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) +} + +// TestFlowControlGranterAdmitOneByOneV2 is a reproduction for #105185. +// Internal admission code that relied on admitting at most one waiting request +// was in fact admitting more than one, and doing so recursively with call +// stacks as deep as the admit chain. This triggered panics (and is also just +// undesirable, design-wise). This test intentionally queues a 1000+ small +// requests, to that end. +func TestFlowControlGranterAdmitOneByOneV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + settings := cluster.MakeTestingClusterSettings() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + OverrideTokenDeduction: func() kvflowcontrol.Tokens { + // This test asserts on the exact values of tracked + // tokens. In non-test code, the tokens deducted are + // a few bytes off (give or take) from the size of + // the proposals. We don't care about such + // differences. + return kvflowcontrol.Tokens(1 << 10 /* 1KiB */) + }, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + AlwaysTryGrantWhenAdmitted: true, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("granter_admit_one_by_one") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing regular 1024*1KiB, 3x replicated writes that are not admitted.)`) + h.log("sending put requests") + for i := 0; i < 1024; i++ { + h.put(ctx, k, 1<<10 /* 1KiB */, admissionpb.NormalPri) + } + h.log("sent put requests") + + h.comment(` +-- Flow token metrics from n1 after issuing 1024KiB, i.e. 1MiB 3x replicated writes +-- that are yet to get admitted. We see 3*1MiB=3MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe the total tracked tokens per-stream on n1.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3) // wait for admission + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. We see 3MiB returns of +-- {regular,elastic} tokens, and the available capacities going back to what +-- they were. In #105185, by now we would've observed panics. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// rac2TestHelper is a helper for tests that exercise the RACv2 machinery. It +// aims to follow the same pattern as the flowControlTestHelper and is +// therefore useful when comparing behavior between v1 and v2 flow control. +type rac2TestHelper struct { + t *testing.T + tc *testcluster.TestCluster + st *cluster.Settings + buf *strings.Builder + rng *rand.Rand +} + +func newRAC2TestHelper( + t *testing.T, tc *testcluster.TestCluster, st *cluster.Settings, +) *rac2TestHelper { + rng, _ := randutil.NewPseudoRand() + buf := &strings.Builder{} + return &rac2TestHelper{ + t: t, + tc: tc, + buf: buf, + rng: rng, + st: st, + } +} + +func (h *rac2TestHelper) init() { + // Reach into each server's cluster setting and override. This causes any + // registered change callbacks to run immediately, which is important since + // running them with some lag (which happens when using SQL and `SET CLUSTER + // SETTING`) interferes with the later activities in these tests. + for _, s := range h.tc.Servers { + kvflowcontrol.Enabled.Override(context.Background(), &s.ClusterSettings().SV, true) + kvflowcontrol.Mode.Override(context.Background(), &s.ClusterSettings().SV, kvflowcontrol.ApplyToAll) + } +} + +// waitForAllTokensReturned waits for all tokens to be returned across all +// streams. The expected number of streams is passed in as an argument. +func (h *rac2TestHelper) waitForAllTokensReturned(ctx context.Context, expStreamCount int) { + testutils.SucceedsSoon(h.t, func() error { + return h.checkAllTokensReturned(ctx, expStreamCount) + }) +} + +// checkAllTokensReturned checks that all tokens have been returned across all +// streams. It also checks that the expected number of streams are present. The default +func (h *rac2TestHelper) checkAllTokensReturned(ctx context.Context, expStreamCount int) error { + elasticTokensPerStream := kvflowcontrol.ElasticTokensPerStream.Get(&h.st.SV) + regularTokensPerStream := kvflowcontrol.RegularTokensPerStream.Get(&h.st.SV) + streams := h.tc.GetFirstStoreFromServer(h.t, 0).GetStoreConfig().KVFlowStreamTokenProvider.Inspect(ctx) + if len(streams) != expStreamCount { + return fmt.Errorf("expected %d replication streams, got %d [%+v]", expStreamCount, len(streams), streams) + } + + checkTokens := func( + expTokens, actualTokens int64, + stream kvflowcontrol.Stream, + typName string, + ) error { + if actualTokens != expTokens { + return fmt.Errorf("expected %v of %s flow tokens for %v, got %v", + humanize.IBytes(uint64(expTokens)), typName, stream, + humanize.IBytes(uint64(actualTokens)), + ) + } + return nil + } + + for _, stream := range streams { + s := kvflowcontrol.Stream{ + TenantID: stream.TenantID, + StoreID: stream.StoreID, + } + if err := checkTokens( + regularTokensPerStream, stream.AvailableEvalRegularTokens, s, "regular eval", + ); err != nil { + return err + } + if err := checkTokens( + elasticTokensPerStream, stream.AvailableEvalElasticTokens, s, "elastic eval", + ); err != nil { + return err + } + if err := checkTokens( + regularTokensPerStream, stream.AvailableSendRegularTokens, s, "regular send", + ); err != nil { + return err + } + if err := checkTokens( + elasticTokensPerStream, stream.AvailableSendElasticTokens, s, "elastic send", + ); err != nil { + return err + } + } + return nil +} + +// waitForConnectedStreams waits for the given range to have the expected +// number of connected streams (replicas with a send stream in +// rac2.RangeController). +func (h *rac2TestHelper) waitForConnectedStreams( + ctx context.Context, rangeID roachpb.RangeID, expConnectedStreams int, +) { + testutils.SucceedsSoon(h.t, func() error { + state, found := kvserver.MakeStoresForRACv2( + h.tc.Server(0).GetStores().(*kvserver.Stores)).LookupInspect(rangeID) + if !found { + return fmt.Errorf("handle for %s not found", rangeID) + } + require.True(h.t, found) + if len(state.ConnectedStreams) != expConnectedStreams { + return fmt.Errorf("expected %d connected streams, got %d [%+v]", + expConnectedStreams, len(state.ConnectedStreams), state.ConnectedStreams) + } + return nil + }) +} + +// waitForTotalTrackedTokens waits for the total tracked tokens across all +// streams for the given range to reach the expected value. It is agnostic of +// priority, the total tracked tokens is the aggregate across all priorities. +func (h *rac2TestHelper) waitForTotalTrackedTokens( + ctx context.Context, rangeID roachpb.RangeID, expTotalTrackedTokens int64, +) { + testutils.SucceedsSoon(h.t, func() error { + state, found := kvserver.MakeStoresForRACv2( + h.tc.Server(0).GetStores().(*kvserver.Stores)).LookupInspect(rangeID) + if !found { + return fmt.Errorf("handle for %s not found", rangeID) + } + require.True(h.t, found) + var totalTracked int64 + for _, stream := range state.ConnectedStreams { + for _, tracked := range stream.TrackedDeductions { + totalTracked += tracked.Tokens + } + } + if totalTracked != expTotalTrackedTokens { + return fmt.Errorf("expected to track %d tokens in aggregate, got %d", + kvflowcontrol.Tokens(expTotalTrackedTokens), kvflowcontrol.Tokens(totalTracked)) + } + return nil + }) +} + +// comment appends the comment string to the testdata file buffer. +func (h *rac2TestHelper) comment(comment string) { + if h.buf.Len() > 0 { + h.buf.WriteString("\n\n") + } + + comment = strings.TrimSpace(comment) + h.buf.WriteString(fmt.Sprintf("%s\n", comment)) + h.log(comment) +} + +// log logs the given message if logging is enabled.Vg +func (h *rac2TestHelper) log(msg string) { + if log.ShowLogs() { + log.Infof(context.Background(), "%s", msg) + } +} + +// query runs the given SQL query against the given SQLRunner, and appends the +// output to the testdata file buffer. +func (h *rac2TestHelper) query(runner *sqlutils.SQLRunner, sql string, headers ...string) { + // NB: We update metric gauges here to ensure that periodically updated + // metrics (via the node metrics loop) are up-to-date. + h.tc.GetFirstStoreFromServer(h.t, 0).GetStoreConfig().KVFlowStreamTokenProvider.UpdateMetricGauges() + sql = strings.TrimSpace(sql) + h.log(sql) + h.buf.WriteString(fmt.Sprintf("%s\n\n", sql)) + + rows := runner.Query(h.t, sql) + tbl := tablewriter.NewWriter(h.buf) + output, err := sqlutils.RowsToStrMatrix(rows) + require.NoError(h.t, err) + tbl.SetAlignment(tablewriter.ALIGN_LEFT) + tbl.AppendBulk(output) + tbl.SetBorder(false) + tbl.SetHeader(headers) + tbl.SetAutoFormatHeaders(false) + tbl.Render() +} + +// put issues a put request for the given key at the prioritity specified, +// against the first server in the cluster. +func (h *rac2TestHelper) put( + ctx context.Context, key roachpb.Key, size int, pri admissionpb.WorkPriority, +) *kvpb.BatchRequest { + value := roachpb.MakeValueFromString(randutil.RandString(h.rng, size, randutil.PrintableKeyAlphabet)) + ba := &kvpb.BatchRequest{} + ba.Add(kvpb.NewPut(key, value)) + ba.AdmissionHeader.Priority = int32(pri) + ba.AdmissionHeader.Source = kvpb.AdmissionHeader_FROM_SQL + if _, pErr := h.tc.Server(0).DB().NonTransactionalSender().Send( + ctx, ba, + ); pErr != nil { + h.t.Fatal(pErr.GoError()) + } + return ba +} + +// close writes the buffer to a file in the testdata directory and compares it +// against the expected output. +func (h *rac2TestHelper) close(filename string) { + echotest.Require(h.t, h.buf.String(), datapathutils.TestDataPath(h.t, "flow_control_integration_v2", filename)) +} + +// enableVerboseRaftMsgLoggingForRange installs a raft handler on each node, +// which in turn enables verbose message logging. +func (h *rac2TestHelper) enableVerboseRaftMsgLoggingForRange(desc roachpb.RangeDescriptor) { + for i := 0; i < len(h.tc.Servers); i++ { + si, err := h.tc.Server(i).GetStores().(*kvserver.Stores).GetStore(h.tc.Server(i).GetFirstStoreID()) + require.NoError(h.t, err) + h.tc.Servers[i].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(si.StoreID(), + &unreliableRaftHandler{ + rangeID: desc.RangeID, + IncomingRaftMessageHandler: si, + unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ + dropReq: func(req *kvserverpb.RaftMessageRequest) bool { + // Install a raft handler to get verbose raft logging. + return false + }, + }, + }) + } +} diff --git a/pkg/kv/kvserver/flow_control_replica_integration.go b/pkg/kv/kvserver/flow_control_replica_integration.go index bb0d7b46f504..d0e399439efe 100644 --- a/pkg/kv/kvserver/flow_control_replica_integration.go +++ b/pkg/kv/kvserver/flow_control_replica_integration.go @@ -226,7 +226,7 @@ func (f *replicaFlowControlIntegrationImpl) onRaftTransportDisconnected( return // nothing to do } - if fn := f.knobs.MaintainStreamsForBrokenRaftTransport; fn != nil && fn() { + if fn := f.knobs.V1.MaintainStreamsForBrokenRaftTransport; fn != nil && fn() { return // nothing to do } @@ -311,12 +311,12 @@ func (f *replicaFlowControlIntegrationImpl) notActivelyReplicatingTo() []roachpb inactiveFollowers := f.replicaForFlowControl.getInactiveFollowers() disconnectedFollowers := f.replicaForFlowControl.getDisconnectedFollowers() - maintainStreamsForBrokenRaftTransport := f.knobs.MaintainStreamsForBrokenRaftTransport != nil && - f.knobs.MaintainStreamsForBrokenRaftTransport() - maintainStreamsForInactiveFollowers := f.knobs.MaintainStreamsForInactiveFollowers != nil && - f.knobs.MaintainStreamsForInactiveFollowers() - maintainStreamsForBehindFollowers := f.knobs.MaintainStreamsForBehindFollowers != nil && - f.knobs.MaintainStreamsForBehindFollowers() + maintainStreamsForBrokenRaftTransport := f.knobs.V1.MaintainStreamsForBrokenRaftTransport != nil && + f.knobs.V1.MaintainStreamsForBrokenRaftTransport() + maintainStreamsForInactiveFollowers := f.knobs.V1.MaintainStreamsForInactiveFollowers != nil && + f.knobs.V1.MaintainStreamsForInactiveFollowers() + maintainStreamsForBehindFollowers := f.knobs.V1.MaintainStreamsForBehindFollowers != nil && + f.knobs.V1.MaintainStreamsForBehindFollowers() notActivelyReplicatingTo := make(map[roachpb.ReplicaDescriptor]struct{}) ourReplicaID := f.replicaForFlowControl.getReplicaID() @@ -486,3 +486,8 @@ func (r *replicaForRACv2) MuUnlock() { func (r *replicaForRACv2) LeaseholderMuLocked() roachpb.ReplicaID { return r.mu.state.Lease.Replica.ReplicaID } + +// IsScratchRange implements replica_rac2.Replica. +func (r *replicaForRACv2) IsScratchRange() bool { + return (*Replica)(r).IsScratchRange() +} diff --git a/pkg/kv/kvserver/flow_control_stores.go b/pkg/kv/kvserver/flow_control_stores.go index bc8fa8e8f4b5..c46a62e0b6e4 100644 --- a/pkg/kv/kvserver/flow_control_stores.go +++ b/pkg/kv/kvserver/flow_control_stores.go @@ -368,6 +368,10 @@ func (ss *storesForRACv2) lookup( if r == nil || r.replicaID != replicaID { return nil } + if flowTestKnobs := r.store.TestingKnobs().FlowControlTestingKnobs; flowTestKnobs != nil && + flowTestKnobs.UseOnlyForScratchRanges && !r.IsScratchRange() { + return nil + } return r.flowControlV2 } diff --git a/pkg/kv/kvserver/kvadmission/BUILD.bazel b/pkg/kv/kvserver/kvadmission/BUILD.bazel index 5df579399fbd..49239170414e 100644 --- a/pkg/kv/kvserver/kvadmission/BUILD.bazel +++ b/pkg/kv/kvserver/kvadmission/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//pkg/kv/kvpb", "//pkg/kv/kvserver/kvflowcontrol", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + "//pkg/kv/kvserver/kvflowcontrol/rac2", "//pkg/kv/kvserver/kvflowcontrol/replica_rac2", "//pkg/kv/kvserver/raftlog", "//pkg/raft/raftpb", diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index e28ecfd5de48..02ed7b6dee3d 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" @@ -356,7 +357,7 @@ func (n *controllerImpl) AdmitKVWork( // TODO(sumeerbhola,kvoli): The priority needs to be converted to a // raftpb.Priority when v2 encoding is enabled. e.g., // rac2.AdmissionToRaftPriority(). - AdmissionPriority: int32(admissionInfo.Priority), + AdmissionPriority: int32(rac2.AdmissionToRaftPriority(admissionInfo.Priority)), AdmissionCreateTime: admissionInfo.CreateTime, AdmissionOriginNode: n.nodeID.Get(), } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go index 57405eae2d49..4cddf1ebbf1c 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go @@ -162,8 +162,8 @@ func (h *Handle) deductTokensForInner( return nil // unused return value in production code } - if h.knobs.OverrideTokenDeduction != nil { - tokens = h.knobs.OverrideTokenDeduction() + if fn := h.knobs.OverrideTokenDeduction; fn != nil { + tokens = fn() } for _, c := range h.mu.connections { diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go index ba3dc3466378..9c2745d5c7d6 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go @@ -154,7 +154,7 @@ func (dt *Tracker) Untrack( break } - if fn := dt.knobs.UntrackTokensInterceptor; fn != nil { + if fn := dt.knobs.V1.UntrackTokensInterceptor; fn != nil { fn(deduction.tokens, deduction.position) } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go index 17cf51f5520b..217fb99a2f44 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go @@ -112,7 +112,7 @@ func TestTracker(t *testing.T) { count := 0 var buf strings.Builder buf.WriteString(fmt.Sprintf("pri=%s\n", pri)) - knobs.UntrackTokensInterceptor = func(tokens kvflowcontrol.Tokens, position kvflowcontrolpb.RaftLogPosition) { + knobs.V1.UntrackTokensInterceptor = func(tokens kvflowcontrol.Tokens, position kvflowcontrolpb.RaftLogPosition) { count += 1 buf.WriteString(fmt.Sprintf(" tokens=%s %s\n", testingPrintTrimmedTokens(tokens), position)) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 677a355a99d9..20cc42f4116e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -261,6 +261,7 @@ type RangeControllerOptions struct { Clock *hlc.Clock CloseTimerScheduler ProbeToCloseTimerScheduler EvalWaitMetrics *EvalWaitMetrics + Knobs *kvflowcontrol.TestingKnobs } // RangeControllerInitState is the initial state at the time of creation. @@ -771,11 +772,15 @@ func (rs *replicaState) handleReadyEntries(ctx context.Context, entries []entryF if !entry.usesFlowControl { continue } - rs.sendStream.mu.tracker.Track(ctx, entry.term, entry.index, entry.pri, entry.tokens) + tokens := entry.tokens + if fn := rs.parent.opts.Knobs.OverrideTokenDeduction; fn != nil { + tokens = fn() + } + rs.sendStream.mu.tracker.Track(ctx, entry.term, entry.index, entry.pri, tokens) rs.evalTokenCounter.Deduct( - ctx, WorkClassFromRaftPriority(entry.pri), entry.tokens) + ctx, WorkClassFromRaftPriority(entry.pri), tokens) rs.sendTokenCounter.Deduct( - ctx, WorkClassFromRaftPriority(entry.pri), entry.tokens) + ctx, WorkClassFromRaftPriority(entry.pri), tokens) } } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index 328308f3a2e4..c8de44474527 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -50,6 +50,10 @@ type Replica interface { // At least Replica mu is held. The caller does not make any claims about // whether it holds raftMu or not. LeaseholderMuLocked() roachpb.ReplicaID + // IsScratchRange returns true if this is range is a scratch range (i.e. + // overlaps with the scratch span and has a start key <= + // keys.ScratchRangeMin). + IsScratchRange() bool } // RaftScheduler abstracts kvserver.raftScheduler. @@ -213,6 +217,7 @@ type ProcessorOptions struct { EvalWaitMetrics *rac2.EvalWaitMetrics EnabledWhenLeaderLevel EnabledWhenLeaderLevel + Knobs *kvflowcontrol.TestingKnobs } // SideChannelInfoUsingRaftMessageRequest is used to provide a follower @@ -517,6 +522,8 @@ func NewProcessor(opts ProcessorOptions) Processor { func (p *processorImpl) isLeaderUsingV2ProcLocked() bool { // We are the leader using V2, or a follower who learned that the leader is // using the V2 protocol. + // TODO(kvoli): Why doesn't this work currently? + // return true return p.leader.rc != nil || p.follower.isLeaderUsingV2Protocol } @@ -800,8 +807,11 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2. // our admitted vector is likely consistent with the latest leader term. p.maybeSendAdmittedRaftMuLocked(ctx) if rc := p.leader.rc; rc != nil { - if err := rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil { - log.Errorf(ctx, "error handling raft event: %v", err) + if knobs := p.opts.Knobs; knobs == nil || !knobs.UseOnlyForScratchRanges || + p.opts.Replica.IsScratchRange() { + if err := rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil { + log.Errorf(ctx, "error handling raft event: %v", err) + } } } } @@ -1141,6 +1151,7 @@ type RangeControllerFactoryImpl struct { evalWaitMetrics *rac2.EvalWaitMetrics streamTokenCounterProvider *rac2.StreamTokenCounterProvider closeTimerScheduler rac2.ProbeToCloseTimerScheduler + knobs *kvflowcontrol.TestingKnobs } func NewRangeControllerFactoryImpl( @@ -1148,12 +1159,14 @@ func NewRangeControllerFactoryImpl( evalWaitMetrics *rac2.EvalWaitMetrics, streamTokenCounterProvider *rac2.StreamTokenCounterProvider, closeTimerScheduler rac2.ProbeToCloseTimerScheduler, + knobs *kvflowcontrol.TestingKnobs, ) RangeControllerFactoryImpl { return RangeControllerFactoryImpl{ clock: clock, evalWaitMetrics: evalWaitMetrics, streamTokenCounterProvider: streamTokenCounterProvider, closeTimerScheduler: closeTimerScheduler, + knobs: knobs, } } @@ -1172,6 +1185,7 @@ func (f RangeControllerFactoryImpl) New( Clock: f.clock, CloseTimerScheduler: f.closeTimerScheduler, EvalWaitMetrics: f.evalWaitMetrics, + Knobs: f.knobs, }, rac2.RangeControllerInitState{ ReplicaSet: state.replicaSet, diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor index 590c1e1f2e6d..aab5d49f93d3 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor @@ -476,6 +476,7 @@ HandleRaftReady: Replica.LeaseholderMuLocked RaftNode.TermLocked() = 52 Replica.MuUnlock + RangeController.AdmitRaftMuLocked(5, term:52, admitted:[LowPri:26,NormalPri:26,AboveNormalPri:26,HighPri:26]) RangeController.HandleRaftEventRaftMuLocked([]) ..... diff --git a/pkg/kv/kvserver/kvflowcontrol/testing_knobs.go b/pkg/kv/kvserver/kvflowcontrol/testing_knobs.go index 6231cfd1cc48..bbac739698c2 100644 --- a/pkg/kv/kvserver/kvflowcontrol/testing_knobs.go +++ b/pkg/kv/kvserver/kvflowcontrol/testing_knobs.go @@ -18,12 +18,20 @@ import ( // TestingKnobs provide fine-grained control over the various kvflowcontrol // components for testing. type TestingKnobs struct { - // UntrackTokensInterceptor is invoked whenever tokens are untracked, along - // with their corresponding log positions. - UntrackTokensInterceptor func(Tokens, kvflowcontrolpb.RaftLogPosition) + V1 TestingKnobsV1 + UseOnlyForScratchRanges bool // OverrideTokenDeduction is used to override how many tokens are deducted // post-evaluation. OverrideTokenDeduction func() Tokens +} + +// TestingKnobsV1 are the testing knobs that appply to replication flow control +// v1, which is mostly contained in the kvflowcontroller, kvflowdispatch, +// kvflowhandle and kvflowtokentracker packages. +type TestingKnobsV1 struct { + // UntrackTokensInterceptor is invoked whenever tokens are untracked, along + // with their corresponding log positions. + UntrackTokensInterceptor func(Tokens, kvflowcontrolpb.RaftLogPosition) // MaintainStreamsForBehindFollowers is used in tests to maintain // replication streams for behind followers. MaintainStreamsForBehindFollowers func() bool @@ -34,9 +42,6 @@ type TestingKnobs struct { // replication streams for followers we're no longer connected to via the // RaftTransport. MaintainStreamsForBrokenRaftTransport func() bool - // UseOnlyForScratchRanges enables the use of kvflowcontrol - // only for scratch ranges. - UseOnlyForScratchRanges bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 36620181e6ae..828c3977b938 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -2531,7 +2531,9 @@ func racV2EnabledWhenLeaderLevel( ctx context.Context, st *cluster.Settings, ) replica_rac2.EnabledWhenLeaderLevel { // TODO(sumeer): implement fully, once all the dependencies are implemented. - return replica_rac2.NotEnabledWhenLeader + // TODO(kvoli): Should this be a cluster setting, or are we ratcheting it up + // via cluster version. + return replica_rac2.EnabledWhenLeaderV2Encoding } // maybeEnqueueProblemRange will enqueue the replica for processing into the diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 21a62b9a7b6d..c03682ffedf5 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -239,6 +239,7 @@ func newUninitializedReplicaWithoutRaftGroup( EvalWaitMetrics: r.store.cfg.KVFlowEvalWaitMetrics, RangeControllerFactory: r.store.kvflowRangeControllerFactory, EnabledWhenLeaderLevel: r.raftMu.flowControlLevel, + Knobs: r.store.TestingKnobs().FlowControlTestingKnobs, }) return r } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 6e54673d41d2..77f2ca6945c2 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1923,11 +1923,12 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { req := newRaftMessageRequest() *req = kvserverpb.RaftMessageRequest{ - RangeID: r.RangeID, - ToReplica: toReplica, - FromReplica: fromReplica, - Message: msg, - RangeStartKey: startKey, // usually nil + RangeID: r.RangeID, + ToReplica: toReplica, + FromReplica: fromReplica, + Message: msg, + RangeStartKey: startKey, // usually nil + UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() == replica_rac2.EnabledWhenLeaderV2Encoding, } // For RACv2, annotate successful MsgAppResp messages with the vector of // admitted log indices, by priority. diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index a2ac927d52e6..0e4f0be09826 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1560,6 +1560,7 @@ func NewStore( s.cfg.KVFlowStreamTokenProvider, replica_rac2.NewStreamCloseScheduler( s.stopper, timeutil.DefaultTimeSource{}, s.scheduler), + s.TestingKnobs().FlowControlTestingKnobs, ) // Run a log SyncWaiter loop for every 32 raft scheduler goroutines. diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/admission_post_split_merge b/pkg/kv/kvserver/testdata/flow_control_integration_v2/admission_post_split_merge new file mode 100644 index 000000000000..ce55bf39eb03 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/admission_post_split_merge @@ -0,0 +1,138 @@ +echo +---- +---- +-- Flow token metrics from n1 after issuing a regular 2*1MiB 3x replicated write +-- that are yet to get admitted. We see 2*3*1MiB=6MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. The 2*1MiB writes +-- happened on what is soon going to be the LHS and RHS of a range being split. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 18 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 6.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 42 MiB + kvflowcontrol.tokens.eval.regular.deducted | 6.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 18 MiB + kvflowcontrol.tokens.send.elastic.deducted | 6.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 42 MiB + kvflowcontrol.tokens.send.regular.deducted | 6.0 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Splitting range.) + + +-- Flow token metrics from n1 after further issuing 2MiB and 3MiB writes to +-- post-split LHS and RHS ranges respectively. We should see 15MiB extra tokens +-- deducted which comes from (2MiB+3MiB)*3=15MiB. So we stand at +-- 6MiB+15MiB=21MiB now. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 21 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 27 MiB + kvflowcontrol.tokens.eval.regular.deducted | 21 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 3.0 MiB + kvflowcontrol.tokens.send.elastic.deducted | 21 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 27 MiB + kvflowcontrol.tokens.send.regular.deducted | 21 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the newly split off replica, with its own three streams. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + 70 | 3 + 71 | 3 + + +-- (Merging ranges.) + + +-- Flow token metrics from n1 after issuing 4MiB of regular replicated writes to +-- the post-merged range. We should see 12MiB extra tokens deducted which comes +-- from 4MiB*3=12MiB. So we stand at 21MiB+12MiB=33MiB tokens deducted now. The +-- RHS of the range is gone now, and the previously 3*3MiB=9MiB of tokens +-- deducted for it are released at the subsuming LHS leaseholder. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.regular.available | 24 MiB + kvflowcontrol.tokens.eval.regular.deducted | 33 MiB + kvflowcontrol.tokens.eval.regular.returned | 9.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 24 MiB + kvflowcontrol.tokens.send.regular.deducted | 33 MiB + kvflowcontrol.tokens.send.regular.returned | 9.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe only the merged replica with its own three streams. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + 70 | 3 + + +-- (Allow below-raft admission to proceed.) + + +-- Flow token metrics from n1 after work gets admitted. We see all outstanding +-- {regular,elastic} tokens returned, including those from: +-- - the LHS before the merge, and +-- - the LHS and RHS before the original split. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 33 MiB + kvflowcontrol.tokens.eval.elastic.returned | 33 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 33 MiB + kvflowcontrol.tokens.eval.regular.returned | 33 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 33 MiB + kvflowcontrol.tokens.send.elastic.returned | 33 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 33 MiB + kvflowcontrol.tokens.send.regular.returned | 33 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/basic b/pkg/kv/kvserver/testdata/flow_control_integration_v2/basic new file mode 100644 index 000000000000..3b277ef34cb7 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/basic @@ -0,0 +1,89 @@ +echo +---- +---- +-- Flow token metrics, before issuing the regular 1MiB replicated write. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 0 B + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 0 B + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 0 B + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 0 B + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Issuing + admitting a regular 1MiB, triply replicated write...) + + +-- Stream counts as seen by n1 post-write. We should see three {regular,elastic} +-- streams given there are three nodes and we're using a replication factor of +-- three. +SELECT name, value + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%stream%' +ORDER BY name ASC; + + kvflowcontrol.streams.eval.elastic.blocked_count | 0 + kvflowcontrol.streams.eval.elastic.total_count | 3 + kvflowcontrol.streams.eval.regular.blocked_count | 0 + kvflowcontrol.streams.eval.regular.total_count | 3 + kvflowcontrol.streams.send.elastic.blocked_count | 0 + kvflowcontrol.streams.send.elastic.total_count | 3 + kvflowcontrol.streams.send.regular.blocked_count | 0 + kvflowcontrol.streams.send.regular.total_count | 3 + + +-- Another view of the stream count, using /inspectz-backed vtables. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +HAVING count(*) = 3 +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + 70 | 3 + + +-- Flow token metrics from n1 after issuing the regular 1MiB replicated write, +-- and it being admitted on n1, n2 and n3. We should see 3*1MiB = 3MiB of +-- {regular,elastic} tokens deducted and returned, and {8*3=24MiB,16*3=48MiB} of +-- {regular,elastic} tokens available. Everything should be accounted for. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 3.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/blocked_admission b/pkg/kv/kvserver/testdata/flow_control_integration_v2/blocked_admission new file mode 100644 index 000000000000..0063556ca7fa --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/blocked_admission @@ -0,0 +1,97 @@ +echo +---- +---- +-- (Issuing 5 regular 1MiB, 3x replicated write that's not admitted.) + + +-- Flow token metrics from n1 after issuing 5 regular 1MiB 3x replicated writes +-- that are yet to get admitted. We see 5*1MiB*3=15MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 9.0 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 15 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 33 MiB + kvflowcontrol.tokens.eval.regular.deducted | 15 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 9.0 MiB + kvflowcontrol.tokens.send.elastic.deducted | 15 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 33 MiB + kvflowcontrol.tokens.send.regular.deducted | 15 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the total tracked tokens per-stream on n1. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 5.0 MiB + 70 | 2 | 5.0 MiB + 70 | 3 | 5.0 MiB + + +-- Observe the individual tracked tokens per-stream on the scratch range. +SELECT range_id, store_id, priority, crdb_internal.humanize_bytes(tokens::INT8) + FROM crdb_internal.kv_flow_token_deductions_v2 + + range_id | store_id | priority | tokens +-----------+----------+------------+---------- + 70 | 1 | normal-pri | 1.0 MiB + 70 | 1 | normal-pri | 1.0 MiB + 70 | 1 | normal-pri | 1.0 MiB + 70 | 1 | normal-pri | 1.0 MiB + 70 | 1 | normal-pri | 1.0 MiB + 70 | 2 | normal-pri | 1.0 MiB + 70 | 2 | normal-pri | 1.0 MiB + 70 | 2 | normal-pri | 1.0 MiB + 70 | 2 | normal-pri | 1.0 MiB + 70 | 2 | normal-pri | 1.0 MiB + 70 | 3 | normal-pri | 1.0 MiB + 70 | 3 | normal-pri | 1.0 MiB + 70 | 3 | normal-pri | 1.0 MiB + 70 | 3 | normal-pri | 1.0 MiB + 70 | 3 | normal-pri | 1.0 MiB + + +-- (Allow below-raft admission to proceed.) + + +-- Flow token metrics from n1 after work gets admitted. We see 15MiB returns of +-- {regular,elastic} tokens, and the available capacities going back to what +-- they were. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 15 MiB + kvflowcontrol.tokens.eval.elastic.returned | 15 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 15 MiB + kvflowcontrol.tokens.eval.regular.returned | 15 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 15 MiB + kvflowcontrol.tokens.send.elastic.returned | 15 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 15 MiB + kvflowcontrol.tokens.send.regular.returned | 15 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/class_prioritization b/pkg/kv/kvserver/testdata/flow_control_integration_v2/class_prioritization new file mode 100644 index 000000000000..dcab380ca055 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/class_prioritization @@ -0,0 +1,91 @@ +echo +---- +---- +-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.) + + +-- Flow token metrics from n1 after issuing 1x1MiB elastic 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of elastic tokens with +-- no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 21 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 0 B + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 21 MiB + kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 0 B + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Issuing 1x1MiB, 3x replicated regular write that's not admitted.) + + +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of {regular,elastic} +-- tokens with no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 18 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 6.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 45 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 18 MiB + kvflowcontrol.tokens.send.elastic.deducted | 6.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 45 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Allow below-raft admission to proceed.) + + +-- Flow token metrics from n1 after work gets admitted. All {regular,elastic} +-- tokens deducted are returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 6.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 6.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 6.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 6.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 3.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/crashed_node b/pkg/kv/kvserver/testdata/flow_control_integration_v2/crashed_node new file mode 100644 index 000000000000..aed49b390bf9 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/crashed_node @@ -0,0 +1,82 @@ +echo +---- +---- +-- (Issuing regular 5x1MiB, 2x replicated writes that are not admitted.) + + +-- Flow token metrics from n1 after issuing 5 regular 1MiB 2x replicated writes +-- that are yet to get admitted. We see 5*1MiB*2=10MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 6.0 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 10 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 22 MiB + kvflowcontrol.tokens.eval.regular.deducted | 10 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 6.0 MiB + kvflowcontrol.tokens.send.elastic.deducted | 10 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 22 MiB + kvflowcontrol.tokens.send.regular.deducted | 10 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the per-stream tracked tokens on n1, before n2 is crashed. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 5.0 MiB + 70 | 2 | 5.0 MiB + + +-- (Crashing n2) + + +-- Observe the per-stream tracked tokens on n1, after n2 crashed. We're no +-- longer tracking the 5MiB held by n2. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 5.0 MiB + + +-- Flow token metrics from n1 after n2 crashed. Observe that we've returned the +-- 5MiB previously held by n2. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 11 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 10 MiB + kvflowcontrol.tokens.eval.elastic.returned | 5.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 27 MiB + kvflowcontrol.tokens.eval.regular.deducted | 10 MiB + kvflowcontrol.tokens.eval.regular.returned | 5.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 11 MiB + kvflowcontrol.tokens.send.elastic.deducted | 10 MiB + kvflowcontrol.tokens.send.elastic.returned | 5.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 27 MiB + kvflowcontrol.tokens.send.regular.deducted | 10 MiB + kvflowcontrol.tokens.send.regular.returned | 5.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/granter_admit_one_by_one b/pkg/kv/kvserver/testdata/flow_control_integration_v2/granter_admit_one_by_one new file mode 100644 index 000000000000..b3a79a59a080 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/granter_admit_one_by_one @@ -0,0 +1,74 @@ +echo +---- +---- +-- (Issuing regular 1024*1KiB, 3x replicated writes that are not admitted.) + + +-- Flow token metrics from n1 after issuing 1024KiB, i.e. 1MiB 3x replicated writes +-- that are yet to get admitted. We see 3*1MiB=3MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 21 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 45 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 21 MiB + kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 45 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the total tracked tokens per-stream on n1. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 1.0 MiB + 70 | 2 | 1.0 MiB + 70 | 3 | 1.0 MiB + + +-- (Allow below-raft admission to proceed.) + + +-- Flow token metrics from n1 after work gets admitted. We see 3MiB returns of +-- {regular,elastic} tokens, and the available capacities going back to what +-- they were. In #105185, by now we would've observed panics. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 3.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/leader_not_leaseholder b/pkg/kv/kvserver/testdata/flow_control_integration_v2/leader_not_leaseholder new file mode 100644 index 000000000000..ee6c4277ee47 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/leader_not_leaseholder @@ -0,0 +1,4 @@ +echo +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership b/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership new file mode 100644 index 000000000000..0612133c1655 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership @@ -0,0 +1,124 @@ +echo +---- +---- +-- (Issuing 1x1MiB, 3x replicated write that's not admitted.) + + +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.regular.available | 45 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 45 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Adding a voting replica on n4.) + + +-- Observe the total tracked tokens per-stream on n1. s1-s3 should have 1MiB +-- tracked each, and s4 should have none. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 1.0 MiB + 70 | 2 | 1.0 MiB + 70 | 3 | 1.0 MiB + 70 | 4 | 0 B + + +-- (Issuing 1x1MiB, 4x replicated write that's not admitted.) + + +-- Observe the individual tracked tokens per-stream on the scratch range. s1-s3 +-- should have 2MiB tracked (they've observed 2x1MiB writes), s4 should have +-- 1MiB. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 2.0 MiB + 70 | 2 | 2.0 MiB + 70 | 3 | 2.0 MiB + 70 | 4 | 1.0 MiB + + +-- (Removing voting replica from n3.) + + +-- (Adding non-voting replica to n5.) + + +-- (Issuing 1x1MiB, 4x replicated write (w/ one non-voter) that's not admitted. + + +-- Observe the individual tracked tokens per-stream on the scratch range. s1-s2 +-- should have 3MiB tracked (they've observed 3x1MiB writes), there should be +-- no s3 since it was removed, s4 and s5 should have 2MiB and 1MiB +-- respectively. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 3.0 MiB + 70 | 2 | 3.0 MiB + 70 | 4 | 2.0 MiB + 70 | 5 | 1.0 MiB + + +-- (Allow below-raft admission to proceed.) + + +-- Observe that there no tracked tokens across s1,s2,s4,s5. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 0 B + 70 | 2 | 0 B + 70 | 4 | 0 B + 70 | 5 | 0 B + + +-- Flow token metrics from n1 after work gets admitted. All {regular,elastic} +-- tokens deducted are returned, including from when s3 was removed as a raft +-- member. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 40 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 11 MiB + kvflowcontrol.tokens.eval.elastic.returned | 11 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 80 MiB + kvflowcontrol.tokens.eval.regular.deducted | 11 MiB + kvflowcontrol.tokens.eval.regular.returned | 11 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 40 MiB + kvflowcontrol.tokens.send.elastic.deducted | 11 MiB + kvflowcontrol.tokens.send.elastic.returned | 11 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 80 MiB + kvflowcontrol.tokens.send.regular.deducted | 11 MiB + kvflowcontrol.tokens.send.regular.returned | 11 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership_remove_self b/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership_remove_self new file mode 100644 index 000000000000..70f727643eb7 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership_remove_self @@ -0,0 +1,106 @@ +echo +---- +---- +-- (Issuing 1x1MiB, 3x replicated write that's not admitted.) + + +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.regular.available | 45 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 45 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Replacing current raft leader on n1 in raft group with new n4 replica.) + + +-- Flow token metrics from n1 after raft leader removed itself from raft group. +-- All {regular,elastic} tokens deducted are returned. Note that the available +-- tokens increases, as n1 has seen 4 replication streams, s1,s2,s3,s4. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 32 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 64 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 32 MiB + kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 64 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 3.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- n1 should have no connected streams now after transferring the lease to n2. +-- While, n2 should have 3 connected streams to s2,s3,s4. Query the stream count +-- on n1, then on n2. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + + +-- Observe the stream count on n2. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + 70 | 3 + + +-- (Allow below-raft admission to proceed.) + + +-- Flow token metrics from n1 after work gets admitted. Tokens were already +-- returned earlier, so there's no change. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 32 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 64 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 32 MiB + kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 64 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 3.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_snapshot b/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_snapshot new file mode 100644 index 000000000000..80a25cb5e6cd --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_snapshot @@ -0,0 +1,209 @@ +echo +---- +---- +-- Flow token metrics from n1 after issuing 1 regular 1MiB 5x replicated write +-- that's not admitted. Since this test is ignoring crashed nodes for token +-- deduction purposes, we see a deduction of 5MiB {regular,elastic} tokens. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 35 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 5.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 75 MiB + kvflowcontrol.tokens.eval.regular.deducted | 5.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 35 MiB + kvflowcontrol.tokens.send.elastic.deducted | 5.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 75 MiB + kvflowcontrol.tokens.send.regular.deducted | 5.0 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the total tracked tokens per-stream on n1. 1MiB is tracked for n1-n5. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 1.0 MiB + 70 | 2 | 1.0 MiB + 70 | 3 | 1.0 MiB + 70 | 4 | 1.0 MiB + 70 | 5 | 1.0 MiB + + +-- (Killing n2 and n3, but preventing their tokens from being returned + +-- artificially allowing tokens to get deducted.) + + +-- Observe the total tracked tokens per-stream on n1. 1MiB is (still) tracked +-- for n1-n5, because they are not in StateSnapshot yet and it have likely been +-- in StateProbe for less than the close timer. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 1.0 MiB + 70 | 2 | 1.0 MiB + 70 | 3 | 1.0 MiB + 70 | 4 | 1.0 MiB + 70 | 5 | 1.0 MiB + + +-- (Issuing another 1MiB of 5x replicated writes while n2 and n3 are down and +-- below-raft admission is paused.) + + +-- Flow token metrics from n1 after issuing 1 regular 1MiB 5x replicated write +-- that's not admitted. We'll have deducted another 5*1MiB=5MiB worth of tokens. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 30 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 10 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 70 MiB + kvflowcontrol.tokens.eval.regular.deducted | 10 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 30 MiB + kvflowcontrol.tokens.send.elastic.deducted | 10 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 70 MiB + kvflowcontrol.tokens.send.regular.deducted | 10 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the total tracked tokens per-stream on n1. 2MiB is tracked for n1-n5; +-- see last comment for an explanation why we're still deducting for n2, n3. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 2.0 MiB + 70 | 2 | 2.0 MiB + 70 | 3 | 2.0 MiB + 70 | 4 | 2.0 MiB + 70 | 5 | 2.0 MiB + + +-- (Truncating raft log.) + + +-- (Restarting n2 and n3.) + + +-- Flow token metrics from n1 after restarting n2 and n3. We've returned the +-- 2MiB previously held by those nodes (2MiB each). We're reacting to it's raft +-- progress state, noting that since we've truncated our log, we need to catch +-- it up via snapshot. So we release all held tokens. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 34 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 10 MiB + kvflowcontrol.tokens.eval.elastic.returned | 4.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 74 MiB + kvflowcontrol.tokens.eval.regular.deducted | 10 MiB + kvflowcontrol.tokens.eval.regular.returned | 4.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 34 MiB + kvflowcontrol.tokens.send.elastic.deducted | 10 MiB + kvflowcontrol.tokens.send.elastic.returned | 4.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 74 MiB + kvflowcontrol.tokens.send.regular.deducted | 10 MiB + kvflowcontrol.tokens.send.regular.returned | 4.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the total tracked tokens per-stream on n1. There's nothing tracked +-- for n2 and n3 anymore. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + WHERE total_tracked_tokens > 0 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 2.0 MiB + 70 | 4 | 2.0 MiB + 70 | 5 | 2.0 MiB + + +-- (Allow below-raft admission to proceed.) + + +-- Flow token metrics from n1 after work gets admitted. We see the remaining +-- 6MiB of {regular,elastic} tokens returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 40 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 10 MiB + kvflowcontrol.tokens.eval.elastic.returned | 10 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 80 MiB + kvflowcontrol.tokens.eval.regular.deducted | 10 MiB + kvflowcontrol.tokens.eval.regular.returned | 10 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 40 MiB + kvflowcontrol.tokens.send.elastic.deducted | 10 MiB + kvflowcontrol.tokens.send.elastic.returned | 10 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 80 MiB + kvflowcontrol.tokens.send.regular.deducted | 10 MiB + kvflowcontrol.tokens.send.regular.returned | 10 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the total tracked tokens per-stream on n1; there should be nothing. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 0 B + 70 | 2 | 0 B + 70 | 3 | 0 B + 70 | 4 | 0 B + 70 | 5 | 0 B + + +-- Another view of tokens, using /inspectz-backed vtables. +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + range_id | eval_regular_available | eval_elastic_available +-----------+------------------------+------------------------- + 1 | 16 MiB | 8.0 MiB + 2 | 16 MiB | 8.0 MiB + 3 | 16 MiB | 8.0 MiB + 4 | 16 MiB | 8.0 MiB + 5 | 16 MiB | 8.0 MiB +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/split_merge b/pkg/kv/kvserver/testdata/flow_control_integration_v2/split_merge new file mode 100644 index 000000000000..472cb19a1b20 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/split_merge @@ -0,0 +1,113 @@ +echo +---- +---- +-- Flow token metrics from n1 after issuing + admitting the regular 1MiB 3x +-- replicated write to the pre-split range. There should be 3MiB of +-- {regular,elastic} tokens {deducted,returned}. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 3.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Splitting range.) + + +-- Flow token metrics from n1 after further issuing 2MiB and 3MiB writes to +-- post-split LHS and RHS ranges respectively. We should see 15MiB extra tokens +-- {deducted,returned}, which comes from (2MiB+3MiB)*3=15MiB. So we stand at +-- 3MiB+15MiB=18MiB now. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 18 MiB + kvflowcontrol.tokens.eval.elastic.returned | 18 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 18 MiB + kvflowcontrol.tokens.eval.regular.returned | 18 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 18 MiB + kvflowcontrol.tokens.send.elastic.returned | 18 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 18 MiB + kvflowcontrol.tokens.send.regular.returned | 18 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the newly split off replica, with its own three streams. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + 70 | 3 + 71 | 3 + + +-- (Merging ranges.) + + +-- Flow token metrics from n1 after issuing 4MiB of regular replicated writes to +-- the post-merged range. We should see 12MiB extra tokens {deducted,returned}, +-- which comes from 4MiB*3=12MiB. So we stand at 18MiB+12MiB=30MiB now. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 30 MiB + kvflowcontrol.tokens.eval.elastic.returned | 30 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 30 MiB + kvflowcontrol.tokens.eval.regular.returned | 30 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 30 MiB + kvflowcontrol.tokens.send.elastic.returned | 30 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 30 MiB + kvflowcontrol.tokens.send.regular.returned | 30 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe only the merged replica with its own three streams. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + 70 | 3 +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/transfer_lease b/pkg/kv/kvserver/testdata/flow_control_integration_v2/transfer_lease new file mode 100644 index 000000000000..6cc3ce5cdba6 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/transfer_lease @@ -0,0 +1,46 @@ +echo +---- +---- +-- (Issuing 1x1MiB, 3x replicated write that's not admitted.) + + +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.regular.available | 45 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 45 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Transferring range lease to n2 and allowing leadership to follow.) + + +-- Flow token metrics from n1 having lost the lease and raft leadership. All +-- deducted tokens are returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 3.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql