Skip to content

Commit

Permalink
server: create api for table metadata job status
Browse files Browse the repository at this point in the history
adds a new API v2 endpoint,
`GET /api/v2/table_metadata/updatejob`.

This new API enables the ability to retrieve the status
of the table metadata update job. the job status includes:
 - current_status
 - progress
 - last_start_time
   - the start time of the last job run
 - last_completed_time
   - the time of the last completed job run
 - last_updated_time
   - the time the job progress was last updated

Resolves: #128896
Epic: CRDB-37558
Release note: None
  • Loading branch information
kyle-a-wong committed Sep 13, 2024
1 parent 66edff7 commit 09d4c43
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 17 deletions.
14 changes: 12 additions & 2 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1350,11 +1350,21 @@ message MVCCStatisticsJobProgress {

message UpdateTableMetadataCacheDetails {}
message UpdateTableMetadataCacheProgress {
enum Status {
NOT_RUNNING = 0;
RUNNING = 1;
}
// The time at which the job last started a run.
google.protobuf.Timestamp last_run_time = 1 [
(gogoproto.nullable) = false,
google.protobuf.Timestamp last_start_time = 1 [
(gogoproto.nullable) = true,
(gogoproto.stdtime) = true
];
// The time at which the job last completed a run.
google.protobuf.Timestamp last_completed_time = 2 [
(gogoproto.nullable) = true,
(gogoproto.stdtime) = true
];
Status status = 3;
}

message ImportRollbackDetails {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/api_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func registerRoutes(
{"sql/", a.execSQL, true, authserver.RegularRole, true},
{"database_metadata/", a.GetDBMetadata, true, authserver.RegularRole, true},
{"table_metadata/", a.GetTableMetadata, true, authserver.RegularRole, true},
{"table_metadata/updatejob/", a.TableMetadataJob, true, authserver.RegularRole, true},
}

// For all routes requiring authentication, have the outer mux (a.mux)
Expand Down
144 changes: 141 additions & 3 deletions pkg/server/api_v2_databases_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/apiutil"
"github.com/cockroachdb/cockroach/pkg/server/authserver"
Expand Down Expand Up @@ -298,7 +299,7 @@ func (a *apiV2Server) getTableMetadata(

it, err := a.sqlServer.internalExecutor.QueryIteratorEx(
ctx, "get-table-metadata", nil, /* txn */
sessiondata.InternalExecutorOverride{},
sessiondata.NodeUserSessionDataOverride,
// We only want to show the grants on the database.
query.String(), query.QueryArguments()...,
)
Expand Down Expand Up @@ -599,7 +600,7 @@ func (a *apiV2Server) getDBMetadata(

it, err := a.admin.internalExecutor.QueryIteratorEx(
ctx, "get-database-metadata", nil, /* txn */
sessiondata.InternalExecutorOverride{},
sessiondata.NodeUserSessionDataOverride,
query.String(), query.QueryArguments()...,
)

Expand Down Expand Up @@ -657,9 +658,138 @@ func (a *apiV2Server) getDBMetadata(
return dbms, totalRowCount, nil
}

// TableMetadataJob routes to the necessary receiver based on the http method of the request. Requires
// The user making the request must have the CONNECT database grant on at least one database or admin privilege.
// ---
// produces:
// - application/json
//
// responses:
//
// "200":
// description: A tmUpdateJobStatusResponse
// "404":
// description: Not found if the user doesn't have the correct authorizations
func (a *apiV2Server) TableMetadataJob(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx = a.sqlServer.AnnotateCtx(ctx)
sqlUser := authserver.UserFromHTTPAuthInfoContext(ctx)

authorized, err := a.updateTableMetadataJobAuthorized(ctx, sqlUser)
if err != nil {
srverrors.APIV2InternalError(ctx, err, w)
return
}

if !authorized {
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}

var resp interface{}
switch r.Method {
case http.MethodGet:
resp, err = a.getTableMetadataUpdateJobStatus(ctx)
default:
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}

if err != nil {
srverrors.APIV2InternalError(ctx, err, w)
return
}

apiutil.WriteJSONResponse(ctx, w, 200, resp)
}

// getTableMetadataUpdateJobStatus gets the status of the table metadata update job. The requesting user
// must have the CONNECT privilege to at least one database on the cluster, or the admin role. If the user
// doesn't have the necessary authorization, an "empty" response will be returned.
func (a *apiV2Server) getTableMetadataUpdateJobStatus(
ctx context.Context,
) (jobStatus tmUpdateJobStatusResponse, retErr error) {
query := safesql.NewQuery()
query.Append(`
SELECT
TIMESTAMPTZ 'epoch' + (progress->>'modifiedMicros' || ' microseconds')::interval as last_modified,
coalesce((progress->>'fractionCompleted')::FLOAT, 0) as fraction_completed,
(progress->'tableMetadataCache'->>'lastCompletedTime')::TIMESTAMPTZ as last_completed_time,
(progress->'tableMetadataCache'->>'lastStartTime')::TIMESTAMPTZ as last_start_time,
coalesce(progress->'tableMetadataCache'->>'status', 'NOT_RUNNING') as status
FROM (
SELECT crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Progress', progress) as progress
FROM crdb_internal.system_jobs
WHERE id = $
)
`, jobs.UpdateTableMetadataCacheJobID)

row, colTypes, err := a.sqlServer.internalExecutor.QueryRowExWithCols(
ctx, "get-tableMetadataUpdateJob-status", nil, /* txn */
sessiondata.NodeUserSessionDataOverride,
query.String(), query.QueryArguments()...,
)

if err != nil {
return jobStatus, err
}
scanner := makeResultScanner(colTypes)
if err = scanner.Scan(row, "fraction_completed", &jobStatus.Progress); err != nil {
return jobStatus, err
}
if err = scanner.Scan(row, "last_modified", &jobStatus.LastUpdatedTime); err != nil {
return jobStatus, err
}
if err = scanner.Scan(row, "last_start_time", &jobStatus.LastStartTime); err != nil {
return jobStatus, err
}
if err = scanner.Scan(row, "last_completed_time", &jobStatus.LastCompletedTime); err != nil {
return jobStatus, err
}
if err = scanner.Scan(row, "status", &jobStatus.CurrentStatus); err != nil {
return jobStatus, err
}
return jobStatus, nil
}

func (a *apiV2Server) updateTableMetadataJobAuthorized(
ctx context.Context, sqlUser username.SQLUsername,
) (isAuthorized bool, retErr error) {
query := safesql.NewQuery()
sqlUserStr := sqlUser.Normalized()
query.Append(`
SELECT count(*)
FROM (
SELECT 1 FROM system.role_members WHERE member = $ AND role = 'admin'
UNION
SELECT 1
FROM "".crdb_internal.cluster_database_privileges cdp
WHERE cdp.grantee = $
AND cdp.privilege_type = 'CONNECT'
)
`, sqlUserStr, sqlUserStr)

row, colTypes, err := a.sqlServer.internalExecutor.QueryRowExWithCols(
ctx, "check-updatejob-authorized", nil, /* txn */
sessiondata.InternalExecutorOverride{},
query.String(), query.QueryArguments()...,
)

if err != nil {
return false, err
}

scanner := makeResultScanner(colTypes)
var count int64
if err = scanner.Scan(row, "count", &count); err != nil {
return false, err
}
return count > 0, nil
}

type PaginatedResponse[T any] struct {
Results T `json:"results"`
PaginationInfo paginationInfo `json:"paginationInfo"`
PaginationInfo paginationInfo `json:"pagination_info"`
}

type paginationInfo struct {
Expand Down Expand Up @@ -694,3 +824,11 @@ type dbMetadata struct {
StoreIds []int64 `json:"store_ids"`
LastUpdated time.Time `json:"last_updated"`
}

type tmUpdateJobStatusResponse struct {
CurrentStatus string `json:"current_status"`
Progress float32 `json:"progress"`
LastStartTime *time.Time `json:"last_start_time"`
LastCompletedTime *time.Time `json:"last_completed_time"`
LastUpdatedTime *time.Time `json:"last_updated_time"`
}
38 changes: 38 additions & 0 deletions pkg/server/api_v2_databases_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,44 @@ func TestGetDBMetadata(t *testing.T) {
})
}

func TestGetTableMetadataUpdateJobStatus(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{})
ctx := context.Background()
defer testCluster.Stopper().Stop(ctx)
conn := testCluster.ServerConn(0)
defer conn.Close()

ts := testCluster.Server(0)

t.Run("authorization", func(t *testing.T) {
uri := "/api/v2/table_metadata/updatejob/"
sessionUsername := username.TestUserName()
userClient, _, err := ts.GetAuthenticatedHTTPClientAndCookie(sessionUsername, false, 1)
require.NoError(t, err)

failed := makeApiRequest[interface{}](t, userClient, ts.AdminURL().WithPath(uri).String())
require.Equal(t, http.StatusText(http.StatusNotFound), failed)

_, e := conn.Exec(fmt.Sprintf("GRANT CONNECT ON DATABASE defaultdb TO %s", sessionUsername.Normalized()))
require.NoError(t, e)

mdResp := makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String())
require.Equal(t, "NOT_RUNNING", mdResp.CurrentStatus)

_, e = conn.Exec(fmt.Sprintf("REVOKE CONNECT ON DATABASE defaultdb FROM %s", sessionUsername.Normalized()))
require.NoError(t, e)
failed = makeApiRequest[string](t, userClient, ts.AdminURL().WithPath(uri).String())
require.Equal(t, http.StatusText(http.StatusNotFound), failed)

_, e = conn.Exec(fmt.Sprintf("GRANT admin TO %s", sessionUsername.Normalized()))
require.NoError(t, e)
mdResp = makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String())
require.Equal(t, "NOT_RUNNING", mdResp.CurrentStatus)
})
}

func makeApiRequest[T any](t *testing.T, client http.Client, uri string) (mdResp T) {
req, err := http.NewRequest("GET", uri, nil)
require.NoError(t, err)
Expand Down
38 changes: 28 additions & 10 deletions pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,42 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int

// Run table metadata update job.
metrics.NumRuns.Inc(1)
j.updateLastRunTime(ctx)
j.markAsRunning(ctx)
if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor()); err != nil {
log.Errorf(ctx, "error running table metadata update job: %s", err)
}
j.markAsCompleted(ctx)
}
}

