Skip to content

Commit

Permalink
kvserver: add basic rac2 integration test
Browse files Browse the repository at this point in the history
Part of: #130187
Release note: None
  • Loading branch information
kvoli committed Sep 16, 2024
1 parent 8e686f4 commit 369ac1f
Show file tree
Hide file tree
Showing 24 changed files with 1,430 additions and 14 deletions.
1,237 changes: 1,237 additions & 0 deletions pkg/kv/kvserver/flow_control_integration_test.go

Large diffs are not rendered by default.

16 changes: 14 additions & 2 deletions pkg/kv/kvserver/flow_control_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,20 @@ func (sh *storesForFlowControl) LookupInspect(
// interface.
func (sh *storesForFlowControl) LookupReplicationAdmissionHandle(
rangeID roachpb.RangeID,
) (kvflowcontrol.ReplicationAdmissionHandle, bool) {
return sh.Lookup(rangeID)
) (handle kvflowcontrol.ReplicationAdmissionHandle, found bool) {
ls := (*Stores)(sh)
if err := ls.VisitStores(func(s *Store) error {
if h, ok := makeStoreForFlowControl(s).LookupReplicationAdmissionHandle(rangeID); ok {
handle = h
found = true
}
return nil
}); err != nil {
ctx := ls.AnnotateCtx(context.Background())
log.Errorf(ctx, "unexpected error: %s", err)
return nil, false
}
return handle, found
}

// Inspect is part of the StoresForFlowControl interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvadmission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/rac2",
"//pkg/kv/kvserver/kvflowcontrol/replica_rac2",
"//pkg/kv/kvserver/raftlog",
"//pkg/raft/raftpb",
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
Expand Down Expand Up @@ -353,7 +354,9 @@ func (n *controllerImpl) AdmitKVWork(
// and the point of deduction. That's ok, there's no strong
// synchronization needed between these two points.
ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{
AdmissionPriority: int32(admissionInfo.Priority),
// TODO(kvoli): Only translate the priority to a raftpb.Priority if
// using v2.
AdmissionPriority: int32(rac2.AdmissionToRaftPriority(admissionInfo.Priority)),
AdmissionCreateTime: admissionInfo.CreateTime,
AdmissionOriginNode: n.nodeID.Get(),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (c *Controller) Admit(
// the wait duration metrics with CPU scheduling artifacts, causing
// confusion.

panic("unreachable")
if waitEndState == waitSuccess {
const formatStr = "admitted request (pri=%s stream=%s wait-duration=%s mode=%s)"
if waited {
Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ func (av AdmittedVector) Merge(other AdmittedVector) AdmittedVector {
return av
}

func (av AdmittedVector) String() string {
return redact.StringWithoutMarkers(av)
}

// SafeFormat implements the redact.SafeFormatter interface.
func (av AdmittedVector) SafeFormat(w redact.SafePrinter, _ rune) {
var buf redact.StringBuilder
buf.Printf("term:%d, admitted:[", av.Term)
for pri, index := range av.Admitted {
if pri > 0 {
buf.Printf(",")
}
buf.Printf("%s:%d", raftpb.Priority(pri), index)
}
buf.Printf("]")
w.Printf("%v", buf)
}

// LogTracker tracks the durable and logically admitted state of a raft log.
//
// Writes to a raft log are ordered by LogMark (term, index) where term is the
Expand Down
16 changes: 13 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ var _ RangeController = &rangeController{}
func NewRangeController(
ctx context.Context, o RangeControllerOptions, init RangeControllerInitState,
) *rangeController {
log.VInfof(ctx, 1, "r%v creating range controller", o.RangeID)
rc := &rangeController{
opts: o,
leaseholder: init.Leaseholder,
Expand Down Expand Up @@ -371,7 +372,10 @@ retry:
}
}
}
rc.opts.EvalWaitMetrics.OnAdmitted(wc, rc.opts.Clock.PhysicalTime().Sub(start))
waitDuration := rc.opts.Clock.PhysicalTime().Sub(start)
log.VEventf(ctx, 2, "admitted request (pri=%v wait-duration=%s wait-for-all=%v)",
pri, waitDuration, waitForAllReplicateHandles)
rc.opts.EvalWaitMetrics.OnAdmitted(wc, waitDuration)
return true, nil
}

Expand Down Expand Up @@ -442,6 +446,7 @@ func (rc *rangeController) SetLeaseholderRaftMuLocked(
if replica == rc.leaseholder {
return
}
log.VInfof(ctx, 1, "r%v setting range leaseholder replica_id=%v", rc.opts.RangeID, replica)
rc.leaseholder = replica
rc.updateWaiterSets()
}
Expand All @@ -450,6 +455,7 @@ func (rc *rangeController) SetLeaseholderRaftMuLocked(
//
// Requires replica.raftMu to be held.
func (rc *rangeController) CloseRaftMuLocked(ctx context.Context) {
log.VInfof(ctx, 1, "r%v closing range controller", rc.opts.RangeID)
rc.mu.Lock()
defer rc.mu.Unlock()

Expand Down Expand Up @@ -637,13 +643,16 @@ func (rss *replicaSendStream) changeConnectedStateLocked(state connectedState, n
}

func (rss *replicaSendStream) admit(ctx context.Context, av AdmittedVector) {
log.VInfof(ctx, 2, "r%v:%v stream %v admit %v",
rss.parent.parent.opts.RangeID, rss.parent.desc, rss.parent.stream, av)
rss.mu.Lock()
defer rss.mu.Unlock()
rss.returnTokens(ctx, rss.mu.tracker.Untrack(av.Term, av.Admitted))
}

func (rs *replicaState) createReplicaSendStream() {
func (rs *replicaState) createReplicaSendStream(ctx context.Context) {
// Must be in StateReplicate on creation.
log.VEventf(ctx, 1, "creating send stream %v for replica %v", rs.stream, rs.desc)
rs.sendStream = &replicaSendStream{
parent: rs,
}
Expand Down Expand Up @@ -744,7 +753,7 @@ func (rs *replicaState) handleReadyState(

case tracker.StateReplicate:
if rs.sendStream == nil {
rs.createReplicaSendStream()
rs.createReplicaSendStream(ctx)
shouldWaitChange = true
} else {
shouldWaitChange = rs.sendStream.makeConsistentInStateReplicate(ctx)
Expand All @@ -760,6 +769,7 @@ func (rs *replicaState) handleReadyState(
}

func (rss *replicaState) closeSendStream(ctx context.Context) {
log.VEventf(ctx, 1, "closing send stream %v for replica %v", rss.stream, rss.desc)
rss.sendStream.mu.Lock()
defer rss.sendStream.mu.Unlock()

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (p *StreamTokenCounterProvider) Eval(stream kvflowcontrol.Stream) *tokenCou
return t
}
t, _ := p.evalCounters.LoadOrStore(stream, newTokenCounter(
p.settings, p.clock, p.tokenMetrics.CounterMetrics[flowControlEvalMetricType]))
p.settings, p.clock, p.tokenMetrics.CounterMetrics[flowControlEvalMetricType], stream))
return t
}

Expand All @@ -71,7 +71,7 @@ func (p *StreamTokenCounterProvider) Send(stream kvflowcontrol.Stream) *tokenCou
return t
}
t, _ := p.sendCounters.LoadOrStore(stream, newTokenCounter(
p.settings, p.clock, p.tokenMetrics.CounterMetrics[flowControlSendMetricType]))
p.settings, p.clock, p.tokenMetrics.CounterMetrics[flowControlSendMetricType], stream))
return t
}

Expand Down
26 changes: 22 additions & 4 deletions pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ type tokenCounter struct {
settings *cluster.Settings
clock *hlc.Clock
metrics *TokenCounterMetrics
// stream is the stream for which tokens are being adjusted, it is only used
// in logging.
stream kvflowcontrol.Stream

mu struct {
syncutil.RWMutex
Expand All @@ -186,12 +189,16 @@ type tokenCounter struct {

// newTokenCounter creates a new TokenCounter.
func newTokenCounter(
settings *cluster.Settings, clock *hlc.Clock, metrics *TokenCounterMetrics,
settings *cluster.Settings,
clock *hlc.Clock,
metrics *TokenCounterMetrics,
stream kvflowcontrol.Stream,
) *tokenCounter {
t := &tokenCounter{
settings: settings,
clock: clock,
metrics: metrics,
stream: stream,
}
limit := tokensPerWorkClass{
regular: kvflowcontrol.Tokens(kvflowcontrol.RegularTokensPerStream.Get(&settings.SV)),
Expand Down Expand Up @@ -470,10 +477,21 @@ func (t *tokenCounter) adjust(
ctx context.Context, class admissionpb.WorkClass, delta kvflowcontrol.Tokens,
) {
now := t.clock.PhysicalTime()
t.mu.Lock()
defer t.mu.Unlock()
func() {
t.mu.Lock()
defer t.mu.Unlock()
t.adjustLocked(ctx, class, delta, now)
}()

if log.V(2) {
func() {
t.mu.RLock()
defer t.mu.RUnlock()

t.adjustLocked(ctx, class, delta, now)
log.Infof(ctx, "adjusted flow tokens (wc=%v stream=%v delta=%v): regular=%v elastic=%v",
class, t.stream, delta, t.tokensLocked(regular), t.tokensLocked(elastic))
}()
}
}

func (t *tokenCounter) adjustLocked(
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/token_counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func TestTokenCounter(t *testing.T) {
settings,
hlc.NewClockForTesting(nil),
newTokenCounterMetrics(flowControlEvalMetricType),
kvflowcontrol.Stream{},
)

assertStateReset := func(t *testing.T) {
Expand Down Expand Up @@ -378,6 +379,7 @@ func (ts *evalTestState) getOrCreateTC(stream string) *namedTokenCounter {
ts.settings,
hlc.NewClockForTesting(nil),
newTokenCounterMetrics(flowControlEvalMetricType),
kvflowcontrol.Stream{},
),
stream: stream,
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (t *Tracker) Track(
term: term,
})

if log.V(1) {
log.Infof(ctx, "tracking %v flow control tokens for pri=%s stream=%s log-position=%d/%d",
tokens, pri, t.stream, term, index)
}
return true
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,22 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2
if err != nil {
panic(errors.Wrap(err, "unable to decode raft command admission data: %v"))
}

if log.V(1) {
log.Infof(ctx, "decoded raft admission meta below-raft: pri=%s create-time=%d proposer=n%s receiver=[n%d,s%s] tenant=t%d tokens≈%d sideloaded=%t raft-entry=%d/%d",
admissionpb.WorkPriority(meta.AdmissionPriority),
meta.AdmissionCreateTime,
meta.AdmissionOriginNode,
p.opts.NodeID,
p.opts.StoreID,
p.desc.tenantID.ToUint64(),
kvflowcontrol.Tokens(len(entry.Data)),
typ.IsSideloaded(),
entry.Term,
entry.Index,
)
}

mark := rac2.LogMark{Term: e.Term, Index: entry.Index}
var raftPri raftpb.Priority
if isV2Encoding {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2528,7 +2528,7 @@ func racV2EnabledWhenLeaderLevel(
ctx context.Context, st *cluster.Settings,
) replica_rac2.EnabledWhenLeaderLevel {
// TODO(sumeer): implement fully, once all the dependencies are implemented.
return replica_rac2.NotEnabledWhenLeader
return replica_rac2.EnabledWhenLeaderV2Encoding
}

// maybeEnqueueProblemRange will enqueue the replica for processing into the
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func newUninitializedReplicaWithoutRaftGroup(
RaftScheduler: r.store.scheduler,
AdmittedPiggybacker: r.store.cfg.KVFlowAdmittedPiggybacker,
ACWorkQueue: r.store.cfg.KVAdmissionController,
Settings: r.store.cfg.Settings,
EvalWaitMetrics: r.store.cfg.KVFlowEvalWaitMetrics,
RangeControllerFactory: r.store.kvflowRangeControllerFactory,
EnabledWhenLeaderLevel: r.raftMu.flowControlLevel,
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,11 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// Even if we don't have a Ready, or entries in Ready,
// replica_rac2.Processor may need to do some work.
raftEvent := rac2.RaftEventFromMsgStorageAppend(msgStorageAppend)
r.flowControlV2.HandleRaftReadyRaftMuLocked(ctx, raftEvent)
if r.store.cfg.TestingKnobs.FlowControlTestingKnobs == nil ||
!r.store.cfg.TestingKnobs.FlowControlTestingKnobs.UseOnlyForScratchRanges ||
r.IsScratchRange() {
r.flowControlV2.HandleRaftReadyRaftMuLocked(ctx, raftEvent)
}
if !hasReady {
// We must update the proposal quota even if we don't have a ready.
// Consider the case when our quota is of size 1 and two out of three
Expand Down
Empty file.
89 changes: 89 additions & 0 deletions pkg/kv/kvserver/testdata/flow_control_integration_v2/basic
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
echo
----
----
-- Flow token metrics, before issuing the regular 1MiB replicated write.
SELECT name, crdb_internal.humanize_bytes(value::INT8)
FROM crdb_internal.node_metrics
WHERE name LIKE '%kvflowcontrol%tokens%'
ORDER BY name ASC;

kvflowcontrol.tokens.eval.elastic.available | 24 MiB
kvflowcontrol.tokens.eval.elastic.deducted | 0 B
kvflowcontrol.tokens.eval.elastic.returned | 0 B
kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B
kvflowcontrol.tokens.eval.regular.available | 48 MiB
kvflowcontrol.tokens.eval.regular.deducted | 0 B
kvflowcontrol.tokens.eval.regular.returned | 0 B
kvflowcontrol.tokens.eval.regular.unaccounted | 0 B
kvflowcontrol.tokens.send.elastic.available | 24 MiB
kvflowcontrol.tokens.send.elastic.deducted | 0 B
kvflowcontrol.tokens.send.elastic.returned | 0 B
kvflowcontrol.tokens.send.elastic.unaccounted | 0 B
kvflowcontrol.tokens.send.regular.available | 48 MiB
kvflowcontrol.tokens.send.regular.deducted | 0 B
kvflowcontrol.tokens.send.regular.returned | 0 B
kvflowcontrol.tokens.send.regular.unaccounted | 0 B


-- (Issuing + admitting a regular 1MiB, triply replicated write...)


-- Stream counts as seen by n1 post-write. We should see three {regular,elastic}
-- streams given there are three nodes and we're using a replication factor of
-- three.
SELECT name, value
FROM crdb_internal.node_metrics
WHERE name LIKE '%kvflowcontrol%stream%'
ORDER BY name ASC;

kvflowcontrol.streams.eval.elastic.blocked_count | 0
kvflowcontrol.streams.eval.elastic.total_count | 3
kvflowcontrol.streams.eval.regular.blocked_count | 0
kvflowcontrol.streams.eval.regular.total_count | 3
kvflowcontrol.streams.send.elastic.blocked_count | 0
kvflowcontrol.streams.send.elastic.total_count | 3
kvflowcontrol.streams.send.regular.blocked_count | 0
kvflowcontrol.streams.send.regular.total_count | 3


-- Another view of the stream count, using /inspectz-backed vtables.
SELECT range_id, count(*) AS streams
FROM crdb_internal.kv_flow_control_handles_v2
GROUP BY (range_id)
HAVING count(*) = 3
ORDER BY streams DESC;

range_id | stream_count
-----------+---------------
70 | 3


-- Flow token metrics from n1 after issuing the regular 1MiB replicated write,
-- and it being admitted on n1, n2 and n3. We should see 3*1MiB = 3MiB of
-- {regular,elastic} tokens deducted and returned, and {8*3=24MiB,16*3=48MiB} of
-- {regular,elastic} tokens available. Everything should be accounted for.
SELECT name, crdb_internal.humanize_bytes(value::INT8)
FROM crdb_internal.node_metrics
WHERE name LIKE '%kvflowcontrol%tokens%'
ORDER BY name ASC;

kvflowcontrol.tokens.eval.elastic.available | 24 MiB
kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB
kvflowcontrol.tokens.eval.elastic.returned | 3.0 MiB
kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B
kvflowcontrol.tokens.eval.regular.available | 48 MiB
kvflowcontrol.tokens.eval.regular.deducted | 3.0 MiB
kvflowcontrol.tokens.eval.regular.returned | 3.0 MiB
kvflowcontrol.tokens.eval.regular.unaccounted | 0 B
kvflowcontrol.tokens.send.elastic.available | 24 MiB
kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB
kvflowcontrol.tokens.send.elastic.returned | 3.0 MiB
kvflowcontrol.tokens.send.elastic.unaccounted | 0 B
kvflowcontrol.tokens.send.regular.available | 48 MiB
kvflowcontrol.tokens.send.regular.deducted | 3.0 MiB
kvflowcontrol.tokens.send.regular.returned | 3.0 MiB
kvflowcontrol.tokens.send.regular.unaccounted | 0 B
----
----

# vim:ft=sql
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.

0 comments on commit 369ac1f

Please sign in to comment.