Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
131093: storage: disable checkUncertainty on failOnMoreRecent in scanner r=tbg a=tbg

It was possible for reads with failOnMoreRecent to hit a
ReadWithinUncertaintyIntervalError instead of the desired
WriteTooOldError. This commit disables uncertainty checks when
failOnMoreRecent is active, as the latter is a stronger check anyway.

Fixes #119681.
Fixes #131005.

Epic: none
Release note: None

131384: roachtest: admission-control/disk-bandwidth-limiter test improvements r=sumeerbhola a=aadityasondhi

This patch fixes a few things in this test:
- Runs the first step longer to have a fuller LSM to induce block and page cache misses to have some disk reads.
- Reduces the throughput of the foreground workload since it was causing saturation on its own.
- Assert on total bandwidth since the disk bandwidth limiter should be accounting for reads when determining tokens.

Fixes #129534.

Release note: None

131395: crosscluster/producer: modify lastEmitWait and lastProduceWait computation r=dt a=msbutler

This patch modifies the lastEmitWait and lastProduceWait in the crdb_internal.cluster_replication_node streams vtable to be either the current wait or previous wait, if the event stream is currently waiting on that given state.

Epic: none

Release note: none

Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Aaditya Sondhi <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
4 people committed Sep 26, 2024
4 parents b5e8f81 + 10664ba + 58e396c + 2957335 commit b4c58a0
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 37 deletions.
16 changes: 8 additions & 8 deletions pkg/ccl/crosscluster/producer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ type eventStream struct {
lastCheckpointTime time.Time
lastCheckpointLen int

lastPolled time.Time

debug streampb.DebugProducerStatus
}

Expand Down Expand Up @@ -113,7 +111,8 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) {
return errors.AssertionFailedf("expected to be started once")
}

s.lastPolled = timeutil.Now()
s.debug.State.Store(int64(streampb.Emitting))
s.debug.LastPolledMicros.Store(timeutil.Now().UnixMicro())

