From 2d49f18737ed19db8dc6f534bbe1ee7c952442c2 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 25 Sep 2024 10:56:32 +0200 Subject: [PATCH 1/4] storage: test combined failOnMoreRecent and uncertainty error Epic: none Release note: None --- .../testdata/mvcc_histories/range_tombstone_scans | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/storage/testdata/mvcc_histories/range_tombstone_scans b/pkg/storage/testdata/mvcc_histories/range_tombstone_scans index b71e1858b7b6..2922ec48e4e0 100644 --- a/pkg/storage/testdata/mvcc_histories/range_tombstone_scans +++ b/pkg/storage/testdata/mvcc_histories/range_tombstone_scans @@ -477,6 +477,18 @@ scan k=b end=d ts=3 globalUncertaintyLimit=4 scan: "b"-"d" -> error: (*kvpb.ReadWithinUncertaintyIntervalError:) ReadWithinUncertaintyIntervalError: read at time 3.000000000,0 encountered previous write with future timestamp 4.000000000,0 within uncertainty interval `t <= (local=0,0, global=0,0)`; observed timestamps: [] +run error +scan k=b end=d ts=3 globalUncertaintyLimit=4 failOnMoreRecent +---- +scan: "b"-"d" -> +error: (*kvpb.ReadWithinUncertaintyIntervalError:) ReadWithinUncertaintyIntervalError: read at time 3.000000000,0 encountered previous write with future timestamp 4.000000000,0 within uncertainty interval `t <= (local=0,0, global=0,0)`; observed timestamps: [] + +run error +get k=c ts=3 globalUncertaintyLimit=4 failOnMoreRecent +---- +get: "c" -> +error: (*kvpb.ReadWithinUncertaintyIntervalError:) ReadWithinUncertaintyIntervalError: read at time 3.000000000,0 encountered previous write with future timestamp 4.000000000,0 within uncertainty interval `t <= (local=0,0, global=0,0)`; observed timestamps: [] + run ok scan k=b end=d ts=4 globalUncertaintyLimit=5 ---- From 10664ba1e34fe859ced742c26ff4a9c4050976bc Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Fri, 20 Sep 2024 16:18:07 +0200 Subject: [PATCH 2/4] storage: disable checkUncertainty on failOnMoreRecent in scanner It was possible for reads with failOnMoreRecent to hit a ReadWithinUncertaintyIntervalError instead of the desired WriteTooOldError. This commit disables uncertainty checks when failOnMoreRecent is active, as the latter is a stronger check anyway. Fixes #119681. Fixes #131005. Epic: none Release note: None --- pkg/storage/pebble_mvcc_scanner.go | 8 +++++++- pkg/storage/testdata/mvcc_histories/range_tombstone_scans | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/storage/pebble_mvcc_scanner.go b/pkg/storage/pebble_mvcc_scanner.go index 10732c8116f9..3711c3428b80 100644 --- a/pkg/storage/pebble_mvcc_scanner.go +++ b/pkg/storage/pebble_mvcc_scanner.go @@ -551,7 +551,13 @@ func (p *pebbleMVCCScanner) init( // because the local uncertainty limit cannot be applied to values with // future-time timestamps with earlier local timestamps. We are only able // to skip uncertainty checks if p.ts >= global_uncertainty_limit. - p.checkUncertainty = p.ts.Less(p.uncertainty.GlobalLimit) + // + // We disable checkUncertainty when the scanner is configured with failOnMoreRecent. + // This avoids cases in which a scan would have failed with a WriteTooOldError + // but instead gets an unexpected ReadWithinUncertaintyIntervalError + // See: + // https://github.com/cockroachdb/cockroach/issues/119681 + p.checkUncertainty = p.ts.Less(p.uncertainty.GlobalLimit) && !p.failOnMoreRecent } // get seeks to the start key exactly once and adds one KV to the result set. diff --git a/pkg/storage/testdata/mvcc_histories/range_tombstone_scans b/pkg/storage/testdata/mvcc_histories/range_tombstone_scans index 2922ec48e4e0..d8037e8202e9 100644 --- a/pkg/storage/testdata/mvcc_histories/range_tombstone_scans +++ b/pkg/storage/testdata/mvcc_histories/range_tombstone_scans @@ -481,13 +481,13 @@ run error scan k=b end=d ts=3 globalUncertaintyLimit=4 failOnMoreRecent ---- scan: "b"-"d" -> -error: (*kvpb.ReadWithinUncertaintyIntervalError:) ReadWithinUncertaintyIntervalError: read at time 3.000000000,0 encountered previous write with future timestamp 4.000000000,0 within uncertainty interval `t <= (local=0,0, global=0,0)`; observed timestamps: [] +error: (*kvpb.WriteTooOldError:) WriteTooOldError: write for key "c" at timestamp 3.000000000,0 too old; must write at or above 4.000000000,1 run error get k=c ts=3 globalUncertaintyLimit=4 failOnMoreRecent ---- get: "c" -> -error: (*kvpb.ReadWithinUncertaintyIntervalError:) ReadWithinUncertaintyIntervalError: read at time 3.000000000,0 encountered previous write with future timestamp 4.000000000,0 within uncertainty interval `t <= (local=0,0, global=0,0)`; observed timestamps: [] +error: (*kvpb.WriteTooOldError:) WriteTooOldError: write for key "c" at timestamp 3.000000000,0 too old; must write at or above 4.000000000,1 run ok scan k=b end=d ts=4 globalUncertaintyLimit=5 From 2957335b32275c6e47cba07095de886a5e5eaa43 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Wed, 25 Sep 2024 19:19:28 -0400 Subject: [PATCH 3/4] crosscluster/producer: modify lastEmitWait and lastProduceWait computation This patch modifies the lastEmitWait and lastProduceWait in the crdb_internal.cluster_replication_node streams vtable to be either the current wait or previous wait, if the event stream is currently waiting on that given state. Epic: none Release note: none --- pkg/ccl/crosscluster/producer/event_stream.go | 16 +++---- pkg/cli/zip_table_registry.go | 1 + pkg/repstream/streampb/empty.go | 22 +++++++++- pkg/sql/crdb_internal.go | 42 ++++++++++++------- .../testdata/logic_test/crdb_internal_catalog | 2 +- 5 files changed, 57 insertions(+), 26 deletions(-) diff --git a/pkg/ccl/crosscluster/producer/event_stream.go b/pkg/ccl/crosscluster/producer/event_stream.go index bd99e2c08bb1..a157ac1172a8 100644 --- a/pkg/ccl/crosscluster/producer/event_stream.go +++ b/pkg/ccl/crosscluster/producer/event_stream.go @@ -69,8 +69,6 @@ type eventStream struct { lastCheckpointTime time.Time lastCheckpointLen int - lastPolled time.Time - debug streampb.DebugProducerStatus } @@ -113,7 +111,8 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) { return errors.AssertionFailedf("expected to be started once") } - s.lastPolled = timeutil.Now() + s.debug.State.Store(int64(streampb.Emitting)) + s.debug.LastPolledMicros.Store(timeutil.Now().UnixMicro()) sourceTenantID, err := s.validateProducerJobAndSpec(ctx) if err != nil { @@ -227,11 +226,11 @@ func (s *eventStream) setErr(err error) bool { // Next implements eval.ValueGenerator interface. func (s *eventStream) Next(ctx context.Context) (bool, error) { - emitWait := int64(timeutil.Since(s.lastPolled)) - + s.debug.State.Store(int64(streampb.Producing)) + emitWait := (timeutil.Now().UnixMicro() - s.debug.LastPolledMicros.Load()) * 1000 s.debug.Flushes.LastEmitWaitNanos.Store(emitWait) s.debug.Flushes.EmitWaitNanos.Add(emitWait) - s.lastPolled = timeutil.Now() + s.debug.LastPolledMicros.Store(timeutil.Now().UnixMicro()) select { case <-ctx.Done(): @@ -244,10 +243,11 @@ func (s *eventStream) Next(ctx context.Context) (bool, error) { case err := <-s.errCh: return false, err default: - produceWait := int64(timeutil.Since(s.lastPolled)) + s.debug.State.Store(int64(streampb.Emitting)) + produceWait := (timeutil.Now().UnixMicro() - s.debug.LastPolledMicros.Load()) * 1000 s.debug.Flushes.ProduceWaitNanos.Add(produceWait) s.debug.Flushes.LastProduceWaitNanos.Store(produceWait) - s.lastPolled = timeutil.Now() + s.debug.LastPolledMicros.Store(timeutil.Now().UnixMicro()) return true, nil } } diff --git a/pkg/cli/zip_table_registry.go b/pkg/cli/zip_table_registry.go index a2bfb15c86da..470288c68726 100644 --- a/pkg/cli/zip_table_registry.go +++ b/pkg/cli/zip_table_registry.go @@ -1056,6 +1056,7 @@ var zipInternalTablesPerNode = DebugZipTableRegistry{ "spans", "initial_ts", "prev_ts", + "state", "batches", "checkpoints", "megabytes", diff --git a/pkg/repstream/streampb/empty.go b/pkg/repstream/streampb/empty.go index fe77fb69a63a..3115a475c715 100644 --- a/pkg/repstream/streampb/empty.go +++ b/pkg/repstream/streampb/empty.go @@ -22,7 +22,8 @@ type DebugProducerStatus struct { // Identification info. StreamID StreamID // Properties. - Spec StreamPartitionSpec + Spec StreamPartitionSpec + State atomic.Int64 RF struct { Checkpoints, Advances atomic.Int64 @@ -37,6 +38,25 @@ type DebugProducerStatus struct { Micros atomic.Int64 Spans atomic.Value } + LastPolledMicros atomic.Int64 +} + +type ProducerState int64 + +const ( + Producing ProducerState = iota + Emitting +) + +func (p ProducerState) String() string { + switch p { + case Producing: + return "produce" + case Emitting: + return "emit" + default: + return "unknown" + } } // TODO(dt): this really should be per server instead of process-global, i.e. if diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index ef1da3a35844..6478354a78e8 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -9214,6 +9214,7 @@ CREATE TABLE crdb_internal.cluster_replication_node_streams ( spans INT, initial_ts DECIMAL, prev_ts DECIMAL, + state STRING, batches INT, checkpoints INT, @@ -9261,6 +9262,10 @@ CREATE TABLE crdb_internal.cluster_replication_node_streams ( return d } + dur := func(nanos int64) tree.Datum { + return tree.NewDInterval(duration.MakeDuration(nanos, 0, 0), types.DefaultIntervalTypeMetadata) + } + for _, s := range sm.DebugGetProducerStatuses(ctx) { resolved := time.UnixMicro(s.RF.ResolvedMicros.Load()) resolvedDatum := tree.DNull @@ -9268,34 +9273,39 @@ CREATE TABLE crdb_internal.cluster_replication_node_streams ( resolvedDatum = shortenLogical(eval.TimestampToDecimalDatum(hlc.Timestamp{WallTime: resolved.UnixNano()})) } + curState := streampb.ProducerState(s.State.Load()) + currentWait := duration.Age(now, time.UnixMicro(s.LastPolledMicros.Load())).Nanos() + + curOrLast := func(currentNanos int64, lastNanos int64, statePredicate streampb.ProducerState) tree.Datum { + if curState == statePredicate { + return dur(currentNanos) + } + return dur(lastNanos) + } + currentWaitWithState := func(statePredicate streampb.ProducerState) int64 { + if curState == statePredicate { + return currentWait + } + return 0 + } + if err := addRow( tree.NewDInt(tree.DInt(s.StreamID)), tree.NewDString(fmt.Sprintf("%d[%d]", s.Spec.ConsumerNode, s.Spec.ConsumerProc)), tree.NewDInt(tree.DInt(len(s.Spec.Spans))), shortenLogical(eval.TimestampToDecimalDatum(s.Spec.InitialScanTimestamp)), shortenLogical(eval.TimestampToDecimalDatum(s.Spec.PreviousReplicatedTimestamp)), + tree.NewDString(curState.String()), tree.NewDInt(tree.DInt(s.Flushes.Batches.Load())), tree.NewDInt(tree.DInt(s.Flushes.Checkpoints.Load())), tree.NewDFloat(tree.DFloat(math.Round(float64(s.Flushes.Bytes.Load())/float64(1<<18))/4)), age(time.UnixMicro(s.LastCheckpoint.Micros.Load())), - tree.NewDInterval( - duration.MakeDuration(s.Flushes.ProduceWaitNanos.Load(), 0, 0), - types.DefaultIntervalTypeMetadata, - ), - tree.NewDInterval( - duration.MakeDuration(s.Flushes.EmitWaitNanos.Load(), 0, 0), - types.DefaultIntervalTypeMetadata, - ), - tree.NewDInterval( - duration.MakeDuration(s.Flushes.LastProduceWaitNanos.Load(), 0, 0), - types.DefaultIntervalTypeMetadata, - ), - tree.NewDInterval( - duration.MakeDuration(s.Flushes.LastEmitWaitNanos.Load(), 0, 0), - types.DefaultIntervalTypeMetadata, - ), + dur(s.Flushes.ProduceWaitNanos.Load()+currentWaitWithState(streampb.Producing)), + dur(s.Flushes.EmitWaitNanos.Load()+currentWaitWithState(streampb.Emitting)), + curOrLast(currentWait, s.Flushes.LastProduceWaitNanos.Load(), streampb.Producing), + curOrLast(currentWait, s.Flushes.LastEmitWaitNanos.Load(), streampb.Emitting), tree.NewDInt(tree.DInt(s.RF.Checkpoints.Load())), tree.NewDInt(tree.DInt(s.RF.Advances.Load())), diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog b/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog index 9c51c1b13703..9606dcd6c14b 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog @@ -403,7 +403,7 @@ SELECT id, strip_volatile(descriptor) FROM crdb_internal.kv_catalog_descriptor O 4294967188 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "state", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "recv_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 5, "name": "last_recv_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 6, "name": "ingest_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 7, "name": "flush_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 8, "name": "flush_count", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 9, "name": "flush_kvs", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 10, "name": "flush_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 11, "name": "flush_batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 12, "name": "last_flush_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 13, "name": "last_kvs_done", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 14, "name": "last_kvs_todo", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 15, "name": "last_batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 16, "name": "last_slowest", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 17, "name": "last_checkpoint", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 18, "name": "checkpoints", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 19, "name": "retry_size", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 20, "name": "resolved_age", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967188, "name": "logical_replication_node_processors", "nextColumnId": 21, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967189 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "span_start", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "span_end", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 5, "name": "resolved", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 6, "name": "resolved_age", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967189, "name": "cluster_replication_node_stream_checkpoints", "nextColumnId": 7, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967190 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "span_start", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "span_end", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967190, "name": "cluster_replication_node_stream_spans", "nextColumnId": 5, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} -4294967191 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "spans", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 4, "name": "initial_ts", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 5, "name": "prev_ts", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 6, "name": "batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 7, "name": "checkpoints", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 8, "name": "megabytes", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}, {"id": 9, "name": "last_checkpoint", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 10, "name": "produce_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 11, "name": "emit_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 12, "name": "last_produce_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 13, "name": "last_emit_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 14, "name": "rf_checkpoints", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 15, "name": "rf_advances", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 16, "name": "rf_last_advance", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 17, "name": "rf_resolved", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 18, "name": "rf_resolved_age", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967191, "name": "cluster_replication_node_streams", "nextColumnId": 19, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} +4294967191 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "spans", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 4, "name": "initial_ts", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 5, "name": "prev_ts", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 6, "name": "state", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 7, "name": "batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 8, "name": "checkpoints", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 9, "name": "megabytes", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}, {"id": 10, "name": "last_checkpoint", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 11, "name": "produce_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 12, "name": "emit_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 13, "name": "last_produce_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 14, "name": "last_emit_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 15, "name": "rf_checkpoints", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 16, "name": "rf_advances", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 17, "name": "rf_last_advance", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 18, "name": "rf_resolved", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 19, "name": "rf_resolved_age", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967191, "name": "cluster_replication_node_streams", "nextColumnId": 20, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967192 {"table": {"columns": [{"id": 1, "name": "job_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "start_key", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "end_key", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "resolved", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 5, "name": "resolved_age", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967192, "name": "cluster_replication_spans", "nextColumnId": 6, "nextConstraintId": 1, "nextMutationId": 1, "primaryIndex": {"foreignKey": {}, "geoConfig": {}, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1", "viewQuery": "WITH spans AS (SELECT j.id AS job_id, jsonb_array_elements(((crdb_internal.pb_to_json('progress', i.value)->'streamIngest')->'checkpoint')->'resolvedSpans') AS s FROM system.jobs AS j LEFT JOIN system.job_info AS i ON (j.id = i.job_id) AND (i.info_key = 'legacy_progress') WHERE j.job_type = 'REPLICATION STREAM INGESTION') SELECT job_id, crdb_internal.pretty_key(decode((s->'span')->>'key', 'base64'), 0) AS start_key, crdb_internal.pretty_key(decode((s->'span')->>'endKey', 'base64'), 0) AS end_key, ((((s->'timestamp')->>'wallTime') || '.') || COALESCE(((s->'timestamp')->'logical'), '0'))::DECIMAL AS resolved, date_trunc('second', ((cluster_logical_timestamp() - ((s->'timestamp')->>'wallTime')::INT8) / 1e9)::INTERVAL) AS resolved_age FROM spans"}} 4294967193 {"table": {"columns": [{"id": 1, "name": "desc_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "version", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 3, "name": "sql_instance_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 4, "name": "session_id", "type": {"family": "BytesFamily", "oid": 17}}, {"id": 5, "name": "crdb_region", "type": {"family": "BytesFamily", "oid": 17}}], "formatVersion": 3, "id": 4294967193, "name": "kv_session_based_leases", "nextColumnId": 6, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967194 {"table": {"columns": [{"id": 1, "name": "id", "type": {"family": "UuidFamily", "oid": 2950}}, {"id": 2, "name": "ts", "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 3, "name": "meta_type", "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "meta", "nullable": true, "type": {"family": "BytesFamily", "oid": 17}}, {"id": 5, "name": "num_spans", "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 6, "name": "spans", "type": {"family": "BytesFamily", "oid": 17}}, {"id": 7, "name": "verified", "type": {"oid": 16}}, {"id": 8, "name": "target", "nullable": true, "type": {"family": "BytesFamily", "oid": 17}}, {"id": 9, "name": "decoded_meta", "nullable": true, "type": {"family": "JsonFamily", "oid": 3802}}, {"id": 10, "name": "decoded_target", "nullable": true, "type": {"family": "JsonFamily", "oid": 3802}}, {"id": 11, "name": "internal_meta", "nullable": true, "type": {"family": "JsonFamily", "oid": 3802}}, {"id": 12, "name": "num_ranges", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 13, "name": "last_updated", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}], "formatVersion": 3, "id": 4294967194, "name": "kv_protected_ts_records", "nextColumnId": 14, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} From 58e396c44d6d01b479aa7acd576d081287e2c952 Mon Sep 17 00:00:00 2001 From: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com> Date: Wed, 25 Sep 2024 17:15:22 -0400 Subject: [PATCH 4/4] roachtest: admission-control/disk-bandwidth-limiter test improvements This patch fixes a few things in this test: - Runs the first step longer to have a fuller LSM to induce block and page cache misses to have some disk reads. - Reduces the throughput of the foreground workload since it was causing saturation on its own. - Assert on total bandwidth since the disk bandwidth limiter should be accounting for reads when determining tokens. Fixes #129534. Release note: None --- ...mission_control_disk_bandwidth_overload.go | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/pkg/cmd/roachtest/tests/admission_control_disk_bandwidth_overload.go b/pkg/cmd/roachtest/tests/admission_control_disk_bandwidth_overload.go index e9eca325cc80..68151c3dce8a 100644 --- a/pkg/cmd/roachtest/tests/admission_control_disk_bandwidth_overload.go +++ b/pkg/cmd/roachtest/tests/admission_control_disk_bandwidth_overload.go @@ -42,7 +42,7 @@ func registerDiskBandwidthOverload(r registry.Registry) { r.Add(registry.TestSpec{ Name: "admission-control/disk-bandwidth-limiter", Owner: registry.OwnerAdmissionControl, - Timeout: time.Hour, + Timeout: 3 * time.Hour, Benchmark: true, CompatibleClouds: registry.AllExceptAzure, // TODO(aaditya): change to weekly once the test stabilizes. @@ -92,21 +92,21 @@ func registerDiskBandwidthOverload(r registry.Registry) { c.Run(ctx, option.WithNodes(c.WorkloadNode()), "./cockroach workload init kv --drop --insert-count=400 "+ - "--max-block-bytes=4096 --min-block-bytes=4096"+foregroundDB+url) + "--max-block-bytes=1024 --min-block-bytes=1024"+foregroundDB+url) c.Run(ctx, option.WithNodes(c.WorkloadNode()), "./cockroach workload init kv --drop --insert-count=400 "+ "--max-block-bytes=4096 --min-block-bytes=4096"+backgroundDB+url) // Run foreground kv workload, QoS="regular". - duration := 40 * time.Minute + duration := 90 * time.Minute m := c.NewMonitor(ctx, c.CRDBNodes()) m.Go(func(ctx context.Context) error { t.Status(fmt.Sprintf("starting foreground kv workload thread (<%s)", time.Minute)) dur := " --duration=" + duration.String() url := fmt.Sprintf(" {pgurl%s}", c.CRDBNodes()) cmd := "./cockroach workload run kv --histograms=perf/stats.json --concurrency=2 " + - "--splits=1000 --read-percent=50 --min-block-bytes=4096 --max-block-bytes=4096 " + + "--splits=1000 --read-percent=50 --min-block-bytes=1024 --max-block-bytes=1024 " + "--txn-qos='regular' --tolerate-errors" + foregroundDB + dur + url c.Run(ctx, option.WithNodes(c.WorkloadNode()), cmd) return nil @@ -124,8 +124,8 @@ func registerDiskBandwidthOverload(r registry.Registry) { return nil }) - t.Status(fmt.Sprintf("waiting for workload to start and ramp up (<%s)", 10*time.Minute)) - time.Sleep(10 * time.Minute) + t.Status(fmt.Sprintf("waiting for workload to start and ramp up (<%s)", 30*time.Minute)) + time.Sleep(60 * time.Minute) db := c.Conn(ctx, t.L(), len(c.CRDBNodes())) defer db.Close() @@ -139,11 +139,12 @@ func registerDiskBandwidthOverload(r registry.Registry) { } t.Status(fmt.Sprintf("setting bandwidth limit, and waiting for it to take effect. (<%s)", 2*time.Minute)) - time.Sleep(2 * time.Minute) + time.Sleep(5 * time.Minute) m.Go(func(ctx context.Context) error { t.Status(fmt.Sprintf("starting monitoring thread (<%s)", time.Minute)) writeBWMetric := divQuery("rate(sys_host_disk_write_bytes[1m])", 1<<20 /* 1MiB */) + readBWMetric := divQuery("rate(sys_host_disk_read_bytes[1m])", 1<<20 /* 1MiB */) getMetricVal := func(query string, label string) (float64, error) { point, err := statCollector.CollectPoint(ctx, t.L(), timeutil.Now(), query) if err != nil { @@ -174,13 +175,19 @@ func registerDiskBandwidthOverload(r registry.Registry) { numSuccesses := 0 for i := 0; i < numIterations; i++ { time.Sleep(collectionIntervalSeconds * time.Second) - val, err := getMetricVal(writeBWMetric, "node") + writeVal, err := getMetricVal(writeBWMetric, "node") if err != nil { numErrors++ continue } - if val > bandwidthThreshold { - t.Fatalf("write bandwidth %f over last exceeded threshold", val) + readVal, err := getMetricVal(readBWMetric, "node") + if err != nil { + numErrors++ + continue + } + totalBW := writeVal + readVal + if totalBW > bandwidthThreshold { + t.Fatalf("write + read bandwidth %f (%f + %f) exceeded threshold of %f", totalBW, writeVal, readVal, bandwidthThreshold) } numSuccesses++ }