Skip to content

Commit

Permalink
tablemetadatacache: add metrics to tmj
Browse files Browse the repository at this point in the history
Adds additional metrics to the update table metadata
job:
  * UpdatedTables - The total number of table rows
                    written to system.table_metadata
  * Errors        - The total number of errors emitted
                    from job runs
  * Duration      - The time spent executing the job

Part of: #130249
Epic: CRDB-37558
Release note: None
  • Loading branch information
kyle-a-wong committed Sep 30, 2024
1 parent 9d75b5b commit 1a7b2c8
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 49 deletions.
3 changes: 3 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,10 @@
<tr><td>APPLICATION</td><td>logical_replication.replicated_time_seconds</td><td>The replicated time of the logical replication stream in seconds since the unix epoch.</td><td>Seconds</td><td>GAUGE</td><td>SECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.retry_queue_bytes</td><td>The replicated time of the logical replication stream in seconds since the unix epoch.</td><td>Bytes</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.retry_queue_events</td><td>The replicated time of the logical replication stream in seconds since the unix epoch.</td><td>Events</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>obs.tablemetadata.update_job.duration</td><td>Time spent running the update table metadata job.</td><td>Duration</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>obs.tablemetadata.update_job.errors</td><td>The total number of errors that have been emitted from the update table metadata job.</td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>obs.tablemetadata.update_job.runs</td><td>The total number of runs of the update table metadata job.</td><td>Executions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>obs.tablemetadata.update_job.table_updates</td><td>The total number of rows that have been updated in system.table_metadata</td><td>Rows Updated</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>physical_replication.admit_latency</td><td>Event admission latency: a difference between event MVCC timestamp and the time it was admitted into ingestion processor</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>physical_replication.commit_latency</td><td>Event commit latency: a difference between event MVCC timestamp and the time it was flushed into disk. If we batch events, then the difference between the oldest event in the batch and flush is recorded</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>physical_replication.cutover_progress</td><td>The number of ranges left to revert in order to complete an inflight cutover</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/tablemetadatacache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/roachpb",
Expand Down
18 changes: 13 additions & 5 deletions pkg/sql/tablemetadatacache/table_metadata_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func newTableMetadataUpdater(ie isql.Executor) *tableMetadataUpdater {
// updateCache performs a full update of the system.table_metadata, returning
// the number of rows updated and the last error encountered.
func (u *tableMetadataUpdater) updateCache(
ctx context.Context, onUpdate onBatchUpdateCallback,
ctx context.Context, onUpdate onBatchUpdateCallback, onErr func(ctx context.Context, e error),
) (updated int, err error) {
it := newTableMetadataBatchIterator(u.ie)
estimatedRowsToUpdate, err := u.getRowsToUpdateCount(ctx)
Expand All @@ -57,6 +57,9 @@ func (u *tableMetadataUpdater) updateCache(
// https://github.com/cockroachdb/cockroach/issues/130040. For now,
// we can't continue because the page key is in an invalid state.
log.Errorf(ctx, "failed to fetch next batch: %s", err.Error())
if onErr != nil {
onErr(ctx, err)
}
return updated, err
}

Expand All @@ -69,6 +72,9 @@ func (u *tableMetadataUpdater) updateCache(
if err != nil {
// If an upsert fails, let's just continue to the next batch for now.
log.Errorf(ctx, "failed to upsert batch of size: %d, err: %s", len(it.batchRows), err.Error())
if onErr != nil {
onErr(ctx, err)
}
continue
}

Expand Down Expand Up @@ -180,7 +186,11 @@ func newTableMetadataBatchUpsertQuery(batchLen int) *tableMetadataBatchUpsertQue
// another batch.
func (q *tableMetadataBatchUpsertQuery) resetForBatch() {
q.stmt.Reset()
q.stmt.WriteString(`
q.stmt.WriteString(upsertQuery)
q.args = q.args[:0]
}

var upsertQuery = `
UPSERT INTO system.table_metadata (
db_id,
table_id,
Expand All @@ -198,9 +208,7 @@ UPSERT INTO system.table_metadata (
perc_live_data,
last_updated
) VALUES
`)
q.args = q.args[:0]
}
`

// addRow adds a tableMetadataIterRow to the batch upsert query.
func (q *tableMetadataBatchUpsertQuery) addRow(row *tableMetadataIterRow) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/tablemetadatacache/table_metadata_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestDataDrivenTableMetadataCacheUpdater(t *testing.T) {
return res
case "update-cache":
updater := newTableMetadataUpdater(s.InternalExecutor().(isql.Executor))
updated, err := updater.updateCache(ctx, nil)
updated, err := updater.updateCache(ctx, nil, nil)
if err != nil {
return err.Error()
}
Expand Down
60 changes: 50 additions & 10 deletions pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"math"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -35,21 +36,25 @@ const (

// updateJobExecFn specifies the function that is run on each iteration of the
// table metadata update job. It can be overriden in tests.
var updateJobExecFn func(context.Context, isql.Executor, *tableMetadataUpdateJobResumer) error = updateTableMetadataCache
var updateJobExecFn func(context.Context, isql.Executor, *tableMetadataUpdateJobResumer) (int, error) = updateTableMetadataCache

type tableMetadataUpdateJobResumer struct {
job *jobs.Job
job *jobs.Job
metrics *TableMetadataUpdateJobMetrics
}

var _ jobs.Resumer = (*tableMetadataUpdateJobResumer)(nil)

// Resume is part of the jobs.Resumer interface.
func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI interface{}) error {
// This job is a forever running background job, and it is always safe to
// terminate the SQL pod whenever the job is running, so mark it as idle.
j.job.MarkIdle(true)

execCtx := execCtxI.(sql.JobExecContext)
metrics := execCtx.ExecCfg().JobRegistry.MetricsStruct().
JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(TableMetadataUpdateJobMetrics)
j.metrics = &metrics
var onJobStartKnob, onJobCompleteKnob, onJobReady func()
if execCtx.ExecCfg().TableMetadataKnobs != nil {
onJobStartKnob = execCtx.ExecCfg().TableMetadataKnobs.OnJobStart
Expand Down Expand Up @@ -111,15 +116,22 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int
onJobStartKnob()
}
// Run table metadata update job.
metrics.NumRuns.Inc(1)
j.metrics.NumRuns.Inc(1)
sw := timeutil.NewStopWatch()
sw.Start()
j.markAsRunning(ctx)
if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor(), j); err != nil {
rowsUpdated, err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor(), j)
if err != nil {
log.Errorf(ctx, "error running table metadata update job: %s", err)
j.metrics.Errors.Inc(1)
}
j.markAsCompleted(ctx)
if onJobCompleteKnob != nil {
onJobCompleteKnob()
}
sw.Stop()
j.metrics.Duration.RecordValue(sw.Elapsed().Nanoseconds())
j.metrics.UpdatedTables.Inc(int64(rowsUpdated))
}
}

Expand Down Expand Up @@ -173,17 +185,18 @@ func (j *tableMetadataUpdateJobResumer) markAsCompleted(ctx context.Context) {
// metadata from the system.namespace, system.descriptor tables and table span stats RPC.
func updateTableMetadataCache(
ctx context.Context, ie isql.Executor, resumer *tableMetadataUpdateJobResumer,
) error {
) (int, error) {
updater := newTableMetadataUpdater(ie)
if _, err := updater.pruneCache(ctx); err != nil {
log.Errorf(ctx, "failed to prune table metadata cache: %s", err.Error())
}

resumer.updateProgress(ctx, .01)
// We'll use the updated ret val in a follow-up to update metrics and
// fractional job progress.
_, err := updater.updateCache(ctx, resumer.onBatchUpdate())
return err
return updater.updateCache(ctx, resumer.onBatchUpdate(), func(ctx context.Context, e error) {
if resumer.metrics != nil {
resumer.metrics.Errors.Inc(1)
}
})
}

// onBatchUpdate returns an onBatchUpdateCallback func that updates the progress of the job.
Expand Down Expand Up @@ -226,7 +239,10 @@ func (j *tableMetadataUpdateJobResumer) CollectProfile(
}

type TableMetadataUpdateJobMetrics struct {
NumRuns *metric.Counter
NumRuns *metric.Counter
UpdatedTables *metric.Counter
Errors *metric.Counter
Duration metric.IHistogram
}

func (m TableMetadataUpdateJobMetrics) MetricStruct() {}
Expand All @@ -240,6 +256,30 @@ func newTableMetadataUpdateJobMetrics() metric.Struct {
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_COUNTER,
}),
UpdatedTables: metric.NewCounter(metric.Metadata{
Name: "obs.tablemetadata.update_job.table_updates",
Help: "The total number of rows that have been updated in system.table_metadata",
Measurement: "Rows Updated",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_COUNTER,
}),
Errors: metric.NewCounter(metric.Metadata{
Name: "obs.tablemetadata.update_job.errors",
Help: "The total number of errors that have been emitted from the update table metadata job.",
Measurement: "Errors",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_COUNTER,
}),
Duration: metric.NewHistogram(metric.HistogramOptions{
Metadata: metric.Metadata{
Name: "obs.tablemetadata.update_job.duration",
Help: "Time spent running the update table metadata job.",
Measurement: "Duration",
Unit: metric.Unit_NANOSECONDS},
Duration: base.DefaultHistogramWindowInterval(),
BucketConfig: metric.IOLatencyBuckets,
Mode: metric.HistogramModePrometheus,
}),
}
}

Expand Down
89 changes: 56 additions & 33 deletions pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,34 @@ func TestUpdateTableMetadataCacheJobRunsOnRPCTrigger(t *testing.T) {

ctx := context.Background()
tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{})
defer tc.Stopper().Stop(context.Background())

defer tc.Stopper().Stop(context.Background())
metrics := tc.Server(0).JobRegistry().(*jobs.Registry).MetricsStruct().
JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(TableMetadataUpdateJobMetrics)
conn := sqlutils.MakeSQLRunner(tc.ServerConn(0))
testJobComplete := func(runNum int64) {
testutils.SucceedsSoon(t, func() error {
if metrics.NumRuns.Count() != runNum {
return errors.New("job hasn't run yet")
}
row := conn.Query(t,
`SELECT running_status, fraction_completed FROM crdb_internal.jobs WHERE job_id = $1 AND running_status IS NOT NULL`,
jobs.UpdateTableMetadataCacheJobID)
if !row.Next() {
return errors.New("last_run_time not updated")
}
var runningStatus string
var fractionCompleted float32
require.NoError(t, row.Scan(&runningStatus, &fractionCompleted))
if !strings.Contains(runningStatus, "Job completed at") {
return errors.New("running_status not updated")
}
if fractionCompleted != 1.0 {
return errors.New("fraction_completed not updated")
}
return nil
})
}

// Get the node id that claimed the update job. We'll issue the
// RPC to a node that doesn't own the job to test that the RPC can
Expand All @@ -60,41 +85,39 @@ WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJ
if !row.Next() {
return errors.New("no node has claimed the job")
}
require.NoError(t, row.Scan(&nodeID))

rpcGatewayNode := (nodeID + 1) % 3
_, err := tc.Server(rpcGatewayNode).GetStatusClient(t).UpdateTableMetadataCache(ctx,
&serverpb.UpdateTableMetadataCacheRequest{Local: false})
if err != nil {
if err := row.Scan(&nodeID); err != nil {
return err
}
// The job shouldn't be busy.
return nil
})

metrics := tc.Server(0).JobRegistry().(*jobs.Registry).MetricsStruct().
JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(TableMetadataUpdateJobMetrics)
testutils.SucceedsSoon(t, func() error {
if metrics.NumRuns.Count() != 1 {
return errors.New("job hasn't run yet")
}
row := conn.Query(t,
`SELECT running_status, fraction_completed FROM crdb_internal.jobs WHERE job_id = $1 AND running_status IS NOT NULL`,
jobs.UpdateTableMetadataCacheJobID)
if !row.Next() {
return errors.New("last_run_time not updated")
}
var runningStatus string
var fractionCompleted float32
require.NoError(t, row.Scan(&runningStatus, &fractionCompleted))
if !strings.Contains(runningStatus, "Job completed at") {
return errors.New("running_status not updated")
}
if fractionCompleted != 1.0 {
return errors.New("fraction_completed not updated")
}
return nil
})
rpcGatewayNode := (nodeID + 1) % 3
_, err := tc.Server(rpcGatewayNode).GetStatusClient(t).UpdateTableMetadataCache(ctx,
&serverpb.UpdateTableMetadataCacheRequest{Local: false})
require.NoError(t, err)
testJobComplete(1)
count := metrics.UpdatedTables.Count()
meanDuration := metrics.Duration.CumulativeSnapshot().Mean()
require.Greater(t, count, int64(0))
require.Greater(t, meanDuration, float64(0))

_, err = tc.Server(rpcGatewayNode).GetStatusClient(t).UpdateTableMetadataCache(ctx,
&serverpb.UpdateTableMetadataCacheRequest{Local: false})
require.NoError(t, err)
testJobComplete(2)
require.Greater(t, metrics.UpdatedTables.Count(), count)
require.NotEqual(t, metrics.Duration.CumulativeSnapshot().Mean(), meanDuration)
meanDuration = metrics.Duration.CumulativeSnapshot().Mean()
count = metrics.UpdatedTables.Count()

defer testutils.TestingHook(&upsertQuery, "UPSERT INTO error")()
_, err = tc.Server(rpcGatewayNode).GetStatusClient(t).UpdateTableMetadataCache(ctx,
&serverpb.UpdateTableMetadataCacheRequest{Local: false})
require.NoError(t, err)
testJobComplete(3)
require.Equal(t, count, metrics.UpdatedTables.Count())
require.NotEqual(t, metrics.Duration.CumulativeSnapshot().Mean(), meanDuration)

}

func TestUpdateTableMetadataCacheProgressUpdate(t *testing.T) {
Expand Down Expand Up @@ -172,15 +195,15 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) {
mockMutex := syncutil.RWMutex{}
jobRunCh := make(chan struct{})
restoreUpdate := testutils.TestingHook(&updateJobExecFn,
func(ctx context.Context, ie isql.Executor, resumer *tableMetadataUpdateJobResumer) error {
func(ctx context.Context, ie isql.Executor, resumer *tableMetadataUpdateJobResumer) (int, error) {
mockMutex.Lock()
defer mockMutex.Unlock()
mockCalls = append(mockCalls, timeutil.Now())
select {
case jobRunCh <- struct{}{}:
default:
}
return nil
return 1, nil
})
defer restoreUpdate()

Expand Down

0 comments on commit 1a7b2c8

Please sign in to comment.