From 09d4c437ac54a7e86448e51087a99ada1610a120 Mon Sep 17 00:00:00 2001 From: Kyle Wong <37189875+kyle-a-wong@users.noreply.github.com> Date: Thu, 12 Sep 2024 16:53:32 -0400 Subject: [PATCH] server: create api for table metadata job status adds a new API v2 endpoint, `GET /api/v2/table_metadata/updatejob`. This new API enables the ability to retrieve the status of the table metadata update job. the job status includes: - current_status - progress - last_start_time - the start time of the last job run - last_completed_time - the time of the last completed job run - last_updated_time - the time the job progress was last updated Resolves: #128896 Epic: CRDB-37558 Release note: None --- pkg/jobs/jobspb/jobs.proto | 14 +- pkg/server/api_v2.go | 1 + pkg/server/api_v2_databases_metadata.go | 144 +++++++++++++++++- pkg/server/api_v2_databases_metadata_test.go | 38 +++++ .../update_table_metadata_cache_job.go | 38 +++-- .../update_table_metadata_cache_job_test.go | 4 +- 6 files changed, 222 insertions(+), 17 deletions(-) diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index b1953f4318ad..0e4754eadd73 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -1350,11 +1350,21 @@ message MVCCStatisticsJobProgress { message UpdateTableMetadataCacheDetails {} message UpdateTableMetadataCacheProgress { + enum Status { + NOT_RUNNING = 0; + RUNNING = 1; + } // The time at which the job last started a run. - google.protobuf.Timestamp last_run_time = 1 [ - (gogoproto.nullable) = false, + google.protobuf.Timestamp last_start_time = 1 [ + (gogoproto.nullable) = true, (gogoproto.stdtime) = true ]; + // The time at which the job last completed a run. + google.protobuf.Timestamp last_completed_time = 2 [ + (gogoproto.nullable) = true, + (gogoproto.stdtime) = true + ]; + Status status = 3; } message ImportRollbackDetails { diff --git a/pkg/server/api_v2.go b/pkg/server/api_v2.go index f67626690bd3..4bf99cb089a4 100644 --- a/pkg/server/api_v2.go +++ b/pkg/server/api_v2.go @@ -189,6 +189,7 @@ func registerRoutes( {"sql/", a.execSQL, true, authserver.RegularRole, true}, {"database_metadata/", a.GetDBMetadata, true, authserver.RegularRole, true}, {"table_metadata/", a.GetTableMetadata, true, authserver.RegularRole, true}, + {"table_metadata/updatejob/", a.TableMetadataJob, true, authserver.RegularRole, true}, } // For all routes requiring authentication, have the outer mux (a.mux) diff --git a/pkg/server/api_v2_databases_metadata.go b/pkg/server/api_v2_databases_metadata.go index fc3553568c81..943efbdbcfb0 100644 --- a/pkg/server/api_v2_databases_metadata.go +++ b/pkg/server/api_v2_databases_metadata.go @@ -17,6 +17,7 @@ import ( "strings" "time" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/apiutil" "github.com/cockroachdb/cockroach/pkg/server/authserver" @@ -298,7 +299,7 @@ func (a *apiV2Server) getTableMetadata( it, err := a.sqlServer.internalExecutor.QueryIteratorEx( ctx, "get-table-metadata", nil, /* txn */ - sessiondata.InternalExecutorOverride{}, + sessiondata.NodeUserSessionDataOverride, // We only want to show the grants on the database. query.String(), query.QueryArguments()..., ) @@ -599,7 +600,7 @@ func (a *apiV2Server) getDBMetadata( it, err := a.admin.internalExecutor.QueryIteratorEx( ctx, "get-database-metadata", nil, /* txn */ - sessiondata.InternalExecutorOverride{}, + sessiondata.NodeUserSessionDataOverride, query.String(), query.QueryArguments()..., ) @@ -657,9 +658,138 @@ func (a *apiV2Server) getDBMetadata( return dbms, totalRowCount, nil } +// TableMetadataJob routes to the necessary receiver based on the http method of the request. Requires +// The user making the request must have the CONNECT database grant on at least one database or admin privilege. +// --- +// produces: +// - application/json +// +// responses: +// +// "200": +// description: A tmUpdateJobStatusResponse +// "404": +// description: Not found if the user doesn't have the correct authorizations +func (a *apiV2Server) TableMetadataJob(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + ctx = a.sqlServer.AnnotateCtx(ctx) + sqlUser := authserver.UserFromHTTPAuthInfoContext(ctx) + + authorized, err := a.updateTableMetadataJobAuthorized(ctx, sqlUser) + if err != nil { + srverrors.APIV2InternalError(ctx, err, w) + return + } + + if !authorized { + http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) + return + } + + var resp interface{} + switch r.Method { + case http.MethodGet: + resp, err = a.getTableMetadataUpdateJobStatus(ctx) + default: + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + if err != nil { + srverrors.APIV2InternalError(ctx, err, w) + return + } + + apiutil.WriteJSONResponse(ctx, w, 200, resp) +} + +// getTableMetadataUpdateJobStatus gets the status of the table metadata update job. The requesting user +// must have the CONNECT privilege to at least one database on the cluster, or the admin role. If the user +// doesn't have the necessary authorization, an "empty" response will be returned. +func (a *apiV2Server) getTableMetadataUpdateJobStatus( + ctx context.Context, +) (jobStatus tmUpdateJobStatusResponse, retErr error) { + query := safesql.NewQuery() + query.Append(` + SELECT + TIMESTAMPTZ 'epoch' + (progress->>'modifiedMicros' || ' microseconds')::interval as last_modified, + coalesce((progress->>'fractionCompleted')::FLOAT, 0) as fraction_completed, + (progress->'tableMetadataCache'->>'lastCompletedTime')::TIMESTAMPTZ as last_completed_time, + (progress->'tableMetadataCache'->>'lastStartTime')::TIMESTAMPTZ as last_start_time, + coalesce(progress->'tableMetadataCache'->>'status', 'NOT_RUNNING') as status + FROM ( + SELECT crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Progress', progress) as progress + FROM crdb_internal.system_jobs + WHERE id = $ + ) +`, jobs.UpdateTableMetadataCacheJobID) + + row, colTypes, err := a.sqlServer.internalExecutor.QueryRowExWithCols( + ctx, "get-tableMetadataUpdateJob-status", nil, /* txn */ + sessiondata.NodeUserSessionDataOverride, + query.String(), query.QueryArguments()..., + ) + + if err != nil { + return jobStatus, err + } + scanner := makeResultScanner(colTypes) + if err = scanner.Scan(row, "fraction_completed", &jobStatus.Progress); err != nil { + return jobStatus, err + } + if err = scanner.Scan(row, "last_modified", &jobStatus.LastUpdatedTime); err != nil { + return jobStatus, err + } + if err = scanner.Scan(row, "last_start_time", &jobStatus.LastStartTime); err != nil { + return jobStatus, err + } + if err = scanner.Scan(row, "last_completed_time", &jobStatus.LastCompletedTime); err != nil { + return jobStatus, err + } + if err = scanner.Scan(row, "status", &jobStatus.CurrentStatus); err != nil { + return jobStatus, err + } + return jobStatus, nil +} + +func (a *apiV2Server) updateTableMetadataJobAuthorized( + ctx context.Context, sqlUser username.SQLUsername, +) (isAuthorized bool, retErr error) { + query := safesql.NewQuery() + sqlUserStr := sqlUser.Normalized() + query.Append(` + SELECT count(*) + FROM ( + SELECT 1 FROM system.role_members WHERE member = $ AND role = 'admin' + UNION + SELECT 1 + FROM "".crdb_internal.cluster_database_privileges cdp + WHERE cdp.grantee = $ + AND cdp.privilege_type = 'CONNECT' + ) +`, sqlUserStr, sqlUserStr) + + row, colTypes, err := a.sqlServer.internalExecutor.QueryRowExWithCols( + ctx, "check-updatejob-authorized", nil, /* txn */ + sessiondata.InternalExecutorOverride{}, + query.String(), query.QueryArguments()..., + ) + + if err != nil { + return false, err + } + + scanner := makeResultScanner(colTypes) + var count int64 + if err = scanner.Scan(row, "count", &count); err != nil { + return false, err + } + return count > 0, nil +} + type PaginatedResponse[T any] struct { Results T `json:"results"` - PaginationInfo paginationInfo `json:"paginationInfo"` + PaginationInfo paginationInfo `json:"pagination_info"` } type paginationInfo struct { @@ -694,3 +824,11 @@ type dbMetadata struct { StoreIds []int64 `json:"store_ids"` LastUpdated time.Time `json:"last_updated"` } + +type tmUpdateJobStatusResponse struct { + CurrentStatus string `json:"current_status"` + Progress float32 `json:"progress"` + LastStartTime *time.Time `json:"last_start_time"` + LastCompletedTime *time.Time `json:"last_completed_time"` + LastUpdatedTime *time.Time `json:"last_updated_time"` +} diff --git a/pkg/server/api_v2_databases_metadata_test.go b/pkg/server/api_v2_databases_metadata_test.go index 36f5fee0f7f4..e0ee0d7f337b 100644 --- a/pkg/server/api_v2_databases_metadata_test.go +++ b/pkg/server/api_v2_databases_metadata_test.go @@ -495,6 +495,44 @@ func TestGetDBMetadata(t *testing.T) { }) } +func TestGetTableMetadataUpdateJobStatus(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{}) + ctx := context.Background() + defer testCluster.Stopper().Stop(ctx) + conn := testCluster.ServerConn(0) + defer conn.Close() + + ts := testCluster.Server(0) + + t.Run("authorization", func(t *testing.T) { + uri := "/api/v2/table_metadata/updatejob/" + sessionUsername := username.TestUserName() + userClient, _, err := ts.GetAuthenticatedHTTPClientAndCookie(sessionUsername, false, 1) + require.NoError(t, err) + + failed := makeApiRequest[interface{}](t, userClient, ts.AdminURL().WithPath(uri).String()) + require.Equal(t, http.StatusText(http.StatusNotFound), failed) + + _, e := conn.Exec(fmt.Sprintf("GRANT CONNECT ON DATABASE defaultdb TO %s", sessionUsername.Normalized())) + require.NoError(t, e) + + mdResp := makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String()) + require.Equal(t, "NOT_RUNNING", mdResp.CurrentStatus) + + _, e = conn.Exec(fmt.Sprintf("REVOKE CONNECT ON DATABASE defaultdb FROM %s", sessionUsername.Normalized())) + require.NoError(t, e) + failed = makeApiRequest[string](t, userClient, ts.AdminURL().WithPath(uri).String()) + require.Equal(t, http.StatusText(http.StatusNotFound), failed) + + _, e = conn.Exec(fmt.Sprintf("GRANT admin TO %s", sessionUsername.Normalized())) + require.NoError(t, e) + mdResp = makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String()) + require.Equal(t, "NOT_RUNNING", mdResp.CurrentStatus) + }) +} + func makeApiRequest[T any](t *testing.T, client http.Client, uri string) (mdResp T) { req, err := http.NewRequest("GET", uri, nil) require.NoError(t, err) diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go index 6e527b57a8bf..12a69ffd119e 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go @@ -95,24 +95,42 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int // Run table metadata update job. metrics.NumRuns.Inc(1) - j.updateLastRunTime(ctx) + j.markAsRunning(ctx) if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor()); err != nil { log.Errorf(ctx, "error running table metadata update job: %s", err) } + j.markAsCompleted(ctx) } } -// updateLastRunTime updates the last_run_time field in the job's progress +// markAsRunning updates the last_start_time and status fields in the job's progress // details and writes the job progress as a JSON string to the running status. -func (j *tableMetadataUpdateJobResumer) updateLastRunTime(ctx context.Context) { +func (j *tableMetadataUpdateJobResumer) markAsRunning(ctx context.Context) { if err := j.job.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { - lrt := timeutil.Now() - ju.UpdateProgress(&jobspb.Progress{ - RunningStatus: fmt.Sprintf("last metadata update at %s", lrt), - Details: &jobspb.Progress_TableMetadataCache{ - TableMetadataCache: &jobspb.UpdateTableMetadataCacheProgress{LastRunTime: lrt}, - }, - }) + progress := md.Progress + details := progress.Details.(*jobspb.Progress_TableMetadataCache).TableMetadataCache + now := timeutil.Now() + progress.RunningStatus = fmt.Sprintf("Job started at %s", now) + details.LastStartTime = &now + details.Status = jobspb.UpdateTableMetadataCacheProgress_RUNNING + ju.UpdateProgress(progress) + return nil + }); err != nil { + log.Errorf(ctx, "%s", err.Error()) + } +} + +// markAsCompleted updates the last_completed_time and status fields in the job's progress +// details and writes the job progress as a JSON string to the running status. +func (j *tableMetadataUpdateJobResumer) markAsCompleted(ctx context.Context) { + if err := j.job.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + progress := md.Progress + details := progress.Details.(*jobspb.Progress_TableMetadataCache).TableMetadataCache + now := timeutil.Now() + progress.RunningStatus = fmt.Sprintf("Job completed at %s", now) + details.LastCompletedTime = &now + details.Status = jobspb.UpdateTableMetadataCacheProgress_NOT_RUNNING + ju.UpdateProgress(progress) return nil }); err != nil { log.Errorf(ctx, "%s", err.Error()) diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go index f90730c833e6..f57552afa96e 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go @@ -84,8 +84,8 @@ WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJ } var runningStatus string require.NoError(t, row.Scan(&runningStatus)) - if !strings.Contains(runningStatus, "last metadata update at") { - return errors.New("last run time not updated") + if !strings.Contains(runningStatus, "Job completed at") { + return errors.New("running_status not updated") } return nil })