sourceTenantID, err := s.validateProducerJobAndSpec(ctx)
if err != nil {
Expand Down Expand Up @@ -227,11 +226,11 @@ func (s *eventStream) setErr(err error) bool {

// Next implements eval.ValueGenerator interface.
func (s *eventStream) Next(ctx context.Context) (bool, error) {
emitWait := int64(timeutil.Since(s.lastPolled))

s.debug.State.Store(int64(streampb.Producing))
emitWait := (timeutil.Now().UnixMicro() - s.debug.LastPolledMicros.Load()) * 1000
s.debug.Flushes.LastEmitWaitNanos.Store(emitWait)
s.debug.Flushes.EmitWaitNanos.Add(emitWait)
s.lastPolled = timeutil.Now()
s.debug.LastPolledMicros.Store(timeutil.Now().UnixMicro())

select {
case <-ctx.Done():
Expand All @@ -244,10 +243,11 @@ func (s *eventStream) Next(ctx context.Context) (bool, error) {
case err := <-s.errCh:
return false, err
default:
produceWait := int64(timeutil.Since(s.lastPolled))
s.debug.State.Store(int64(streampb.Emitting))
produceWait := (timeutil.Now().UnixMicro() - s.debug.LastPolledMicros.Load()) * 1000
s.debug.Flushes.ProduceWaitNanos.Add(produceWait)
s.debug.Flushes.LastProduceWaitNanos.Store(produceWait)
s.lastPolled = timeutil.Now()
s.debug.LastPolledMicros.Store(timeutil.Now().UnixMicro())
return true, nil
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/zip_table_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,7 @@ var zipInternalTablesPerNode = DebugZipTableRegistry{
"spans",
"initial_ts",
"prev_ts",
"state",
"batches",
"checkpoints",
"megabytes",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func registerDiskBandwidthOverload(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "admission-control/disk-bandwidth-limiter",
Owner: registry.OwnerAdmissionControl,
Timeout: time.Hour,
Timeout: 3 * time.Hour,
Benchmark: true,
CompatibleClouds: registry.AllExceptAzure,
// TODO(aaditya): change to weekly once the test stabilizes.
Expand Down Expand Up @@ -92,21 +92,21 @@ func registerDiskBandwidthOverload(r registry.Registry) {

c.Run(ctx, option.WithNodes(c.WorkloadNode()),
"./cockroach workload init kv --drop --insert-count=400 "+
"--max-block-bytes=4096 --min-block-bytes=4096"+foregroundDB+url)
"--max-block-bytes=1024 --min-block-bytes=1024"+foregroundDB+url)

c.Run(ctx, option.WithNodes(c.WorkloadNode()),
"./cockroach workload init kv --drop --insert-count=400 "+
"--max-block-bytes=4096 --min-block-bytes=4096"+backgroundDB+url)

// Run foreground kv workload, QoS="regular".
duration := 40 * time.Minute
duration := 90 * time.Minute
m := c.NewMonitor(ctx, c.CRDBNodes())
m.Go(func(ctx context.Context) error {
t.Status(fmt.Sprintf("starting foreground kv workload thread (<%s)", time.Minute))
dur := " --duration=" + duration.String()
url := fmt.Sprintf(" {pgurl%s}", c.CRDBNodes())
cmd := "./cockroach workload run kv --histograms=perf/stats.json --concurrency=2 " +
"--splits=1000 --read-percent=50 --min-block-bytes=4096 --max-block-bytes=4096 " +
"--splits=1000 --read-percent=50 --min-block-bytes=1024 --max-block-bytes=1024 " +
"--txn-qos='regular' --tolerate-errors" + foregroundDB + dur + url
c.Run(ctx, option.WithNodes(c.WorkloadNode()), cmd)
return nil
Expand All @@ -124,8 +124,8 @@ func registerDiskBandwidthOverload(r registry.Registry) {
return nil
})

t.Status(fmt.Sprintf("waiting for workload to start and ramp up (<%s)", 10*time.Minute))
time.Sleep(10 * time.Minute)
t.Status(fmt.Sprintf("waiting for workload to start and ramp up (<%s)", 30*time.Minute))
time.Sleep(60 * time.Minute)

db := c.Conn(ctx, t.L(), len(c.CRDBNodes()))
defer db.Close()
Expand All @@ -139,11 +139,12 @@ func registerDiskBandwidthOverload(r registry.Registry) {
}

t.Status(fmt.Sprintf("setting bandwidth limit, and waiting for it to take effect. (<%s)", 2*time.Minute))
time.Sleep(2 * time.Minute)
time.Sleep(5 * time.Minute)

m.Go(func(ctx context.Context) error {
t.Status(fmt.Sprintf("starting monitoring thread (<%s)", time.Minute))
writeBWMetric := divQuery("rate(sys_host_disk_write_bytes[1m])", 1<<20 /* 1MiB */)
readBWMetric := divQuery("rate(sys_host_disk_read_bytes[1m])", 1<<20 /* 1MiB */)
getMetricVal := func(query string, label string) (float64, error) {
point, err := statCollector.CollectPoint(ctx, t.L(), timeutil.Now(), query)
if err != nil {
Expand Down Expand Up @@ -174,13 +175,19 @@ func registerDiskBandwidthOverload(r registry.Registry) {
numSuccesses := 0
for i := 0; i < numIterations; i++ {
time.Sleep(collectionIntervalSeconds * time.Second)
val, err := getMetricVal(writeBWMetric, "node")
writeVal, err := getMetricVal(writeBWMetric, "node")
if err != nil {
numErrors++
continue
}
if val > bandwidthThreshold {
t.Fatalf("write bandwidth %f over last exceeded threshold", val)
readVal, err := getMetricVal(readBWMetric, "node")
if err != nil {
numErrors++
continue
}
totalBW := writeVal + readVal
if totalBW > bandwidthThreshold {
t.Fatalf("write + read bandwidth %f (%f + %f) exceeded threshold of %f", totalBW, writeVal, readVal, bandwidthThreshold)
}
numSuccesses++
}
Expand Down
22 changes: 21 additions & 1 deletion pkg/repstream/streampb/empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type DebugProducerStatus struct {
// Identification info.
StreamID StreamID
// Properties.
Spec StreamPartitionSpec
Spec StreamPartitionSpec
State atomic.Int64

RF struct {
Checkpoints, Advances atomic.Int64
Expand All @@ -37,6 +38,25 @@ type DebugProducerStatus struct {
Micros atomic.Int64
Spans atomic.Value
}
LastPolledMicros atomic.Int64
}

type ProducerState int64

const (
Producing ProducerState = iota
Emitting
)

func (p ProducerState) String() string {
switch p {
case Producing:
return "produce"
case Emitting:
return "emit"
default:
return "unknown"
}
}

// TODO(dt): this really should be per server instead of process-global, i.e. if
Expand Down
42 changes: 26 additions & 16 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9214,6 +9214,7 @@ CREATE TABLE crdb_internal.cluster_replication_node_streams (
spans INT,
initial_ts DECIMAL,
prev_ts DECIMAL,
state STRING,
batches INT,
checkpoints INT,
Expand Down Expand Up @@ -9261,41 +9262,50 @@ CREATE TABLE crdb_internal.cluster_replication_node_streams (
return d
}

dur := func(nanos int64) tree.Datum {
return tree.NewDInterval(duration.MakeDuration(nanos, 0, 0), types.DefaultIntervalTypeMetadata)
}

for _, s := range sm.DebugGetProducerStatuses(ctx) {
resolved := time.UnixMicro(s.RF.ResolvedMicros.Load())
resolvedDatum := tree.DNull
if resolved.Unix() != 0 {
resolvedDatum = shortenLogical(eval.TimestampToDecimalDatum(hlc.Timestamp{WallTime: resolved.UnixNano()}))
}

curState := streampb.ProducerState(s.State.Load())
currentWait := duration.Age(now, time.UnixMicro(s.LastPolledMicros.Load())).Nanos()

curOrLast := func(currentNanos int64, lastNanos int64, statePredicate streampb.ProducerState) tree.Datum {
if curState == statePredicate {
return dur(currentNanos)
}
return dur(lastNanos)
}
currentWaitWithState := func(statePredicate streampb.ProducerState) int64 {
if curState == statePredicate {
return currentWait
}
return 0
}

if err := addRow(
tree.NewDInt(tree.DInt(s.StreamID)),
tree.NewDString(fmt.Sprintf("%d[%d]", s.Spec.ConsumerNode, s.Spec.ConsumerProc)),
tree.NewDInt(tree.DInt(len(s.Spec.Spans))),
shortenLogical(eval.TimestampToDecimalDatum(s.Spec.InitialScanTimestamp)),
shortenLogical(eval.TimestampToDecimalDatum(s.Spec.PreviousReplicatedTimestamp)),
tree.NewDString(curState.String()),

tree.NewDInt(tree.DInt(s.Flushes.Batches.Load())),
tree.NewDInt(tree.DInt(s.Flushes.Checkpoints.Load())),
tree.NewDFloat(tree.DFloat(math.Round(float64(s.Flushes.Bytes.Load())/float64(1<<18))/4)),
age(time.UnixMicro(s.LastCheckpoint.Micros.Load())),

tree.NewDInterval(
duration.MakeDuration(s.Flushes.ProduceWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),
tree.NewDInterval(
duration.MakeDuration(s.Flushes.EmitWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),
tree.NewDInterval(
duration.MakeDuration(s.Flushes.LastProduceWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),
tree.NewDInterval(
duration.MakeDuration(s.Flushes.LastEmitWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),
dur(s.Flushes.ProduceWaitNanos.Load()+currentWaitWithState(streampb.Producing)),
dur(s.Flushes.EmitWaitNanos.Load()+currentWaitWithState(streampb.Emitting)),
curOrLast(currentWait, s.Flushes.LastProduceWaitNanos.Load(), streampb.Producing),
curOrLast(currentWait, s.Flushes.LastEmitWaitNanos.Load(), streampb.Emitting),

tree.NewDInt(tree.DInt(s.RF.Checkpoints.Load())),
tree.NewDInt(tree.DInt(s.RF.Advances.Load())),
Expand Down
Loading

0 comments on commit b4c58a0

Please sign in to comment.