diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 8686b3769bd3..8b311413b705 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -877,6 +877,7 @@ GO_TARGETS = [ "//pkg/ccl/changefeedccl/schemafeed/schematestutils:schematestutils", "//pkg/ccl/changefeedccl/schemafeed:schemafeed", "//pkg/ccl/changefeedccl/schemafeed:schemafeed_test", + "//pkg/ccl/changefeedccl/timers:timers", "//pkg/ccl/changefeedccl:changefeedccl", "//pkg/ccl/changefeedccl:changefeedccl_test", "//pkg/ccl/cliccl/cliflagsccl:cliflagsccl", diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 118726d3518f..25daac9561b6 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -60,6 +60,7 @@ go_library( "//pkg/ccl/changefeedccl/kvevent", "//pkg/ccl/changefeedccl/kvfeed", "//pkg/ccl/changefeedccl/schemafeed", + "//pkg/ccl/changefeedccl/timers", "//pkg/ccl/kvccl/kvfollowerreadsccl", "//pkg/ccl/utilccl", "//pkg/cloud", diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 2b5d435f6552..6585cee096d1 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -497,6 +497,7 @@ func (ca *changeAggregator) makeKVFeedCfg( SchemaChangePolicy: schemaChange.Policy, SchemaFeed: sf, Knobs: ca.knobs.FeedKnobs, + ScopedTimers: ca.sliMetrics.Timers, MonitoringCfg: monitoringCfg, }, nil } @@ -1232,12 +1233,14 @@ func (cf *changeFrontier) Start(ctx context.Context) { // but the oracle is only used when emitting row updates. var nilOracle timestampLowerBoundOracle var err error - sli, err := cf.metrics.getSLIMetrics(cf.spec.Feed.Opts[changefeedbase.OptMetricsScope]) + scope := cf.spec.Feed.Opts[changefeedbase.OptMetricsScope] + sli, err := cf.metrics.getSLIMetrics(scope) if err != nil { cf.MoveToDraining(err) return } cf.sliMetrics = sli + cf.sink, err = getResolvedTimestampSink(ctx, cf.FlowCtx.Cfg, cf.spec.Feed, nilOracle, cf.spec.User(), cf.spec.JobID, sli) @@ -1644,6 +1647,8 @@ func (cf *changeFrontier) maybeCheckpointJob( func (cf *changeFrontier) checkpointJobProgress( frontier hlc.Timestamp, checkpoint jobspb.ChangefeedProgress_Checkpoint, ) (bool, error) { + defer cf.sliMetrics.Timers.CheckpointJobProgress.Start()() + if cf.knobs.RaiseRetryableError != nil { if err := cf.knobs.RaiseRetryableError(); err != nil { return false, changefeedbase.MarkRetryableError( diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index 4759554fad21..3cd9d6187da6 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -421,6 +421,7 @@ func (c *kvEventToRowConsumer) encodeAndEmit( } } + stop := c.metrics.Timers.Encode.Start() if c.encodingOpts.Format == changefeedbase.OptFormatParquet { return c.encodeForParquet( ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp, @@ -444,10 +445,14 @@ func (c *kvEventToRowConsumer) encodeAndEmit( // Since we're done processing/converting this event, and will not use much more // than len(key)+len(bytes) worth of resources, adjust allocation to match. alloc.AdjustBytesToTarget(ctx, int64(len(keyCopy)+len(valueCopy))) + stop() - if err := c.sink.EmitRow( - ctx, topic, keyCopy, valueCopy, schemaTS, updatedRow.MvccTimestamp, alloc, - ); err != nil { + c.metrics.Timers.EmitRow.Time(func() { + err = c.sink.EmitRow( + ctx, topic, keyCopy, valueCopy, schemaTS, updatedRow.MvccTimestamp, alloc, + ) + }) + if err != nil { return err } if log.V(3) { diff --git a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel index 88a0f7a0d276..d978b5ca4354 100644 --- a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/ccl/changefeedccl/kvevent", "//pkg/ccl/changefeedccl/schemafeed", + "//pkg/ccl/changefeedccl/timers", "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", @@ -54,6 +55,7 @@ go_test( "//pkg/ccl/changefeedccl/kvevent", "//pkg/ccl/changefeedccl/schemafeed", "//pkg/ccl/changefeedccl/schemafeed/schematestutils", + "//pkg/ccl/changefeedccl/timers", "//pkg/ccl/storageccl", "//pkg/jobs/jobspb", "//pkg/keys", diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 6a4f25927359..2686239260a5 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -93,6 +94,8 @@ type Config struct { // Knobs are kvfeed testing knobs. Knobs TestingKnobs + + ScopedTimers *timers.ScopedTimers } // Run will run the kvfeed. The feed runs synchronously and returns an @@ -126,7 +129,7 @@ func Run(ctx context.Context, cfg Config) error { cfg.InitialHighWater, cfg.EndTime, cfg.Codec, cfg.SchemaFeed, - sc, pff, bf, cfg.Targets, cfg.Knobs) + sc, pff, bf, cfg.Targets, cfg.ScopedTimers, cfg.Knobs) f.onBackfillCallback = cfg.MonitoringCfg.OnBackfillCallback f.rangeObserver = startLaggingRangesObserver(g, cfg.MonitoringCfg.LaggingRangesCallback, cfg.MonitoringCfg.LaggingRangesPollingInterval, cfg.MonitoringCfg.LaggingRangesThreshold) @@ -259,6 +262,7 @@ type kvFeed struct { schemaChangePolicy changefeedbase.SchemaChangePolicy targets changefeedbase.Targets + timers *timers.ScopedTimers // These dependencies are made available for test injection. bufferFactory func() kvevent.Buffer @@ -285,6 +289,7 @@ func newKVFeed( pff physicalFeedFactory, bf func() kvevent.Buffer, targets changefeedbase.Targets, + ts *timers.ScopedTimers, knobs TestingKnobs, ) *kvFeed { return &kvFeed{ @@ -305,6 +310,7 @@ func newKVFeed( physicalFeed: pff, bufferFactory: bf, targets: targets, + timers: ts, knobs: knobs, } } @@ -579,6 +585,7 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro WithDiff: f.withDiff, WithFiltering: f.withFiltering, Knobs: f.knobs, + Timers: f.timers, RangeObserver: f.rangeObserver, } @@ -590,7 +597,7 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro // until a table event (i.e. a column is added/dropped) has occurred, which // signals another possible scan. g.GoCtx(func(ctx context.Context) error { - return copyFromSourceToDestUntilTableEvent(ctx, f.writer, memBuf, resumeFrontier, f.tableFeed, f.endTime, f.knobs) + return copyFromSourceToDestUntilTableEvent(ctx, f.writer, memBuf, resumeFrontier, f.tableFeed, f.endTime, f.knobs, f.timers) }) g.GoCtx(func(ctx context.Context) error { return f.physicalFeed.Run(ctx, memBuf, physicalCfg) @@ -673,6 +680,7 @@ func copyFromSourceToDestUntilTableEvent( schemaFeed schemafeed.SchemaFeed, endTime hlc.Timestamp, knobs TestingKnobs, + st *timers.ScopedTimers, ) error { // Initially, the only copy boundary is the end time if one is specified. // Once we discover a table event (which is before the end time), that will @@ -689,6 +697,7 @@ func copyFromSourceToDestUntilTableEvent( // from rangefeed) and checks if a table event was encountered at or before // said timestamp. If so, it replaces the copy boundary with the table event. checkForTableEvent = func(ts hlc.Timestamp) error { + defer st.KVFeedWaitForTableEvent.Start()() // There's no need to check for table events again if we already found one // since that should already be the earliest one. if _, ok := boundary.(*errTableEventReached); ok { @@ -782,6 +791,8 @@ func copyFromSourceToDestUntilTableEvent( // writeToDest writes an event to the dest. writeToDest = func(e kvevent.Event) error { + defer st.KVFeedBuffer.Start()() + switch e.Type() { case kvevent.TypeKV, kvevent.TypeFlush: return dest.Add(ctx, e) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go index 9530a96b0a5e..a4769c9eff73 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed/schematestutils" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" @@ -142,6 +143,7 @@ func TestKVFeed(t *testing.T) { }) ref := rawEventFeed(tc.events) tf := newRawTableFeed(tc.descs, tc.initialHighWater) + st := timers.New(time.Minute).GetOrCreateScopedTimers("") f := newKVFeed(buf, tc.spans, tc.checkpoint, hlc.Timestamp{}, tc.schemaChangeEvents, tc.schemaChangePolicy, tc.needsInitialScan, tc.withDiff, true, /* withFiltering */ @@ -149,7 +151,7 @@ func TestKVFeed(t *testing.T) { codec, tf, sf, rangefeedFactory(ref.run), bufferFactory, changefeedbase.Targets{}, - TestingKnobs{}) + st, TestingKnobs{}) ctx, cancel := context.WithCancel(context.Background()) g := ctxgroup.WithContext(ctx) g.GoCtx(func(ctx context.Context) error { @@ -637,7 +639,7 @@ func TestCopyFromSourceToDestUntilTableEvent(t *testing.T) { schemaFeed := &testSchemaFeed{tableEvents: tc.tableEvents} endTime := tc.endTime - err = copyFromSourceToDestUntilTableEvent(ctx, dest, src, frontier, schemaFeed, endTime, TestingKnobs{}) + err = copyFromSourceToDestUntilTableEvent(ctx, dest, src, frontier, schemaFeed, endTime, TestingKnobs{}, timers.New(1*time.Second).GetOrCreateScopedTimers("")) require.Equal(t, tc.expectedErr, err) require.Empty(t, src.events) require.Equal(t, tc.expectedEvents, dest.events) diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index 4d6f49e0fd4b..afc4ccecf90c 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -12,6 +12,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -35,6 +36,7 @@ type rangeFeedConfig struct { WithFiltering bool RangeObserver kvcoord.RangeObserver Knobs TestingKnobs + Timers *timers.ScopedTimers } // rangefeedFactory is a function that creates and runs a rangefeed. @@ -55,6 +57,7 @@ type rangefeed struct { // that the rangefeed uses to send event messages to. eventCh <-chan kvcoord.RangeFeedMessage knobs TestingKnobs + st *timers.ScopedTimers } // Run implements the physicalFeedFactory interface. @@ -82,6 +85,7 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang cfg: cfg, eventCh: eventCh, knobs: cfg.Knobs, + st: cfg.Timers, } g := ctxgroup.WithContext(ctx) g.GoCtx(feed.addEventsToBuffer) @@ -118,11 +122,13 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { return err } } + stop := p.st.RangefeedBufferValue.Start() if err := p.memBuf.Add( ctx, kvevent.MakeKVEvent(e.RangeFeedEvent), ); err != nil { return err } + stop() case *kvpb.RangeFeedCheckpoint: if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) { // RangeFeed happily forwards any closed timestamps it receives as @@ -133,11 +139,13 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { if p.knobs.ShouldSkipCheckpoint != nil && p.knobs.ShouldSkipCheckpoint(t) { continue } + stop := p.st.RangefeedBufferCheckpoint.Start() if err := p.memBuf.Add( ctx, kvevent.MakeResolvedEvent(e.RangeFeedEvent, jobspb.ResolvedSpan_NONE), ); err != nil { return err } + stop() case *kvpb.RangeFeedSSTable: // For now, we just error on SST ingestion, since we currently don't // expect SST ingestion into spans with active changefeeds. diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index e2ed8684e004..1c24026615b7 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -85,6 +86,8 @@ type AggMetrics struct { CloudstorageBufferedBytes *aggmetric.AggGauge KafkaThrottlingNanos *aggmetric.AggHistogram + Timers *timers.Timers + // There is always at least 1 sliMetrics created for defaultSLI scope. mu struct { syncutil.Mutex @@ -163,6 +166,8 @@ type sliMetrics struct { CloudstorageBufferedBytes *aggmetric.Gauge KafkaThrottlingNanos *aggmetric.Histogram + Timers *timers.ScopedTimers + mu struct { syncutil.Mutex id int64 @@ -1064,6 +1069,7 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag SigFigs: 2, BucketConfig: metric.BatchProcessLatencyBuckets, }), + Timers: timers.New(histogramWindow), NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"), } a.mu.sliMetrics = make(map[string]*sliMetrics) @@ -1133,6 +1139,9 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { TotalRanges: a.TotalRanges.AddChild(scope), CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope), KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope), + + Timers: a.Timers.GetOrCreateScopedTimers(scope), + // TODO(#130358): Again, this doesn't belong here, but it's the most // convenient way to feed this metric to changefeeds. NetMetrics: a.NetMetrics, diff --git a/pkg/ccl/changefeedccl/timers/BUILD.bazel b/pkg/ccl/changefeedccl/timers/BUILD.bazel new file mode 100644 index 000000000000..88e68e778d99 --- /dev/null +++ b/pkg/ccl/changefeedccl/timers/BUILD.bazel @@ -0,0 +1,14 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "timers", + srcs = ["timers.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/metric", + "//pkg/util/metric/aggmetric", + "//pkg/util/timeutil", + "@com_github_prometheus_client_golang//prometheus", + ], +) diff --git a/pkg/ccl/changefeedccl/timers/timers.go b/pkg/ccl/changefeedccl/timers/timers.go new file mode 100644 index 000000000000..1d909b26c870 --- /dev/null +++ b/pkg/ccl/changefeedccl/timers/timers.go @@ -0,0 +1,100 @@ +// Copyright 2024 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package timers + +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/prometheus/client_golang/prometheus" +) + +type Timers struct { + CheckpointJobProgress *aggmetric.AggHistogram + Encode *aggmetric.AggHistogram + EmitRow *aggmetric.AggHistogram + KVFeedWaitForTableEvent *aggmetric.AggHistogram + KVFeedBuffer *aggmetric.AggHistogram + RangefeedBufferValue *aggmetric.AggHistogram + RangefeedBufferCheckpoint *aggmetric.AggHistogram +} + +func New(histogramWindow time.Duration) *Timers { + histogramOptsFor := func(name, desc string) metric.HistogramOptions { + return metric.HistogramOptions{ + Metadata: metric.Metadata{ + Name: name, + Help: desc, + Unit: metric.Unit_NANOSECONDS, + Measurement: "Latency", + }, + Duration: histogramWindow, + Buckets: prometheus.ExponentialBucketsRange(float64(1*time.Microsecond), float64(1*time.Hour), 60), + Mode: metric.HistogramModePrometheus, + } + } + + b := aggmetric.MakeBuilder("scope") + return &Timers{ + CheckpointJobProgress: b.Histogram(histogramOptsFor("changefeed.stage.checkpoint_job_progress.latency", "Latency of the changefeed stage: checkpointing job progress")), + Encode: b.Histogram(histogramOptsFor("changefeed.stage.encode.latency", "Latency of the changefeed stage: encoding data")), + EmitRow: b.Histogram(histogramOptsFor("changefeed.stage.emit_row.latency", "Latency of the changefeed stage: emitting row to sink")), + KVFeedWaitForTableEvent: b.Histogram(histogramOptsFor("changefeed.stage.kv_feed_wait_for_table_event.latency", "Latency of the changefeed stage: waiting for a table schema event to join to the kv event")), + KVFeedBuffer: b.Histogram(histogramOptsFor("changefeed.stage.kv_feed_buffer.latency", "Latency of the changefeed stage: waiting to buffer kv events")), + RangefeedBufferValue: b.Histogram(histogramOptsFor("changefeed.stage.rangefeed_buffer_value.latency", "Latency of the changefeed stage: buffering rangefeed value events")), + RangefeedBufferCheckpoint: b.Histogram(histogramOptsFor("changefeed.stage.rangefeed_buffer_checkpoint.latency", "Latency of the changefeed stage: buffering rangefeed checkpoint events")), + } +} + +func (ts *Timers) GetOrCreateScopedTimers(scope string) *ScopedTimers { + return &ScopedTimers{ + CheckpointJobProgress: &timer{ts.CheckpointJobProgress.AddChild(scope)}, + Encode: &timer{ts.Encode.AddChild(scope)}, + EmitRow: &timer{ts.EmitRow.AddChild(scope)}, + KVFeedWaitForTableEvent: &timer{ts.KVFeedWaitForTableEvent.AddChild(scope)}, + KVFeedBuffer: &timer{ts.KVFeedBuffer.AddChild(scope)}, + RangefeedBufferValue: &timer{ts.RangefeedBufferValue.AddChild(scope)}, + RangefeedBufferCheckpoint: &timer{ts.RangefeedBufferCheckpoint.AddChild(scope)}, + } +} + +type ScopedTimers struct { + CheckpointJobProgress *timer + Encode *timer + EmitRow *timer + KVFeedWaitForTableEvent *timer + KVFeedBuffer *timer + RangefeedBufferValue *timer + RangefeedBufferCheckpoint *timer +} + +func (ts *ScopedTimers) StartTimer(stage *aggmetric.Histogram) func() { + start := timeutil.Now() + return func() { + stage.RecordValue(timeutil.Since(start).Nanoseconds()) + } +} + +type timer struct { + hist *aggmetric.Histogram +} + +func (t *timer) Start() (end func()) { + start := timeutil.Now() + return func() { + t.hist.RecordValue(timeutil.Since(start).Nanoseconds()) + } +} + +func (t *timer) Time(cb func()) { + defer t.Start()() + cb() +} diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go index 7b16148467aa..d29aa6f0e537 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go @@ -1455,6 +1455,7 @@ func TestFlushErrorHandling(t *testing.T) { lrw.purgatory.flush = lrw.flushBuffer lrw.purgatory.bytesGauge = lrw.metrics.RetryQueueBytes lrw.purgatory.eventsGauge = lrw.metrics.RetryQueueEvents + lrw.purgatory.debug = &streampb.DebugLogicalConsumerStatus{} lrw.bh = []BatchHandler{(mockBatchHandler(true))} diff --git a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go index 9c2c3a492409..0700d977e3fa 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go @@ -231,6 +231,7 @@ func newLogicalReplicationWriterProcessor( checkpoint: lrw.checkpoint, bytesGauge: lrw.metrics.RetryQueueBytes, eventsGauge: lrw.metrics.RetryQueueEvents, + debug: &lrw.debug, } if err := lrw.Init(ctx, lrw, post, logicalReplicationWriterResultType, flowCtx, processorID, nil, /* memMonitor */ @@ -319,6 +320,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) { lrw.subscription = sub lrw.workerGroup.GoCtx(func(_ context.Context) error { if err := sub.Subscribe(subscriptionCtx); err != nil { + log.Infof(lrw.Ctx(), "subscription completed. Error: %s", err) lrw.sendError(errors.Wrap(err, "subscription")) } return nil @@ -326,6 +328,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) { lrw.workerGroup.GoCtx(func(ctx context.Context) error { defer close(lrw.checkpointCh) if err := lrw.consumeEvents(ctx); err != nil { + log.Infof(lrw.Ctx(), "consumer completed. Error: %s", err) lrw.sendError(errors.Wrap(err, "consume events")) } return nil @@ -391,10 +394,10 @@ func (lrw *logicalReplicationWriterProcessor) ConsumerClosed() { func (lrw *logicalReplicationWriterProcessor) close() { streampb.UnregisterActiveLogicalConsumerStatus(&lrw.debug) - if lrw.Closed { return } + log.Infof(lrw.Ctx(), "logical replication writer processor closing") defer lrw.frontier.Release() if lrw.streamPartitionClient != nil { @@ -423,6 +426,7 @@ func (lrw *logicalReplicationWriterProcessor) close() { lrw.purgatory.bytesGauge.Dec(lrw.purgatory.bytes) for _, i := range lrw.purgatory.levels { lrw.purgatory.eventsGauge.Dec(int64(len(i.events))) + lrw.purgatory.debug.RecordPurgatory(-int64(len(i.events))) } lrw.InternalClose() @@ -442,13 +446,20 @@ func (lrw *logicalReplicationWriterProcessor) sendError(err error) { // consumeEvents handles processing events on the event queue and returns once // the event channel has closed. func (lrw *logicalReplicationWriterProcessor) consumeEvents(ctx context.Context) error { - before := timeutil.Now() + lastLog := timeutil.Now() + lrw.debug.RecordRecvStart() for event := range lrw.subscription.Events() { - lrw.debug.RecordRecv(timeutil.Since(before)) - before = timeutil.Now() + lrw.debug.RecordRecv() if err := lrw.handleEvent(ctx, event); err != nil { return err } + if timeutil.Since(lastLog) > 5*time.Minute { + lastLog = timeutil.Now() + if !lrw.frontier.Frontier().GoTime().After(timeutil.Now().Add(-5 * time.Minute)) { + log.Infof(lrw.Ctx(), "lagging frontier: %s with span %s", lrw.frontier.Frontier(), lrw.frontier.PeekFrontierSpan()) + } + } + lrw.debug.RecordRecvStart() } return lrw.subscription.Err() } @@ -534,6 +545,7 @@ func (lrw *logicalReplicationWriterProcessor) checkpoint( return nil } lrw.metrics.CheckpointEvents.Inc(1) + lrw.debug.RecordCheckpoint(lrw.frontier.Frontier().GoTime()) return nil } diff --git a/pkg/ccl/crosscluster/logical/purgatory.go b/pkg/ccl/crosscluster/logical/purgatory.go index 4b992dd96f92..a9d7fa7475a0 100644 --- a/pkg/ccl/crosscluster/logical/purgatory.go +++ b/pkg/ccl/crosscluster/logical/purgatory.go @@ -58,6 +58,7 @@ type purgatory struct { bytes int64 levels []purgatoryLevel eventsGauge, bytesGauge *metric.Gauge + debug *streampb.DebugLogicalConsumerStatus } type purgatoryLevel struct { @@ -95,6 +96,7 @@ func (p *purgatory) Store( p.bytes += byteSize p.bytesGauge.Inc(byteSize) p.eventsGauge.Inc(int64(len(events))) + p.debug.RecordPurgatory(int64(len(events))) return nil } @@ -133,7 +135,9 @@ func (p *purgatory) Drain(ctx context.Context) error { } p.bytesGauge.Dec(levelBytes - p.levels[i].bytes) - p.eventsGauge.Dec(int64(levelCount - len(remaining))) + flushedEventCount := int64(levelCount - len(remaining)) + p.eventsGauge.Dec(flushedEventCount) + p.debug.RecordPurgatory(-flushedEventCount) // If we have resolved every prior level and all events in this level were // handled, we can resolve this level and emit its checkpoint, if any. diff --git a/pkg/ccl/crosscluster/logical/purgatory_test.go b/pkg/ccl/crosscluster/logical/purgatory_test.go index 820e49d249b5..d287e5bc7256 100644 --- a/pkg/ccl/crosscluster/logical/purgatory_test.go +++ b/pkg/ccl/crosscluster/logical/purgatory_test.go @@ -60,6 +60,7 @@ func TestPurgatory(t *testing.T) { resolved = int(sp[0].Timestamp.WallTime) return nil }, + debug: &streampb.DebugLogicalConsumerStatus{}, } sz := int64((&streampb.StreamEvent_KV{KeyValue: roachpb.KeyValue{Key: roachpb.Key("a")}}).Size()) diff --git a/pkg/ccl/utilccl/BUILD.bazel b/pkg/ccl/utilccl/BUILD.bazel index 40571896bcc8..b03364997f47 100644 --- a/pkg/ccl/utilccl/BUILD.bazel +++ b/pkg/ccl/utilccl/BUILD.bazel @@ -45,6 +45,7 @@ go_test( "//pkg/server/license", "//pkg/settings", "//pkg/settings/cluster", + "//pkg/sql", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/util/envutil", diff --git a/pkg/ccl/utilccl/license_check_test.go b/pkg/ccl/utilccl/license_check_test.go index f29ea2d636f8..e884acf6f48c 100644 --- a/pkg/ccl/utilccl/license_check_test.go +++ b/pkg/ccl/utilccl/license_check_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/license" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -293,7 +294,7 @@ func TestRefreshLicenseEnforcerOnLicenseChange(t *testing.T) { // Test to ensure that the state is correctly registered on startup before // changing the license. - enforcer := license.GetEnforcerInstance() + enforcer := srv.SystemLayer().ExecutorConfig().(sql.ExecutorConfig).LicenseEnforcer require.Equal(t, false, enforcer.GetHasLicense()) gracePeriodTS, hasGracePeriod := enforcer.GetGracePeriodEndTS() require.True(t, hasGracePeriod) diff --git a/pkg/cli/zip_table_registry.go b/pkg/cli/zip_table_registry.go index 1250aa6575a2..a2bfb15c86da 100644 --- a/pkg/cli/zip_table_registry.go +++ b/pkg/cli/zip_table_registry.go @@ -1089,22 +1089,23 @@ var zipInternalTablesPerNode = DebugZipTableRegistry{ nonSensitiveCols: NonSensitiveColumns{ "stream_id", "consumer", - "recv_wait", - "last_recv_wait", - "flush_count", + "state", + "recv_time", + "last_recv_time", + "ingest_time", "flush_time", + "flush_count", "flush_kvs", "flush_bytes", "flush_batches", - "last_time", - "last_kvs", - "last_bytes", + "last_flush_time", + "last_kvs_done", + "last_kvs_todo", + "last_batches", "last_slowest", - "cur_time", - "cur_kvs_done", - "cur_kvs_todo", - "cur_batches", - "cur_slowest", + "checkpoints", + "retry_size", + "resolved_age", }, }, } diff --git a/pkg/repstream/streampb/BUILD.bazel b/pkg/repstream/streampb/BUILD.bazel index 066054ac985d..5435089ad2a4 100644 --- a/pkg/repstream/streampb/BUILD.bazel +++ b/pkg/repstream/streampb/BUILD.bazel @@ -46,5 +46,8 @@ go_library( embed = [":streampb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/repstream/streampb", visibility = ["//visibility:public"], - deps = ["//pkg/util/syncutil"], + deps = [ + "//pkg/util/syncutil", + "//pkg/util/timeutil", + ], ) diff --git a/pkg/repstream/streampb/empty.go b/pkg/repstream/streampb/empty.go index f79d1b76fdd9..fe77fb69a63a 100644 --- a/pkg/repstream/streampb/empty.go +++ b/pkg/repstream/streampb/empty.go @@ -15,6 +15,7 @@ import ( time "time" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) type DebugProducerStatus struct { @@ -129,23 +130,58 @@ type DebugLogicalConsumerStatus struct { } } +type LogicalConsumerState int + +const ( + Other LogicalConsumerState = iota + Flushing + Waiting +) + +func (s LogicalConsumerState) String() string { + switch s { + case Other: + return "other" + case Flushing: + return "flush" + case Waiting: + return "receive" + default: + return "unknown" + } +} + type DebugLogicalConsumerStats struct { Recv struct { + CurReceiveStart time.Time LastWaitNanos, TotalWaitNanos int64 } + Ingest struct { + // TotalIngestNanos is the total time spent not waiting for data. + CurIngestStart time.Time + TotalIngestNanos int64 + } + Flushes struct { Count, Nanos, KVs, Bytes, Batches int64 - Current struct { - StartedUnixMicros, ProcessedKVs, TotalKVs, Batches, SlowestBatchNanos int64 + Last struct { + CurFlushStart time.Time + LastFlushNanos, ProcessedKVs, TotalKVs, Batches, SlowestBatchNanos int64 // TODO(dt): BatchErrors atomic.Int64 // TODO(dt): LastBatchErr atomic.Value } - Last struct { - Nanos, KVs, Bytes, Batches, SlowestBatchNanos int64 - // TODO(dt): Errors atomic.Int64 - } + } + Checkpoints struct { + Count int64 + LastCheckpoint, Resolved time.Time + } + + CurrentState LogicalConsumerState + + Purgatory struct { + CurrentCount int64 } } @@ -154,62 +190,86 @@ func (d *DebugLogicalConsumerStatus) GetStats() DebugLogicalConsumerStats { defer d.mu.Unlock() return d.mu.stats } +func (d *DebugLogicalConsumerStatus) RecordRecvStart() { + d.mu.Lock() + defer d.mu.Unlock() + d.mu.stats.Recv.CurReceiveStart = timeutil.Now() + d.mu.stats.CurrentState = Waiting + if !d.mu.stats.Ingest.CurIngestStart.IsZero() { + d.mu.stats.Ingest.TotalIngestNanos += timeutil.Since(d.mu.stats.Ingest.CurIngestStart).Nanoseconds() + } + d.mu.stats.Recv.LastWaitNanos = 0 + d.mu.stats.Ingest.CurIngestStart = time.Time{} +} -func (d *DebugLogicalConsumerStatus) RecordRecv(wait time.Duration) { - nanos := wait.Nanoseconds() +func (d *DebugLogicalConsumerStatus) RecordRecv() { d.mu.Lock() + defer d.mu.Unlock() + nanos := timeutil.Since(d.mu.stats.Recv.CurReceiveStart).Nanoseconds() + d.mu.stats.CurrentState = Other d.mu.stats.Recv.LastWaitNanos = nanos d.mu.stats.Recv.TotalWaitNanos += nanos - d.mu.Unlock() + + d.mu.stats.Ingest.CurIngestStart = timeutil.Now() + d.mu.stats.Recv.CurReceiveStart = time.Time{} } func (d *DebugLogicalConsumerStatus) SetInjectedFailurePercent(percent uint32) { d.mu.Lock() d.mu.injectFailurePercent = percent - d.mu.Unlock() + d.mu.Unlock() // nolint:deferunlockcheck } func (d *DebugLogicalConsumerStatus) RecordFlushStart(start time.Time, keyCount int64) uint32 { - micros := start.UnixMicro() d.mu.Lock() - d.mu.stats.Flushes.Current.TotalKVs = keyCount - d.mu.stats.Flushes.Current.StartedUnixMicros = micros + defer d.mu.Unlock() + // Reset the incremental flush stats. + d.mu.stats.Flushes.Last.SlowestBatchNanos = 0 + d.mu.stats.Flushes.Last.Batches = 0 + d.mu.stats.Flushes.Last.ProcessedKVs = 0 + d.mu.stats.Flushes.Last.LastFlushNanos = 0 + + d.mu.stats.CurrentState = Flushing + d.mu.stats.Flushes.Last.CurFlushStart = start + d.mu.stats.Flushes.Last.TotalKVs = keyCount failPercent := d.mu.injectFailurePercent - d.mu.Unlock() return failPercent } func (d *DebugLogicalConsumerStatus) RecordBatchApplied(t time.Duration, keyCount int64) { nanos := t.Nanoseconds() d.mu.Lock() - d.mu.stats.Flushes.Current.Batches++ - d.mu.stats.Flushes.Current.ProcessedKVs += keyCount - if d.mu.stats.Flushes.Current.SlowestBatchNanos < nanos { // nolint:deferunlockcheck - d.mu.stats.Flushes.Current.SlowestBatchNanos = nanos + d.mu.stats.Flushes.Last.Batches++ + d.mu.stats.Flushes.Last.ProcessedKVs += keyCount + if d.mu.stats.Flushes.Last.SlowestBatchNanos < nanos { // nolint:deferunlockcheck + d.mu.stats.Flushes.Last.SlowestBatchNanos = nanos } d.mu.Unlock() // nolint:deferunlockcheck } func (d *DebugLogicalConsumerStatus) RecordFlushComplete(totalNanos, keyCount, byteSize int64) { d.mu.Lock() - + defer d.mu.Unlock() + d.mu.stats.CurrentState = Other d.mu.stats.Flushes.Count++ d.mu.stats.Flushes.Nanos += totalNanos d.mu.stats.Flushes.KVs += keyCount d.mu.stats.Flushes.Bytes += byteSize - d.mu.stats.Flushes.Batches += d.mu.stats.Flushes.Current.Batches - - d.mu.stats.Flushes.Last.Nanos = totalNanos - d.mu.stats.Flushes.Last.Batches = d.mu.stats.Flushes.Current.Batches - d.mu.stats.Flushes.Last.SlowestBatchNanos = d.mu.stats.Flushes.Current.SlowestBatchNanos - d.mu.stats.Flushes.Last.KVs = keyCount - d.mu.stats.Flushes.Last.Bytes = byteSize + d.mu.stats.Flushes.Batches += d.mu.stats.Flushes.Last.Batches + d.mu.stats.Flushes.Last.LastFlushNanos = totalNanos + d.mu.stats.Flushes.Last.CurFlushStart = time.Time{} +} - d.mu.stats.Flushes.Current.StartedUnixMicros = 0 - d.mu.stats.Flushes.Current.SlowestBatchNanos = 0 - d.mu.stats.Flushes.Current.Batches = 0 - d.mu.stats.Flushes.Current.TotalKVs = 0 - d.mu.stats.Flushes.Current.ProcessedKVs = 0 +func (d *DebugLogicalConsumerStatus) RecordCheckpoint(resolved time.Time) { + d.mu.Lock() + d.mu.stats.Checkpoints.Count++ + d.mu.stats.Checkpoints.Resolved = resolved + d.mu.stats.Checkpoints.LastCheckpoint = timeutil.Now() // nolint:deferunlockcheck + d.mu.Unlock() +} - d.mu.Unlock() // nolint:deferunlockcheck +func (d *DebugLogicalConsumerStatus) RecordPurgatory(netEvents int64) { + d.mu.Lock() + d.mu.stats.Purgatory.CurrentCount += netEvents + d.mu.Unlock() } diff --git a/pkg/roachprod/install/cluster_synced.go b/pkg/roachprod/install/cluster_synced.go index 89c171bc63f6..b788a22a7742 100644 --- a/pkg/roachprod/install/cluster_synced.go +++ b/pkg/roachprod/install/cluster_synced.go @@ -217,8 +217,8 @@ func scpWithRetry( scpCtx, cancel := context.WithTimeout(ctx, scpTimeout) defer cancel() - return runWithMaybeRetry(ctx, l, DefaultRetryOpt, defaultSCPShouldRetryFn, - func(ctx context.Context) (*RunResultDetails, error) { return scp(scpCtx, l, src, dest) }) + return runWithMaybeRetry(scpCtx, l, DefaultRetryOpt, defaultSCPShouldRetryFn, + func(ctx context.Context) (*RunResultDetails, error) { return scp(ctx, l, src, dest) }) } // Host returns the public IP of a node. @@ -2801,6 +2801,7 @@ func scp(ctx context.Context, l *logger.Logger, src, dest string) (*RunResultDet args = append(args, sshAuthArgs()...) args = append(args, src, dest) cmd := exec.CommandContext(ctx, args[0], args[1:]...) + cmd.WaitDelay = time.Second // make sure the call below returns when the context is canceled out, err := cmd.CombinedOutput() if err != nil { diff --git a/pkg/server/config.go b/pkg/server/config.go index d48a6b809949..cfc737d41477 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/server/license" "github.com/cockroachdb/cockroach/pkg/server/status" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" @@ -542,6 +543,9 @@ type SQLConfig struct { // NodeMetricsRecorder is the node's MetricRecorder; the tenant's metrics will // be recorded with it. Nil if this is not a shared-process tenant. NodeMetricsRecorder *status.MetricsRecorder + + // LicenseEnforcer is used to enforce license policies. + LicenseEnforcer *license.Enforcer } // LocalKVServerInfo is used to group information about the local KV server @@ -575,6 +579,7 @@ func (sqlCfg *SQLConfig) SetDefaults(tempStorageCfg base.TempStorageConfig) { sqlCfg.TableStatCacheSize = defaultSQLTableStatCacheSize sqlCfg.QueryCacheSize = defaultSQLQueryCacheSize sqlCfg.TempStorageConfig = tempStorageCfg + sqlCfg.LicenseEnforcer = license.NewEnforcer(nil) } // setOpenFileLimit sets the soft limit for open file descriptors to the hard diff --git a/pkg/server/config_test.go b/pkg/server/config_test.go index 438a5d8f3b40..9be7d6372746 100644 --- a/pkg/server/config_test.go +++ b/pkg/server/config_test.go @@ -167,6 +167,9 @@ func TestReadEnvironmentVariables(t *testing.T) { // Temp storage disk monitors will have slightly different names, so we // override them to point to the same one. cfgExpected.TempStorageConfig.Mon = cfg.TempStorageConfig.Mon + // The LicenseEnforcer initializes a start time, which can vary between runs, + // so we ensure they are the same for comparison. + cfgExpected.LicenseEnforcer = cfg.LicenseEnforcer require.Equal(t, cfgExpected, cfg) // Set all the environment variables to valid values and ensure they are set diff --git a/pkg/server/license/BUILD.bazel b/pkg/server/license/BUILD.bazel index 68c41e26adc4..4af3addeb4d4 100644 --- a/pkg/server/license/BUILD.bazel +++ b/pkg/server/license/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "cclbridge.go", "enforcer.go", + "opts.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/server/license", visibility = ["//visibility:public"], diff --git a/pkg/server/license/enforcer.go b/pkg/server/license/enforcer.go index ff14e17fee8a..d801fdd8098f 100644 --- a/pkg/server/license/enforcer.go +++ b/pkg/server/license/enforcer.go @@ -14,7 +14,6 @@ import ( "context" "fmt" "strings" - "sync" "sync/atomic" "time" @@ -41,10 +40,14 @@ const ( type Enforcer struct { mu struct { syncutil.Mutex - // testingKnobs are used to control the behavior of the enforcer for testing. - testingKnobs *TestingKnobs + + // setupComplete ensures that Start() is called only once. + setupComplete bool } + // testingKnobs are used to control the behavior of the enforcer for testing. + testingKnobs *TestingKnobs + // telemetryStatusReporter is an interface for getting the timestamp of the // last successful ping to the telemetry server. For some licenses, sending // telemetry data is required to avoid throttling. @@ -126,26 +129,17 @@ type TelemetryStatusReporter interface { GetLastSuccessfulTelemetryPing() time.Time } -var instance *Enforcer -var once sync.Once - -// GetEnforcerInstance returns the singleton instance of the Enforcer. The -// Enforcer is responsible for license enforcement policies. -func GetEnforcerInstance() *Enforcer { - once.Do( - func() { - instance = newEnforcer() - }) - return instance -} - -// newEnforcer creates a new Enforcer object. -func newEnforcer() *Enforcer { +// NewEnforcer creates a new Enforcer object. +func NewEnforcer(tk *TestingKnobs) *Enforcer { e := &Enforcer{ startTime: timeutil.Now(), throttleLogger: log.Every(5 * time.Minute), + testingKnobs: tk, + } + // Start is disabled by default unless overridden by testing knobs. + if tk == nil || !tk.Enable { + e.isDisabled.Store(true) } - e.isDisabled.Store(true) // Start disabled until Start() is called return e } @@ -154,23 +148,30 @@ func (e *Enforcer) SetTelemetryStatusReporter(reporter TelemetryStatusReporter) e.telemetryStatusReporter.Store(&reporter) } -// SetTesting Knobs will set the pointer to the testing knobs. -func (e *Enforcer) SetTestingKnobs(k *TestingKnobs) { - e.mu.Lock() - defer e.mu.Unlock() - e.mu.testingKnobs = k +func (e *Enforcer) GetTestingKnobs() *TestingKnobs { + return e.testingKnobs } -func (e *Enforcer) GetTestingKnobs() *TestingKnobs { +// Start will load the necessary metadata for the enforcer. If called for the +// system tenant, it reads from the KV license metadata and will populate any +// missing data as needed. +func (e *Enforcer) Start(ctx context.Context, st *cluster.Settings, opts ...Option) error { + options := options{} + for _, o := range opts { + o.apply(&options) + } + e.mu.Lock() defer e.mu.Unlock() - return e.mu.testingKnobs -} + if e.mu.setupComplete { + return nil + } + + if options.testingKnobs != nil { + e.testingKnobs = options.testingKnobs + } + e.telemetryStatusReporter.Store(&options.telemetryStatusReporter) -// Start will load the necessary metadata for the enforcer. It reads from the -// KV license metadata and will populate any missing data as needed. The DB -// passed in must have access to the system tenant. -func (e *Enforcer) Start(ctx context.Context, st *cluster.Settings, db isql.DB) error { // We always start disabled. If an error occurs, the enforcer setup will be // incomplete, but the server will continue to start. To ensure stability in // that case, we leave throttling disabled. @@ -180,7 +181,7 @@ func (e *Enforcer) Start(ctx context.Context, st *cluster.Settings, db isql.DB) e.maybeLogActiveOverrides(ctx) if !startDisabled { - if err := e.maybeWriteClusterInitGracePeriodTS(ctx, db); err != nil { + if err := e.maybeWriteClusterInitGracePeriodTS(ctx, options); err != nil { return err } } @@ -198,14 +199,31 @@ func (e *Enforcer) Start(ctx context.Context, st *cluster.Settings, db isql.DB) // This should be the final step after all error checks are completed. e.isDisabled.Store(startDisabled) + e.mu.setupComplete = true return nil } // maybeWriteClusterInitGracePeriodTS checks if the cluster init grace period // timestamp needs to be written to the KV layer and writes it if needed. -func (e *Enforcer) maybeWriteClusterInitGracePeriodTS(ctx context.Context, db isql.DB) error { - return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { +func (e *Enforcer) maybeWriteClusterInitGracePeriodTS(ctx context.Context, options options) error { + // Secondary tenants do not have access to the system keyspace where + // the cluster init grace period is stored. As a fallback, we apply a 7-day + // grace period from the tenant's start time, which is used only when no + // license is installed. This logic applies specifically when secondary + // tenants are started in a separate process from the system tenant. If they + // are not, a shared enforcer will have access to the system keyspace and + // handle the grace period. + // TODO(spilchen): Change to give the secondary tenant read access to the + // system keyspace KV. + if !options.isSystemTenant { + gracePeriodLength := e.getGracePeriodDuration(7 * 24 * time.Hour) + end := e.getStartTime().Add(gracePeriodLength) + e.clusterInitGracePeriodEndTS.Store(end.Unix()) + return nil + } + + return options.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { // We could use a conditional put for this logic. However, we want to read // and cache the value, and the common case is that the value will be read. // Only during the initialization of the first node in the cluster will we diff --git a/pkg/server/license/enforcer_test.go b/pkg/server/license/enforcer_test.go index e70c9417b35d..469417ac8aae 100644 --- a/pkg/server/license/enforcer_test.go +++ b/pkg/server/license/enforcer_test.go @@ -47,7 +47,8 @@ func TestClusterInitGracePeriod_NoOverwrite(t *testing.T) { // This is the timestamp that we'll override the grace period init timestamp with. // This will be set when bringing up the server. ts1 := timeutil.Unix(1724329716, 0) - ts1End := ts1.Add(30 * 24 * time.Hour) // Calculate the end of the grace period based on ts1 + ts1_30d := ts1.Add(30 * 24 * time.Hour) + ts1_7d := ts1.Add(7 * 24 * time.Hour) ctx := context.Background() srv := serverutils.StartServerOnly(t, base.TestServerArgs{ @@ -64,26 +65,35 @@ func TestClusterInitGracePeriod_NoOverwrite(t *testing.T) { // Create a new enforcer, to test that it won't overwrite the grace period init // timestamp that was already setup. - enforcer := &license.Enforcer{} ts2 := ts1.Add(1) ts2End := ts2.Add(7 * 24 * time.Hour) // Calculate the end of the grace period - enforcer.SetTestingKnobs(&license.TestingKnobs{ - Enable: true, - OverrideStartTime: &ts2, - }) + enforcer := license.NewEnforcer( + &license.TestingKnobs{ + Enable: true, + OverrideStartTime: &ts2, + }) // Ensure request for the grace period init ts1 before start just returns the start // time used when the enforcer was created. require.Equal(t, ts2End, enforcer.GetClusterInitGracePeriodEndTS()) // Start the enforcer to read the timestamp from the KV. - enforcer.SetTelemetryStatusReporter(&mockTelemetryStatusReporter{lastPingTime: ts1}) - err := enforcer.Start(ctx, srv.ClusterSettings(), srv.SystemLayer().InternalDB().(descs.DB)) + err := enforcer.Start(ctx, srv.ClusterSettings(), + license.WithDB(srv.SystemLayer().InternalDB().(descs.DB)), + license.WithSystemTenant(true), + license.WithTelemetryStatusReporter(&mockTelemetryStatusReporter{lastPingTime: ts1}), + ) require.NoError(t, err) - require.Equal(t, ts1End, enforcer.GetClusterInitGracePeriodEndTS()) + require.Equal(t, ts1_30d, enforcer.GetClusterInitGracePeriodEndTS()) // Access the enforcer that is cached in the executor config to make sure they // work for the system tenant and secondary tenant. - require.Equal(t, ts1End, srv.SystemLayer().ExecutorConfig().(sql.ExecutorConfig).LicenseEnforcer.GetClusterInitGracePeriodEndTS()) - require.Equal(t, ts1End, srv.ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig).LicenseEnforcer.GetClusterInitGracePeriodEndTS()) + require.Equal(t, ts1_30d, srv.SystemLayer().ExecutorConfig().(sql.ExecutorConfig).LicenseEnforcer.GetClusterInitGracePeriodEndTS()) + // TODO(spilchen): Until the secondary tenant can read from the KV, it will + // guess the ending grace period to be 7-days after start. This will be fixed + // in CRDB-42309. Depending on how the test was initialized, it will be either + // the shared process secondary tenant (ts1_30d) or the separate process + // secondary tenant (ts1_7d). + require.Contains(t, []time.Time{ts1_30d, ts1_7d}, + srv.ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig).LicenseEnforcer.GetClusterInitGracePeriodEndTS()) } func TestClusterInitGracePeriod_NewClusterEstimation(t *testing.T) { @@ -117,12 +127,12 @@ func TestClusterInitGracePeriod_NewClusterEstimation(t *testing.T) { {"init-1h1min-ago", ts1.Add(-61 * time.Minute), ts1.Add(30 * 24 * time.Hour)}, } { t.Run(tc.desc, func(t *testing.T) { - enforcer := &license.Enforcer{} - enforcer.SetTestingKnobs(&license.TestingKnobs{ - Enable: true, - OverrideStartTime: &ts1, - OverwriteClusterInitGracePeriodTS: true, - }) + enforcer := license.NewEnforcer( + &license.TestingKnobs{ + Enable: true, + OverrideStartTime: &ts1, + OverwriteClusterInitGracePeriodTS: true, + }) // Set up the min time in system.migrations. This is used by the enforcer // to figure out if the cluster is new or old. The grace period length is @@ -136,7 +146,15 @@ func TestClusterInitGracePeriod_NewClusterEstimation(t *testing.T) { }) require.NoError(t, err) - err = enforcer.Start(ctx, srv.ClusterSettings(), srv.SystemLayer().InternalDB().(descs.DB)) + err = enforcer.Start(ctx, srv.ClusterSettings(), + license.WithDB(db), + license.WithSystemTenant(true), + license.WithTestingKnobs(&license.TestingKnobs{ + Enable: true, + OverrideStartTime: &ts1, + OverwriteClusterInitGracePeriodTS: true, + }), + ) require.NoError(t, err) require.Equal(t, tc.expGracePeriodEndTS, enforcer.GetClusterInitGracePeriodEndTS()) }) @@ -210,11 +228,12 @@ func TestThrottle(t *testing.T) { {OverTxnThreshold, license.LicTypeEvaluation, t0, t0, t15d, t46d, "License expired", ""}, } { t.Run(fmt.Sprintf("test %d", i), func(t *testing.T) { - e := license.Enforcer{} - e.SetTestingKnobs(&license.TestingKnobs{ - OverrideStartTime: &tc.gracePeriodInit, - OverrideThrottleCheckTime: &tc.checkTs, - }) + e := license.NewEnforcer( + &license.TestingKnobs{ + Enable: true, + OverrideStartTime: &tc.gracePeriodInit, + OverrideThrottleCheckTime: &tc.checkTs, + }) e.SetTelemetryStatusReporter(&mockTelemetryStatusReporter{ lastPingTime: tc.lastTelemetryPingTime, }) @@ -296,7 +315,7 @@ func TestThrottleErrorMsg(t *testing.T) { defer srv.Stopper().Stop(ctx) // Set up a free license that will expire in 30 days - licenseEnforcer := srv.SystemLayer().ExecutorConfig().(sql.ExecutorConfig).LicenseEnforcer + licenseEnforcer := srv.ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig).LicenseEnforcer licenseEnforcer.RefreshForLicenseChange(ctx, license.LicTypeFree, t30d) for _, tc := range []struct { diff --git a/pkg/server/license/opts.go b/pkg/server/license/opts.go new file mode 100644 index 000000000000..7fd26d23c237 --- /dev/null +++ b/pkg/server/license/opts.go @@ -0,0 +1,56 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package license + +import "github.com/cockroachdb/cockroach/pkg/sql/isql" + +type options struct { + db isql.DB + isSystemTenant bool + testingKnobs *TestingKnobs + telemetryStatusReporter TelemetryStatusReporter +} + +type Option interface { + apply(*options) +} + +type optionFunc func(*options) + +func (f optionFunc) apply(o *options) { + f(o) +} + +func WithDB(db isql.DB) Option { + return optionFunc(func(o *options) { + o.db = db + }) +} + +func WithSystemTenant(v bool) Option { + return optionFunc(func(o *options) { + o.isSystemTenant = v + }) +} + +func WithTestingKnobs(tk *TestingKnobs) Option { + return optionFunc(func(o *options) { + if tk != nil { + o.testingKnobs = tk + } + }) +} + +func WithTelemetryStatusReporter(r TelemetryStatusReporter) Option { + return optionFunc(func(o *options) { + o.telemetryStatusReporter = r + }) +} diff --git a/pkg/server/server_controller_new_server.go b/pkg/server/server_controller_new_server.go index 9475eaf57b5d..027804f79ca9 100644 --- a/pkg/server/server_controller_new_server.go +++ b/pkg/server/server_controller_new_server.go @@ -352,6 +352,7 @@ func makeSharedProcessTenantServerConfig( sqlCfg.LocalKVServerInfo = &kvServerInfo sqlCfg.NodeMetricsRecorder = nodeMetricsRecorder + sqlCfg.LicenseEnforcer = kvServerCfg.SQLConfig.LicenseEnforcer return baseCfg, sqlCfg, nil } diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 1d8b25ac6cfc..2a49de0d5b7c 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -1061,7 +1061,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { NodeDescs: cfg.nodeDescs, TenantCapabilitiesReader: cfg.tenantCapabilitiesReader, CidrLookup: cfg.BaseConfig.CidrLookup, - LicenseEnforcer: license.GetEnforcerInstance(), + LicenseEnforcer: cfg.SQLConfig.LicenseEnforcer, } if codec.ForSystemTenant() { @@ -1923,23 +1923,23 @@ func (s *SQLServer) startLicenseEnforcer(ctx context.Context, knobs base.Testing // Start the license enforcer. This is only started for the system tenant since // it requires access to the system keyspace. For secondary tenants, this struct // is shared to provide access to the values cached from the KV read. - if s.execCfg.Codec.ForSystemTenant() { - licenseEnforcer := s.execCfg.LicenseEnforcer - if knobs.Server != nil { - s.execCfg.LicenseEnforcer.SetTestingKnobs(&knobs.Server.(*TestingKnobs).LicenseTestingKnobs) - } - licenseEnforcer.SetTelemetryStatusReporter(s.diagnosticsReporter) - // TODO(spilchen): we need to tell the license enforcer about the - // diagnostics reporter. This will be handled in CRDB-39991 - err := startup.RunIdempotentWithRetry(ctx, s.stopper.ShouldQuiesce(), "license enforcer start", - func(ctx context.Context) error { - return licenseEnforcer.Start(ctx, s.cfg.Settings, s.internalDB) - }) - // This is not a critical component. If it fails to start, we log a warning - // rather than prevent the entire server from starting. - if err != nil { - log.Warningf(ctx, "failed to start the license enforcer: %v", err) - } + licenseEnforcer := s.execCfg.LicenseEnforcer + opts := []license.Option{ + license.WithDB(s.internalDB), + license.WithSystemTenant(s.execCfg.Codec.ForSystemTenant()), + license.WithTelemetryStatusReporter(s.diagnosticsReporter), + } + if knobs.Server != nil { + opts = append(opts, license.WithTestingKnobs(&knobs.Server.(*TestingKnobs).LicenseTestingKnobs)) + } + err := startup.RunIdempotentWithRetry(ctx, s.stopper.ShouldQuiesce(), "license enforcer start", + func(ctx context.Context) error { + return licenseEnforcer.Start(ctx, s.cfg.Settings, opts...) + }) + // This is not a critical component. If it fails to start, we log a warning + // rather than prevent the entire server from starting. + if err != nil { + log.Warningf(ctx, "failed to start the license enforcer: %v", err) } } diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 357c2e45b9ba..8d9fc59c59bf 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -625,6 +625,7 @@ func (ts *testServer) startDefaultTestTenant( if ts.params.Knobs.Server != nil { params.TestingKnobs.Server.(*TestingKnobs).DiagnosticsTestingKnobs = ts.params.Knobs.Server.(*TestingKnobs).DiagnosticsTestingKnobs + params.TestingKnobs.Server.(*TestingKnobs).LicenseTestingKnobs = ts.params.Knobs.Server.(*TestingKnobs).LicenseTestingKnobs } return ts.StartTenant(ctx, params) } @@ -641,6 +642,7 @@ func (ts *testServer) getSharedProcessDefaultTenantArgs() base.TestSharedProcess args.Knobs.Server = &TestingKnobs{} if ts.params.Knobs.Server != nil { args.Knobs.Server.(*TestingKnobs).DiagnosticsTestingKnobs = ts.params.Knobs.Server.(*TestingKnobs).DiagnosticsTestingKnobs + args.Knobs.Server.(*TestingKnobs).LicenseTestingKnobs = ts.params.Knobs.Server.(*TestingKnobs).LicenseTestingKnobs } return args } diff --git a/pkg/sql/catalog/lease/descriptor_state.go b/pkg/sql/catalog/lease/descriptor_state.go index 8c334ef5dea2..2b11d9d4d3a6 100644 --- a/pkg/sql/catalog/lease/descriptor_state.go +++ b/pkg/sql/catalog/lease/descriptor_state.go @@ -185,7 +185,9 @@ func (t *descriptorState) upsertLeaseLocked( // is subsumed we have nothing to delete. In dual-write mode clearing // this guarantees only the old expiry based lease is cleaned up. In // Session only clearing this means the release is a no-op. - toRelease.sessionID = nil + if toRelease != nil { + toRelease.sessionID = nil + } return nil, toRelease, nil } diff --git a/pkg/sql/conn_executor_internal_test.go b/pkg/sql/conn_executor_internal_test.go index 34d0ee75614b..02f58ea07261 100644 --- a/pkg/sql/conn_executor_internal_test.go +++ b/pkg/sql/conn_executor_internal_test.go @@ -347,7 +347,7 @@ func startConnExecutor( StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, st), HistogramWindowInterval: base.DefaultHistogramWindowInterval(), CollectionFactory: collectionFactory, - LicenseEnforcer: license.GetEnforcerInstance(), + LicenseEnforcer: license.NewEnforcer(nil), } s := NewServer(cfg, pool) diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index a55cca42148f..ef1da3a35844 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiespb" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/security/username" @@ -9393,22 +9394,24 @@ var crdbInternalLDRProcessorTable = virtualSchemaTable{ CREATE TABLE crdb_internal.logical_replication_node_processors ( stream_id INT, consumer STRING, - recv_wait INTERVAL, - last_recv_wait INTERVAL, - flush_count INT, + state STRING, + recv_time INTERVAL, + last_recv_time INTERVAL, + ingest_time INTERVAL, flush_time INTERVAL, + flush_count INT, flush_kvs INT, flush_bytes INT, flush_batches INT, - last_time INTERVAL, - last_kvs INT, - last_bytes INT, + last_flush_time INTERVAL, + last_kvs_done INT, + last_kvs_todo INT, + last_batches INT, last_slowest INTERVAL, - cur_time INTERVAL, - cur_kvs_done INT, - cur_kvs_todo INT, - cur_batches INT, - cur_slowest INTERVAL + last_checkpoint INTERVAL, + checkpoints INT, + retry_size INT, + resolved_age INTERVAL );`, populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { sm, err := p.EvalContext().StreamManagerFactory.GetReplicationStreamManager(ctx) @@ -9420,43 +9423,51 @@ CREATE TABLE crdb_internal.logical_replication_node_processors ( return err } now := p.EvalContext().GetStmtTimestamp() + dur := func(nanos int64) tree.Datum { + return tree.NewDInterval(duration.MakeDuration(nanos, 0, 0), types.DefaultIntervalTypeMetadata) + } + nanosSince := func(t time.Time) int64 { + if t.IsZero() { + return 0 + } + return now.Sub(t).Nanoseconds() + } age := func(t time.Time) tree.Datum { if t.Unix() == 0 { return tree.DNull } return tree.NewDInterval(duration.Age(now, t), types.DefaultIntervalTypeMetadata) } - dur := func(nanos int64) tree.Datum { - return tree.NewDInterval(duration.MakeDuration(nanos, 0, 0), types.DefaultIntervalTypeMetadata) - } for _, container := range sm.DebugGetLogicalConsumerStatuses(ctx) { status := container.GetStats() - nullCur := func(x tree.Datum) tree.Datum { - if status.Flushes.Current.StartedUnixMicros == 0 { - return tree.DNull + curOrLast := func(currentNanos int64, lastNanos int64, currentState streampb.LogicalConsumerState) tree.Datum { + if status.CurrentState == currentState { + return dur(currentNanos) } - return x + return dur(lastNanos) } if err := addRow( tree.NewDInt(tree.DInt(container.StreamID)), tree.NewDString(fmt.Sprintf("%d[%d]", p.extendedEvalCtx.ExecCfg.JobRegistry.ID(), container.ProcessorID)), - dur(status.Recv.TotalWaitNanos), - dur(status.Recv.LastWaitNanos), - tree.NewDInt(tree.DInt(status.Flushes.Count)), - dur(status.Flushes.Nanos), - tree.NewDInt(tree.DInt(status.Flushes.KVs)), - tree.NewDInt(tree.DInt(status.Flushes.Bytes)), - tree.NewDInt(tree.DInt(status.Flushes.Batches)), - dur(status.Flushes.Last.Nanos), - tree.NewDInt(tree.DInt(status.Flushes.Last.KVs)), - tree.NewDInt(tree.DInt(status.Flushes.Last.Bytes)), - dur(status.Flushes.Last.SlowestBatchNanos), - nullCur(age(time.UnixMicro(status.Flushes.Current.StartedUnixMicros))), - nullCur(tree.NewDInt(tree.DInt(status.Flushes.Current.TotalKVs))), - nullCur(tree.NewDInt(tree.DInt(status.Flushes.Current.ProcessedKVs))), - nullCur(tree.NewDInt(tree.DInt(status.Flushes.Current.Batches))), - nullCur(dur(status.Flushes.Current.SlowestBatchNanos)), + tree.NewDString(status.CurrentState.String()), // current_state + dur(status.Recv.TotalWaitNanos+nanosSince(status.Recv.CurReceiveStart)), // recv_time + curOrLast(nanosSince(status.Recv.CurReceiveStart), status.Recv.LastWaitNanos, streampb.Waiting), // last_recv_time + dur(status.Ingest.TotalIngestNanos+nanosSince(status.Ingest.CurIngestStart)), // ingest_time + dur(status.Flushes.Nanos+nanosSince(status.Flushes.Last.CurFlushStart)), // flush_time + tree.NewDInt(tree.DInt(status.Flushes.Count)), // flush_count + tree.NewDInt(tree.DInt(status.Flushes.KVs)), // flush_kvs + tree.NewDInt(tree.DInt(status.Flushes.Bytes)), // flush_bytes + tree.NewDInt(tree.DInt(status.Flushes.Batches)), // flush_batches + curOrLast(nanosSince(status.Flushes.Last.CurFlushStart), status.Flushes.Last.LastFlushNanos, streampb.Flushing), // last_flush_time + tree.NewDInt(tree.DInt(status.Flushes.Last.TotalKVs)), // last_kvs_done + tree.NewDInt(tree.DInt(status.Flushes.Last.ProcessedKVs)), // last_kvs_todo + tree.NewDInt(tree.DInt(status.Flushes.Last.Batches)), // last_batches + dur(status.Flushes.Last.SlowestBatchNanos), // last_slowest + age(status.Checkpoints.LastCheckpoint), // last_checkpoint + tree.NewDInt(tree.DInt(status.Checkpoints.Count)), // checkpoints + tree.NewDInt(tree.DInt(status.Purgatory.CurrentCount)), // retry_size + age(status.Checkpoints.Resolved), // resolved_age ); err != nil { return err } diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog b/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog index d1ba7e37c1a1..9c51c1b13703 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog @@ -400,7 +400,7 @@ SELECT id, strip_volatile(descriptor) FROM crdb_internal.kv_catalog_descriptor O 4294967185 {"table": {"columns": [{"id": 1, "name": "grantee", "type": {"family": "StringFamily", "oid": 25}}, {"id": 2, "name": "role_name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "is_grantable", "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967185, "name": "administrable_role_authorizations", "nextColumnId": 4, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967186, "version": "1"}} 4294967186 {"schema": {"defaultPrivileges": {"type": "SCHEMA"}, "id": 4294967186, "name": "information_schema", "privileges": {"ownerProto": "node", "users": [{"privileges": "512", "userProto": "public"}], "version": 3}, "version": "1"}} 4294967187 {"table": {"columns": [{"id": 1, "name": "object_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "schema_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 3, "name": "database_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 4, "name": "object_name", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 5, "name": "schema_name", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 6, "name": "database_name", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 7, "name": "fq_name", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967187, "name": "fully_qualified_names", "nextColumnId": 8, "nextConstraintId": 1, "nextMutationId": 1, "primaryIndex": {"foreignKey": {}, "geoConfig": {}, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1", "viewQuery": "SELECT t.id, sc.id, db.id, t.name, sc.name, db.name, (((quote_ident(db.name) || '.') || quote_ident(sc.name)) || '.') || quote_ident(t.name) FROM system.namespace AS t JOIN system.namespace AS sc ON t.\"parentSchemaID\" = sc.id JOIN system.namespace AS db ON t.\"parentID\" = db.id WHERE (db.\"parentID\" = 0) AND pg_catalog.has_database_privilege(db.name, 'CONNECT')"}} -4294967188 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "recv_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 4, "name": "last_recv_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 5, "name": "flush_count", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 6, "name": "flush_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 7, "name": "flush_kvs", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 8, "name": "flush_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 9, "name": "flush_batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 10, "name": "last_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 11, "name": "last_kvs", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 12, "name": "last_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 13, "name": "last_slowest", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 14, "name": "cur_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 15, "name": "cur_kvs_done", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 16, "name": "cur_kvs_todo", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 17, "name": "cur_batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 18, "name": "cur_slowest", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967188, "name": "logical_replication_node_processors", "nextColumnId": 19, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} +4294967188 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "state", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "recv_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 5, "name": "last_recv_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 6, "name": "ingest_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 7, "name": "flush_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 8, "name": "flush_count", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 9, "name": "flush_kvs", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 10, "name": "flush_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 11, "name": "flush_batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 12, "name": "last_flush_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 13, "name": "last_kvs_done", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 14, "name": "last_kvs_todo", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 15, "name": "last_batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 16, "name": "last_slowest", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 17, "name": "last_checkpoint", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 18, "name": "checkpoints", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 19, "name": "retry_size", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 20, "name": "resolved_age", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967188, "name": "logical_replication_node_processors", "nextColumnId": 21, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967189 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "span_start", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "span_end", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 5, "name": "resolved", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 6, "name": "resolved_age", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967189, "name": "cluster_replication_node_stream_checkpoints", "nextColumnId": 7, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967190 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "span_start", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "span_end", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967190, "name": "cluster_replication_node_stream_spans", "nextColumnId": 5, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967191 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "spans", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 4, "name": "initial_ts", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 5, "name": "prev_ts", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 6, "name": "batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 7, "name": "checkpoints", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 8, "name": "megabytes", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}, {"id": 9, "name": "last_checkpoint", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 10, "name": "produce_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 11, "name": "emit_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 12, "name": "last_produce_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 13, "name": "last_emit_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 14, "name": "rf_checkpoints", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 15, "name": "rf_advances", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 16, "name": "rf_last_advance", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 17, "name": "rf_resolved", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 18, "name": "rf_resolved_age", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967191, "name": "cluster_replication_node_streams", "nextColumnId": 19, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} diff --git a/pkg/sql/logictest/testdata/logic_test/synthetic_privileges b/pkg/sql/logictest/testdata/logic_test/synthetic_privileges index 1da2f8d0be71..b01a2c61ee11 100644 --- a/pkg/sql/logictest/testdata/logic_test/synthetic_privileges +++ b/pkg/sql/logictest/testdata/logic_test/synthetic_privileges @@ -291,7 +291,7 @@ CREATE USER testuser4 statement ok REVOKE SELECT ON crdb_internal.tables FROM public -query B +query B retry SELECT has_table_privilege('testuser4', 'crdb_internal.tables', 'SELECT') ---- false