From 8771ecda789f9b595915d028fcb8fcb8402b59a5 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Tue, 17 Sep 2024 18:50:18 -0400 Subject: [PATCH] kvserver: add rac2 v1 integration tests Introduce several tests in `flow_control_integration_test.go`, mirroring the existing tests but applied to the replication flow control v2 machinery. The tests largely follow an identical pattern to the existing v1 tests, swapping in rac2 metric and vtables. The following tests are added: ``` TestFlowControlBasic TestFlowControlRangeSplitMerge TestFlowControlBlockedAdmission TestFlowControlAdmissionPostSplitMerge TestFlowControlCrashedNode TestFlowControlRaftSnapshot TestFlowControlRaftMembership TestFlowControlRaftMembershipRemoveSelf TestFlowControlClassPrioritization TestFlowControlTransferLease TestFlowControlGranterAdmitOneByOne ``` Another two tests are ommitted, `TestFlowControlRaftTransportBreak` and `TestFlowControlRaftTransportCulled`, because they behave identically to `TestFlowControlCrashedNode` as rac2 is less tightly coupled to the raft transport. One test, `TestFlowControlLeaderNotLeaseholder` has divering pre-evaluation admit behavior and is therefore delayed to a subsequent commit. Lastly, two tests, `TestFlowControlQuiescedRange` and `TestFlowControlUnquiescedRange` are dependent on #129581, and delayed to a subsequent commit as well. Part of: #130187 Release note: None --- .../kvserver/flow_control_integration_test.go | 1726 +++++++++++++++++ .../admission_post_split_merge | 138 ++ .../flow_control_integration_v2/basic | 89 + .../blocked_admission | 97 + .../class_prioritization | 91 + .../flow_control_integration_v2/crashed_node | 82 + .../granter_admit_one_by_one | 74 + .../leader_not_leaseholder | 4 + .../raft_membership | 124 ++ .../raft_membership_remove_self | 106 + .../flow_control_integration_v2/raft_snapshot | 209 ++ .../flow_control_integration_v2/split_merge | 113 ++ .../transfer_lease | 46 + 13 files changed, 2899 insertions(+) create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/admission_post_split_merge create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/basic create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/blocked_admission create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/class_prioritization create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/crashed_node create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/granter_admit_one_by_one create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/leader_not_leaseholder create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership_remove_self create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_snapshot create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/split_merge create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/transfer_lease diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 98d1d36eefce..efdf51d7abb1 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -2487,3 +2487,1729 @@ func (h *flowControlTestHelper) put( func (h *flowControlTestHelper) close(filename string) { echotest.Require(h.t, h.buf.String(), datapathutils.TestDataPath(h.t, "flow_control_integration", filename)) } + +// TOOD(kvoli): The following tests are a port of the v2 flow control tests from +// above. We aim to port as many as possible. The tests are: +// +// - [x] TestFlowControlBasic +// - [x] TestFlowControlRangeSplitMerge +// - [x] TestFlowControlBlockedAdmission +// - [x] TestFlowControlAdmissionPostSplitMerge +// - [x] TestFlowControlCrashedNode +// - [x] TestFlowControlRaftSnapshot +// - [-] TestFlowControlRaftTransportBreak +// - This test is identical to TestFlowControlCrashedNode in rac2, as the +// implementation is not coupled to the raft transport, crashing the node has +// the same effect. +// - [-] TestFlowControlRaftTransportCulled +// - Similar to above, the token dispatch is coupled to the raft transport +// more closely in V1. +// - [x] TestFlowControlRaftMembership +// - [x] TestFlowControlRaftMembershipRemoveSelf +// - [x] TestFlowControlClassPrioritization +// - [-] TestFlowControlQuiescedRange +// - Delaying until #129581 is done. +// - [-] TestFlowControlUnquiescedRange +// - Delaying until #129581 is done. +// - [x] TestFlowControlTransferLease +// - [-] TestFlowControlLeaderNotLeaseholder +// - V2 diverges here, in that it still requires the leaseholder (who isn't +// the leader) to admit via the StoreWorkQueue. V1 does not because the +// replica handle is gone. +// - [x] TestFlowControlGranterAdmitOneByOne + +// TestFlowControlBasicV2 runs a basic end-to-end test of the v2 kvflowcontrol +// machinery, replicating + admitting a single 1MiB regular write. The vmodule +// flags for running these tests with full logging are: +// +// --vmodule='replica_raft=1,replica_proposal_buf=1,raft_transport=2, +// kvadmission=1,work_queue=1,replica_flow_control=1, +// tracker=1,client_raft_helpers_test=1,range_controller=2, +// token_counter=2,token_tracker=2,processor=2' +func TestFlowControlBasicV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testutils.RunTrueAndFalse(t, "always-enqueue", func(t *testing.T, alwaysEnqueue bool) { + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: alwaysEnqueue, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + // Setup the test state with 3 voters, one on each of the three + // node/stores. + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("basic") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.enableVerboseRaftMsgLoggingForRange(desc) + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- Flow token metrics, before issuing the regular 1MiB replicated write.`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Issuing + admitting a regular 1MiB, triply replicated write...)`) + h.log("sending put request") + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + h.log("sent put request") + + h.waitForAllTokensReturned(ctx, 3) + h.comment(` +-- Stream counts as seen by n1 post-write. We should see three {regular,elastic} +-- streams given there are three nodes and we're using a replication factor of +-- three. +`) + h.query(n1, ` + SELECT name, value + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%stream%' +ORDER BY name ASC; +`) + + h.comment(`-- Another view of the stream count, using /inspectz-backed vtables.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +HAVING count(*) = 3 +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(` +-- Flow token metrics from n1 after issuing the regular 1MiB replicated write, +-- and it being admitted on n1, n2 and n3. We should see 3*1MiB = 3MiB of +-- {regular,elastic} tokens deducted and returned, and {8*3=24MiB,16*3=48MiB} of +-- {regular,elastic} tokens available. Everything should be accounted for. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + }) +} + +// TestFlowControlRangeSplitMergeV2 walks through what happens to flow tokens +// when a range splits/merges. +func TestFlowControlRangeSplitMergeV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("split_merge") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.log("sending put request to pre-split range") + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + h.log("sent put request to pre-split range") + + h.waitForAllTokensReturned(ctx, 3) + h.comment(` +-- Flow token metrics from n1 after issuing + admitting the regular 1MiB 3x +-- replicated write to the pre-split range. There should be 3MiB of +-- {regular,elastic} tokens {deducted,returned}. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Splitting range.)`) + left, right := tc.SplitRangeOrFatal(t, k.Next()) + h.waitForConnectedStreams(ctx, right.RangeID, 3) + + h.log("sending 2MiB put request to post-split LHS") + h.put(ctx, k, 2<<20 /* 2MiB */, admissionpb.NormalPri) + h.log("sent 2MiB put request to post-split LHS") + + h.log("sending 3MiB put request to post-split RHS") + h.put(ctx, roachpb.Key(right.StartKey), 3<<20 /* 3MiB */, admissionpb.NormalPri) + h.log("sent 3MiB put request to post-split RHS") + + h.waitForAllTokensReturned(ctx, 3) + h.comment(` +-- Flow token metrics from n1 after further issuing 2MiB and 3MiB writes to +-- post-split LHS and RHS ranges respectively. We should see 15MiB extra tokens +-- {deducted,returned}, which comes from (2MiB+3MiB)*3=15MiB. So we stand at +-- 3MiB+15MiB=18MiB now. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe the newly split off replica, with its own three streams.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(`-- (Merging ranges.)`) + merged := tc.MergeRangesOrFatal(t, left.StartKey.AsRawKey()) + + h.log("sending 4MiB put request to post-merge range") + h.put(ctx, roachpb.Key(merged.StartKey), 4<<20 /* 4MiB */, admissionpb.NormalPri) + h.log("sent 4MiB put request to post-merged range") + + h.waitForAllTokensReturned(ctx, 3) + h.comment(` +-- Flow token metrics from n1 after issuing 4MiB of regular replicated writes to +-- the post-merged range. We should see 12MiB extra tokens {deducted,returned}, +-- which comes from 4MiB*3=12MiB. So we stand at 18MiB+12MiB=30MiB now. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe only the merged replica with its own three streams.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") +} + +// TestFlowControlBlockedAdmissionV2 tests token tracking behavior by explicitly +// blocking below-raft admission. +func TestFlowControlBlockedAdmissionV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + + settings := cluster.MakeTestingClusterSettings() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("blocked_admission") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.enableVerboseRaftMsgLoggingForRange(desc) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 5 regular 1MiB, 3x replicated write that's not admitted.)`) + h.log("sending put requests") + for i := 0; i < 5; i++ { + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + } + h.log("sent put requests") + + h.comment(` +-- Flow token metrics from n1 after issuing 5 regular 1MiB 3x replicated writes +-- that are yet to get admitted. We see 5*1MiB*3=15MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe the total tracked tokens per-stream on n1.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- Observe the individual tracked tokens per-stream on the scratch range.`) + h.query(n1, ` + SELECT range_id, store_id, priority, crdb_internal.humanize_bytes(tokens::INT8) + FROM crdb_internal.kv_flow_token_deductions_v2 +`, "range_id", "store_id", "priority", "tokens") + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3) // wait for admission + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. We see 15MiB returns of +-- {regular,elastic} tokens, and the available capacities going back to what +-- they were. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// TestFlowControlAdmissionPostSplitMergeV2 walks through what happens with flow +// tokens when a range after undergoes splits/merges. It does this by blocking +// and later unblocking below-raft admission, verifying: +// - tokens for the RHS are released at the post-merge subsuming leaseholder, +// - admission for the RHS post-merge does not cause a double return of tokens, +// - admission for the LHS can happen post-merge, +// - admission for the LHS and RHS can happen post-split. +func TestFlowControlAdmissionPostSplitMergeV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + settings := cluster.MakeTestingClusterSettings() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("admission_post_split_merge") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.log("sending put request to pre-split range") + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + h.put(ctx, k.Next(), 1<<20 /* 1MiB */, admissionpb.NormalPri) + h.log("sent put request to pre-split range") + + h.comment(` +-- Flow token metrics from n1 after issuing a regular 2*1MiB 3x replicated write +-- that are yet to get admitted. We see 2*3*1MiB=6MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. The 2*1MiB writes +-- happened on what is soon going to be the LHS and RHS of a range being split. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Splitting range.)`) + left, right := tc.SplitRangeOrFatal(t, k.Next()) + h.waitForConnectedStreams(ctx, right.RangeID, 3) + + h.log("sending 2MiB put request to post-split LHS") + h.put(ctx, k, 2<<20 /* 2MiB */, admissionpb.NormalPri) + h.log("sent 2MiB put request to post-split LHS") + + h.log("sending 3MiB put request to post-split RHS") + h.put(ctx, roachpb.Key(right.StartKey), 3<<20 /* 3MiB */, admissionpb.NormalPri) + h.log("sent 3MiB put request to post-split RHS") + + h.comment(` +-- Flow token metrics from n1 after further issuing 2MiB and 3MiB writes to +-- post-split LHS and RHS ranges respectively. We should see 15MiB extra tokens +-- deducted which comes from (2MiB+3MiB)*3=15MiB. So we stand at +-- 6MiB+15MiB=21MiB now. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe the newly split off replica, with its own three streams.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(`-- (Merging ranges.)`) + merged := tc.MergeRangesOrFatal(t, left.StartKey.AsRawKey()) + + h.log("sending 4MiB put request to post-merge range") + h.put(ctx, roachpb.Key(merged.StartKey), 4<<20 /* 4MiB */, admissionpb.NormalPri) + h.log("sent 4MiB put request to post-merged range") + + h.comment(` +-- Flow token metrics from n1 after issuing 4MiB of regular replicated writes to +-- the post-merged range. We should see 12MiB extra tokens deducted which comes +-- from 4MiB*3=12MiB. So we stand at 21MiB+12MiB=33MiB tokens deducted now. The +-- RHS of the range is gone now, and the previously 3*3MiB=9MiB of tokens +-- deducted for it are released at the subsuming LHS leaseholder. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe only the merged replica with its own three streams.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3) // wait for admission + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. We see all outstanding +-- {regular,elastic} tokens returned, including those from: +-- - the LHS before the merge, and +-- - the LHS and RHS before the original split. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// TestFlowControlCrashedNodeV2 tests flow token behavior in the presence of +// crashed nodes. +func TestFlowControlCrashedNodeV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + kvserver.ExpirationLeasesOnly.Override(ctx, &settings.SV, true) + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + RaftConfig: base.RaftConfig{ + // Suppress timeout-based elections. This test doesn't want to + // deal with leadership changing hands. + RaftElectionTimeoutTicks: 1000000, + // Reduce the RangeLeaseDuration to speeds up failure detection + // below. + RangeLeaseDuration: time.Second, + }, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return true + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("crashed_node") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0)) + h.waitForConnectedStreams(ctx, desc.RangeID, 2) + + h.comment(`-- (Issuing regular 5x1MiB, 2x replicated writes that are not admitted.)`) + h.log("sending put requests") + for i := 0; i < 5; i++ { + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + } + h.log("sent put requests") + + h.comment(` +-- Flow token metrics from n1 after issuing 5 regular 1MiB 2x replicated writes +-- that are yet to get admitted. We see 5*1MiB*2=10MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + h.comment(`-- Observe the per-stream tracked tokens on n1, before n2 is crashed.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- (Crashing n2)`) + tc.StopServer(1) + h.waitForConnectedStreams(ctx, desc.RangeID, 1) + + h.comment(` +-- Observe the per-stream tracked tokens on n1, after n2 crashed. We're no +-- longer tracking the 5MiB held by n2. +`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(` +-- Flow token metrics from n1 after n2 crashed. Observe that we've returned the +-- 5MiB previously held by n2. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// TestFlowControlRaftSnapshotV2 tests flow token behavior when one replica +// needs to be caught up via raft snapshot. +func TestFlowControlRaftSnapshotV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const numServers int = 5 + stickyServerArgs := make(map[int]base.TestServerArgs) + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + for i := 0; i < numServers; i++ { + stickyServerArgs[i] = base.TestServerArgs{ + Settings: settings, + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), + }, + }, + RaftConfig: base.RaftConfig{ + // Suppress timeout-based elections. This test doesn't want to + // deal with leadership changing hands. + RaftElectionTimeoutTicks: 1000000, + }, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyVFSRegistry: fs.NewStickyRegistry(), + }, + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + OverrideTokenDeduction: func() kvflowcontrol.Tokens { + // This test makes use of (small) increment + // requests, but wants to see large token + // deductions/returns. + return kvflowcontrol.Tokens(1 << 20 /* 1MiB */) + }, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + RaftTransport: &kvserver.RaftTransportTestingKnobs{ + OverrideIdleTimeout: func() time.Duration { + // Effectively disable token returns due to underlying + // raft transport streams disconnecting due to + // inactivity. + return time.Hour + }, + }, + }, + } + } + + tc := testcluster.StartTestCluster(t, numServers, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: stickyServerArgs, + }) + defer tc.Stopper().Stop(ctx) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("raft_snapshot") + + store := tc.GetFirstStoreFromServer(t, 0) + + incA := int64(5) + incB := int64(7) + incAB := incA + incB + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + tc.AddVotersOrFatal(t, k, tc.Targets(3, 4)...) + repl := store.LookupReplica(roachpb.RKey(k)) + require.NotNil(t, repl) + h.waitForConnectedStreams(ctx, repl.RangeID, 5) + + // Set up a key to replicate across the cluster. We're going to modify this + // key and truncate the raft logs from that command after killing one of the + // nodes to check that it gets the new value after it comes up. + incArgs := incrementArgs(k, incA) + if _, err := kv.SendWrappedWithAdmission(ctx, tc.Server(0).DB().NonTransactionalSender(), kvpb.Header{}, kvpb.AdmissionHeader{ + Priority: int32(admissionpb.HighPri), + Source: kvpb.AdmissionHeader_FROM_SQL, + }, incArgs); err != nil { + t.Fatal(err) + } + + h.comment(` +-- Flow token metrics from n1 after issuing 1 regular 1MiB 5x replicated write +-- that's not admitted. Since this test is ignoring crashed nodes for token +-- deduction purposes, we see a deduction of 5MiB {regular,elastic} tokens. + `) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + `) + h.comment(` +-- Observe the total tracked tokens per-stream on n1. 1MiB is tracked for n1-n5. + `) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + `, "range_id", "store_id", "total_tracked_tokens") + + tc.WaitForValues(t, k, []int64{incA, incA, incA, incA, incA}) + + h.comment(` +-- (Killing n2 and n3, but preventing their tokens from being returned + +-- artificially allowing tokens to get deducted.)`) + + // Kill stores 1 + 2, increment the key on the other stores and truncate + // their logs to make sure that when store 1 + 2 comes back up they will + // require a snapshot from Raft. + tc.StopServer(1) + tc.StopServer(2) + + h.comment(` +-- Observe the total tracked tokens per-stream on n1. 1MiB is (still) tracked +-- for n1-n5, because they are not in StateSnapshot yet and it have likely been +-- in StateProbe for less than the close timer. + `) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + `, "range_id", "store_id", "total_tracked_tokens") + + h.comment(` +-- (Issuing another 1MiB of 5x replicated writes while n2 and n3 are down and +-- below-raft admission is paused.) +`) + incArgs = incrementArgs(k, incB) + if _, err := kv.SendWrappedWithAdmission(ctx, tc.Server(0).DB().NonTransactionalSender(), kvpb.Header{}, kvpb.AdmissionHeader{ + Priority: int32(admissionpb.HighPri), + Source: kvpb.AdmissionHeader_FROM_SQL, + }, incArgs); err != nil { + t.Fatal(err) + } + + h.comment(` +-- Flow token metrics from n1 after issuing 1 regular 1MiB 5x replicated write +-- that's not admitted. We'll have deducted another 5*1MiB=5MiB worth of tokens. + `) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + `) + h.comment(` +-- Observe the total tracked tokens per-stream on n1. 2MiB is tracked for n1-n5; +-- see last comment for an explanation why we're still deducting for n2, n3. +`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + `, "range_id", "store_id", "total_tracked_tokens") + + tc.WaitForValues(t, k, []int64{incAB, 0 /* stopped */, 0 /* stopped */, incAB, incAB}) + + index := repl.GetLastIndex() + h.comment(`-- (Truncating raft log.)`) + + // Truncate the log at index+1 (log entries < N are removed, so this + // includes the increment). + truncArgs := truncateLogArgs(index+1, repl.GetRangeID()) + if _, err := kv.SendWrappedWithAdmission(ctx, tc.Server(0).DB().NonTransactionalSender(), kvpb.Header{}, kvpb.AdmissionHeader{ + Priority: int32(admissionpb.HighPri), + Source: kvpb.AdmissionHeader_FROM_SQL, + }, truncArgs); err != nil { + t.Fatal(err) + } + + h.comment(`-- (Restarting n2 and n3.)`) + require.NoError(t, tc.RestartServer(1)) + require.NoError(t, tc.RestartServer(2)) + + tc.WaitForValues(t, k, []int64{incAB, incAB, incAB, incAB, incAB}) + + h.comment(` +-- Flow token metrics from n1 after restarting n2 and n3. We've returned the +-- 2MiB previously held by those nodes (2MiB each). We're reacting to it's raft +-- progress state, noting that since we've truncated our log, we need to catch +-- it up via snapshot. So we release all held tokens. + `) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(` +-- Observe the total tracked tokens per-stream on n1. There's nothing tracked +-- for n2 and n3 anymore. +`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + WHERE total_tracked_tokens > 0 +`, "range_id", "store_id", "total_tracked_tokens") + + h.waitForConnectedStreams(ctx, repl.RangeID, 5) + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + + h.waitForAllTokensReturned(ctx, 5) + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. We see the remaining +-- 6MiB of {regular,elastic} tokens returned. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(` +-- Observe the total tracked tokens per-stream on n1; there should be nothing. +`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- Another view of tokens, using /inspectz-backed vtables.`) + h.query(n1, ` +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; +`, "range_id", "eval_regular_available", "eval_elastic_available") +} + +// TestFlowControlRaftMembershipV2 tests flow token behavior when the raft +// membership changes. +func TestFlowControlRaftMembershipV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("raft_membership") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- (Adding a voting replica on n4.)`) + tc.AddVotersOrFatal(t, k, tc.Target(3)) + h.waitForConnectedStreams(ctx, desc.RangeID, 4) + + h.comment(` +-- Observe the total tracked tokens per-stream on n1. s1-s3 should have 1MiB +-- tracked each, and s4 should have none.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- (Issuing 1x1MiB, 4x replicated write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Observe the individual tracked tokens per-stream on the scratch range. s1-s3 +-- should have 2MiB tracked (they've observed 2x1MiB writes), s4 should have +-- 1MiB. +`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- (Removing voting replica from n3.)`) + tc.RemoveVotersOrFatal(t, k, tc.Target(2)) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Adding non-voting replica to n5.)`) + tc.AddNonVotersOrFatal(t, k, tc.Target(4)) + h.waitForConnectedStreams(ctx, desc.RangeID, 4) + + h.comment(`-- (Issuing 1x1MiB, 4x replicated write (w/ one non-voter) that's not admitted.`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Observe the individual tracked tokens per-stream on the scratch range. s1-s2 +-- should have 3MiB tracked (they've observed 3x1MiB writes), there should be +-- no s3 since it was removed, s4 and s5 should have 2MiB and 1MiB +-- respectively. +`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 5) + + h.comment(`-- Observe that there no tracked tokens across s1,s2,s4,s5.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. All {regular,elastic} +-- tokens deducted are returned, including from when s3 was removed as a raft +-- member. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// TestFlowControlRaftMembershipRemoveSelf tests flow token behavior when the +// raft leader removes itself from the raft group. +func TestFlowControlRaftMembershipRemoveSelfV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testutils.RunTrueAndFalse(t, "transfer-lease-first", func(t *testing.T, transferLeaseFirst bool) { + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + n2 := sqlutils.MakeSQLRunner(tc.ServerConn(1)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + // Note this test behaves identically independent of we transfer the lease + // first. + defer h.close("raft_membership_remove_self") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + + // Make sure the lease is on n1 and that we're triply connected. + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0)) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- (Replacing current raft leader on n1 in raft group with new n4 replica.)`) + testutils.SucceedsSoon(t, func() error { + // Relocate range from n1 -> n4. + if err := tc.Servers[2].DB(). + AdminRelocateRange( + context.Background(), desc.StartKey.AsRawKey(), + tc.Targets(1, 2, 3), nil, transferLeaseFirst); err != nil { + return err + } + leaseHolder, err := tc.FindRangeLeaseHolder(desc, nil) + if err != nil { + return err + } + if leaseHolder.Equal(tc.Target(0)) { + return errors.Errorf("expected leaseholder to not be on n1") + } + return nil + }) + h.waitForAllTokensReturned(ctx, 4) + + h.comment(` +-- Flow token metrics from n1 after raft leader removed itself from raft group. +-- All {regular,elastic} tokens deducted are returned. Note that the available +-- tokens increases, as n1 has seen 4 replication streams, s1,s2,s3,s4. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(` +-- n1 should have no connected streams now after transferring the lease to n2. +-- While, n2 should have 3 connected streams to s2,s3,s4. Query the stream count +-- on n1, then on n2. +`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(`-- Observe the stream count on n2.`) + h.query(n2, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 4) + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. Tokens were already +-- returned earlier, so there's no change. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + }) +} + +// TestFlowControlClassPrioritizationV2 shows how tokens are managed for both +// regular and elastic work. It does so by replicating + admitting a single +// 1MiB {regular,elastic} write. +func TestFlowControlClassPrioritizationV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + settings := cluster.MakeTestingClusterSettings() + tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("class_prioritization") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.BulkNormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB elastic 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of elastic tokens with +-- no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated regular write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of {regular,elastic} +-- tokens with no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3) + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. All {regular,elastic} +-- tokens deducted are returned. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// TestFlowControlTransferLeaseV2 tests flow control behavior when the range +// lease is transferred, and the raft leadership along with it. +func TestFlowControlTransferLeaseV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + settings := cluster.MakeTestingClusterSettings() + tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("transfer_lease") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- (Transferring range lease to n2 and allowing leadership to follow.)`) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + testutils.SucceedsSoon(t, func() error { + if leader := tc.GetRaftLeader(t, roachpb.RKey(k)); leader.NodeID() != tc.Target(1).NodeID { + return errors.Errorf("expected raft leadership to transfer to n1, found n%d", leader.NodeID()) + } + return nil + }) + h.waitForAllTokensReturned(ctx, 3) + + h.comment(` +-- Flow token metrics from n1 having lost the lease and raft leadership. All +-- deducted tokens are returned. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) +} + +// TestFlowControlLeaderNotLeaseholderV2 tests flow control behavior when the +// range leaseholder is not the raft leader. +func TestFlowControlLeaderNotLeaseholderV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + settings := cluster.MakeTestingClusterSettings() + kvflowcontrol.Enabled.Override(ctx, &settings.SV, true) + + tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + // Disable leader transfers during leaseholder changes so + // that we can easily create leader-not-leaseholder + // scenarios. + DisableLeaderFollowsLeaseholder: true, + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + n2 := sqlutils.MakeSQLRunner(tc.ServerConn(1)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("leader_not_leaseholder") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- (Transferring only range lease, not raft leadership, to n2.)`) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + require.Equal(t, tc.GetRaftLeader(t, roachpb.RKey(k)).NodeID(), tc.Target(0).NodeID) + + h.comment(` +-- Flow token metrics from n1 having lost the lease but retained raft +-- leadership. No deducted tokens are released. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(` +-- (Issuing another 1x1MiB, 3x replicated write that's not admitted while in +-- this leader != leaseholder state.) +`) + // TODO(kvoli): This write blocks at the store work queue, since granting is + // disabled and synchronous. Why does the v1 test not encounter the same + // problem? The put must be bypassing kvadmission with the handle being + // closed or something. + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Looking at n1's flow token metrics, there's no change. No additional tokens +-- are deducted since the write is not being proposed here. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(` +-- Looking at n2's flow token metrics, there's no activity. n2 never acquired +-- the raft leadership. +`) + h.query(n2, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3) // wait for admission + + h.comment(` +-- All deducted flow tokens are returned back to where the raft leader is. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) +} + +// TestFlowControlGranterAdmitOneByOneV2 is a reproduction for #105185. +// Internal admission code that relied on admitting at most one waiting request +// was in fact admitting more than one, and doing so recursively with call +// stacks as deep as the admit chain. This triggered panics (and is also just +// undesirable, design-wise). This test intentionally queues a 1000+ small +// requests, to that end. +func TestFlowControlGranterAdmitOneByOneV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + settings := cluster.MakeTestingClusterSettings() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + OverrideTokenDeduction: func() kvflowcontrol.Tokens { + // This test asserts on the exact values of tracked + // tokens. In non-test code, the tokens deducted are + // a few bytes off (give or take) from the size of + // the proposals. We don't care about such + // differences. + return kvflowcontrol.Tokens(1 << 10 /* 1KiB */) + }, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + AlwaysTryGrantWhenAdmitted: true, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc, settings) + h.init() + defer h.close("granter_admit_one_by_one") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing regular 1024*1KiB, 3x replicated writes that are not admitted.)`) + h.log("sending put requests") + for i := 0; i < 1024; i++ { + h.put(ctx, k, 1<<10 /* 1KiB */, admissionpb.NormalPri) + } + h.log("sent put requests") + + h.comment(` +-- Flow token metrics from n1 after issuing 1024KiB, i.e. 1MiB 3x replicated writes +-- that are yet to get admitted. We see 3*1MiB=3MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe the total tracked tokens per-stream on n1.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3) // wait for admission + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. We see 3MiB returns of +-- {regular,elastic} tokens, and the available capacities going back to what +-- they were. In #105185, by now we would've observed panics. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// rac2TestHelper is a helper for tests that exercise the RACv2 machinery. It +// aims to follow the same pattern as the flowControlTestHelper and is +// therefore useful when comparing behavior between v1 and v2 flow control. +type rac2TestHelper struct { + t *testing.T + tc *testcluster.TestCluster + st *cluster.Settings + buf *strings.Builder + rng *rand.Rand +} + +func newRAC2TestHelper( + t *testing.T, tc *testcluster.TestCluster, st *cluster.Settings, +) *rac2TestHelper { + rng, _ := randutil.NewPseudoRand() + buf := &strings.Builder{} + return &rac2TestHelper{ + t: t, + tc: tc, + buf: buf, + rng: rng, + st: st, + } +} + +func (h *rac2TestHelper) init() { + // Reach into each server's cluster setting and override. This causes any + // registered change callbacks to run immediately, which is important since + // running them with some lag (which happens when using SQL and `SET CLUSTER + // SETTING`) interferes with the later activities in these tests. + for _, s := range h.tc.Servers { + kvflowcontrol.Enabled.Override(context.Background(), &s.ClusterSettings().SV, true) + kvflowcontrol.Mode.Override(context.Background(), &s.ClusterSettings().SV, kvflowcontrol.ApplyToAll) + } +} + +// waitForAllTokensReturned waits for all tokens to be returned across all +// streams. The expected number of streams is passed in as an argument. +func (h *rac2TestHelper) waitForAllTokensReturned(ctx context.Context, expStreamCount int) { + testutils.SucceedsSoon(h.t, func() error { + return h.checkAllTokensReturned(ctx, expStreamCount) + }) +} + +// checkAllTokensReturned checks that all tokens have been returned across all +// streams. It also checks that the expected number of streams are present. The default +func (h *rac2TestHelper) checkAllTokensReturned(ctx context.Context, expStreamCount int) error { + elasticTokensPerStream := kvflowcontrol.ElasticTokensPerStream.Get(&h.st.SV) + regularTokensPerStream := kvflowcontrol.RegularTokensPerStream.Get(&h.st.SV) + streams := h.tc.GetFirstStoreFromServer(h.t, 0).GetStoreConfig().KVFlowStreamTokenProvider.Inspect(ctx) + if len(streams) != expStreamCount { + return fmt.Errorf("expected %d replication streams, got %d [%+v]", expStreamCount, len(streams), streams) + } + + checkTokens := func( + expTokens, actualTokens int64, + stream kvflowcontrol.Stream, + typName string, + ) error { + if actualTokens != expTokens { + return fmt.Errorf("expected %v of %s flow tokens for %v, got %v", + humanize.IBytes(uint64(expTokens)), typName, stream, + humanize.IBytes(uint64(actualTokens)), + ) + } + return nil + } + + for _, stream := range streams { + s := kvflowcontrol.Stream{ + TenantID: stream.TenantID, + StoreID: stream.StoreID, + } + if err := checkTokens( + regularTokensPerStream, stream.AvailableEvalRegularTokens, s, "regular eval", + ); err != nil { + return err + } + if err := checkTokens( + elasticTokensPerStream, stream.AvailableEvalElasticTokens, s, "elastic eval", + ); err != nil { + return err + } + if err := checkTokens( + regularTokensPerStream, stream.AvailableSendRegularTokens, s, "regular send", + ); err != nil { + return err + } + if err := checkTokens( + elasticTokensPerStream, stream.AvailableSendElasticTokens, s, "elastic send", + ); err != nil { + return err + } + } + return nil +} + +// waitForConnectedStreams waits for the given range to have the expected +// number of connected streams (replicas with a send stream in +// rac2.RangeController). +func (h *rac2TestHelper) waitForConnectedStreams( + ctx context.Context, rangeID roachpb.RangeID, expConnectedStreams int, +) { + testutils.SucceedsSoon(h.t, func() error { + state, found := kvserver.MakeStoresForRACv2( + h.tc.Server(0).GetStores().(*kvserver.Stores)).LookupInspect(rangeID) + if !found { + return fmt.Errorf("handle for %s not found", rangeID) + } + require.True(h.t, found) + if len(state.ConnectedStreams) != expConnectedStreams { + return fmt.Errorf("expected %d connected streams, got %d [%+v]", + expConnectedStreams, len(state.ConnectedStreams), state.ConnectedStreams) + } + return nil + }) +} + +// waitForTotalTrackedTokens waits for the total tracked tokens across all +// streams for the given range to reach the expected value. It is agnostic of +// priority, the total tracked tokens is the aggregate across all priorities. +func (h *rac2TestHelper) waitForTotalTrackedTokens( + ctx context.Context, rangeID roachpb.RangeID, expTotalTrackedTokens int64, +) { + testutils.SucceedsSoon(h.t, func() error { + state, found := kvserver.MakeStoresForRACv2( + h.tc.Server(0).GetStores().(*kvserver.Stores)).LookupInspect(rangeID) + if !found { + return fmt.Errorf("handle for %s not found", rangeID) + } + require.True(h.t, found) + var totalTracked int64 + for _, stream := range state.ConnectedStreams { + for _, tracked := range stream.TrackedDeductions { + totalTracked += tracked.Tokens + } + } + if totalTracked != expTotalTrackedTokens { + return fmt.Errorf("expected to track %d tokens in aggregate, got %d", + kvflowcontrol.Tokens(expTotalTrackedTokens), kvflowcontrol.Tokens(totalTracked)) + } + return nil + }) +} + +// comment appends the comment string to the testdata file buffer. +func (h *rac2TestHelper) comment(comment string) { + if h.buf.Len() > 0 { + h.buf.WriteString("\n\n") + } + + comment = strings.TrimSpace(comment) + h.buf.WriteString(fmt.Sprintf("%s\n", comment)) + h.log(comment) +} + +// log logs the given message if logging is enabled.Vg +func (h *rac2TestHelper) log(msg string) { + if log.ShowLogs() { + log.Infof(context.Background(), "%s", msg) + } +} + +// query runs the given SQL query against the given SQLRunner, and appends the +// output to the testdata file buffer. +func (h *rac2TestHelper) query(runner *sqlutils.SQLRunner, sql string, headers ...string) { + // NB: We update metric gauges here to ensure that periodically updated + // metrics (via the node metrics loop) are up-to-date. + h.tc.GetFirstStoreFromServer(h.t, 0).GetStoreConfig().KVFlowStreamTokenProvider.UpdateMetricGauges() + sql = strings.TrimSpace(sql) + h.log(sql) + h.buf.WriteString(fmt.Sprintf("%s\n\n", sql)) + + rows := runner.Query(h.t, sql) + tbl := tablewriter.NewWriter(h.buf) + output, err := sqlutils.RowsToStrMatrix(rows) + require.NoError(h.t, err) + tbl.SetAlignment(tablewriter.ALIGN_LEFT) + tbl.AppendBulk(output) + tbl.SetBorder(false) + tbl.SetHeader(headers) + tbl.SetAutoFormatHeaders(false) + tbl.Render() +} + +// put issues a put request for the given key at the prioritity specified, +// against the first server in the cluster. +func (h *rac2TestHelper) put( + ctx context.Context, key roachpb.Key, size int, pri admissionpb.WorkPriority, +) *kvpb.BatchRequest { + value := roachpb.MakeValueFromString(randutil.RandString(h.rng, size, randutil.PrintableKeyAlphabet)) + ba := &kvpb.BatchRequest{} + ba.Add(kvpb.NewPut(key, value)) + ba.AdmissionHeader.Priority = int32(pri) + ba.AdmissionHeader.Source = kvpb.AdmissionHeader_FROM_SQL + if _, pErr := h.tc.Server(0).DB().NonTransactionalSender().Send( + ctx, ba, + ); pErr != nil { + h.t.Fatal(pErr.GoError()) + } + return ba +} + +// close writes the buffer to a file in the testdata directory and compares it +// against the expected output. +func (h *rac2TestHelper) close(filename string) { + echotest.Require(h.t, h.buf.String(), datapathutils.TestDataPath(h.t, "flow_control_integration_v2", filename)) +} + +// enableVerboseRaftMsgLoggingForRange installs a raft handler on each node, +// which in turn enables verbose message logging. +func (h *rac2TestHelper) enableVerboseRaftMsgLoggingForRange(desc roachpb.RangeDescriptor) { + for i := 0; i < len(h.tc.Servers); i++ { + si, err := h.tc.Server(i).GetStores().(*kvserver.Stores).GetStore(h.tc.Server(i).GetFirstStoreID()) + require.NoError(h.t, err) + h.tc.Servers[i].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(si.StoreID(), + &unreliableRaftHandler{ + rangeID: desc.RangeID, + IncomingRaftMessageHandler: si, + unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ + dropReq: func(req *kvserverpb.RaftMessageRequest) bool { + // Install a raft handler to get verbose raft logging. + return false + }, + }, + }) + } +} diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/admission_post_split_merge b/pkg/kv/kvserver/testdata/flow_control_integration_v2/admission_post_split_merge new file mode 100644 index 000000000000..ce55bf39eb03 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/admission_post_split_merge @@ -0,0 +1,138 @@ +echo +---- +---- +-- Flow token metrics from n1 after issuing a regular 2*1MiB 3x replicated write +-- that are yet to get admitted. We see 2*3*1MiB=6MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. The 2*1MiB writes +-- happened on what is soon going to be the LHS and RHS of a range being split. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 18 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 6.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 42 MiB + kvflowcontrol.tokens.eval.regular.deducted | 6.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 18 MiB + kvflowcontrol.tokens.send.elastic.deducted | 6.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 42 MiB + kvflowcontrol.tokens.send.regular.deducted | 6.0 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Splitting range.) + + +-- Flow token metrics from n1 after further issuing 2MiB and 3MiB writes to +-- post-split LHS and RHS ranges respectively. We should see 15MiB extra tokens +-- deducted which comes from (2MiB+3MiB)*3=15MiB. So we stand at +-- 6MiB+15MiB=21MiB now. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 21 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 27 MiB + kvflowcontrol.tokens.eval.regular.deducted | 21 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 3.0 MiB + kvflowcontrol.tokens.send.elastic.deducted | 21 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 27 MiB + kvflowcontrol.tokens.send.regular.deducted | 21 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the newly split off replica, with its own three streams. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + 70 | 3 + 71 | 3 + + +-- (Merging ranges.) + + +-- Flow token metrics from n1 after issuing 4MiB of regular replicated writes to +-- the post-merged range. We should see 12MiB extra tokens deducted which comes +-- from 4MiB*3=12MiB. So we stand at 21MiB+12MiB=33MiB tokens deducted now. The +-- RHS of the range is gone now, and the previously 3*3MiB=9MiB of tokens +-- deducted for it are released at the subsuming LHS leaseholder. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.regular.available | 24 MiB + kvflowcontrol.tokens.eval.regular.deducted | 33 MiB + kvflowcontrol.tokens.eval.regular.returned | 9.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 24 MiB + kvflowcontrol.tokens.send.regular.deducted | 33 MiB + kvflowcontrol.tokens.send.regular.returned | 9.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe only the merged replica with its own three streams. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + 70 | 3 + + +-- (Allow below-raft admission to proceed.) + + +-- Flow token metrics from n1 after work gets admitted. We see all outstanding +-- {regular,elastic} tokens returned, including those from: +-- - the LHS before the merge, and +-- - the LHS and RHS before the original split. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 33 MiB + kvflowcontrol.tokens.eval.elastic.returned | 33 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 33 MiB + kvflowcontrol.tokens.eval.regular.returned | 33 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 33 MiB + kvflowcontrol.tokens.send.elastic.returned | 33 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 33 MiB + kvflowcontrol.tokens.send.regular.returned | 33 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/basic b/pkg/kv/kvserver/testdata/flow_control_integration_v2/basic new file mode 100644 index 000000000000..3b277ef34cb7 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/basic @@ -0,0 +1,89 @@ +echo +---- +---- +-- Flow token metrics, before issuing the regular 1MiB replicated write. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 0 B + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 0 B + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 0 B + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 0 B + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Issuing + admitting a regular 1MiB, triply replicated write...) + + +-- Stream counts as seen by n1 post-write. We should see three {regular,elastic} +-- streams given there are three nodes and we're using a replication factor of +-- three. +SELECT name, value + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%stream%' +ORDER BY name ASC; + + kvflowcontrol.streams.eval.elastic.blocked_count | 0 + kvflowcontrol.streams.eval.elastic.total_count | 3 + kvflowcontrol.streams.eval.regular.blocked_count | 0 + kvflowcontrol.streams.eval.regular.total_count | 3 + kvflowcontrol.streams.send.elastic.blocked_count | 0 + kvflowcontrol.streams.send.elastic.total_count | 3 + kvflowcontrol.streams.send.regular.blocked_count | 0 + kvflowcontrol.streams.send.regular.total_count | 3 + + +-- Another view of the stream count, using /inspectz-backed vtables. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +HAVING count(*) = 3 +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + 70 | 3 + + +-- Flow token metrics from n1 after issuing the regular 1MiB replicated write, +-- and it being admitted on n1, n2 and n3. We should see 3*1MiB = 3MiB of +-- {regular,elastic} tokens deducted and returned, and {8*3=24MiB,16*3=48MiB} of +-- {regular,elastic} tokens available. Everything should be accounted for. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 3.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/blocked_admission b/pkg/kv/kvserver/testdata/flow_control_integration_v2/blocked_admission new file mode 100644 index 000000000000..0063556ca7fa --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/blocked_admission @@ -0,0 +1,97 @@ +echo +---- +---- +-- (Issuing 5 regular 1MiB, 3x replicated write that's not admitted.) + + +-- Flow token metrics from n1 after issuing 5 regular 1MiB 3x replicated writes +-- that are yet to get admitted. We see 5*1MiB*3=15MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 9.0 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 15 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 33 MiB + kvflowcontrol.tokens.eval.regular.deducted | 15 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 9.0 MiB + kvflowcontrol.tokens.send.elastic.deducted | 15 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 33 MiB + kvflowcontrol.tokens.send.regular.deducted | 15 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the total tracked tokens per-stream on n1. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 5.0 MiB + 70 | 2 | 5.0 MiB + 70 | 3 | 5.0 MiB + + +-- Observe the individual tracked tokens per-stream on the scratch range. +SELECT range_id, store_id, priority, crdb_internal.humanize_bytes(tokens::INT8) + FROM crdb_internal.kv_flow_token_deductions_v2 + + range_id | store_id | priority | tokens +-----------+----------+------------+---------- + 70 | 1 | normal-pri | 1.0 MiB + 70 | 1 | normal-pri | 1.0 MiB + 70 | 1 | normal-pri | 1.0 MiB + 70 | 1 | normal-pri | 1.0 MiB + 70 | 1 | normal-pri | 1.0 MiB + 70 | 2 | normal-pri | 1.0 MiB + 70 | 2 | normal-pri | 1.0 MiB + 70 | 2 | normal-pri | 1.0 MiB + 70 | 2 | normal-pri | 1.0 MiB + 70 | 2 | normal-pri | 1.0 MiB + 70 | 3 | normal-pri | 1.0 MiB + 70 | 3 | normal-pri | 1.0 MiB + 70 | 3 | normal-pri | 1.0 MiB + 70 | 3 | normal-pri | 1.0 MiB + 70 | 3 | normal-pri | 1.0 MiB + + +-- (Allow below-raft admission to proceed.) + + +-- Flow token metrics from n1 after work gets admitted. We see 15MiB returns of +-- {regular,elastic} tokens, and the available capacities going back to what +-- they were. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 15 MiB + kvflowcontrol.tokens.eval.elastic.returned | 15 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 15 MiB + kvflowcontrol.tokens.eval.regular.returned | 15 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 15 MiB + kvflowcontrol.tokens.send.elastic.returned | 15 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 15 MiB + kvflowcontrol.tokens.send.regular.returned | 15 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/class_prioritization b/pkg/kv/kvserver/testdata/flow_control_integration_v2/class_prioritization new file mode 100644 index 000000000000..dcab380ca055 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/class_prioritization @@ -0,0 +1,91 @@ +echo +---- +---- +-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.) + + +-- Flow token metrics from n1 after issuing 1x1MiB elastic 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of elastic tokens with +-- no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 21 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 0 B + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 21 MiB + kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 0 B + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Issuing 1x1MiB, 3x replicated regular write that's not admitted.) + + +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of {regular,elastic} +-- tokens with no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 18 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 6.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 45 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 18 MiB + kvflowcontrol.tokens.send.elastic.deducted | 6.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 45 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Allow below-raft admission to proceed.) + + +-- Flow token metrics from n1 after work gets admitted. All {regular,elastic} +-- tokens deducted are returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 6.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 6.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 6.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 6.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 3.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/crashed_node b/pkg/kv/kvserver/testdata/flow_control_integration_v2/crashed_node new file mode 100644 index 000000000000..aed49b390bf9 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/crashed_node @@ -0,0 +1,82 @@ +echo +---- +---- +-- (Issuing regular 5x1MiB, 2x replicated writes that are not admitted.) + + +-- Flow token metrics from n1 after issuing 5 regular 1MiB 2x replicated writes +-- that are yet to get admitted. We see 5*1MiB*2=10MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 6.0 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 10 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 22 MiB + kvflowcontrol.tokens.eval.regular.deducted | 10 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 6.0 MiB + kvflowcontrol.tokens.send.elastic.deducted | 10 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 22 MiB + kvflowcontrol.tokens.send.regular.deducted | 10 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the per-stream tracked tokens on n1, before n2 is crashed. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 5.0 MiB + 70 | 2 | 5.0 MiB + + +-- (Crashing n2) + + +-- Observe the per-stream tracked tokens on n1, after n2 crashed. We're no +-- longer tracking the 5MiB held by n2. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 5.0 MiB + + +-- Flow token metrics from n1 after n2 crashed. Observe that we've returned the +-- 5MiB previously held by n2. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 11 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 10 MiB + kvflowcontrol.tokens.eval.elastic.returned | 5.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 27 MiB + kvflowcontrol.tokens.eval.regular.deducted | 10 MiB + kvflowcontrol.tokens.eval.regular.returned | 5.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 11 MiB + kvflowcontrol.tokens.send.elastic.deducted | 10 MiB + kvflowcontrol.tokens.send.elastic.returned | 5.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 27 MiB + kvflowcontrol.tokens.send.regular.deducted | 10 MiB + kvflowcontrol.tokens.send.regular.returned | 5.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/granter_admit_one_by_one b/pkg/kv/kvserver/testdata/flow_control_integration_v2/granter_admit_one_by_one new file mode 100644 index 000000000000..b3a79a59a080 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/granter_admit_one_by_one @@ -0,0 +1,74 @@ +echo +---- +---- +-- (Issuing regular 1024*1KiB, 3x replicated writes that are not admitted.) + + +-- Flow token metrics from n1 after issuing 1024KiB, i.e. 1MiB 3x replicated writes +-- that are yet to get admitted. We see 3*1MiB=3MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 21 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 45 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 21 MiB + kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 45 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the total tracked tokens per-stream on n1. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 1.0 MiB + 70 | 2 | 1.0 MiB + 70 | 3 | 1.0 MiB + + +-- (Allow below-raft admission to proceed.) + + +-- Flow token metrics from n1 after work gets admitted. We see 3MiB returns of +-- {regular,elastic} tokens, and the available capacities going back to what +-- they were. In #105185, by now we would've observed panics. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 3.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/leader_not_leaseholder b/pkg/kv/kvserver/testdata/flow_control_integration_v2/leader_not_leaseholder new file mode 100644 index 000000000000..ee6c4277ee47 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/leader_not_leaseholder @@ -0,0 +1,4 @@ +echo +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership b/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership new file mode 100644 index 000000000000..0612133c1655 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership @@ -0,0 +1,124 @@ +echo +---- +---- +-- (Issuing 1x1MiB, 3x replicated write that's not admitted.) + + +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.regular.available | 45 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 45 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Adding a voting replica on n4.) + + +-- Observe the total tracked tokens per-stream on n1. s1-s3 should have 1MiB +-- tracked each, and s4 should have none. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 1.0 MiB + 70 | 2 | 1.0 MiB + 70 | 3 | 1.0 MiB + 70 | 4 | 0 B + + +-- (Issuing 1x1MiB, 4x replicated write that's not admitted.) + + +-- Observe the individual tracked tokens per-stream on the scratch range. s1-s3 +-- should have 2MiB tracked (they've observed 2x1MiB writes), s4 should have +-- 1MiB. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 2.0 MiB + 70 | 2 | 2.0 MiB + 70 | 3 | 2.0 MiB + 70 | 4 | 1.0 MiB + + +-- (Removing voting replica from n3.) + + +-- (Adding non-voting replica to n5.) + + +-- (Issuing 1x1MiB, 4x replicated write (w/ one non-voter) that's not admitted. + + +-- Observe the individual tracked tokens per-stream on the scratch range. s1-s2 +-- should have 3MiB tracked (they've observed 3x1MiB writes), there should be +-- no s3 since it was removed, s4 and s5 should have 2MiB and 1MiB +-- respectively. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 3.0 MiB + 70 | 2 | 3.0 MiB + 70 | 4 | 2.0 MiB + 70 | 5 | 1.0 MiB + + +-- (Allow below-raft admission to proceed.) + + +-- Observe that there no tracked tokens across s1,s2,s4,s5. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 0 B + 70 | 2 | 0 B + 70 | 4 | 0 B + 70 | 5 | 0 B + + +-- Flow token metrics from n1 after work gets admitted. All {regular,elastic} +-- tokens deducted are returned, including from when s3 was removed as a raft +-- member. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 40 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 11 MiB + kvflowcontrol.tokens.eval.elastic.returned | 11 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 80 MiB + kvflowcontrol.tokens.eval.regular.deducted | 11 MiB + kvflowcontrol.tokens.eval.regular.returned | 11 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 40 MiB + kvflowcontrol.tokens.send.elastic.deducted | 11 MiB + kvflowcontrol.tokens.send.elastic.returned | 11 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 80 MiB + kvflowcontrol.tokens.send.regular.deducted | 11 MiB + kvflowcontrol.tokens.send.regular.returned | 11 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership_remove_self b/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership_remove_self new file mode 100644 index 000000000000..70f727643eb7 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership_remove_self @@ -0,0 +1,106 @@ +echo +---- +---- +-- (Issuing 1x1MiB, 3x replicated write that's not admitted.) + + +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.regular.available | 45 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 45 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Replacing current raft leader on n1 in raft group with new n4 replica.) + + +-- Flow token metrics from n1 after raft leader removed itself from raft group. +-- All {regular,elastic} tokens deducted are returned. Note that the available +-- tokens increases, as n1 has seen 4 replication streams, s1,s2,s3,s4. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 32 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 64 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 32 MiB + kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 64 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 3.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- n1 should have no connected streams now after transferring the lease to n2. +-- While, n2 should have 3 connected streams to s2,s3,s4. Query the stream count +-- on n1, then on n2. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + + +-- Observe the stream count on n2. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + 70 | 3 + + +-- (Allow below-raft admission to proceed.) + + +-- Flow token metrics from n1 after work gets admitted. Tokens were already +-- returned earlier, so there's no change. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 32 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 64 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 32 MiB + kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 64 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 3.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_snapshot b/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_snapshot new file mode 100644 index 000000000000..80a25cb5e6cd --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_snapshot @@ -0,0 +1,209 @@ +echo +---- +---- +-- Flow token metrics from n1 after issuing 1 regular 1MiB 5x replicated write +-- that's not admitted. Since this test is ignoring crashed nodes for token +-- deduction purposes, we see a deduction of 5MiB {regular,elastic} tokens. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 35 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 5.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 75 MiB + kvflowcontrol.tokens.eval.regular.deducted | 5.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 35 MiB + kvflowcontrol.tokens.send.elastic.deducted | 5.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 75 MiB + kvflowcontrol.tokens.send.regular.deducted | 5.0 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the total tracked tokens per-stream on n1. 1MiB is tracked for n1-n5. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 1.0 MiB + 70 | 2 | 1.0 MiB + 70 | 3 | 1.0 MiB + 70 | 4 | 1.0 MiB + 70 | 5 | 1.0 MiB + + +-- (Killing n2 and n3, but preventing their tokens from being returned + +-- artificially allowing tokens to get deducted.) + + +-- Observe the total tracked tokens per-stream on n1. 1MiB is (still) tracked +-- for n1-n5, because they are not in StateSnapshot yet and it have likely been +-- in StateProbe for less than the close timer. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 1.0 MiB + 70 | 2 | 1.0 MiB + 70 | 3 | 1.0 MiB + 70 | 4 | 1.0 MiB + 70 | 5 | 1.0 MiB + + +-- (Issuing another 1MiB of 5x replicated writes while n2 and n3 are down and +-- below-raft admission is paused.) + + +-- Flow token metrics from n1 after issuing 1 regular 1MiB 5x replicated write +-- that's not admitted. We'll have deducted another 5*1MiB=5MiB worth of tokens. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 30 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 10 MiB + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 70 MiB + kvflowcontrol.tokens.eval.regular.deducted | 10 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 30 MiB + kvflowcontrol.tokens.send.elastic.deducted | 10 MiB + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 70 MiB + kvflowcontrol.tokens.send.regular.deducted | 10 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the total tracked tokens per-stream on n1. 2MiB is tracked for n1-n5; +-- see last comment for an explanation why we're still deducting for n2, n3. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 2.0 MiB + 70 | 2 | 2.0 MiB + 70 | 3 | 2.0 MiB + 70 | 4 | 2.0 MiB + 70 | 5 | 2.0 MiB + + +-- (Truncating raft log.) + + +-- (Restarting n2 and n3.) + + +-- Flow token metrics from n1 after restarting n2 and n3. We've returned the +-- 2MiB previously held by those nodes (2MiB each). We're reacting to it's raft +-- progress state, noting that since we've truncated our log, we need to catch +-- it up via snapshot. So we release all held tokens. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 34 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 10 MiB + kvflowcontrol.tokens.eval.elastic.returned | 4.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 74 MiB + kvflowcontrol.tokens.eval.regular.deducted | 10 MiB + kvflowcontrol.tokens.eval.regular.returned | 4.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 34 MiB + kvflowcontrol.tokens.send.elastic.deducted | 10 MiB + kvflowcontrol.tokens.send.elastic.returned | 4.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 74 MiB + kvflowcontrol.tokens.send.regular.deducted | 10 MiB + kvflowcontrol.tokens.send.regular.returned | 4.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the total tracked tokens per-stream on n1. There's nothing tracked +-- for n2 and n3 anymore. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + WHERE total_tracked_tokens > 0 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 2.0 MiB + 70 | 4 | 2.0 MiB + 70 | 5 | 2.0 MiB + + +-- (Allow below-raft admission to proceed.) + + +-- Flow token metrics from n1 after work gets admitted. We see the remaining +-- 6MiB of {regular,elastic} tokens returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 40 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 10 MiB + kvflowcontrol.tokens.eval.elastic.returned | 10 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 80 MiB + kvflowcontrol.tokens.eval.regular.deducted | 10 MiB + kvflowcontrol.tokens.eval.regular.returned | 10 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 40 MiB + kvflowcontrol.tokens.send.elastic.deducted | 10 MiB + kvflowcontrol.tokens.send.elastic.returned | 10 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 80 MiB + kvflowcontrol.tokens.send.regular.deducted | 10 MiB + kvflowcontrol.tokens.send.regular.returned | 10 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the total tracked tokens per-stream on n1; there should be nothing. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 70 | 1 | 0 B + 70 | 2 | 0 B + 70 | 3 | 0 B + 70 | 4 | 0 B + 70 | 5 | 0 B + + +-- Another view of tokens, using /inspectz-backed vtables. +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + range_id | eval_regular_available | eval_elastic_available +-----------+------------------------+------------------------- + 1 | 16 MiB | 8.0 MiB + 2 | 16 MiB | 8.0 MiB + 3 | 16 MiB | 8.0 MiB + 4 | 16 MiB | 8.0 MiB + 5 | 16 MiB | 8.0 MiB +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/split_merge b/pkg/kv/kvserver/testdata/flow_control_integration_v2/split_merge new file mode 100644 index 000000000000..472cb19a1b20 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/split_merge @@ -0,0 +1,113 @@ +echo +---- +---- +-- Flow token metrics from n1 after issuing + admitting the regular 1MiB 3x +-- replicated write to the pre-split range. There should be 3MiB of +-- {regular,elastic} tokens {deducted,returned}. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 3.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Splitting range.) + + +-- Flow token metrics from n1 after further issuing 2MiB and 3MiB writes to +-- post-split LHS and RHS ranges respectively. We should see 15MiB extra tokens +-- {deducted,returned}, which comes from (2MiB+3MiB)*3=15MiB. So we stand at +-- 3MiB+15MiB=18MiB now. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 18 MiB + kvflowcontrol.tokens.eval.elastic.returned | 18 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 18 MiB + kvflowcontrol.tokens.eval.regular.returned | 18 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 18 MiB + kvflowcontrol.tokens.send.elastic.returned | 18 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 18 MiB + kvflowcontrol.tokens.send.regular.returned | 18 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe the newly split off replica, with its own three streams. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + 70 | 3 + 71 | 3 + + +-- (Merging ranges.) + + +-- Flow token metrics from n1 after issuing 4MiB of regular replicated writes to +-- the post-merged range. We should see 12MiB extra tokens {deducted,returned}, +-- which comes from 4MiB*3=12MiB. So we stand at 18MiB+12MiB=30MiB now. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 30 MiB + kvflowcontrol.tokens.eval.elastic.returned | 30 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 30 MiB + kvflowcontrol.tokens.eval.regular.returned | 30 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 30 MiB + kvflowcontrol.tokens.send.elastic.returned | 30 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 30 MiB + kvflowcontrol.tokens.send.regular.returned | 30 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- Observe only the merged replica with its own three streams. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + 70 | 3 +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/transfer_lease b/pkg/kv/kvserver/testdata/flow_control_integration_v2/transfer_lease new file mode 100644 index 000000000000..6cc3ce5cdba6 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/transfer_lease @@ -0,0 +1,46 @@ +echo +---- +---- +-- (Issuing 1x1MiB, 3x replicated write that's not admitted.) + + +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.regular.available | 45 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 45 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Transferring range lease to n2 and allowing leadership to follow.) + + +-- Flow token metrics from n1 having lost the lease and raft leadership. All +-- deducted tokens are returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 3.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql