diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index 3f0d347fa423..b652026dd461 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -1458,7 +1458,10 @@
APPLICATION | logical_replication.replicated_time_seconds | The replicated time of the logical replication stream in seconds since the unix epoch. | Seconds | GAUGE | SECONDS | AVG | NONE |
APPLICATION | logical_replication.retry_queue_bytes | The replicated time of the logical replication stream in seconds since the unix epoch. | Bytes | GAUGE | BYTES | AVG | NONE |
APPLICATION | logical_replication.retry_queue_events | The replicated time of the logical replication stream in seconds since the unix epoch. | Events | GAUGE | COUNT | AVG | NONE |
+APPLICATION | obs.tablemetadata.update_job.duration | Time spent running the update table metadata job. | Duration | HISTOGRAM | NANOSECONDS | AVG | NONE |
+APPLICATION | obs.tablemetadata.update_job.errors | The total number of errors that have been emitted from the update table metadata job. | Errors | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | obs.tablemetadata.update_job.runs | The total number of runs of the update table metadata job. | Executions | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | obs.tablemetadata.update_job.table_updates | The total number of rows that have been updated in system.table_metadata | Rows Updated | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | physical_replication.admit_latency | Event admission latency: a difference between event MVCC timestamp and the time it was admitted into ingestion processor | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | physical_replication.commit_latency | Event commit latency: a difference between event MVCC timestamp and the time it was flushed into disk. If we batch events, then the difference between the oldest event in the batch and flush is recorded | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | physical_replication.cutover_progress | The number of ranges left to revert in order to complete an inflight cutover | Ranges | GAUGE | COUNT | AVG | NONE |
diff --git a/pkg/sql/tablemetadatacache/BUILD.bazel b/pkg/sql/tablemetadatacache/BUILD.bazel
index c6b78f6c4809..aa08429cea9c 100644
--- a/pkg/sql/tablemetadatacache/BUILD.bazel
+++ b/pkg/sql/tablemetadatacache/BUILD.bazel
@@ -11,6 +11,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache",
visibility = ["//visibility:public"],
deps = [
+ "//pkg/base",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/roachpb",
diff --git a/pkg/sql/tablemetadatacache/table_metadata_updater.go b/pkg/sql/tablemetadatacache/table_metadata_updater.go
index dd8c3ee1a491..40a52717da57 100644
--- a/pkg/sql/tablemetadatacache/table_metadata_updater.go
+++ b/pkg/sql/tablemetadatacache/table_metadata_updater.go
@@ -43,7 +43,7 @@ func newTableMetadataUpdater(ie isql.Executor) *tableMetadataUpdater {
// updateCache performs a full update of the system.table_metadata, returning
// the number of rows updated and the last error encountered.
func (u *tableMetadataUpdater) updateCache(
- ctx context.Context, onUpdate onBatchUpdateCallback,
+ ctx context.Context, onUpdate onBatchUpdateCallback, onErr func(ctx context.Context, e error),
) (updated int, err error) {
it := newTableMetadataBatchIterator(u.ie)
estimatedRowsToUpdate, err := u.getRowsToUpdateCount(ctx)
@@ -57,6 +57,9 @@ func (u *tableMetadataUpdater) updateCache(
// https://github.com/cockroachdb/cockroach/issues/130040. For now,
// we can't continue because the page key is in an invalid state.
log.Errorf(ctx, "failed to fetch next batch: %s", err.Error())
+ if onErr != nil {
+ onErr(ctx, err)
+ }
return updated, err
}
@@ -69,6 +72,9 @@ func (u *tableMetadataUpdater) updateCache(
if err != nil {
// If an upsert fails, let's just continue to the next batch for now.
log.Errorf(ctx, "failed to upsert batch of size: %d, err: %s", len(it.batchRows), err.Error())
+ if onErr != nil {
+ onErr(ctx, err)
+ }
continue
}
@@ -180,7 +186,11 @@ func newTableMetadataBatchUpsertQuery(batchLen int) *tableMetadataBatchUpsertQue
// another batch.
func (q *tableMetadataBatchUpsertQuery) resetForBatch() {
q.stmt.Reset()
- q.stmt.WriteString(`
+ q.stmt.WriteString(upsertQuery)
+ q.args = q.args[:0]
+}
+
+var upsertQuery = `
UPSERT INTO system.table_metadata (
db_id,
table_id,
@@ -198,9 +208,7 @@ UPSERT INTO system.table_metadata (
perc_live_data,
last_updated
) VALUES
-`)
- q.args = q.args[:0]
-}
+`
// addRow adds a tableMetadataIterRow to the batch upsert query.
func (q *tableMetadataBatchUpsertQuery) addRow(row *tableMetadataIterRow) error {
diff --git a/pkg/sql/tablemetadatacache/table_metadata_updater_test.go b/pkg/sql/tablemetadatacache/table_metadata_updater_test.go
index a8f6fcca8975..c4df5be50299 100644
--- a/pkg/sql/tablemetadatacache/table_metadata_updater_test.go
+++ b/pkg/sql/tablemetadatacache/table_metadata_updater_test.go
@@ -55,7 +55,7 @@ func TestDataDrivenTableMetadataCacheUpdater(t *testing.T) {
return res
case "update-cache":
updater := newTableMetadataUpdater(s.InternalExecutor().(isql.Executor))
- updated, err := updater.updateCache(ctx, nil)
+ updated, err := updater.updateCache(ctx, nil, nil)
if err != nil {
return err.Error()
}
diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go
index 8413e5ad195e..7b6f0c4301ee 100644
--- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go
+++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go
@@ -15,6 +15,7 @@ import (
"fmt"
"math"
+ "github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
@@ -35,21 +36,25 @@ const (
// updateJobExecFn specifies the function that is run on each iteration of the
// table metadata update job. It can be overriden in tests.
-var updateJobExecFn func(context.Context, isql.Executor, *tableMetadataUpdateJobResumer) error = updateTableMetadataCache
+var updateJobExecFn func(context.Context, isql.Executor, *tableMetadataUpdateJobResumer) (int, error) = updateTableMetadataCache
type tableMetadataUpdateJobResumer struct {
- job *jobs.Job
+ job *jobs.Job
+ metrics *TableMetadataUpdateJobMetrics
}
var _ jobs.Resumer = (*tableMetadataUpdateJobResumer)(nil)
// Resume is part of the jobs.Resumer interface.
func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI interface{}) error {
+ // This job is a forever running background job, and it is always safe to
+ // terminate the SQL pod whenever the job is running, so mark it as idle.
j.job.MarkIdle(true)
execCtx := execCtxI.(sql.JobExecContext)
metrics := execCtx.ExecCfg().JobRegistry.MetricsStruct().
JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(TableMetadataUpdateJobMetrics)
+ j.metrics = &metrics
var onJobStartKnob, onJobCompleteKnob, onJobReady func()
if execCtx.ExecCfg().TableMetadataKnobs != nil {
onJobStartKnob = execCtx.ExecCfg().TableMetadataKnobs.OnJobStart
@@ -111,15 +116,22 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int
onJobStartKnob()
}
// Run table metadata update job.
- metrics.NumRuns.Inc(1)
+ j.metrics.NumRuns.Inc(1)
+ sw := timeutil.NewStopWatch()
+ sw.Start()
j.markAsRunning(ctx)
- if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor(), j); err != nil {
+ rowsUpdated, err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor(), j)
+ if err != nil {
log.Errorf(ctx, "error running table metadata update job: %s", err)
+ j.metrics.Errors.Inc(1)
}
j.markAsCompleted(ctx)
if onJobCompleteKnob != nil {
onJobCompleteKnob()
}
+ sw.Stop()
+ j.metrics.Duration.RecordValue(sw.Elapsed().Nanoseconds())
+ j.metrics.UpdatedTables.Inc(int64(rowsUpdated))
}
}
@@ -173,17 +185,18 @@ func (j *tableMetadataUpdateJobResumer) markAsCompleted(ctx context.Context) {
// metadata from the system.namespace, system.descriptor tables and table span stats RPC.
func updateTableMetadataCache(
ctx context.Context, ie isql.Executor, resumer *tableMetadataUpdateJobResumer,
-) error {
+) (int, error) {
updater := newTableMetadataUpdater(ie)
if _, err := updater.pruneCache(ctx); err != nil {
log.Errorf(ctx, "failed to prune table metadata cache: %s", err.Error())
}
resumer.updateProgress(ctx, .01)
- // We'll use the updated ret val in a follow-up to update metrics and
- // fractional job progress.
- _, err := updater.updateCache(ctx, resumer.onBatchUpdate())
- return err
+ return updater.updateCache(ctx, resumer.onBatchUpdate(), func(ctx context.Context, e error) {
+ if resumer.metrics != nil {
+ resumer.metrics.Errors.Inc(1)
+ }
+ })
}
// onBatchUpdate returns an onBatchUpdateCallback func that updates the progress of the job.
@@ -226,7 +239,10 @@ func (j *tableMetadataUpdateJobResumer) CollectProfile(
}
type TableMetadataUpdateJobMetrics struct {
- NumRuns *metric.Counter
+ NumRuns *metric.Counter
+ UpdatedTables *metric.Counter
+ Errors *metric.Counter
+ Duration metric.IHistogram
}
func (m TableMetadataUpdateJobMetrics) MetricStruct() {}
@@ -240,6 +256,30 @@ func newTableMetadataUpdateJobMetrics() metric.Struct {
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_COUNTER,
}),
+ UpdatedTables: metric.NewCounter(metric.Metadata{
+ Name: "obs.tablemetadata.update_job.table_updates",
+ Help: "The total number of rows that have been updated in system.table_metadata",
+ Measurement: "Rows Updated",
+ Unit: metric.Unit_COUNT,
+ MetricType: io_prometheus_client.MetricType_COUNTER,
+ }),
+ Errors: metric.NewCounter(metric.Metadata{
+ Name: "obs.tablemetadata.update_job.errors",
+ Help: "The total number of errors that have been emitted from the update table metadata job.",
+ Measurement: "Errors",
+ Unit: metric.Unit_COUNT,
+ MetricType: io_prometheus_client.MetricType_COUNTER,
+ }),
+ Duration: metric.NewHistogram(metric.HistogramOptions{
+ Metadata: metric.Metadata{
+ Name: "obs.tablemetadata.update_job.duration",
+ Help: "Time spent running the update table metadata job.",
+ Measurement: "Duration",
+ Unit: metric.Unit_NANOSECONDS},
+ Duration: base.DefaultHistogramWindowInterval(),
+ BucketConfig: metric.IOLatencyBuckets,
+ Mode: metric.HistogramModePrometheus,
+ }),
}
}
diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go
index 022f9e9bfe24..9f3a150b31f0 100644
--- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go
+++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go
@@ -45,9 +45,34 @@ func TestUpdateTableMetadataCacheJobRunsOnRPCTrigger(t *testing.T) {
ctx := context.Background()
tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{})
- defer tc.Stopper().Stop(context.Background())
+ defer tc.Stopper().Stop(context.Background())
+ metrics := tc.Server(0).JobRegistry().(*jobs.Registry).MetricsStruct().
+ JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(TableMetadataUpdateJobMetrics)
conn := sqlutils.MakeSQLRunner(tc.ServerConn(0))
+ testJobComplete := func(runNum int64) {
+ testutils.SucceedsSoon(t, func() error {
+ if metrics.NumRuns.Count() != runNum {
+ return errors.New("job hasn't run yet")
+ }
+ row := conn.Query(t,
+ `SELECT running_status, fraction_completed FROM crdb_internal.jobs WHERE job_id = $1 AND running_status IS NOT NULL`,
+ jobs.UpdateTableMetadataCacheJobID)
+ if !row.Next() {
+ return errors.New("last_run_time not updated")
+ }
+ var runningStatus string
+ var fractionCompleted float32
+ require.NoError(t, row.Scan(&runningStatus, &fractionCompleted))
+ if !strings.Contains(runningStatus, "Job completed at") {
+ return errors.New("running_status not updated")
+ }
+ if fractionCompleted != 1.0 {
+ return errors.New("fraction_completed not updated")
+ }
+ return nil
+ })
+ }
// Get the node id that claimed the update job. We'll issue the
// RPC to a node that doesn't own the job to test that the RPC can
@@ -60,41 +85,39 @@ WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJ
if !row.Next() {
return errors.New("no node has claimed the job")
}
- require.NoError(t, row.Scan(&nodeID))
-
- rpcGatewayNode := (nodeID + 1) % 3
- _, err := tc.Server(rpcGatewayNode).GetStatusClient(t).UpdateTableMetadataCache(ctx,
- &serverpb.UpdateTableMetadataCacheRequest{Local: false})
- if err != nil {
+ if err := row.Scan(&nodeID); err != nil {
return err
}
- // The job shouldn't be busy.
return nil
})
- metrics := tc.Server(0).JobRegistry().(*jobs.Registry).MetricsStruct().
- JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(TableMetadataUpdateJobMetrics)
- testutils.SucceedsSoon(t, func() error {
- if metrics.NumRuns.Count() != 1 {
- return errors.New("job hasn't run yet")
- }
- row := conn.Query(t,
- `SELECT running_status, fraction_completed FROM crdb_internal.jobs WHERE job_id = $1 AND running_status IS NOT NULL`,
- jobs.UpdateTableMetadataCacheJobID)
- if !row.Next() {
- return errors.New("last_run_time not updated")
- }
- var runningStatus string
- var fractionCompleted float32
- require.NoError(t, row.Scan(&runningStatus, &fractionCompleted))
- if !strings.Contains(runningStatus, "Job completed at") {
- return errors.New("running_status not updated")
- }
- if fractionCompleted != 1.0 {
- return errors.New("fraction_completed not updated")
- }
- return nil
- })
+ rpcGatewayNode := (nodeID + 1) % 3
+ _, err := tc.Server(rpcGatewayNode).GetStatusClient(t).UpdateTableMetadataCache(ctx,
+ &serverpb.UpdateTableMetadataCacheRequest{Local: false})
+ require.NoError(t, err)
+ testJobComplete(1)
+ count := metrics.UpdatedTables.Count()
+ meanDuration := metrics.Duration.CumulativeSnapshot().Mean()
+ require.Greater(t, count, int64(0))
+ require.Greater(t, meanDuration, float64(0))
+
+ _, err = tc.Server(rpcGatewayNode).GetStatusClient(t).UpdateTableMetadataCache(ctx,
+ &serverpb.UpdateTableMetadataCacheRequest{Local: false})
+ require.NoError(t, err)
+ testJobComplete(2)
+ require.Greater(t, metrics.UpdatedTables.Count(), count)
+ require.NotEqual(t, metrics.Duration.CumulativeSnapshot().Mean(), meanDuration)
+ meanDuration = metrics.Duration.CumulativeSnapshot().Mean()
+ count = metrics.UpdatedTables.Count()
+
+ defer testutils.TestingHook(&upsertQuery, "UPSERT INTO error")()
+ _, err = tc.Server(rpcGatewayNode).GetStatusClient(t).UpdateTableMetadataCache(ctx,
+ &serverpb.UpdateTableMetadataCacheRequest{Local: false})
+ require.NoError(t, err)
+ testJobComplete(3)
+ require.Equal(t, count, metrics.UpdatedTables.Count())
+ require.NotEqual(t, metrics.Duration.CumulativeSnapshot().Mean(), meanDuration)
+
}
func TestUpdateTableMetadataCacheProgressUpdate(t *testing.T) {
@@ -172,7 +195,7 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) {
mockMutex := syncutil.RWMutex{}
jobRunCh := make(chan struct{})
restoreUpdate := testutils.TestingHook(&updateJobExecFn,
- func(ctx context.Context, ie isql.Executor, resumer *tableMetadataUpdateJobResumer) error {
+ func(ctx context.Context, ie isql.Executor, resumer *tableMetadataUpdateJobResumer) (int, error) {
mockMutex.Lock()
defer mockMutex.Unlock()
mockCalls = append(mockCalls, timeutil.Now())
@@ -180,7 +203,7 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) {
case jobRunCh <- struct{}{}:
default:
}
- return nil
+ return 1, nil
})
defer restoreUpdate()