Skip to content

Commit

Permalink
tablemetadatacache: update progress of update tmj
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
kyle-a-wong committed Sep 26, 2024
1 parent f5beced commit d396826
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 12 deletions.
1 change: 0 additions & 1 deletion pkg/server/api_v2_databases_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,6 @@ func TestTriggerMetadataUpdateJob(t *testing.T) {
defer close(jobReadyChan)
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{

Knobs: base.TestingKnobs{
TableMetadata: &tablemetadatacache_util.TestingKnobs{
OnJobReady: func() {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/tablemetadatacache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_test(
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/sql/isql",
"//pkg/sql/tablemetadatacache/util",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/serverutils",
Expand Down
33 changes: 30 additions & 3 deletions pkg/sql/tablemetadatacache/table_metadata_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

const pruneBatchSize = 512

type onBatchUpdateCallback func(ctx context.Context, totalRowsToUpdate int, rowsUpdated int)

// tableMetadataUpdater encapsulates the logic for updating the table metadata cache.
type tableMetadataUpdater struct {
ie isql.Executor
Expand All @@ -39,9 +42,14 @@ func newTableMetadataUpdater(ie isql.Executor) *tableMetadataUpdater {

// updateCache performs a full update of the system.table_metadata, returning
// the number of rows updated and the last error encountered.
func (u *tableMetadataUpdater) updateCache(ctx context.Context) (updated int, err error) {
func (u *tableMetadataUpdater) updateCache(
ctx context.Context, cb onBatchUpdateCallback,
) (updated int, err error) {
it := newTableMetadataBatchIterator(u.ie)

estimatedRowsToUpdate, err := u.getRowsToUpdateCount(ctx)
if err != nil {
log.Errorf(ctx, "failed to get estimated row count. err=%s", err.Error())
}
for {
more, err := it.fetchNextBatch(ctx, tableBatchSize)
if err != nil {
Expand All @@ -63,6 +71,10 @@ func (u *tableMetadataUpdater) updateCache(ctx context.Context) (updated int, er
log.Errorf(ctx, "failed to upsert batch of size: %d, err: %s", len(it.batchRows), err.Error())
continue
}

if cb != nil {
cb(ctx, estimatedRowsToUpdate, count)
}
updated += count
}

Expand Down Expand Up @@ -111,7 +123,6 @@ func (u *tableMetadataUpdater) upsertBatch(
ctx context.Context, batch []tableMetadataIterRow,
) (int, error) {
u.upsertQuery.resetForBatch()

upsertBatchSize := 0
for _, row := range batch {
if err := u.upsertQuery.addRow(&row); err != nil {
Expand All @@ -135,6 +146,22 @@ func (u *tableMetadataUpdater) upsertBatch(
)
}

func (u *tableMetadataUpdater) getRowsToUpdateCount(ctx context.Context) (int, error) {
query := `
SELECT sum(count)::INT FROM (
SELECT count(*) FROM "".information_schema.tables WHERE table_type != 'SYSTEM VIEW'
UNION
SELECT count(*) FROM "".information_schema.sequences
) t;
`
row, err := u.ie.QueryRow(ctx, "get-table-metadata-row-count", nil, query)
if err != nil {
return 0, err
}

return int(tree.MustBeDInt(row[0])), nil
}

type tableMetadataBatchUpsertQuery struct {
stmt bytes.Buffer
args []interface{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/tablemetadatacache/table_metadata_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestDataDrivenTableMetadataCacheUpdater(t *testing.T) {
return res
case "update-cache":
updater := newTableMetadataUpdater(s.InternalExecutor().(isql.Executor))
updated, err := updater.updateCache(ctx)
updated, err := updater.updateCache(ctx, nil)
if err != nil {
return err.Error()
}
Expand Down
52 changes: 48 additions & 4 deletions pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package tablemetadatacache
import (
"context"
"fmt"
"math"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand All @@ -26,9 +27,18 @@ import (
io_prometheus_client "github.com/prometheus/client_model/go"
)

const (
// updateJobProgressBatchThreshold is used as the threshold when
// determining whether job progress should be incrementally updated
updateJobProgressBatchThreshold = 10
// batchesPerProgressUpdate is used to determine how many batches
// should be processed before updating the job progress
batchesPerProgressUpdate = 5
)

// updateJobExecFn specifies the function that is run on each iteration of the
// table metadata update job. It can be overriden in tests.
var updateJobExecFn func(context.Context, isql.Executor) error = updateTableMetadataCache
var updateJobExecFn func(context.Context, isql.Executor, *tableMetadataUpdateJobResumer) error = updateTableMetadataCache

type tableMetadataUpdateJobResumer struct {
job *jobs.Job
Expand Down Expand Up @@ -106,7 +116,7 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int
// Run table metadata update job.
metrics.NumRuns.Inc(1)
j.markAsRunning(ctx)
if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor()); err != nil {
if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor(), j); err != nil {
log.Errorf(ctx, "error running table metadata update job: %s", err)
}
j.markAsCompleted(ctx)
Expand All @@ -116,6 +126,12 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int
}
}

func (j *tableMetadataUpdateJobResumer) updateProgress(ctx context.Context, progress float32) {
if err := j.job.NoTxn().FractionProgressed(ctx, jobs.FractionUpdater(progress)); err != nil {
log.Errorf(ctx, "Error updating table metadata log progress. error: %s", err.Error())
}
}

// markAsRunning updates the last_start_time and status fields in the job's progress
// details and writes the job progress as a JSON string to the running status.
func (j *tableMetadataUpdateJobResumer) markAsRunning(ctx context.Context) {
Expand All @@ -126,6 +142,9 @@ func (j *tableMetadataUpdateJobResumer) markAsRunning(ctx context.Context) {
progress.RunningStatus = fmt.Sprintf("Job started at %s", now)
details.LastStartTime = &now
details.Status = jobspb.UpdateTableMetadataCacheProgress_RUNNING
progress.Progress = &jobspb.Progress_FractionCompleted{
FractionCompleted: 0,
}
ju.UpdateProgress(progress)
return nil
}); err != nil {
Expand All @@ -143,6 +162,9 @@ func (j *tableMetadataUpdateJobResumer) markAsCompleted(ctx context.Context) {
progress.RunningStatus = fmt.Sprintf("Job completed at %s", now)
details.LastCompletedTime = &now
details.Status = jobspb.UpdateTableMetadataCacheProgress_NOT_RUNNING
progress.Progress = &jobspb.Progress_FractionCompleted{
FractionCompleted: 1.0,
}
ju.UpdateProgress(progress)
return nil
}); err != nil {
Expand All @@ -152,18 +174,40 @@ func (j *tableMetadataUpdateJobResumer) markAsCompleted(ctx context.Context) {

// updateTableMetadataCache performs a full update of system.table_metadata by collecting
// metadata from the system.namespace, system.descriptor tables and table span stats RPC.
func updateTableMetadataCache(ctx context.Context, ie isql.Executor) error {
func updateTableMetadataCache(
ctx context.Context, ie isql.Executor, resumer *tableMetadataUpdateJobResumer,
) error {
updater := newTableMetadataUpdater(ie)
if _, err := updater.pruneCache(ctx); err != nil {
log.Errorf(ctx, "failed to prune table metadata cache: %s", err.Error())
}

resumer.updateProgress(ctx, .01)
// We'll use the updated ret val in a follow-up to update metrics and
// fractional job progress.
_, err := updater.updateCache(ctx)
_, err := updater.updateCache(ctx, updateCacheCallbackFn(resumer))
return err
}

// updateCacheCallbackFn creates a onBatchUpdateCallback closure with access to tableMetadataUpdateJobResumer.
var updateCacheCallbackFn func(resumer *tableMetadataUpdateJobResumer) onBatchUpdateCallback = func(resumer *tableMetadataUpdateJobResumer) onBatchUpdateCallback {
batchNum := 0
return func(ctx context.Context, totalRowsToUpdate int, rowsUpdated int) {
batchNum++
// Only update progress if the update will happen in more than 10 batches.
estimatedBatches := int(math.Ceil(float64(totalRowsToUpdate) / float64(tableBatchSize)))
if batchNum == estimatedBatches {
resumer.updateProgress(ctx, .99)
} else if estimatedBatches < updateJobProgressBatchThreshold {
// If there are less than 10 batches expected to run, don't bother updating progress till the end.
return
} else if batchNum%batchesPerProgressUpdate == 0 && estimatedBatches > 0 {
progress := float32(batchNum) / float32(estimatedBatches)
resumer.updateProgress(ctx, progress)
}
}
}

// OnFailOrCancel implements jobs.Resumer.
func (j *tableMetadataUpdateJobResumer) OnFailOrCancel(
ctx context.Context, execCtx interface{}, jobErr error,
Expand Down
123 changes: 120 additions & 3 deletions pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"errors"
"fmt"
"math"
"strings"
"testing"
"time"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
tablemetadatacacheutil "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand Down Expand Up @@ -77,20 +79,135 @@ WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJ
return errors.New("job hasn't run yet")
}
row := conn.Query(t,
`SELECT running_status FROM crdb_internal.jobs WHERE job_id = $1 AND running_status IS NOT NULL`,
`SELECT running_status, fraction_completed FROM crdb_internal.jobs WHERE job_id = $1 AND running_status IS NOT NULL`,
jobs.UpdateTableMetadataCacheJobID)
if !row.Next() {
return errors.New("last_run_time not updated")
}
var runningStatus string
require.NoError(t, row.Scan(&runningStatus))
var fractionCompleted float32
require.NoError(t, row.Scan(&runningStatus, &fractionCompleted))
if !strings.Contains(runningStatus, "Job completed at") {
return errors.New("running_status not updated")
}
if fractionCompleted != 1.0 {
return errors.New("fraction_completed not updated")
}
return nil
})
}

func TestUpdateTableMetadataCacheProgressUpdate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderStress(t, "too slow under stress")
ctx := context.Background()

var conn *sqlutils.SQLRunner
jobCompleteCh := make(chan interface{})
updateCacheCh := make(chan interface{})
defer close(jobCompleteCh)
defer close(updateCacheCh)

oldUpdateCacheCallbackFn := updateCacheCallbackFn

getJobProgress := func() float32 {
var fractionCompleted float32
conn.QueryRow(t, `SELECT fraction_completed FROM crdb_internal.jobs WHERE job_id = $1`,
jobs.UpdateTableMetadataCacheJobID).Scan(&fractionCompleted)
return fractionCompleted
}

var preUpdateProgress float32
var postUpdateProgress float32
defer testutils.TestingHook(&updateCacheCallbackFn, func(resumer *tableMetadataUpdateJobResumer) onBatchUpdateCallback {
cb := oldUpdateCacheCallbackFn(resumer)
return func(ctx context.Context, totalRowsToUpdate int, rowsUpdated int) {
preUpdateProgress = getJobProgress()
cb(ctx, totalRowsToUpdate, rowsUpdated)
postUpdateProgress = getJobProgress()
updateCacheCh <- struct{}{}
}
})()

// waitForJob will return once the signal has been sent to the jobCompleteCh. Before returning it asserts that
// the progress has been updated at least an expected number of times, which is done by comparing the progress
// before and after each batch update has completed in the update table metadata cache job.
// We don't care so much about what the progress value was update to, just that it has been increased.
waitForJob := func(expectedProgressUpdates int, timeout time.Duration) error {
ctxWithCancel, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
updateProgressCount := 0
for {
select {
case <-jobCompleteCh:
require.GreaterOrEqual(t, updateProgressCount, expectedProgressUpdates)
require.Equal(t, float32(1.0), getJobProgress())
return nil
case <-updateCacheCh:
if postUpdateProgress > preUpdateProgress {
updateProgressCount++
}
case <-ctxWithCancel.Done():
return fmt.Errorf("timed out waiting for job run ")
}
}
}

// Server setup.
s := serverutils.StartServerOnly(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
TableMetadata: &tablemetadatacacheutil.TestingKnobs{
OnJobComplete: func() {
jobCompleteCh <- struct{}{}
},
},
}})
defer s.Stopper().Stop(ctx)

conn = sqlutils.MakeSQLRunner(s.ApplicationLayer().SQLConn(t))

// Wait for the job to be claimed by a node.
testutils.SucceedsSoon(t, func() error {
row := conn.Query(t, `
SELECT claim_instance_id, status FROM system.jobs
WHERE id = $1 AND claim_instance_id IS NOT NULL
AND status = 'running'`,
jobs.UpdateTableMetadataCacheJobID)
if !row.Next() {
return errors.New("no node has claimed the job")
}
return nil
})

// Trigger update table metadata cache job with the base system tables.
_, err := s.GetStatusClient(t).UpdateTableMetadataCache(ctx,
&serverpb.UpdateTableMetadataCacheRequest{Local: false})
require.NoError(t, err)
// The job is only expected to update the progress once, since the amount of tables is under the
// threshold of how many tables should exist before considering updating progress incrementally .
require.NoError(t, waitForJob(1, 5*time.Second))

randomTableCount := 500
conn.Exec(t, fmt.Sprintf(`
SELECT crdb_internal.generate_test_objects('{
"names": "random_pattern",
"counts": [%d],
"randomize_columns": true
}'::JSONB);
`, randomTableCount))

var systemTablesCount int
conn.QueryRow(t, `SELECT count(*)::INT from [show tables from system]`).Scan(&systemTablesCount)

expectedProgressUpdates := int(math.Ceil(float64(randomTableCount+systemTablesCount)/float64(tableBatchSize)) / float64(batchesPerProgressUpdate))
_, err = s.GetStatusClient(t).UpdateTableMetadataCache(ctx,
&serverpb.UpdateTableMetadataCacheRequest{Local: false})
require.NoError(t, err)
require.NoError(t, waitForJob(expectedProgressUpdates, 5*time.Second))

}

// TestUpdateTableMetadataCacheAutomaticUpdates tests that:
// 1. The update table metadata cache job does not run automatically by default.
// 2. The job runs automatically on the data validity interval when automatic
Expand All @@ -109,7 +226,7 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) {
mockMutex := syncutil.RWMutex{}
jobRunCh := make(chan struct{})
restoreUpdate := testutils.TestingHook(&updateJobExecFn,
func(ctx context.Context, ie isql.Executor) error {
func(ctx context.Context, ie isql.Executor, resumer *tableMetadataUpdateJobResumer) error {
mockMutex.Lock()
defer mockMutex.Unlock()
mockCalls = append(mockCalls, timeutil.Now())
Expand Down

0 comments on commit d396826

Please sign in to comment.