Skip to content

Commit

Permalink
tablemetadatacache: update progress of update tmj
Browse files Browse the repository at this point in the history
updates the job progress value of the
update table metadata job as it processes
batches of tables. The progress will not be
updated on every successful batch update, but
will instead be updated every nth batch, where
n is defined by `batchesPerProgressUpdate`. This
is done because each batch executes relatively
quickly, and it is unnecessary to provide such
granular updates to the progress, each of which
results in a write to the database.

Part of: #130249
Epic: CRDB-37558
Release note: None
  • Loading branch information
kyle-a-wong committed Sep 27, 2024
1 parent 946ebda commit 9d75b5b
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 12 deletions.
1 change: 0 additions & 1 deletion pkg/server/api_v2_databases_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,6 @@ func TestTriggerMetadataUpdateJob(t *testing.T) {
defer close(jobReadyChan)
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{

Knobs: base.TestingKnobs{
TableMetadata: &tablemetadatacache_util.TestingKnobs{
OnJobReady: func() {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/tablemetadatacache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ go_test(
"//pkg/kv/kvserver",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/sql/isql",
Expand Down
32 changes: 29 additions & 3 deletions pkg/sql/tablemetadatacache/table_metadata_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

const pruneBatchSize = 512

type onBatchUpdateCallback func(ctx context.Context, totalRowsToUpdate int, rowsUpdated int)

// tableMetadataUpdater encapsulates the logic for updating the table metadata cache.
type tableMetadataUpdater struct {
ie isql.Executor
Expand All @@ -39,9 +42,14 @@ func newTableMetadataUpdater(ie isql.Executor) *tableMetadataUpdater {

// updateCache performs a full update of the system.table_metadata, returning
// the number of rows updated and the last error encountered.
func (u *tableMetadataUpdater) updateCache(ctx context.Context) (updated int, err error) {
func (u *tableMetadataUpdater) updateCache(
ctx context.Context, onUpdate onBatchUpdateCallback,
) (updated int, err error) {
it := newTableMetadataBatchIterator(u.ie)

estimatedRowsToUpdate, err := u.getRowsToUpdateCount(ctx)
if err != nil {
log.Errorf(ctx, "failed to get estimated row count. err=%s", err.Error())
}
for {
more, err := it.fetchNextBatch(ctx, tableBatchSize)
if err != nil {
Expand All @@ -63,7 +71,11 @@ func (u *tableMetadataUpdater) updateCache(ctx context.Context) (updated int, er
log.Errorf(ctx, "failed to upsert batch of size: %d, err: %s", len(it.batchRows), err.Error())
continue
}

updated += count
if onUpdate != nil {
onUpdate(ctx, estimatedRowsToUpdate, updated)
}
}

return updated, err
Expand Down Expand Up @@ -111,7 +123,6 @@ func (u *tableMetadataUpdater) upsertBatch(
ctx context.Context, batch []tableMetadataIterRow,
) (int, error) {
u.upsertQuery.resetForBatch()

upsertBatchSize := 0
for _, row := range batch {
if err := u.upsertQuery.addRow(&row); err != nil {
Expand All @@ -135,6 +146,21 @@ func (u *tableMetadataUpdater) upsertBatch(
)
}

func (u *tableMetadataUpdater) getRowsToUpdateCount(ctx context.Context) (int, error) {
query := `
SELECT count(*)::INT
FROM system.namespace t
JOIN system.namespace d ON t."parentID" = d.id
WHERE d."parentID" = 0 and t."parentSchemaID" != 0
`
row, err := u.ie.QueryRow(ctx, "get-table-metadata-row-count", nil, query)
if err != nil {
return 0, err
}

return int(tree.MustBeDInt(row[0])), nil
}

type tableMetadataBatchUpsertQuery struct {
stmt bytes.Buffer
args []interface{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/tablemetadatacache/table_metadata_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestDataDrivenTableMetadataCacheUpdater(t *testing.T) {
return res
case "update-cache":
updater := newTableMetadataUpdater(s.InternalExecutor().(isql.Executor))
updated, err := updater.updateCache(ctx)
updated, err := updater.updateCache(ctx, nil)
if err != nil {
return err.Error()
}
Expand Down
49 changes: 45 additions & 4 deletions pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package tablemetadatacache
import (
"context"
"fmt"
"math"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand All @@ -26,9 +27,15 @@ import (
io_prometheus_client "github.com/prometheus/client_model/go"
)

const (
// batchesPerProgressUpdate is used to determine how many batches
// should be processed before updating the job progress
batchesPerProgressUpdate = 10
)

// updateJobExecFn specifies the function that is run on each iteration of the
// table metadata update job. It can be overriden in tests.
var updateJobExecFn func(context.Context, isql.Executor) error = updateTableMetadataCache
var updateJobExecFn func(context.Context, isql.Executor, *tableMetadataUpdateJobResumer) error = updateTableMetadataCache

type tableMetadataUpdateJobResumer struct {
job *jobs.Job
Expand Down Expand Up @@ -106,7 +113,7 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int
// Run table metadata update job.
metrics.NumRuns.Inc(1)
j.markAsRunning(ctx)
if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor()); err != nil {
if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor(), j); err != nil {
log.Errorf(ctx, "error running table metadata update job: %s", err)
}
j.markAsCompleted(ctx)
Expand All @@ -116,6 +123,12 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int
}
}

func (j *tableMetadataUpdateJobResumer) updateProgress(ctx context.Context, progress float32) {
if err := j.job.NoTxn().FractionProgressed(ctx, jobs.FractionUpdater(progress)); err != nil {
log.Errorf(ctx, "Error updating table metadata log progress. error: %s", err.Error())
}
}

// markAsRunning updates the last_start_time and status fields in the job's progress
// details and writes the job progress as a JSON string to the running status.
func (j *tableMetadataUpdateJobResumer) markAsRunning(ctx context.Context) {
Expand All @@ -126,6 +139,9 @@ func (j *tableMetadataUpdateJobResumer) markAsRunning(ctx context.Context) {
progress.RunningStatus = fmt.Sprintf("Job started at %s", now)
details.LastStartTime = &now
details.Status = jobspb.UpdateTableMetadataCacheProgress_RUNNING
progress.Progress = &jobspb.Progress_FractionCompleted{
FractionCompleted: 0,
}
ju.UpdateProgress(progress)
return nil
}); err != nil {
Expand All @@ -143,6 +159,9 @@ func (j *tableMetadataUpdateJobResumer) markAsCompleted(ctx context.Context) {
progress.RunningStatus = fmt.Sprintf("Job completed at %s", now)
details.LastCompletedTime = &now
details.Status = jobspb.UpdateTableMetadataCacheProgress_NOT_RUNNING
progress.Progress = &jobspb.Progress_FractionCompleted{
FractionCompleted: 1.0,
}
ju.UpdateProgress(progress)
return nil
}); err != nil {
Expand All @@ -152,18 +171,40 @@ func (j *tableMetadataUpdateJobResumer) markAsCompleted(ctx context.Context) {

// updateTableMetadataCache performs a full update of system.table_metadata by collecting
// metadata from the system.namespace, system.descriptor tables and table span stats RPC.
func updateTableMetadataCache(ctx context.Context, ie isql.Executor) error {
func updateTableMetadataCache(
ctx context.Context, ie isql.Executor, resumer *tableMetadataUpdateJobResumer,
) error {
updater := newTableMetadataUpdater(ie)
if _, err := updater.pruneCache(ctx); err != nil {
log.Errorf(ctx, "failed to prune table metadata cache: %s", err.Error())
}

resumer.updateProgress(ctx, .01)
// We'll use the updated ret val in a follow-up to update metrics and
// fractional job progress.
_, err := updater.updateCache(ctx)
_, err := updater.updateCache(ctx, resumer.onBatchUpdate())
return err
}

// onBatchUpdate returns an onBatchUpdateCallback func that updates the progress of the job.
// The call to updateProgress doesn't happen on every invocation, and only happens every nth
// invocation, where n is defined by batchesPerProgressUpdate. This is done because each
// batch update is expected to execute quickly, and updating progress at a high velocity
// doesn't seem worth it.
func (j *tableMetadataUpdateJobResumer) onBatchUpdate() onBatchUpdateCallback {
batchNum := 0
return func(ctx context.Context, totalRowsToUpdate int, rowsUpdated int) {
batchNum++
estimatedBatches := int(math.Ceil(float64(totalRowsToUpdate) / float64(tableBatchSize)))
if batchNum == estimatedBatches {
j.updateProgress(ctx, .99)
} else if batchNum%batchesPerProgressUpdate == 0 && estimatedBatches > 0 {
progress := float32(rowsUpdated) / float32(totalRowsToUpdate)
j.updateProgress(ctx, progress)
}
}
}

// OnFailOrCancel implements jobs.Resumer.
func (j *tableMetadataUpdateJobResumer) OnFailOrCancel(
ctx context.Context, execCtx interface{}, jobErr error,
Expand Down
69 changes: 66 additions & 3 deletions pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import (
"context"
"errors"
"fmt"
"math"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -77,20 +79,81 @@ WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJ
return errors.New("job hasn't run yet")
}
row := conn.Query(t,
`SELECT running_status FROM crdb_internal.jobs WHERE job_id = $1 AND running_status IS NOT NULL`,
`SELECT running_status, fraction_completed FROM crdb_internal.jobs WHERE job_id = $1 AND running_status IS NOT NULL`,
jobs.UpdateTableMetadataCacheJobID)
if !row.Next() {
return errors.New("last_run_time not updated")
}
var runningStatus string
require.NoError(t, row.Scan(&runningStatus))
var fractionCompleted float32
require.NoError(t, row.Scan(&runningStatus, &fractionCompleted))
if !strings.Contains(runningStatus, "Job completed at") {
return errors.New("running_status not updated")
}
if fractionCompleted != 1.0 {
return errors.New("fraction_completed not updated")
}
return nil
})
}

func TestUpdateTableMetadataCacheProgressUpdate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

// Server setup.
s := serverutils.StartServerOnly(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)
conn := sqlutils.MakeSQLRunner(s.ApplicationLayer().SQLConn(t))

jobRegistry := s.JobRegistry().(*jobs.Registry)
jobId := jobRegistry.MakeJobID()

// Create a new job so that we don't have to wait for the existing one be claimed
jr := jobs.Record{
JobID: jobId,
Description: jobspb.TypeUpdateTableMetadataCache.String(),
Details: jobspb.UpdateTableMetadataCacheDetails{},
Progress: jobspb.UpdateTableMetadataCacheProgress{},
CreatedBy: &jobs.CreatedByInfo{Name: username.NodeUser, ID: username.NodeUserID},
Username: username.NodeUserName(),
NonCancelable: true,
}
job, err := jobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jobId, nil)
require.NoError(t, err)
resumer := tableMetadataUpdateJobResumer{job: job}

getJobProgress := func() float32 {
var fractionCompleted float32
conn.QueryRow(t, `SELECT fraction_completed FROM crdb_internal.jobs WHERE job_id = $1`,
jobId).Scan(&fractionCompleted)
return fractionCompleted
}

totalRowsToUpdate := []int{50, 99, 1000, 2000, 20001}
for _, r := range totalRowsToUpdate {
resumer.updateProgress(ctx, 0)
cb := resumer.onBatchUpdate()
x := r
iterations := int(math.Ceil(float64(x) / float64(tableBatchSize)))
for i := 1; i <= iterations; i++ {
rowsUpdated := min(r, i*tableBatchSize)
preUpdateProgress := getJobProgress()
cb(ctx, x, rowsUpdated)
postUpdateProgress := getJobProgress()
if i == iterations {
require.Equal(t, float32(.99), postUpdateProgress)
} else if i%batchesPerProgressUpdate == 0 {
require.Greater(t, postUpdateProgress, preUpdateProgress)
} else {
require.Equal(t, preUpdateProgress, postUpdateProgress)
}
}
}

}

// TestUpdateTableMetadataCacheAutomaticUpdates tests that:
// 1. The update table metadata cache job does not run automatically by default.
// 2. The job runs automatically on the data validity interval when automatic
Expand All @@ -109,7 +172,7 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) {
mockMutex := syncutil.RWMutex{}
jobRunCh := make(chan struct{})
restoreUpdate := testutils.TestingHook(&updateJobExecFn,
func(ctx context.Context, ie isql.Executor) error {
func(ctx context.Context, ie isql.Executor, resumer *tableMetadataUpdateJobResumer) error {
mockMutex.Lock()
defer mockMutex.Unlock()
mockCalls = append(mockCalls, timeutil.Now())
Expand Down

0 comments on commit 9d75b5b

Please sign in to comment.