diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index a0fc506ddfab..230ac0c9a9c6 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -830,6 +830,7 @@
APPLICATION | changefeed.sink_batch_hist_nanos | Time spent batched in the sink buffer before being flushed and acknowledged | Changefeeds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | changefeed.sink_io_inflight | The number of keys currently inflight as IO requests being sent to the sink | Messages | GAUGE | COUNT | AVG | NONE |
APPLICATION | changefeed.size_based_flushes | Total size based flushes across all feeds | Flushes | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | changefeed.total_ranges | The total number of ranges being watched by changefeed aggregators | Ranges | GAUGE | COUNT | AVG | NONE |
APPLICATION | changefeed.usage.error_count | Count of errors encountered while generating usage metrics for changefeeds | Errors | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | changefeed.usage.query_duration | Time taken by the queries used to generate usage metrics for changefeeds | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | changefeed.usage.table_bytes | Aggregated number of bytes of data per table watched by changefeeds | Storage | GAUGE | BYTES | AVG | NONE |
diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go
index 63c35daf27f8..4141ae7bfd8d 100644
--- a/pkg/ccl/changefeedccl/changefeed_test.go
+++ b/pkg/ccl/changefeedccl/changefeed_test.go
@@ -1364,6 +1364,60 @@ func TestChangefeedLaggingRangesMetrics(t *testing.T) {
cdcTest(t, testFn, feedTestNoTenants, feedTestEnterpriseSinks)
}
+func TestChangefeedTotalRangesMetric(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
+ registry := s.Server.JobRegistry().(*jobs.Registry)
+ metrics := registry.MetricsStruct().Changefeed.(*Metrics)
+ defaultSLI, err := metrics.getSLIMetrics(defaultSLIScope)
+ require.NoError(t, err)
+ totalRanges := defaultSLI.TotalRanges
+
+ // Total ranges should start at zero.
+ require.Zero(t, totalRanges.Value())
+
+ assertTotalRanges := func(expected int64) {
+ testutils.SucceedsSoon(t, func() error {
+ if actual := totalRanges.Value(); expected != actual {
+ return errors.Newf("expected total ranges to be %d, but got %d", expected, actual)
+ }
+ return nil
+ })
+ }
+
+ sqlDB := sqlutils.MakeSQLRunner(s.DB)
+ sqlDB.Exec(t, "CREATE TABLE foo (x int)")
+
+ // We expect one range after creating a changefeed on a single table.
+ fooFeed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH lagging_ranges_polling_interval='1s'")
+ assertTotalRanges(1)
+
+ // We expect total ranges to be zero again after pausing the changefeed.
+ require.NoError(t, fooFeed.(cdctest.EnterpriseTestFeed).Pause())
+ assertTotalRanges(0)
+
+ // We once again expect one range after resuming the changefeed.
+ require.NoError(t, fooFeed.(cdctest.EnterpriseTestFeed).Resume())
+ assertTotalRanges(1)
+
+ // We expect two ranges after starting another changefeed on a single table.
+ barFeed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH lagging_ranges_polling_interval='1s'")
+ assertTotalRanges(2)
+
+ // We expect there to still be one range after cancelling one of the changefeeds.
+ require.NoError(t, fooFeed.Close())
+ assertTotalRanges(1)
+
+ // We expect there to be no ranges left after cancelling the other changefeed.
+ require.NoError(t, barFeed.Close())
+ assertTotalRanges(0)
+ }
+
+ cdcTest(t, testFn, feedTestEnterpriseSinks)
+}
+
func TestChangefeedBackfillObservability(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go
index 4080d61f21fc..facbc970b578 100644
--- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go
+++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go
@@ -39,10 +39,10 @@ import (
// the caller with information about the state of the kvfeed.
type MonitoringConfig struct {
// LaggingRangesCallback is called periodically with the number of lagging ranges
- // in the kvfeed.
- LaggingRangesCallback func(int64)
+ // and total ranges watched by the kvfeed.
+ LaggingRangesCallback func(lagging int64, total int64)
// LaggingRangesPollingInterval is how often the kv feed will poll for
- // lagging ranges.
+ // lagging ranges and total ranges.
LaggingRangesPollingInterval time.Duration
// LaggingRangesThreshold is how far behind a range must be to be considered
// lagging.
@@ -176,15 +176,15 @@ func Run(ctx context.Context, cfg Config) error {
func startLaggingRangesObserver(
g ctxgroup.Group,
- updateLaggingRanges func(int64),
+ updateLaggingRanges func(lagging int64, total int64),
pollingInterval time.Duration,
threshold time.Duration,
-) func(fn kvcoord.ForEachRangeFn) {
+) kvcoord.RangeObserver {
return func(fn kvcoord.ForEachRangeFn) {
g.GoCtx(func(ctx context.Context) error {
// Reset metrics on shutdown.
defer func() {
- updateLaggingRanges(0)
+ updateLaggingRanges(0 /* lagging */, 0 /* total */)
}()
var timer timeutil.Timer
@@ -198,9 +198,11 @@ func startLaggingRangesObserver(
case <-timer.C:
timer.Read = true
- count := int64(0)
+ var laggingCount, totalCount int64
thresholdTS := timeutil.Now().Add(-1 * threshold)
err := fn(func(rfCtx kvcoord.RangeFeedContext, feed kvcoord.PartialRangeFeed) error {
+ totalCount += 1
+
// The resolved timestamp of a range determines the timestamp which is caught up to.
// However, during catchup scans, this is not set. For catchup scans, we consider the
// time the partial rangefeed was created to be its resolved ts. Note that a range can
@@ -212,14 +214,14 @@ func startLaggingRangesObserver(
}
if ts.Less(hlc.Timestamp{WallTime: thresholdTS.UnixNano()}) {
- count += 1
+ laggingCount += 1
}
return nil
})
if err != nil {
return err
}
- updateLaggingRanges(count)
+ updateLaggingRanges(laggingCount, totalCount)
timer.Reset(pollingInterval)
}
}
@@ -251,7 +253,7 @@ type kvFeed struct {
codec keys.SQLCodec
onBackfillCallback func() func()
- rangeObserver func(fn kvcoord.ForEachRangeFn)
+ rangeObserver kvcoord.RangeObserver
schemaChangeEvents changefeedbase.SchemaChangeEventClass
schemaChangePolicy changefeedbase.SchemaChangePolicy
diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
index 93b8b5d175ab..4d6f49e0fd4b 100644
--- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
+++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
@@ -33,7 +33,7 @@ type rangeFeedConfig struct {
Spans []kvcoord.SpanTimePair
WithDiff bool
WithFiltering bool
- RangeObserver func(fn kvcoord.ForEachRangeFn)
+ RangeObserver kvcoord.RangeObserver
Knobs TestingKnobs
}
diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go
index 3662e085edb6..e2ed8684e004 100644
--- a/pkg/ccl/changefeedccl/metrics.go
+++ b/pkg/ccl/changefeedccl/metrics.go
@@ -81,6 +81,7 @@ type AggMetrics struct {
AggregatorProgress *aggmetric.AggGauge
CheckpointProgress *aggmetric.AggGauge
LaggingRanges *aggmetric.AggGauge
+ TotalRanges *aggmetric.AggGauge
CloudstorageBufferedBytes *aggmetric.AggGauge
KafkaThrottlingNanos *aggmetric.AggHistogram
@@ -158,6 +159,7 @@ type sliMetrics struct {
AggregatorProgress *aggmetric.Gauge
CheckpointProgress *aggmetric.Gauge
LaggingRanges *aggmetric.Gauge
+ TotalRanges *aggmetric.Gauge
CloudstorageBufferedBytes *aggmetric.Gauge
KafkaThrottlingNanos *aggmetric.Histogram
@@ -937,12 +939,18 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
Measurement: "Unix Timestamp Nanoseconds",
Unit: metric.Unit_TIMESTAMP_NS,
}
- metaLaggingRangePercentage := metric.Metadata{
+ metaLaggingRanges := metric.Metadata{
Name: "changefeed.lagging_ranges",
Help: "The number of ranges considered to be lagging behind",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
+ metaTotalRanges := metric.Metadata{
+ Name: "changefeed.total_ranges",
+ Help: "The total number of ranges being watched by changefeed aggregators",
+ Measurement: "Ranges",
+ Unit: metric.Unit_COUNT,
+ }
metaCloudstorageBufferedBytes := metric.Metadata{
Name: "changefeed.cloudstorage_buffered_bytes",
Help: "The number of bytes buffered in cloudstorage sink files which have not been emitted yet",
@@ -1046,7 +1054,8 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
SchemaRegistrations: b.Counter(metaSchemaRegistryRegistrations),
AggregatorProgress: b.FunctionalGauge(metaAggregatorProgress, functionalGaugeMinFn),
CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn),
- LaggingRanges: b.Gauge(metaLaggingRangePercentage),
+ LaggingRanges: b.Gauge(metaLaggingRanges),
+ TotalRanges: b.Gauge(metaTotalRanges),
CloudstorageBufferedBytes: b.Gauge(metaCloudstorageBufferedBytes),
KafkaThrottlingNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedKafkaThrottlingNanos,
@@ -1121,6 +1130,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
SchemaRegistryRetries: a.SchemaRegistryRetries.AddChild(scope),
SchemaRegistrations: a.SchemaRegistrations.AddChild(scope),
LaggingRanges: a.LaggingRanges.AddChild(scope),
+ TotalRanges: a.TotalRanges.AddChild(scope),
CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope),
KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope),
// TODO(#130358): Again, this doesn't belong here, but it's the most
@@ -1158,7 +1168,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
// getLaggingRangesCallback returns a function which can be called to update the
// lagging ranges metric. It should be called with the current number of lagging
// ranges.
-func (s *sliMetrics) getLaggingRangesCallback() func(int64) {
+func (s *sliMetrics) getLaggingRangesCallback() func(lagging int64, total int64) {
// Because this gauge is shared between changefeeds in the same metrics scope,
// we must instead modify it using `Inc` and `Dec` (as opposed to `Update`) to
// ensure values written by others are not overwritten. The code below is used
@@ -1175,13 +1185,18 @@ func (s *sliMetrics) getLaggingRangesCallback() func(int64) {
// If 1 lagging range is deleted, last=7,i=10: X.Dec(11-10) = X.Dec(1)
last := struct {
syncutil.Mutex
- v int64
+ lagging int64
+ total int64
}{}
- return func(i int64) {
+ return func(lagging int64, total int64) {
last.Lock()
defer last.Unlock()
- s.LaggingRanges.Dec(last.v - i)
- last.v = i
+
+ s.LaggingRanges.Dec(last.lagging - lagging)
+ last.lagging = lagging
+
+ s.TotalRanges.Dec(last.total - total)
+ last.total = total
}
}
diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
index eb27238a22b1..32481a029679 100644
--- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
+++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
@@ -63,13 +63,17 @@ var catchupStartupRate = settings.RegisterIntSetting(
// ForEachRangeFn is used to execute `fn` over each range in a rangefeed.
type ForEachRangeFn func(fn ActiveRangeFeedIterFn) error
+// A RangeObserver is a function that observes the ranges in a rangefeed
+// by polling fn.
+type RangeObserver func(fn ForEachRangeFn)
+
type rangeFeedConfig struct {
overSystemTable bool
withDiff bool
withFiltering bool
withMetadata bool
withMatchingOriginIDs []uint32
- rangeObserver func(ForEachRangeFn)
+ rangeObserver RangeObserver
knobs struct {
// onRangefeedEvent invoked on each rangefeed event.
@@ -129,7 +133,7 @@ func WithMatchingOriginIDs(originIDs ...uint32) RangeFeedOption {
// WithRangeObserver is called when the rangefeed starts with a function that
// can be used to iterate over all the ranges.
-func WithRangeObserver(observer func(ForEachRangeFn)) RangeFeedOption {
+func WithRangeObserver(observer RangeObserver) RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.rangeObserver = observer
})