// updateLastRunTime updates the last_run_time field in the job's progress
// markAsRunning updates the last_start_time and status fields in the job's progress
// details and writes the job progress as a JSON string to the running status.
func (j *tableMetadataUpdateJobResumer) updateLastRunTime(ctx context.Context) {
func (j *tableMetadataUpdateJobResumer) markAsRunning(ctx context.Context) {
if err := j.job.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
lrt := timeutil.Now()
ju.UpdateProgress(&jobspb.Progress{
RunningStatus: fmt.Sprintf("last metadata update at %s", lrt),
Details: &jobspb.Progress_TableMetadataCache{
TableMetadataCache: &jobspb.UpdateTableMetadataCacheProgress{LastRunTime: lrt},
},
})
progress := md.Progress
details := progress.Details.(*jobspb.Progress_TableMetadataCache).TableMetadataCache
now := timeutil.Now()
progress.RunningStatus = fmt.Sprintf("Job started at %s", now)
details.LastStartTime = &now
details.Status = jobspb.UpdateTableMetadataCacheProgress_RUNNING
ju.UpdateProgress(progress)
return nil
}); err != nil {
log.Errorf(ctx, "%s", err.Error())
}
}

// markAsCompleted updates the last_completed_time and status fields in the job's progress
// details and writes the job progress as a JSON string to the running status.
func (j *tableMetadataUpdateJobResumer) markAsCompleted(ctx context.Context) {
if err := j.job.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
progress := md.Progress
details := progress.Details.(*jobspb.Progress_TableMetadataCache).TableMetadataCache
now := timeutil.Now()
progress.RunningStatus = fmt.Sprintf("Job completed at %s", now)
details.LastCompletedTime = &now
details.Status = jobspb.UpdateTableMetadataCacheProgress_NOT_RUNNING
ju.UpdateProgress(progress)
return nil
}); err != nil {
log.Errorf(ctx, "%s", err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJ
}
var runningStatus string
require.NoError(t, row.Scan(&runningStatus))
if !strings.Contains(runningStatus, "last metadata update at") {
return errors.New("last run time not updated")
if !strings.Contains(runningStatus, "Job completed at") {
return errors.New("running_status not updated")
}
return nil
})
Expand Down

0 comments on commit 09d4c43

Please sign in to comment.