From 9d75b5be8b0ebb96b0d211c1e9e685911b1622e9 Mon Sep 17 00:00:00 2001 From: Kyle Wong <37189875+kyle-a-wong@users.noreply.github.com> Date: Thu, 26 Sep 2024 10:57:53 -0400 Subject: [PATCH] tablemetadatacache: update progress of update tmj updates the job progress value of the update table metadata job as it processes batches of tables. The progress will not be updated on every successful batch update, but will instead be updated every nth batch, where n is defined by `batchesPerProgressUpdate`. This is done because each batch executes relatively quickly, and it is unnecessary to provide such granular updates to the progress, each of which results in a write to the database. Part of: #130249 Epic: CRDB-37558 Release note: None --- pkg/server/api_v2_databases_metadata_test.go | 1 - pkg/sql/tablemetadatacache/BUILD.bazel | 1 + .../table_metadata_updater.go | 32 ++++++++- .../table_metadata_updater_test.go | 2 +- .../update_table_metadata_cache_job.go | 49 +++++++++++-- .../update_table_metadata_cache_job_test.go | 69 ++++++++++++++++++- 6 files changed, 142 insertions(+), 12 deletions(-) diff --git a/pkg/server/api_v2_databases_metadata_test.go b/pkg/server/api_v2_databases_metadata_test.go index 53816b68d227..7822a5198446 100644 --- a/pkg/server/api_v2_databases_metadata_test.go +++ b/pkg/server/api_v2_databases_metadata_test.go @@ -753,7 +753,6 @@ func TestTriggerMetadataUpdateJob(t *testing.T) { defer close(jobReadyChan) testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ TableMetadata: &tablemetadatacache_util.TestingKnobs{ OnJobReady: func() { diff --git a/pkg/sql/tablemetadatacache/BUILD.bazel b/pkg/sql/tablemetadatacache/BUILD.bazel index a19ae735faff..c6b78f6c4809 100644 --- a/pkg/sql/tablemetadatacache/BUILD.bazel +++ b/pkg/sql/tablemetadatacache/BUILD.bazel @@ -48,6 +48,7 @@ go_test( "//pkg/kv/kvserver", "//pkg/security/securityassets", "//pkg/security/securitytest", + "//pkg/security/username", "//pkg/server", "//pkg/server/serverpb", "//pkg/sql/isql", diff --git a/pkg/sql/tablemetadatacache/table_metadata_updater.go b/pkg/sql/tablemetadatacache/table_metadata_updater.go index 324cb83155e3..dd8c3ee1a491 100644 --- a/pkg/sql/tablemetadatacache/table_metadata_updater.go +++ b/pkg/sql/tablemetadatacache/table_metadata_updater.go @@ -18,12 +18,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/log" ) const pruneBatchSize = 512 +type onBatchUpdateCallback func(ctx context.Context, totalRowsToUpdate int, rowsUpdated int) + // tableMetadataUpdater encapsulates the logic for updating the table metadata cache. type tableMetadataUpdater struct { ie isql.Executor @@ -39,9 +42,14 @@ func newTableMetadataUpdater(ie isql.Executor) *tableMetadataUpdater { // updateCache performs a full update of the system.table_metadata, returning // the number of rows updated and the last error encountered. -func (u *tableMetadataUpdater) updateCache(ctx context.Context) (updated int, err error) { +func (u *tableMetadataUpdater) updateCache( + ctx context.Context, onUpdate onBatchUpdateCallback, +) (updated int, err error) { it := newTableMetadataBatchIterator(u.ie) - + estimatedRowsToUpdate, err := u.getRowsToUpdateCount(ctx) + if err != nil { + log.Errorf(ctx, "failed to get estimated row count. err=%s", err.Error()) + } for { more, err := it.fetchNextBatch(ctx, tableBatchSize) if err != nil { @@ -63,7 +71,11 @@ func (u *tableMetadataUpdater) updateCache(ctx context.Context) (updated int, er log.Errorf(ctx, "failed to upsert batch of size: %d, err: %s", len(it.batchRows), err.Error()) continue } + updated += count + if onUpdate != nil { + onUpdate(ctx, estimatedRowsToUpdate, updated) + } } return updated, err @@ -111,7 +123,6 @@ func (u *tableMetadataUpdater) upsertBatch( ctx context.Context, batch []tableMetadataIterRow, ) (int, error) { u.upsertQuery.resetForBatch() - upsertBatchSize := 0 for _, row := range batch { if err := u.upsertQuery.addRow(&row); err != nil { @@ -135,6 +146,21 @@ func (u *tableMetadataUpdater) upsertBatch( ) } +func (u *tableMetadataUpdater) getRowsToUpdateCount(ctx context.Context) (int, error) { + query := ` +SELECT count(*)::INT +FROM system.namespace t +JOIN system.namespace d ON t."parentID" = d.id +WHERE d."parentID" = 0 and t."parentSchemaID" != 0 +` + row, err := u.ie.QueryRow(ctx, "get-table-metadata-row-count", nil, query) + if err != nil { + return 0, err + } + + return int(tree.MustBeDInt(row[0])), nil +} + type tableMetadataBatchUpsertQuery struct { stmt bytes.Buffer args []interface{} diff --git a/pkg/sql/tablemetadatacache/table_metadata_updater_test.go b/pkg/sql/tablemetadatacache/table_metadata_updater_test.go index a330885e50bc..a8f6fcca8975 100644 --- a/pkg/sql/tablemetadatacache/table_metadata_updater_test.go +++ b/pkg/sql/tablemetadatacache/table_metadata_updater_test.go @@ -55,7 +55,7 @@ func TestDataDrivenTableMetadataCacheUpdater(t *testing.T) { return res case "update-cache": updater := newTableMetadataUpdater(s.InternalExecutor().(isql.Executor)) - updated, err := updater.updateCache(ctx) + updated, err := updater.updateCache(ctx, nil) if err != nil { return err.Error() } diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go index 04f050b4b4fb..8413e5ad195e 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go @@ -13,6 +13,7 @@ package tablemetadatacache import ( "context" "fmt" + "math" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -26,9 +27,15 @@ import ( io_prometheus_client "github.com/prometheus/client_model/go" ) +const ( + // batchesPerProgressUpdate is used to determine how many batches + // should be processed before updating the job progress + batchesPerProgressUpdate = 10 +) + // updateJobExecFn specifies the function that is run on each iteration of the // table metadata update job. It can be overriden in tests. -var updateJobExecFn func(context.Context, isql.Executor) error = updateTableMetadataCache +var updateJobExecFn func(context.Context, isql.Executor, *tableMetadataUpdateJobResumer) error = updateTableMetadataCache type tableMetadataUpdateJobResumer struct { job *jobs.Job @@ -106,7 +113,7 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int // Run table metadata update job. metrics.NumRuns.Inc(1) j.markAsRunning(ctx) - if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor()); err != nil { + if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor(), j); err != nil { log.Errorf(ctx, "error running table metadata update job: %s", err) } j.markAsCompleted(ctx) @@ -116,6 +123,12 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int } } +func (j *tableMetadataUpdateJobResumer) updateProgress(ctx context.Context, progress float32) { + if err := j.job.NoTxn().FractionProgressed(ctx, jobs.FractionUpdater(progress)); err != nil { + log.Errorf(ctx, "Error updating table metadata log progress. error: %s", err.Error()) + } +} + // markAsRunning updates the last_start_time and status fields in the job's progress // details and writes the job progress as a JSON string to the running status. func (j *tableMetadataUpdateJobResumer) markAsRunning(ctx context.Context) { @@ -126,6 +139,9 @@ func (j *tableMetadataUpdateJobResumer) markAsRunning(ctx context.Context) { progress.RunningStatus = fmt.Sprintf("Job started at %s", now) details.LastStartTime = &now details.Status = jobspb.UpdateTableMetadataCacheProgress_RUNNING + progress.Progress = &jobspb.Progress_FractionCompleted{ + FractionCompleted: 0, + } ju.UpdateProgress(progress) return nil }); err != nil { @@ -143,6 +159,9 @@ func (j *tableMetadataUpdateJobResumer) markAsCompleted(ctx context.Context) { progress.RunningStatus = fmt.Sprintf("Job completed at %s", now) details.LastCompletedTime = &now details.Status = jobspb.UpdateTableMetadataCacheProgress_NOT_RUNNING + progress.Progress = &jobspb.Progress_FractionCompleted{ + FractionCompleted: 1.0, + } ju.UpdateProgress(progress) return nil }); err != nil { @@ -152,18 +171,40 @@ func (j *tableMetadataUpdateJobResumer) markAsCompleted(ctx context.Context) { // updateTableMetadataCache performs a full update of system.table_metadata by collecting // metadata from the system.namespace, system.descriptor tables and table span stats RPC. -func updateTableMetadataCache(ctx context.Context, ie isql.Executor) error { +func updateTableMetadataCache( + ctx context.Context, ie isql.Executor, resumer *tableMetadataUpdateJobResumer, +) error { updater := newTableMetadataUpdater(ie) if _, err := updater.pruneCache(ctx); err != nil { log.Errorf(ctx, "failed to prune table metadata cache: %s", err.Error()) } + resumer.updateProgress(ctx, .01) // We'll use the updated ret val in a follow-up to update metrics and // fractional job progress. - _, err := updater.updateCache(ctx) + _, err := updater.updateCache(ctx, resumer.onBatchUpdate()) return err } +// onBatchUpdate returns an onBatchUpdateCallback func that updates the progress of the job. +// The call to updateProgress doesn't happen on every invocation, and only happens every nth +// invocation, where n is defined by batchesPerProgressUpdate. This is done because each +// batch update is expected to execute quickly, and updating progress at a high velocity +// doesn't seem worth it. +func (j *tableMetadataUpdateJobResumer) onBatchUpdate() onBatchUpdateCallback { + batchNum := 0 + return func(ctx context.Context, totalRowsToUpdate int, rowsUpdated int) { + batchNum++ + estimatedBatches := int(math.Ceil(float64(totalRowsToUpdate) / float64(tableBatchSize))) + if batchNum == estimatedBatches { + j.updateProgress(ctx, .99) + } else if batchNum%batchesPerProgressUpdate == 0 && estimatedBatches > 0 { + progress := float32(rowsUpdated) / float32(totalRowsToUpdate) + j.updateProgress(ctx, progress) + } + } +} + // OnFailOrCancel implements jobs.Resumer. func (j *tableMetadataUpdateJobResumer) OnFailOrCancel( ctx context.Context, execCtx interface{}, jobErr error, diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go index 19e93570c106..022f9e9bfe24 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go @@ -14,6 +14,7 @@ import ( "context" "errors" "fmt" + "math" "strings" "testing" "time" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -77,20 +79,81 @@ WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJ return errors.New("job hasn't run yet") } row := conn.Query(t, - `SELECT running_status FROM crdb_internal.jobs WHERE job_id = $1 AND running_status IS NOT NULL`, + `SELECT running_status, fraction_completed FROM crdb_internal.jobs WHERE job_id = $1 AND running_status IS NOT NULL`, jobs.UpdateTableMetadataCacheJobID) if !row.Next() { return errors.New("last_run_time not updated") } var runningStatus string - require.NoError(t, row.Scan(&runningStatus)) + var fractionCompleted float32 + require.NoError(t, row.Scan(&runningStatus, &fractionCompleted)) if !strings.Contains(runningStatus, "Job completed at") { return errors.New("running_status not updated") } + if fractionCompleted != 1.0 { + return errors.New("fraction_completed not updated") + } return nil }) } +func TestUpdateTableMetadataCacheProgressUpdate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + // Server setup. + s := serverutils.StartServerOnly(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + conn := sqlutils.MakeSQLRunner(s.ApplicationLayer().SQLConn(t)) + + jobRegistry := s.JobRegistry().(*jobs.Registry) + jobId := jobRegistry.MakeJobID() + + // Create a new job so that we don't have to wait for the existing one be claimed + jr := jobs.Record{ + JobID: jobId, + Description: jobspb.TypeUpdateTableMetadataCache.String(), + Details: jobspb.UpdateTableMetadataCacheDetails{}, + Progress: jobspb.UpdateTableMetadataCacheProgress{}, + CreatedBy: &jobs.CreatedByInfo{Name: username.NodeUser, ID: username.NodeUserID}, + Username: username.NodeUserName(), + NonCancelable: true, + } + job, err := jobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jobId, nil) + require.NoError(t, err) + resumer := tableMetadataUpdateJobResumer{job: job} + + getJobProgress := func() float32 { + var fractionCompleted float32 + conn.QueryRow(t, `SELECT fraction_completed FROM crdb_internal.jobs WHERE job_id = $1`, + jobId).Scan(&fractionCompleted) + return fractionCompleted + } + + totalRowsToUpdate := []int{50, 99, 1000, 2000, 20001} + for _, r := range totalRowsToUpdate { + resumer.updateProgress(ctx, 0) + cb := resumer.onBatchUpdate() + x := r + iterations := int(math.Ceil(float64(x) / float64(tableBatchSize))) + for i := 1; i <= iterations; i++ { + rowsUpdated := min(r, i*tableBatchSize) + preUpdateProgress := getJobProgress() + cb(ctx, x, rowsUpdated) + postUpdateProgress := getJobProgress() + if i == iterations { + require.Equal(t, float32(.99), postUpdateProgress) + } else if i%batchesPerProgressUpdate == 0 { + require.Greater(t, postUpdateProgress, preUpdateProgress) + } else { + require.Equal(t, preUpdateProgress, postUpdateProgress) + } + } + } + +} + // TestUpdateTableMetadataCacheAutomaticUpdates tests that: // 1. The update table metadata cache job does not run automatically by default. // 2. The job runs automatically on the data validity interval when automatic @@ -109,7 +172,7 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) { mockMutex := syncutil.RWMutex{} jobRunCh := make(chan struct{}) restoreUpdate := testutils.TestingHook(&updateJobExecFn, - func(ctx context.Context, ie isql.Executor) error { + func(ctx context.Context, ie isql.Executor, resumer *tableMetadataUpdateJobResumer) error { mockMutex.Lock() defer mockMutex.Unlock() mockCalls = append(mockCalls, timeutil.Now())