Skip to content

Commit

Permalink
tablemetadatacache: add metrics to tmj
Browse files Browse the repository at this point in the history
Adds additional metrics to the update table metadata
job:
  * UpdatedTables - The total number of table rows
                    written to system.table_metadata
  * Errors        - The total number of errors emitted
                    from job runs
  * Duration      - The time spent executing the job

Part of: #130249
Epic: CRDB-37558
Release note: None
  • Loading branch information
kyle-a-wong committed Sep 27, 2024
1 parent 9d75b5b commit bc58ef4
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 49 deletions.
1 change: 1 addition & 0 deletions pkg/sql/tablemetadatacache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/roachpb",
Expand Down
18 changes: 13 additions & 5 deletions pkg/sql/tablemetadatacache/table_metadata_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func newTableMetadataUpdater(ie isql.Executor) *tableMetadataUpdater {
// updateCache performs a full update of the system.table_metadata, returning
// the number of rows updated and the last error encountered.
func (u *tableMetadataUpdater) updateCache(
ctx context.Context, onUpdate onBatchUpdateCallback,
ctx context.Context, onUpdate onBatchUpdateCallback, onErr func(ctx context.Context, e error),
) (updated int, err error) {
it := newTableMetadataBatchIterator(u.ie)
estimatedRowsToUpdate, err := u.getRowsToUpdateCount(ctx)
Expand All @@ -57,6 +57,9 @@ func (u *tableMetadataUpdater) updateCache(
// https://github.com/cockroachdb/cockroach/issues/130040. For now,
// we can't continue because the page key is in an invalid state.
log.Errorf(ctx, "failed to fetch next batch: %s", err.Error())
if onErr != nil {
onErr(ctx, err)
}
return updated, err
}

Expand All @@ -69,6 +72,9 @@ func (u *tableMetadataUpdater) updateCache(
if err != nil {
// If an upsert fails, let's just continue to the next batch for now.
log.Errorf(ctx, "failed to upsert batch of size: %d, err: %s", len(it.batchRows), err.Error())
if onErr != nil {
onErr(ctx, err)
}
continue
}

Expand Down Expand Up @@ -180,7 +186,11 @@ func newTableMetadataBatchUpsertQuery(batchLen int) *tableMetadataBatchUpsertQue
// another batch.
func (q *tableMetadataBatchUpsertQuery) resetForBatch() {
q.stmt.Reset()
q.stmt.WriteString(`
q.stmt.WriteString(upsertQuery)
q.args = q.args[:0]
}

var upsertQuery = `
UPSERT INTO system.table_metadata (
db_id,
table_id,
Expand All @@ -198,9 +208,7 @@ UPSERT INTO system.table_metadata (
perc_live_data,
last_updated
) VALUES
`)
q.args = q.args[:0]
}
`

// addRow adds a tableMetadataIterRow to the batch upsert query.
func (q *tableMetadataBatchUpsertQuery) addRow(row *tableMetadataIterRow) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/tablemetadatacache/table_metadata_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestDataDrivenTableMetadataCacheUpdater(t *testing.T) {
return res
case "update-cache":
updater := newTableMetadataUpdater(s.InternalExecutor().(isql.Executor))
updated, err := updater.updateCache(ctx, nil)
updated, err := updater.updateCache(ctx, nil, nil)
if err != nil {
return err.Error()
}
Expand Down
60 changes: 50 additions & 10 deletions pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"math"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -35,21 +36,25 @@ const (

// updateJobExecFn specifies the function that is run on each iteration of the
// table metadata update job. It can be overriden in tests.
var updateJobExecFn func(context.Context, isql.Executor, *tableMetadataUpdateJobResumer) error = updateTableMetadataCache
var updateJobExecFn func(context.Context, isql.Executor, *tableMetadataUpdateJobResumer) (int, error) = updateTableMetadataCache

type tableMetadataUpdateJobResumer struct {
job *jobs.Job
job *jobs.Job
metrics *TableMetadataUpdateJobMetrics
}

var _ jobs.Resumer = (*tableMetadataUpdateJobResumer)(nil)

// Resume is part of the jobs.Resumer interface.
func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI interface{}) error {
// This job is a forever running background job, and it is always safe to
// terminate the SQL pod whenever the job is running, so mark it as idle.
j.job.MarkIdle(true)

execCtx := execCtxI.(sql.JobExecContext)
metrics := execCtx.ExecCfg().JobRegistry.MetricsStruct().
JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(TableMetadataUpdateJobMetrics)
j.metrics = &metrics
var onJobStartKnob, onJobCompleteKnob, onJobReady func()
if execCtx.ExecCfg().TableMetadataKnobs != nil {
onJobStartKnob = execCtx.ExecCfg().TableMetadataKnobs.OnJobStart
Expand Down Expand Up @@ -111,15 +116,22 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int
onJobStartKnob()
}
// Run table metadata update job.
metrics.NumRuns.Inc(1)
j.metrics.NumRuns.Inc(1)
sw := timeutil.NewStopWatch()
sw.Start()
j.markAsRunning(ctx)
if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor(), j); err != nil {
rowsUpdated, err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor(), j)
if err != nil {
log.Errorf(ctx, "error running table metadata update job: %s", err)
j.metrics.Errors.Inc(1)
}
j.markAsCompleted(ctx)
if onJobCompleteKnob != nil {
onJobCompleteKnob()
}
sw.Stop()
j.metrics.Duration.RecordValue(sw.Elapsed().Nanoseconds())
j.metrics.UpdatedTables.Inc(int64(rowsUpdated))
}
}

Expand Down Expand Up @@ -173,17 +185,18 @@ func (j *tableMetadataUpdateJobResumer) markAsCompleted(ctx context.Context) {
// metadata from the system.namespace, system.descriptor tables and table span stats RPC.
func updateTableMetadataCache(
ctx context.Context, ie isql.Executor, resumer *tableMetadataUpdateJobResumer,
) error {
) (int, error) {
updater := newTableMetadataUpdater(ie)
if _, err := updater.pruneCache(ctx); err != nil {
log.Errorf(ctx, "failed to prune table metadata cache: %s", err.Error())
}

resumer.updateProgress(ctx, .01)
// We'll use the updated ret val in a follow-up to update metrics and
// fractional job progress.
_, err := updater.updateCache(ctx, resumer.onBatchUpdate())
return err
return updater.updateCache(ctx, resumer.onBatchUpdate(), func(ctx context.Context, e error) {
if resumer.metrics != nil {
resumer.metrics.Errors.Inc(1)
}
})
}

// onBatchUpdate returns an onBatchUpdateCallback func that updates the progress of the job.
Expand Down Expand Up @@ -226,7 +239,10 @@ func (j *tableMetadataUpdateJobResumer) CollectProfile(
}

type TableMetadataUpdateJobMetrics struct {
NumRuns *metric.Counter
NumRuns *metric.Counter
UpdatedTables *metric.Counter
Errors *metric.Counter
Duration metric.IHistogram
}

func (m TableMetadataUpdateJobMetrics) MetricStruct() {}
Expand All @@ -240,6 +256,30 @@ func newTableMetadataUpdateJobMetrics() metric.Struct {
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_COUNTER,
}),
UpdatedTables: metric.NewCounter(metric.Metadata{
Name: "obs.tablemetadata.update_job.table_updates",
Help: "The total number of rows that have been updated in system.table_metadata",
Measurement: "Rows Updated",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_COUNTER,
}),
Errors: metric.NewCounter(metric.Metadata{
Name: "obs.tablemetadata.update_job.errors",
Help: "The total number of errors that have been emitted from the update table metadata job.",
Measurement: "Errors",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_COUNTER,
}),
Duration: metric.NewHistogram(metric.HistogramOptions{
Metadata: metric.Metadata{
Name: "obs.tablemetadata.update_job.duration",
Help: "Time spent running the update table metadata job.",
Measurement: "Duration",
Unit: metric.Unit_NANOSECONDS},
Duration: base.DefaultHistogramWindowInterval(),
BucketConfig: metric.IOLatencyBuckets,
Mode: metric.HistogramModePrometheus,
}),
}
}

Expand Down
89 changes: 56 additions & 33 deletions pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,34 @@ func TestUpdateTableMetadataCacheJobRunsOnRPCTrigger(t *testing.T) {

ctx := context.Background()
tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{})
defer tc.Stopper().Stop(context.Background())

defer tc.Stopper().Stop(context.Background())
metrics := tc.Server(0).JobRegistry().(*jobs.Registry).MetricsStruct().
JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(TableMetadataUpdateJobMetrics)
conn := sqlutils.MakeSQLRunner(tc.ServerConn(0))
testJobComplete := func(runNum int64) {
testutils.SucceedsSoon(t, func() error {
if metrics.NumRuns.Count() != runNum {
return errors.New("job hasn't run yet")
}
row := conn.Query(t,
`SELECT running_status, fraction_completed FROM crdb_internal.jobs WHERE job_id = $1 AND running_status IS NOT NULL`,
jobs.UpdateTableMetadataCacheJobID)
if !row.Next() {
return errors.New("last_run_time not updated")
}
var runningStatus string
var fractionCompleted float32
require.NoError(t, row.Scan(&runningStatus, &fractionCompleted))
if !strings.Contains(runningStatus, "Job completed at") {
return errors.New("running_status not updated")
}
if fractionCompleted != 1.0 {
return errors.New("fraction_completed not updated")
}
return nil
})
}

// Get the node id that claimed the update job. We'll issue the
// RPC to a node that doesn't own the job to test that the RPC can
Expand All @@ -60,41 +85,39 @@ WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJ
if !row.Next() {
return errors.New("no node has claimed the job")
}
require.NoError(t, row.Scan(&nodeID))

rpcGatewayNode := (nodeID + 1) % 3
_, err := tc.Server(rpcGatewayNode).GetStatusClient(t).UpdateTableMetadataCache(ctx,
&serverpb.UpdateTableMetadataCacheRequest{Local: false})
if err != nil {
if err := row.Scan(&nodeID); err != nil {
return err
}
// The job shouldn't be busy.
return nil
})

metrics := tc.Server(0).JobRegistry().(*jobs.Registry).MetricsStruct().
JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(TableMetadataUpdateJobMetrics)
testutils.SucceedsSoon(t, func() error {
if metrics.NumRuns.Count() != 1 {
return errors.New("job hasn't run yet")
}
row := conn.Query(t,
`SELECT running_status, fraction_completed FROM crdb_internal.jobs WHERE job_id = $1 AND running_status IS NOT NULL`,
jobs.UpdateTableMetadataCacheJobID)
if !row.Next() {
return errors.New("last_run_time not updated")
}
var runningStatus string
var fractionCompleted float32
require.NoError(t, row.Scan(&runningStatus, &fractionCompleted))
if !strings.Contains(runningStatus, "Job completed at") {
return errors.New("running_status not updated")
}
if fractionCompleted != 1.0 {
return errors.New("fraction_completed not updated")
}
return nil
})
rpcGatewayNode := (nodeID + 1) % 3
_, err := tc.Server(rpcGatewayNode).GetStatusClient(t).UpdateTableMetadataCache(ctx,
&serverpb.UpdateTableMetadataCacheRequest{Local: false})
require.NoError(t, err)
testJobComplete(1)
count := metrics.UpdatedTables.Count()
meanDuration := metrics.Duration.CumulativeSnapshot().Mean()
require.Greater(t, count, int64(0))
require.Greater(t, meanDuration, float64(0))

_, err = tc.Server(rpcGatewayNode).GetStatusClient(t).UpdateTableMetadataCache(ctx,
&serverpb.UpdateTableMetadataCacheRequest{Local: false})
require.NoError(t, err)
testJobComplete(2)
require.Greater(t, metrics.UpdatedTables.Count(), count)
require.NotEqual(t, metrics.Duration.CumulativeSnapshot().Mean(), meanDuration)
meanDuration = metrics.Duration.CumulativeSnapshot().Mean()
count = metrics.UpdatedTables.Count()

defer testutils.TestingHook(&upsertQuery, "UPSERT INTO error")()
_, err = tc.Server(rpcGatewayNode).GetStatusClient(t).UpdateTableMetadataCache(ctx,
&serverpb.UpdateTableMetadataCacheRequest{Local: false})
require.NoError(t, err)
testJobComplete(3)
require.Equal(t, count, metrics.UpdatedTables.Count())
require.NotEqual(t, metrics.Duration.CumulativeSnapshot().Mean(), meanDuration)

}

func TestUpdateTableMetadataCacheProgressUpdate(t *testing.T) {
Expand Down Expand Up @@ -172,15 +195,15 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) {
mockMutex := syncutil.RWMutex{}
jobRunCh := make(chan struct{})
restoreUpdate := testutils.TestingHook(&updateJobExecFn,
func(ctx context.Context, ie isql.Executor, resumer *tableMetadataUpdateJobResumer) error {
func(ctx context.Context, ie isql.Executor, resumer *tableMetadataUpdateJobResumer) (int, error) {
mockMutex.Lock()
defer mockMutex.Unlock()
mockCalls = append(mockCalls, timeutil.Now())
select {
case jobRunCh <- struct{}{}:
default:
}
return nil
return 1, nil
})
defer restoreUpdate()

Expand Down

0 comments on commit bc58ef4

Please sign in to comment.