diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 0c9fc216260a..69ce4b7a9f16 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -2477,3 +2477,1240 @@ func (h *flowControlTestHelper) put( func (h *flowControlTestHelper) close(filename string) { echotest.Require(h.t, h.buf.String(), datapathutils.TestDataPath(h.t, "flow_control_integration", filename)) } + +// TestFlowControlBasicV2 runs a basic end-to-end test of the v2 kvflowcontrol +// machinery, replicating + admitting a single 1MiB regular write. +// The vmodule flags for running these tests are: +// +// --vmodule='replica_raft=1,kvflowcontroller=2,replica_proposal_buf=1, +// raft_transport=2,kvflowdispatch=1,kvadmission=1, +// kvflowhandle=1,work_queue=1,replica_flow_control=1, +// tracker=1,client_raft_helpers_test=1,range_controller=2, +// token_counter=2,token_tracker=2,processor=2' + +func TestFlowControlBasicV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testutils.RunTrueAndFalse(t, "always-enqueue", func(t *testing.T, alwaysEnqueue bool) { + ctx := context.Background() + const numNodes = 3 + st := cluster.MakeTestingClusterSettings() + kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) // override metamorphism + kvflowcontrol.Enabled.Override(ctx, &st.SV, true) + + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + RaftConfig: base.RaftConfig{ + // Suppress timeout-based elections. This test doesn't want to + // deal with leadership changing hands or followers unquiescing + // ranges by calling elections. + RaftElectionTimeoutTicks: 1000000, + }, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: alwaysEnqueue, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + desc, err := tc.LookupRange(k) + require.NoError(t, err) + + for i := 0; i < numNodes; i++ { + si, err := tc.Server(i).GetStores().(*kvserver.Stores).GetStore(tc.Server(i).GetFirstStoreID()) + require.NoError(t, err) + tc.Servers[i].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(si.StoreID(), + &unreliableRaftHandler{ + rangeID: desc.RangeID, + IncomingRaftMessageHandler: si, + unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ + dropReq: func(req *kvserverpb.RaftMessageRequest) bool { + // Install a raft handler to get verbose raft logging. + return false + }, + }, + }) + } + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc) + h.init() + defer h.close("basic") // this test behaves identically with or without the fast path + + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- Flow token metrics, before issuing the regular 1MiB replicated write.`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Issuing + admitting a regular 1MiB, triply replicated write...)`) + h.log("sending put request") + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + h.log("sent put request") + + h.waitForAllTokensReturned(ctx, 3) + h.comment(` +-- Stream counts as seen by n1 post-write. We should see three {regular,elastic} +-- streams given there are three nodes and we're using a replication factor of +-- three. +`) + h.query(n1, ` + SELECT name, value + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%stream%' +ORDER BY name ASC; +`) + + h.comment(`-- Another view of the stream count, using /inspectz-backed vtables.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +HAVING count(*) = 3 +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(` +-- Flow token metrics from n1 after issuing the regular 1MiB replicated write, +-- and it being admitted on n1, n2 and n3. We should see 3*1MiB = 3MiB of +-- {regular,elastic} tokens deducted and returned, and {8*3=24MiB,16*3=48MiB} of +-- {regular,elastic} tokens available. Everything should be accounted for. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + }) +} + +// TestFlowControlRangeSplitMergeV2 walks through what happens to flow tokens +// when a range splits/merges. +func TestFlowControlRangeSplitMergeV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + const numNodes = 3 + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc) + h.init() + defer h.close("split_merge") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.log("sending put request to pre-split range") + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + h.log("sent put request to pre-split range") + + h.waitForAllTokensReturned(ctx, 3) + h.comment(` +-- Flow token metrics from n1 after issuing + admitting the regular 1MiB 3x +-- replicated write to the pre-split range. There should be 3MiB of +-- {regular,elastic} tokens {deducted,returned}. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Splitting range.)`) + left, right := tc.SplitRangeOrFatal(t, k.Next()) + h.waitForConnectedStreams(ctx, right.RangeID, 3) + // [T1,n1,s1,r63/1:/{Table/62-Max},*kvpb.AdminSplitRequest] initiating a split of this range at key /Table/Max [r64] (manual) + // [T1,n1,s1,r64/1:/{Table/Max-Max},raft] connected to stream: t1/s1 + // [T1,n1,s1,r64/1:/{Table/Max-Max},raft] connected to stream: t1/s2 + // [T1,n1,s1,r64/1:/{Table/Max-Max},raft] connected to stream: t1/s3 + + h.log("sending 2MiB put request to post-split LHS") + h.put(ctx, k, 2<<20 /* 2MiB */, admissionpb.NormalPri) + h.log("sent 2MiB put request to post-split LHS") + + h.log("sending 3MiB put request to post-split RHS") + h.put(ctx, roachpb.Key(right.StartKey), 3<<20 /* 3MiB */, admissionpb.NormalPri) + h.log("sent 3MiB put request to post-split RHS") + + h.waitForAllTokensReturned(ctx, 3) + h.comment(` +-- Flow token metrics from n1 after further issuing 2MiB and 3MiB writes to +-- post-split LHS and RHS ranges respectively. We should see 15MiB extra tokens +-- {deducted,returned}, which comes from (2MiB+3MiB)*3=15MiB. So we stand at +-- 3MiB+15MiB=18MiB now. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe the newly split off replica, with its own three streams.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(`-- (Merging ranges.)`) + merged := tc.MergeRangesOrFatal(t, left.StartKey.AsRawKey()) + + // [T1,n1,s1,r64/1:{/Table/Max-\xfa\x00},*kvpb.AdminMergeRequest] initiating a merge of r65:{\xfa\x00-/Max} [(n1,s1):1, (n2,s2):2, (n3,s3):3, next=4, gen=6, sticky=9223372036.854775807,2147483647] into this range (manual) + // [T1,n1,s1,r64/1:{/Table/Max-\xfa\x00},raft] 380 removing replica r65/1 + // [T1,n2,s2,r64/2:{/Table/Max-\xfa\x00},raft] 385 removing replica r65/2 + // [T1,n3,s3,r64/3:{/Table/Max-\xfa\x00},raft] 384 removing replica r65/3 + // [T1,n1,s1,r65/1:{\xfa\x00-/Max},raft] disconnected stream: t1/s1 + // [T1,n1,s1,r65/1:{\xfa\x00-/Max},raft] disconnected stream: t1/s2 + // [T1,n1,s1,r65/1:{\xfa\x00-/Max},raft] disconnected stream: t1/s3 + + h.log("sending 4MiB put request to post-merge range") + h.put(ctx, roachpb.Key(merged.StartKey), 4<<20 /* 4MiB */, admissionpb.NormalPri) + h.log("sent 4MiB put request to post-merged range") + + h.waitForAllTokensReturned(ctx, 3) + h.comment(` +-- Flow token metrics from n1 after issuing 4MiB of regular replicated writes to +-- the post-merged range. We should see 12MiB extra tokens {deducted,returned}, +-- which comes from 4MiB*3=12MiB. So we stand at 18MiB+12MiB=30MiB now. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe only the merged replica with its own three streams.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") +} + +// TestFlowControlBlockedAdmissionV2 tests token tracking behavior by explicitly +// blocking below-raft admission. +func TestFlowControlBlockedAdmissionV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + const numNodes = 3 + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + + st := cluster.MakeTestingClusterSettings() + kvflowcontrol.Enabled.Override(ctx, &st.SV, true) + + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + n2 := sqlutils.MakeSQLRunner(tc.ServerConn(1)) + + h := newRAC2TestHelper(t, tc) + h.init() + defer h.close("blocked_admission") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing regular 1MiB, 3x replicated write that's not admitted.)`) + h.log("sending put requests") + for i := 0; i < 5; i++ { + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + } + h.log("sent put requests") + + h.comment(` +-- Flow token metrics from n1 after issuing 5 regular 1MiB 3x replicated writes +-- that are yet to get admitted. We see 5*1MiB*3=15MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe the total tracked tokens per-stream on n1.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- Observe the individual tracked tokens per-stream on the scratch range.`) + h.query(n1, ` + SELECT range_id, store_id, priority, crdb_internal.humanize_bytes(tokens::INT8) + FROM crdb_internal.kv_flow_token_deductions_v2 +`, "range_id", "store_id", "priority", "tokens") + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3) // wait for admission + + h.comment(`-- Observe flow token dispatch metrics from n1.`) + h.query(n1, ` + SELECT name, value + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol.flow_token_dispatch.local_regular%' +ORDER BY name ASC; +`) + + h.comment(` +-- Observe flow token dispatch metrics from n2. +-- TODO(kvoli): We don't have flow token dispatch metrics in v2 (yet?). +-- Either add them to support this query or remove this query. + `) + h.query(n2, ` + SELECT name, value + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvadmission.flow_token_dispatch.remote_regular%' + ORDER BY name ASC; + `) + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. We see 15MiB returns of +-- {regular,elastic} tokens, and the available capacities going back to what +-- they were. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// TestFlowControlAdmissionPostSplitMergeV2 walks through what happens with flow +// tokens when a range after undergoes splits/merges. It does this by blocking +// and later unblocking below-raft admission, verifying: +// - tokens for the RHS are released at the post-merge subsuming leaseholder, +// - admission for the RHS post-merge does not cause a double return of tokens, +// - admission for the LHS can happen post-merge, +// - admission for the LHS and RHS can happen post-split. +func TestFlowControlAdmissionPostSplitMergeV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + const numNodes = 3 + + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + + st := cluster.MakeTestingClusterSettings() + kvflowcontrol.Enabled.Override(ctx, &st.SV, true) + + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc) + h.init() + defer h.close("admission_post_split_merge") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.log("sending put request to pre-split range") + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + h.put(ctx, k.Next(), 1<<20 /* 1MiB */, admissionpb.NormalPri) + h.log("sent put request to pre-split range") + + h.comment(` +-- Flow token metrics from n1 after issuing a regular 2*1MiB 3x replicated write +-- that are yet to get admitted. We see 2*3*1MiB=6MiB deductions of +-- {regular,elastic} tokens with no corresponding returns. The 2*1MiB writes +-- happened on what is soon going to be the LHS and RHS of a range being split. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Splitting range.)`) + left, right := tc.SplitRangeOrFatal(t, k.Next()) + h.waitForConnectedStreams(ctx, right.RangeID, 3) + + h.log("sending 2MiB put request to post-split LHS") + h.put(ctx, k, 2<<20 /* 2MiB */, admissionpb.NormalPri) + h.log("sent 2MiB put request to post-split LHS") + + h.log("sending 3MiB put request to post-split RHS") + h.put(ctx, roachpb.Key(right.StartKey), 3<<20 /* 3MiB */, admissionpb.NormalPri) + h.log("sent 3MiB put request to post-split RHS") + + h.comment(` +-- Flow token metrics from n1 after further issuing 2MiB and 3MiB writes to +-- post-split LHS and RHS ranges respectively. We should see 15MiB extra tokens +-- deducted which comes from (2MiB+3MiB)*3=15MiB. So we stand at +-- 6MiB+15MiB=21MiB now. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe the newly split off replica, with its own three streams.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(`-- (Merging ranges.)`) + merged := tc.MergeRangesOrFatal(t, left.StartKey.AsRawKey()) + + h.log("sending 4MiB put request to post-merge range") + h.put(ctx, roachpb.Key(merged.StartKey), 4<<20 /* 4MiB */, admissionpb.NormalPri) + h.log("sent 4MiB put request to post-merged range") + + h.comment(` +-- Flow token metrics from n1 after issuing 4MiB of regular replicated writes to +-- the post-merged range. We should see 12MiB extra tokens deducted which comes +-- from 4MiB*3=12MiB. So we stand at 21MiB+12MiB=33MiB tokens deducted now. The +-- RHS of the range is gone now, and the previously 3*3MiB=9MiB of tokens +-- deducted for it are released at the subsuming LHS leaseholder. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- Observe only the merged replica with its own three streams.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3) // wait for admission + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. We see all outstanding +-- {regular,elastic} tokens returned, including those from: +-- - the LHS before the merge, and +-- - the LHS and RHS before the original split. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// TestFlowControlRaftMembershipV2 tests flow token behavior when the raft +// membership changes. +func TestFlowControlRaftMembershipV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + kvflowcontrol.Enabled.Override(ctx, &st.SV, true) + + const numNodes = 5 + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc) + h.init() + defer h.close("raft_membership") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- (Adding a voting replica on n4.)`) + tc.AddVotersOrFatal(t, k, tc.Target(3)) + h.waitForConnectedStreams(ctx, desc.RangeID, 4) + + h.comment(` +-- Observe the total tracked tokens per-stream on n1. s1-s3 should have 1MiB +-- tracked each, and s4 should have none.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- (Issuing 1x1MiB, 4x replicated write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Observe the individual tracked tokens per-stream on the scratch range. s1-s3 +-- should have 2MiB tracked (they've observed 2x1MiB writes), s4 should have +-- 1MiB. +`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- (Removing voting replica from n3.)`) + tc.RemoveVotersOrFatal(t, k, tc.Target(2)) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Adding non-voting replica to n5.)`) + tc.AddNonVotersOrFatal(t, k, tc.Target(4)) + h.waitForConnectedStreams(ctx, desc.RangeID, 4) + + h.comment(`-- (Issuing 1x1MiB, 4x replicated write (w/ one non-voter) that's not admitted.`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Observe the individual tracked tokens per-stream on the scratch range. s1-s2 +-- should have 3MiB tracked (they've observed 3x1MiB writes), there should be +-- no s3 since it was removed, s4 and s5 should have 2MiB and 1MiB +-- respectively. +`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 5) + + h.comment(`-- Observe that there no tracked tokens across s1,s2,s4,s5.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. All {regular,elastic} +-- tokens deducted are returned, including from when s3 was removed as a raft +-- member. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// TestFlowControlRaftMembershipRemoveSelf tests flow token behavior when the +// raft leader removes itself from the raft group. +func TestFlowControlRaftMembershipRemoveSelfV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testutils.RunTrueAndFalse(t, "transfer-lease-first", func(t *testing.T, transferLeaseFirst bool) { + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + kvflowcontrol.Enabled.Override(ctx, &st.SV, true) + + const numNodes = 4 + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc) + h.init() + defer h.close("raft_membership_remove_self") // this test behaves identically independent of we transfer the lease first + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + + // Make sure the lease is on n1 and that we're triply connected. + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0)) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- (Replacing current raft leader on n1 in raft group with new n4 replica.)`) + testutils.SucceedsSoon(t, func() error { + // Relocate range from n1 -> n4. + if err := tc.Servers[2].DB(). + AdminRelocateRange( + context.Background(), desc.StartKey.AsRawKey(), + tc.Targets(1, 2, 3), nil, transferLeaseFirst); err != nil { + return err + } + leaseHolder, err := tc.FindRangeLeaseHolder(desc, nil) + if err != nil { + return err + } + if leaseHolder.Equal(tc.Target(0)) { + return errors.Errorf("expected leaseholder to not be on n1") + } + return nil + }) + h.waitForAllTokensReturned(ctx, 3) + + h.comment(` +-- Flow token metrics from n1 after raft leader removed itself from raft group. +-- All {regular,elastic} tokens deducted are returned. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3) + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. Tokens were already +-- returned earlier, so there's no change. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + }) +} + +// TestFlowControlClassPrioritizationV2 shows how tokens are managed for both +// regular and elastic work. It does so by replicating + admitting a single +// 1MiB {regular,elastic} write. + +func TestFlowControlClassPrioritizationV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + const numNodes = 5 + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + + st := cluster.MakeTestingClusterSettings() + kvflowcontrol.Enabled.Override(ctx, &st.SV, true) + + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc) + h.init() + defer h.close("class_prioritization") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.BulkNormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB elastic 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of elastic tokens with +-- no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated regular write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of {regular,elastic} +-- tokens with no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3) + + h.comment(` +-- Flow token metrics from n1 after work gets admitted. All {regular,elastic} +-- tokens deducted are returned. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; +`) +} + +// TestFlowControlTransferLeaseV2 tests flow control behavior when the range +// lease is transferred, and the raft leadership along with it. +func TestFlowControlTransferLeaseV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + const numNodes = 5 + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + + st := cluster.MakeTestingClusterSettings() + kvflowcontrol.Enabled.Override(ctx, &st.SV, true) + + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + h := newRAC2TestHelper(t, tc) + h.init() + defer h.close("transfer_lease") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- (Transferring range lease to n2 and allowing leadership to follow.)`) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + testutils.SucceedsSoon(t, func() error { + if leader := tc.GetRaftLeader(t, roachpb.RKey(k)); leader.NodeID() != tc.Target(1).NodeID { + return errors.Errorf("expected raft leadership to transfer to n1, found n%d", leader.NodeID()) + } + return nil + }) + h.waitForAllTokensReturned(ctx, 3) + + h.comment(` +-- Flow token metrics from n1 having lost the lease and raft leadership. All +-- deducted tokens are returned. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) +} + +// TestFlowControlLeaderNotLeaseholderV2 tests flow control behavior when the +// range leaseholder is not the raft leader. +func TestFlowControlLeaderNotLeaseholderV2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + const numNodes = 5 + var disableWorkQueueGranting atomic.Bool + disableWorkQueueGranting.Store(true) + + st := cluster.MakeTestingClusterSettings() + kvflowcontrol.Enabled.Override(ctx, &st.SV, true) + + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + // Disable leader transfers during leaseholder changes so + // that we can easily create leader-not-leaseholder + // scenarios. + DisableLeaderFollowsLeaseholder: true, + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + return disableWorkQueueGranting.Load() + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + n2 := sqlutils.MakeSQLRunner(tc.ServerConn(1)) + + h := newRAC2TestHelper(t, tc) + h.init() + defer h.close("leader_not_leaseholder") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.waitForConnectedStreams(ctx, desc.RangeID, 3) + + h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Flow token metrics from n1 after issuing 1x1MiB regular 3x replicated write +-- that's not admitted. We see 1*1MiB*3=3MiB deductions of regular tokens with +-- no corresponding returns. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- (Transferring only range lease, not raft leadership, to n2.)`) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + require.Equal(t, tc.GetRaftLeader(t, roachpb.RKey(k)).NodeID(), tc.Target(0).NodeID) + + h.comment(` +-- Flow token metrics from n1 having lost the lease but retained raft +-- leadership. No deducted tokens are released. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(` +-- (Issuing another 1x1MiB, 3x replicated write that's not admitted while in +-- this leader != leaseholder state.) +`) + h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) + + h.comment(` +-- Looking at n1's flow token metrics, there's no change. No additional tokens +-- are deducted since the write is not being proposed here. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(` +-- Looking at n2's flow token metrics, there's no activity. n2 never acquired +-- the raft leadership. +`) + h.query(n2, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) + + h.comment(`-- (Allow below-raft admission to proceed.)`) + disableWorkQueueGranting.Store(false) + h.waitForAllTokensReturned(ctx, 3) // wait for admission + + h.comment(` +-- All deducted flow tokens are returned back to where the raft leader is. +`) + h.query(n1, ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%regular%' +ORDER BY name ASC; +`) +} + +type rac2TestHelper struct { + t *testing.T + tc *testcluster.TestCluster + buf *strings.Builder + rng *rand.Rand +} + +func newRAC2TestHelper(t *testing.T, tc *testcluster.TestCluster) *rac2TestHelper { + rng, _ := randutil.NewPseudoRand() + buf := &strings.Builder{} + return &rac2TestHelper{ + t: t, + tc: tc, + buf: buf, + rng: rng, + } +} + +func (h *rac2TestHelper) init() { + // Reach into each server's cluster setting and override. This causes any + // registered change callbacks to run immediately, which is important since + // running them with some lag (which happens when using SQL and `SET CLUSTER + // SETTING`) interferes with the later activities in these tests. + for _, s := range h.tc.Servers { + kvflowcontrol.Enabled.Override(context.Background(), &s.ClusterSettings().SV, true) + kvflowcontrol.Mode.Override(context.Background(), &s.ClusterSettings().SV, kvflowcontrol.ApplyToAll) + } +} + +func (h *rac2TestHelper) waitForAllTokensReturned(ctx context.Context, expStreamCount int) { + testutils.SucceedsSoon(h.t, func() error { + return h.checkAllTokensReturned(ctx, expStreamCount) + }) +} + +func (h *rac2TestHelper) checkAllTokensReturned(ctx context.Context, expStreamCount int) error { + streams := h.tc.GetFirstStoreFromServer(h.t, 0).GetStoreConfig().KVFlowStreamTokenProvider.Inspect(ctx) + if len(streams) != expStreamCount { + return fmt.Errorf("expected %d replication streams, got %d", expStreamCount, len(streams)) + } + for _, stream := range streams { + if stream.AvailableEvalRegularTokens != 16<<20 { + return fmt.Errorf("expected %s of regular flow tokens for %s, got %s", + humanize.IBytes(16<<20), + kvflowcontrol.Stream{ + TenantID: stream.TenantID, + StoreID: stream.StoreID, + }, + humanize.IBytes(uint64(stream.AvailableEvalRegularTokens)), + ) + } + if stream.AvailableEvalElasticTokens != 8<<20 { + return fmt.Errorf("expected %s of elastic flow tokens for %s, got %s", + humanize.IBytes(8<<20), + kvflowcontrol.Stream{ + TenantID: stream.TenantID, + StoreID: stream.StoreID, + }, + humanize.IBytes(uint64(stream.AvailableEvalElasticTokens)), + ) + } + } + return nil +} + +func (h *rac2TestHelper) waitForConnectedStreams( + ctx context.Context, rangeID roachpb.RangeID, expConnectedStreams int, +) { + testutils.SucceedsSoon(h.t, func() error { + state, found := kvserver.MakeStoresForRACv2( + h.tc.Server(0).GetStores().(*kvserver.Stores)).LookupInspect(rangeID) + if !found { + return fmt.Errorf("handle for %s not found", rangeID) + } + require.True(h.t, found) + if len(state.ConnectedStreams) != expConnectedStreams { + return fmt.Errorf("expected %d connected streams, got %d", + expConnectedStreams, len(state.ConnectedStreams)) + } + return nil + }) +} + +func (h *rac2TestHelper) waitForTotalTrackedTokens( + ctx context.Context, rangeID roachpb.RangeID, expTotalTrackedTokens int64, +) { + testutils.SucceedsSoon(h.t, func() error { + state, found := kvserver.MakeStoresForRACv2( + h.tc.Server(0).GetStores().(*kvserver.Stores)).LookupInspect(rangeID) + if !found { + return fmt.Errorf("handle for %s not found", rangeID) + } + require.True(h.t, found) + var totalTracked int64 + for _, stream := range state.ConnectedStreams { + for _, tracked := range stream.TrackedDeductions { + totalTracked += tracked.Tokens + } + } + if totalTracked != expTotalTrackedTokens { + return fmt.Errorf("expected to track %d tokens in aggregate, got %d", + kvflowcontrol.Tokens(expTotalTrackedTokens), kvflowcontrol.Tokens(totalTracked)) + } + return nil + }) +} + +func (h *rac2TestHelper) comment(comment string) { + if h.buf.Len() > 0 { + h.buf.WriteString("\n\n") + } + + comment = strings.TrimSpace(comment) + h.buf.WriteString(fmt.Sprintf("%s\n", comment)) + h.log(comment) +} + +func (h *rac2TestHelper) log(msg string) { + if log.ShowLogs() { + log.Infof(context.Background(), "%s", msg) + } +} + +func (h *rac2TestHelper) query(runner *sqlutils.SQLRunner, sql string, headers ...string) { + // NB: We update metric gauges here to ensure that periodically updated + // metrics (via the node metrics loop) are up-to-date. + h.tc.GetFirstStoreFromServer(h.t, 0).GetStoreConfig().KVFlowStreamTokenProvider.UpdateMetricGauges() + sql = strings.TrimSpace(sql) + h.log(sql) + h.buf.WriteString(fmt.Sprintf("%s\n\n", sql)) + + rows := runner.Query(h.t, sql) + tbl := tablewriter.NewWriter(h.buf) + output, err := sqlutils.RowsToStrMatrix(rows) + require.NoError(h.t, err) + tbl.SetAlignment(tablewriter.ALIGN_LEFT) + tbl.AppendBulk(output) + tbl.SetBorder(false) + tbl.SetHeader(headers) + tbl.SetAutoFormatHeaders(false) + tbl.Render() +} + +func (h *rac2TestHelper) put( + ctx context.Context, key roachpb.Key, size int, pri admissionpb.WorkPriority, +) *kvpb.BatchRequest { + value := roachpb.MakeValueFromString(randutil.RandString(h.rng, size, randutil.PrintableKeyAlphabet)) + ba := &kvpb.BatchRequest{} + ba.Add(kvpb.NewPut(key, value)) + ba.AdmissionHeader.Priority = int32(pri) + ba.AdmissionHeader.Source = kvpb.AdmissionHeader_FROM_SQL + if _, pErr := h.tc.Server(0).DB().NonTransactionalSender().Send( + ctx, ba, + ); pErr != nil { + h.t.Fatal(pErr.GoError()) + } + return ba +} + +func (h *rac2TestHelper) close(filename string) { + echotest.Require(h.t, h.buf.String(), datapathutils.TestDataPath(h.t, "flow_control_integration_v2", filename)) +} diff --git a/pkg/kv/kvserver/flow_control_stores.go b/pkg/kv/kvserver/flow_control_stores.go index 48cce8c531fb..bc8fa8e8f4b5 100644 --- a/pkg/kv/kvserver/flow_control_stores.go +++ b/pkg/kv/kvserver/flow_control_stores.go @@ -71,8 +71,20 @@ func (sh *storesForFlowControl) LookupInspect( // interface. func (sh *storesForFlowControl) LookupReplicationAdmissionHandle( rangeID roachpb.RangeID, -) (kvflowcontrol.ReplicationAdmissionHandle, bool) { - return sh.Lookup(rangeID) +) (handle kvflowcontrol.ReplicationAdmissionHandle, found bool) { + ls := (*Stores)(sh) + if err := ls.VisitStores(func(s *Store) error { + if h, ok := makeStoreForFlowControl(s).LookupReplicationAdmissionHandle(rangeID); ok { + handle = h + found = true + } + return nil + }); err != nil { + ctx := ls.AnnotateCtx(context.Background()) + log.Errorf(ctx, "unexpected error: %s", err) + return nil, false + } + return handle, found } // Inspect is part of the StoresForFlowControl interface. diff --git a/pkg/kv/kvserver/kvadmission/BUILD.bazel b/pkg/kv/kvserver/kvadmission/BUILD.bazel index 5df579399fbd..49239170414e 100644 --- a/pkg/kv/kvserver/kvadmission/BUILD.bazel +++ b/pkg/kv/kvserver/kvadmission/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//pkg/kv/kvpb", "//pkg/kv/kvserver/kvflowcontrol", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + "//pkg/kv/kvserver/kvflowcontrol/rac2", "//pkg/kv/kvserver/kvflowcontrol/replica_rac2", "//pkg/kv/kvserver/raftlog", "//pkg/raft/raftpb", diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index 16837b4276cd..eedb0f4f8995 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" @@ -353,7 +354,9 @@ func (n *controllerImpl) AdmitKVWork( // and the point of deduction. That's ok, there's no strong // synchronization needed between these two points. ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{ - AdmissionPriority: int32(admissionInfo.Priority), + // TODO(kvoli): Only translate the priority to a raftpb.Priority if + // using v2. + AdmissionPriority: int32(rac2.AdmissionToRaftPriority(admissionInfo.Priority)), AdmissionCreateTime: admissionInfo.CreateTime, AdmissionOriginNode: n.nodeID.Get(), } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go index bb5af1f7f49a..e300b91b958e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go @@ -157,6 +157,7 @@ func (c *Controller) Admit( // the wait duration metrics with CPU scheduling artifacts, causing // confusion. + panic("unreachable") if waitEndState == waitSuccess { const formatStr = "admitted request (pri=%s stream=%s wait-duration=%s mode=%s)" if waited { diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go index b37c70f0cb64..d1515c902b1f 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go @@ -51,6 +51,24 @@ func (av AdmittedVector) Merge(other AdmittedVector) AdmittedVector { return av } +func (av AdmittedVector) String() string { + return redact.StringWithoutMarkers(av) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (av AdmittedVector) SafeFormat(w redact.SafePrinter, _ rune) { + var buf redact.StringBuilder + buf.Printf("term:%d, admitted:[", av.Term) + for pri, index := range av.Admitted { + if pri > 0 { + buf.Printf(",") + } + buf.Printf("%s:%d", raftpb.Priority(pri), index) + } + buf.Printf("]") + w.Printf("%v", buf) +} + // LogTracker tracks the durable and logically admitted state of a raft log. // // Writes to a raft log are ordered by LogMark (term, index) where term is the diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index c3079fdcd3aa..91f92422e395 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -257,6 +257,7 @@ var _ RangeController = &rangeController{} func NewRangeController( ctx context.Context, o RangeControllerOptions, init RangeControllerInitState, ) *rangeController { + log.VInfof(ctx, 1, "r%v creating range controller", o.RangeID) rc := &rangeController{ opts: o, leaseholder: init.Leaseholder, @@ -371,7 +372,10 @@ retry: } } } - rc.opts.EvalWaitMetrics.OnAdmitted(wc, rc.opts.Clock.PhysicalTime().Sub(start)) + waitDuration := rc.opts.Clock.PhysicalTime().Sub(start) + log.VEventf(ctx, 2, "admitted request (pri=%v wait-duration=%s wait-for-all=%v)", + pri, waitDuration, waitForAllReplicateHandles) + rc.opts.EvalWaitMetrics.OnAdmitted(wc, waitDuration) return true, nil } @@ -442,6 +446,7 @@ func (rc *rangeController) SetLeaseholderRaftMuLocked( if replica == rc.leaseholder { return } + log.VInfof(ctx, 1, "r%v setting range leaseholder replica_id=%v", rc.opts.RangeID, replica) rc.leaseholder = replica rc.updateWaiterSets() } @@ -450,6 +455,7 @@ func (rc *rangeController) SetLeaseholderRaftMuLocked( // // Requires replica.raftMu to be held. func (rc *rangeController) CloseRaftMuLocked(ctx context.Context) { + log.VInfof(ctx, 1, "r%v closing range controller", rc.opts.RangeID) rc.mu.Lock() defer rc.mu.Unlock() @@ -637,13 +643,16 @@ func (rss *replicaSendStream) changeConnectedStateLocked(state connectedState, n } func (rss *replicaSendStream) admit(ctx context.Context, av AdmittedVector) { + log.VInfof(ctx, 2, "r%v:%v stream %v admit %v", + rss.parent.parent.opts.RangeID, rss.parent.desc, rss.parent.stream, av) rss.mu.Lock() defer rss.mu.Unlock() rss.returnTokens(ctx, rss.mu.tracker.Untrack(av.Term, av.Admitted)) } -func (rs *replicaState) createReplicaSendStream() { +func (rs *replicaState) createReplicaSendStream(ctx context.Context) { // Must be in StateReplicate on creation. + log.VEventf(ctx, 1, "creating send stream %v for replica %v", rs.stream, rs.desc) rs.sendStream = &replicaSendStream{ parent: rs, } @@ -744,7 +753,7 @@ func (rs *replicaState) handleReadyState( case tracker.StateReplicate: if rs.sendStream == nil { - rs.createReplicaSendStream() + rs.createReplicaSendStream(ctx) shouldWaitChange = true } else { shouldWaitChange = rs.sendStream.makeConsistentInStateReplicate(ctx) @@ -760,6 +769,7 @@ func (rs *replicaState) handleReadyState( } func (rss *replicaState) closeSendStream(ctx context.Context) { + log.VEventf(ctx, 1, "closing send stream %v for replica %v", rss.stream, rss.desc) rss.sendStream.mu.Lock() defer rss.sendStream.mu.Unlock() diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go b/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go index ae1c334f4757..b7089b1f9a71 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go @@ -61,7 +61,7 @@ func (p *StreamTokenCounterProvider) Eval(stream kvflowcontrol.Stream) *tokenCou return t } t, _ := p.evalCounters.LoadOrStore(stream, newTokenCounter( - p.settings, p.clock, p.tokenMetrics.CounterMetrics[flowControlEvalMetricType])) + p.settings, p.clock, p.tokenMetrics.CounterMetrics[flowControlEvalMetricType], stream)) return t } @@ -71,7 +71,7 @@ func (p *StreamTokenCounterProvider) Send(stream kvflowcontrol.Stream) *tokenCou return t } t, _ := p.sendCounters.LoadOrStore(stream, newTokenCounter( - p.settings, p.clock, p.tokenMetrics.CounterMetrics[flowControlSendMetricType])) + p.settings, p.clock, p.tokenMetrics.CounterMetrics[flowControlSendMetricType], stream)) return t } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go index 3eee5cf9cc7e..c5dffff12832 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go @@ -176,6 +176,9 @@ type tokenCounter struct { settings *cluster.Settings clock *hlc.Clock metrics *TokenCounterMetrics + // stream is the stream for which tokens are being adjusted, it is only used + // in logging. + stream kvflowcontrol.Stream mu struct { syncutil.RWMutex @@ -186,12 +189,16 @@ type tokenCounter struct { // newTokenCounter creates a new TokenCounter. func newTokenCounter( - settings *cluster.Settings, clock *hlc.Clock, metrics *TokenCounterMetrics, + settings *cluster.Settings, + clock *hlc.Clock, + metrics *TokenCounterMetrics, + stream kvflowcontrol.Stream, ) *tokenCounter { t := &tokenCounter{ settings: settings, clock: clock, metrics: metrics, + stream: stream, } limit := tokensPerWorkClass{ regular: kvflowcontrol.Tokens(kvflowcontrol.RegularTokensPerStream.Get(&settings.SV)), @@ -470,10 +477,21 @@ func (t *tokenCounter) adjust( ctx context.Context, class admissionpb.WorkClass, delta kvflowcontrol.Tokens, ) { now := t.clock.PhysicalTime() - t.mu.Lock() - defer t.mu.Unlock() + func() { + t.mu.Lock() + defer t.mu.Unlock() + t.adjustLocked(ctx, class, delta, now) + }() + + if log.V(2) { + func() { + t.mu.RLock() + defer t.mu.RUnlock() - t.adjustLocked(ctx, class, delta, now) + log.Infof(ctx, "adjusted flow tokens (wc=%v stream=%v delta=%v): regular=%v elastic=%v", + class, t.stream, delta, t.tokensLocked(regular), t.tokensLocked(elastic)) + }() + } } func (t *tokenCounter) adjustLocked( diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter_test.go index d5d695aa306b..033b1ce3f1a3 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter_test.go @@ -203,6 +203,7 @@ func TestTokenCounter(t *testing.T) { settings, hlc.NewClockForTesting(nil), newTokenCounterMetrics(flowControlEvalMetricType), + kvflowcontrol.Stream{}, ) assertStateReset := func(t *testing.T) { @@ -378,6 +379,7 @@ func (ts *evalTestState) getOrCreateTC(stream string) *namedTokenCounter { ts.settings, hlc.NewClockForTesting(nil), newTokenCounterMetrics(flowControlEvalMetricType), + kvflowcontrol.Stream{}, ), stream: stream, } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go b/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go index 29322476cae9..38f6a1a5b1a8 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go @@ -72,6 +72,10 @@ func (t *Tracker) Track( term: term, }) + if log.V(1) { + log.Infof(ctx, "tracking %v flow control tokens for pri=%s stream=%s log-position=%d/%d", + tokens, pri, t.stream, term, index) + } return true } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index abf1a42bc96d..60346c018068 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -889,6 +889,22 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2 if err != nil { panic(errors.Wrap(err, "unable to decode raft command admission data: %v")) } + + if log.V(1) { + log.Infof(ctx, "decoded raft admission meta below-raft: pri=%s create-time=%d proposer=n%s receiver=[n%d,s%s] tenant=t%d tokensā‰ˆ%d sideloaded=%t raft-entry=%d/%d", + admissionpb.WorkPriority(meta.AdmissionPriority), + meta.AdmissionCreateTime, + meta.AdmissionOriginNode, + p.opts.NodeID, + p.opts.StoreID, + p.desc.tenantID.ToUint64(), + kvflowcontrol.Tokens(len(entry.Data)), + typ.IsSideloaded(), + entry.Term, + entry.Index, + ) + } + mark := rac2.LogMark{Term: e.Term, Index: entry.Index} var raftPri raftpb.Priority if isV2Encoding { diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 7561bee5ad17..ca389c971046 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -2528,7 +2528,7 @@ func racV2EnabledWhenLeaderLevel( ctx context.Context, st *cluster.Settings, ) replica_rac2.EnabledWhenLeaderLevel { // TODO(sumeer): implement fully, once all the dependencies are implemented. - return replica_rac2.NotEnabledWhenLeader + return replica_rac2.EnabledWhenLeaderV2Encoding } // maybeEnqueueProblemRange will enqueue the replica for processing into the diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 4fc8cf52224e..f4065b88f213 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -234,6 +234,7 @@ func newUninitializedReplicaWithoutRaftGroup( RaftScheduler: r.store.scheduler, AdmittedPiggybacker: r.store.cfg.KVFlowAdmittedPiggybacker, ACWorkQueue: r.store.cfg.KVAdmissionController, + Settings: r.store.cfg.Settings, EvalWaitMetrics: r.store.cfg.KVFlowEvalWaitMetrics, RangeControllerFactory: r.store.kvflowRangeControllerFactory, EnabledWhenLeaderLevel: r.raftMu.flowControlLevel, diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index e38bc9f0b8bc..f954de6b6d1d 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -923,7 +923,11 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // Even if we don't have a Ready, or entries in Ready, // replica_rac2.Processor may need to do some work. raftEvent := rac2.RaftEventFromMsgStorageAppend(msgStorageAppend) - r.flowControlV2.HandleRaftReadyRaftMuLocked(ctx, raftEvent) + if r.store.cfg.TestingKnobs.FlowControlTestingKnobs == nil || + !r.store.cfg.TestingKnobs.FlowControlTestingKnobs.UseOnlyForScratchRanges || + r.IsScratchRange() { + r.flowControlV2.HandleRaftReadyRaftMuLocked(ctx, raftEvent) + } if !hasReady { // We must update the proposal quota even if we don't have a ready. // Consider the case when our quota is of size 1 and two out of three diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/admission_post_split_merge b/pkg/kv/kvserver/testdata/flow_control_integration_v2/admission_post_split_merge new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/basic b/pkg/kv/kvserver/testdata/flow_control_integration_v2/basic new file mode 100644 index 000000000000..3b277ef34cb7 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/basic @@ -0,0 +1,89 @@ +echo +---- +---- +-- Flow token metrics, before issuing the regular 1MiB replicated write. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 0 B + kvflowcontrol.tokens.eval.elastic.returned | 0 B + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 0 B + kvflowcontrol.tokens.eval.regular.returned | 0 B + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 0 B + kvflowcontrol.tokens.send.elastic.returned | 0 B + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 0 B + kvflowcontrol.tokens.send.regular.returned | 0 B + kvflowcontrol.tokens.send.regular.unaccounted | 0 B + + +-- (Issuing + admitting a regular 1MiB, triply replicated write...) + + +-- Stream counts as seen by n1 post-write. We should see three {regular,elastic} +-- streams given there are three nodes and we're using a replication factor of +-- three. +SELECT name, value + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%stream%' +ORDER BY name ASC; + + kvflowcontrol.streams.eval.elastic.blocked_count | 0 + kvflowcontrol.streams.eval.elastic.total_count | 3 + kvflowcontrol.streams.eval.regular.blocked_count | 0 + kvflowcontrol.streams.eval.regular.total_count | 3 + kvflowcontrol.streams.send.elastic.blocked_count | 0 + kvflowcontrol.streams.send.elastic.total_count | 3 + kvflowcontrol.streams.send.regular.blocked_count | 0 + kvflowcontrol.streams.send.regular.total_count | 3 + + +-- Another view of the stream count, using /inspectz-backed vtables. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +HAVING count(*) = 3 +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + 70 | 3 + + +-- Flow token metrics from n1 after issuing the regular 1MiB replicated write, +-- and it being admitted on n1, n2 and n3. We should see 3*1MiB = 3MiB of +-- {regular,elastic} tokens deducted and returned, and {8*3=24MiB,16*3=48MiB} of +-- {regular,elastic} tokens available. Everything should be accounted for. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%tokens%' +ORDER BY name ASC; + + kvflowcontrol.tokens.eval.elastic.available | 24 MiB + kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B + kvflowcontrol.tokens.eval.regular.available | 48 MiB + kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB + kvflowcontrol.tokens.eval.regular.unaccounted | 0 B + kvflowcontrol.tokens.send.elastic.available | 24 MiB + kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB + kvflowcontrol.tokens.send.elastic.returned | 3.0 MiB + kvflowcontrol.tokens.send.elastic.unaccounted | 0 B + kvflowcontrol.tokens.send.regular.available | 48 MiB + kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB + kvflowcontrol.tokens.send.regular.returned | 3.0 MiB + kvflowcontrol.tokens.send.regular.unaccounted | 0 B +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/blocked_admission b/pkg/kv/kvserver/testdata/flow_control_integration_v2/blocked_admission new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/class_prioritization b/pkg/kv/kvserver/testdata/flow_control_integration_v2/class_prioritization new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/leader_not_leaseholder b/pkg/kv/kvserver/testdata/flow_control_integration_v2/leader_not_leaseholder new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership b/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership_remove_self b/pkg/kv/kvserver/testdata/flow_control_integration_v2/raft_membership_remove_self new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/split_merge b/pkg/kv/kvserver/testdata/flow_control_integration_v2/split_merge new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/transfer_lease b/pkg/kv/kvserver/testdata/flow_control_integration_v2/transfer_lease new file mode 100644 index 000000000000..e69de29bb2d1