From afcf872a43c2ed6fa0effec9f6dff11c2c95c419 Mon Sep 17 00:00:00 2001 From: Kyle Wong <37189875+kyle-a-wong@users.noreply.github.com> Date: Fri, 13 Sep 2024 17:24:27 -0400 Subject: [PATCH] server: create api to trigger table metadata update job adds a new API v2 endpoint, `POST /api/v2/table_metadata/updatejob`. This new API will trigger the table metadata update job to run and refresh the system.table_metadata table. This will only trigger the job if it isn't already running. If the `?onlyIfStale` query param is provided, the job won't be triggered unless deemed stale, determined by the following setting: `obs.tablemetadata.data_valid_duration` note that `onlyIfStale` with any value other then `false` will be treated as true. Resolves: #128897 Epic: CRDB-37558 Release note: None --- pkg/BUILD.bazel | 1 + pkg/base/testing_knobs.go | 1 + pkg/server/BUILD.bazel | 4 + pkg/server/api_v2_databases_metadata.go | 49 ++++- pkg/server/api_v2_databases_metadata_test.go | 188 +++++++++++++++--- pkg/server/server_sql.go | 5 + pkg/server/status.go | 2 +- pkg/sql/BUILD.bazel | 1 + pkg/sql/exec_util.go | 2 + .../tablemetadatacache/cluster_settings.go | 2 +- .../update_table_metadata_cache_job.go | 16 +- .../update_table_metadata_cache_job_test.go | 4 +- pkg/sql/tablemetadatacache/util/BUILD.bazel | 8 + pkg/sql/tablemetadatacache/util/test_utils.go | 22 ++ 14 files changed, 264 insertions(+), 41 deletions(-) create mode 100644 pkg/sql/tablemetadatacache/util/BUILD.bazel create mode 100644 pkg/sql/tablemetadatacache/util/test_utils.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 4d35a347fb96..0e3d9ac65dc7 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -2289,6 +2289,7 @@ GO_TARGETS = [ "//pkg/sql/syntheticprivilege:syntheticprivilege", "//pkg/sql/syntheticprivilege:syntheticprivilege_test", "//pkg/sql/syntheticprivilegecache:syntheticprivilegecache", + "//pkg/sql/tablemetadatacache/util:util", "//pkg/sql/tablemetadatacache:tablemetadatacache", "//pkg/sql/tablemetadatacache:tablemetadatacache_test", "//pkg/sql/tests:tests", diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index f145195e9cc7..3390f64755cb 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -60,4 +60,5 @@ type TestingKnobs struct { TenantCapabilitiesTestingKnobs ModuleTestingKnobs TableStatsKnobs ModuleTestingKnobs Insights ModuleTestingKnobs + TableMetadata ModuleTestingKnobs } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 176ce1967609..a19f70bcdd33 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -279,6 +279,8 @@ go_library( "//pkg/sql/stats", "//pkg/sql/stmtdiagnostics", "//pkg/sql/syntheticprivilegecache", + "//pkg/sql/tablemetadatacache", + "//pkg/sql/tablemetadatacache/util", "//pkg/sql/ttl/ttljob", "//pkg/sql/ttl/ttlschedule", "//pkg/storage", @@ -530,6 +532,8 @@ go_test( "//pkg/sql/sessiondata", "//pkg/sql/sqlstats", "//pkg/sql/sqlstats/insights", + "//pkg/sql/tablemetadatacache", + "//pkg/sql/tablemetadatacache/util", "//pkg/storage", "//pkg/storage/disk", "//pkg/storage/fs", diff --git a/pkg/server/api_v2_databases_metadata.go b/pkg/server/api_v2_databases_metadata.go index e4dfdd62bcc0..3364f174d56b 100644 --- a/pkg/server/api_v2_databases_metadata.go +++ b/pkg/server/api_v2_databases_metadata.go @@ -21,11 +21,16 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/apiutil" "github.com/cockroachdb/cockroach/pkg/server/authserver" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/srverrors" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache" "github.com/cockroachdb/cockroach/pkg/util/safesql" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -36,6 +41,7 @@ const ( pageNumKey = "pageNum" pageSizeKey = "pageSize" storeIdKey = "storeId" + onlyIfStaleKey = "onlyIfStale" defaultPageSize = 10 defaultPageNum = 1 ) @@ -668,14 +674,13 @@ func (a *apiV2Server) getDBMetadata( // responses: // // "200": -// description: A tmUpdateJobStatusResponse +// description: A tmUpdateJobStatusResponse for GET requests and tmJobTriggeredResponse for POST requests. // "404": // description: Not found if the user doesn't have the correct authorizations func (a *apiV2Server) TableMetadataJob(w http.ResponseWriter, r *http.Request) { ctx := r.Context() ctx = a.sqlServer.AnnotateCtx(ctx) sqlUser := authserver.UserFromHTTPAuthInfoContext(ctx) - authorized, err := a.updateTableMetadataJobAuthorized(ctx, sqlUser) if err != nil { srverrors.APIV2InternalError(ctx, err, w) @@ -691,6 +696,13 @@ func (a *apiV2Server) TableMetadataJob(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: resp, err = a.getTableMetadataUpdateJobStatus(ctx) + case http.MethodPost: + // onlyIfStale will be true if the query param exists and has any value other than "false" + var onlyIfStale bool + if r.URL.Query().Has(onlyIfStaleKey) { + onlyIfStale = r.URL.Query().Get(onlyIfStaleKey) != "false" + } + resp, err = a.triggerTableMetadataUpdateJob(ctx, onlyIfStale) default: http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) return @@ -753,6 +765,34 @@ func (a *apiV2Server) getTableMetadataUpdateJobStatus( return jobStatus, nil } +// triggerTableMetadataUpdateJob will trigger the table metadata update job if it isn't currently running and if it +// is stale, if onlyIfStale is true. +func (a *apiV2Server) triggerTableMetadataUpdateJob( + ctx context.Context, onlyIfStale bool, +) (tmJobTriggeredResponse, error) { + jobStatus, err := a.getTableMetadataUpdateJobStatus(ctx) + if err != nil { + return tmJobTriggeredResponse{}, err + } + + stalenessDuration := tablemetadatacache.DataValidDurationSetting.Get(&a.sqlServer.execCfg.Settings.SV) + if onlyIfStale && jobStatus.LastCompletedTime != nil && timeutil.Since(*jobStatus.LastCompletedTime) < stalenessDuration { + return tmJobTriggeredResponse{JobTriggered: false, Message: "Not enough time has elapsed since last job run"}, nil + } + + _, err = a.status.UpdateTableMetadataCache(ctx, &serverpb.UpdateTableMetadataCacheRequest{Local: false}) + if err != nil { + st, ok := status.FromError(err) + if ok { + if st.Code() == codes.Aborted { + return tmJobTriggeredResponse{JobTriggered: false, Message: "Job is already running"}, nil + } + } + return tmJobTriggeredResponse{}, err + } + return tmJobTriggeredResponse{JobTriggered: true, Message: "Job triggered successfully"}, nil +} + func (a *apiV2Server) updateTableMetadataJobAuthorized( ctx context.Context, sqlUser username.SQLUsername, ) (isAuthorized bool, retErr error) { @@ -833,3 +873,8 @@ type tmUpdateJobStatusResponse struct { LastCompletedTime *time.Time `json:"last_completed_time"` LastUpdatedTime *time.Time `json:"last_updated_time"` } + +type tmJobTriggeredResponse struct { + JobTriggered bool `json:"job_triggered"` + Message string `json:"message"` +} diff --git a/pkg/server/api_v2_databases_metadata_test.go b/pkg/server/api_v2_databases_metadata_test.go index fa4582e04b6d..9a4a86e5bcd1 100644 --- a/pkg/server/api_v2_databases_metadata_test.go +++ b/pkg/server/api_v2_databases_metadata_test.go @@ -21,12 +21,20 @@ import ( "slices" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache" + tablemetadatacache_util "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -45,7 +53,7 @@ func descendingComparator[T any](comparator func(first, second T) int) func(firs func TestGetTableMetadata(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{}) + testCluster := serverutils.StartCluster(t, 1, base.TestClusterArgs{}) ctx := context.Background() defer testCluster.Stopper().Stop(ctx) conn := testCluster.ServerConn(0) @@ -73,7 +81,7 @@ func TestGetTableMetadata(t *testing.T) { require.Contains(t, string(respBytes), "Method Not Allowed") }) t.Run("unknown db id", func(t *testing.T) { - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath("/api/v2/table_metadata/?dbId=1000").String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath("/api/v2/table_metadata/?dbId=1000").String(), http.MethodGet) require.Len(t, mdResp.Results, 0) require.Equal(t, int64(0), mdResp.PaginationInfo.TotalResults) }) @@ -84,14 +92,14 @@ func TestGetTableMetadata(t *testing.T) { // Assert that the test user gets an empty response for db 1 uri1 := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d", db1Id) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String(), http.MethodGet) require.Empty(t, mdResp.Results) require.Zero(t, mdResp.PaginationInfo.TotalResults) // Assert that the test user gets an empty response for db 2 uri2 := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d", db2Id) - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String(), http.MethodGet) require.Empty(t, mdResp.Results) require.Zero(t, mdResp.PaginationInfo.TotalResults) @@ -99,13 +107,13 @@ func TestGetTableMetadata(t *testing.T) { // Grant connect access to DB 1 _, e := conn.Exec(fmt.Sprintf("GRANT CONNECT ON DATABASE %s TO %s", db1Name, sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String(), http.MethodGet) // Assert that user now see results for db1 require.NotEmpty(t, mdResp.Results) require.True(t, slices.IsSortedFunc(mdResp.Results, defaultTMComparator)) // Assert that the test user gets an empty response for db 2 - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String(), http.MethodGet) require.Empty(t, mdResp.Results) require.Zero(t, mdResp.PaginationInfo.TotalResults) @@ -113,7 +121,7 @@ func TestGetTableMetadata(t *testing.T) { // Revoke connect access from db1 _, e = conn.Exec(fmt.Sprintf("REVOKE CONNECT ON DATABASE %s FROM %s", db1Name, sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String(), http.MethodGet) // Assert that user no longer sees results from db1 require.Empty(t, mdResp.Results) @@ -122,13 +130,13 @@ func TestGetTableMetadata(t *testing.T) { // Revoke connect access from db1 _, e = conn.Exec(fmt.Sprintf("GRANT admin TO %s", sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String(), http.MethodGet) // Assert that user now see results for db1 require.NotEmpty(t, mdResp.Results) require.True(t, slices.IsSortedFunc(mdResp.Results, defaultTMComparator)) // Assert that user now see results for db1 - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) require.True(t, slices.IsSortedFunc(mdResp.Results, defaultTMComparator)) @@ -211,7 +219,7 @@ func TestGetTableMetadata(t *testing.T) { for _, tt := range sortTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/table_metadata/%s&dbId=%d", tt.queryString, tt.dbId) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) require.True(t, slices.IsSortedFunc(mdResp.Results, tt.comparator)) for _, tbmd := range mdResp.Results { @@ -242,7 +250,7 @@ func TestGetTableMetadata(t *testing.T) { for _, tt := range tableNameTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d&name=%s", tt.dbId, tt.nameFilter) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, int64(tt.expectedCount), mdResp.PaginationInfo.TotalResults) }) @@ -263,7 +271,7 @@ func TestGetTableMetadata(t *testing.T) { for _, tt := range pageTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/table_metadata/%s&dbId=%d", tt.queryString, db1Id) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) require.LessOrEqual(t, len(mdResp.Results), tt.expectedPageSize) require.Equal(t, tt.expectedPageSize, mdResp.PaginationInfo.PageSize) @@ -273,14 +281,14 @@ func TestGetTableMetadata(t *testing.T) { t.Run("large page num", func(t *testing.T) { uri := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d&pageSize=1&pageNum=100", db1Id) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Empty(t, mdResp.Results) }) }) t.Run("filter store id", func(t *testing.T) { storeIds := []int64{1, 2} uri := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d&storeId=%d&storeId=%d", db1Id, storeIds[0], storeIds[1]) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) for _, tmdr := range mdResp.Results { require.Condition(t, func() (success bool) { @@ -313,7 +321,7 @@ func TestGetTableMetadata(t *testing.T) { }) t.Run("no views", func(t *testing.T) { uri := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d&name=%s", db1Id, "view") - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, int64(0), mdResp.PaginationInfo.TotalResults) }) @@ -322,7 +330,7 @@ func TestGetTableMetadata(t *testing.T) { func TestGetDBMetadata(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{}) + testCluster := serverutils.StartCluster(t, 1, base.TestClusterArgs{}) ctx := context.Background() defer testCluster.Stopper().Stop(ctx) conn := testCluster.ServerConn(0) @@ -373,7 +381,7 @@ func TestGetDBMetadata(t *testing.T) { for _, tt := range sortTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/database_metadata/%s", tt.queryString) - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) isSorted := slices.IsSortedFunc(mdResp.Results, tt.comparator) require.True(t, isSorted) @@ -388,7 +396,7 @@ func TestGetDBMetadata(t *testing.T) { // Assert that the test user gets an empty response for db 1 uri := "/api/v2/database_metadata/" - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Empty(t, mdResp.Results) require.Zero(t, mdResp.PaginationInfo.TotalResults) @@ -396,7 +404,7 @@ func TestGetDBMetadata(t *testing.T) { // Grant connect access to DB 1 _, e := conn.Exec(fmt.Sprintf("GRANT CONNECT ON DATABASE %s TO %s", db1Name, sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) // Assert that user now see results for db1 require.Len(t, mdResp.Results, 1) @@ -406,7 +414,7 @@ func TestGetDBMetadata(t *testing.T) { // Revoke connect access from db1 _, e = conn.Exec(fmt.Sprintf("REVOKE CONNECT ON DATABASE %s FROM %s", db1Name, sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) // Assert that user no longer sees results from db1 require.Empty(t, mdResp.Results) @@ -414,7 +422,7 @@ func TestGetDBMetadata(t *testing.T) { // Make user admin _, e = conn.Exec(fmt.Sprintf("GRANT admin TO %s", sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) // Assert that user now see results for all dbs require.Len(t, mdResp.Results, 2) @@ -436,7 +444,7 @@ func TestGetDBMetadata(t *testing.T) { for _, tt := range pageTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/database_metadata/%s", tt.queryString) - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) require.LessOrEqual(t, len(mdResp.Results), tt.expectedPageSize) require.Equal(t, tt.expectedPageSize, mdResp.PaginationInfo.PageSize) @@ -461,7 +469,7 @@ func TestGetDBMetadata(t *testing.T) { for _, tt := range dbtableNameTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/database_metadata/?name=%s", tt.nameFilter) - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, int64(tt.expectedCount), mdResp.PaginationInfo.TotalResults) }) @@ -470,7 +478,7 @@ func TestGetDBMetadata(t *testing.T) { t.Run("filter store id", func(t *testing.T) { storeIds := []int64{8, 9} uri := fmt.Sprintf("/api/v2/database_metadata/?storeId=%d&storeId=%d", storeIds[0], storeIds[1]) - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) for _, dmdr := range mdResp.Results { require.Condition(t, func() (success bool) { return slices.Contains(dmdr.StoreIds, storeIds[0]) || slices.Contains(dmdr.StoreIds, storeIds[1]) @@ -501,7 +509,7 @@ func TestGetDBMetadata(t *testing.T) { }) t.Run("table count only includes tables", func(t *testing.T) { uri := fmt.Sprintf("/api/v2/database_metadata/?name=%s", db1Name) - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, int64(1), mdResp.PaginationInfo.TotalResults) // This count should not include views, materialized views, or sequences @@ -512,7 +520,7 @@ func TestGetDBMetadata(t *testing.T) { func TestGetTableMetadataUpdateJobStatus(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{}) + testCluster := serverutils.StartCluster(t, 1, base.TestClusterArgs{}) ctx := context.Background() defer testCluster.Stopper().Stop(ctx) conn := testCluster.ServerConn(0) @@ -526,29 +534,135 @@ func TestGetTableMetadataUpdateJobStatus(t *testing.T) { userClient, _, err := ts.GetAuthenticatedHTTPClientAndCookie(sessionUsername, false, 1) require.NoError(t, err) - failed := makeApiRequest[interface{}](t, userClient, ts.AdminURL().WithPath(uri).String()) + failed := makeApiRequest[interface{}](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, http.StatusText(http.StatusNotFound), failed) _, e := conn.Exec(fmt.Sprintf("GRANT CONNECT ON DATABASE defaultdb TO %s", sessionUsername.Normalized())) require.NoError(t, e) - mdResp := makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, "NOT_RUNNING", mdResp.CurrentStatus) _, e = conn.Exec(fmt.Sprintf("REVOKE CONNECT ON DATABASE defaultdb FROM %s", sessionUsername.Normalized())) require.NoError(t, e) - failed = makeApiRequest[string](t, userClient, ts.AdminURL().WithPath(uri).String()) + failed = makeApiRequest[string](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, http.StatusText(http.StatusNotFound), failed) _, e = conn.Exec(fmt.Sprintf("GRANT admin TO %s", sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp = makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, "NOT_RUNNING", mdResp.CurrentStatus) }) } -func makeApiRequest[T any](t *testing.T, client http.Client, uri string) (mdResp T) { - req, err := http.NewRequest("GET", uri, nil) +func TestTriggerMetadataUpdateJob(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + jobCompletedChan := make(chan interface{}) + defer close(jobCompletedChan) + skip.UnderStress(t, "too slow under stress") + testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + + Knobs: base.TestingKnobs{ + TableMetadata: &tablemetadatacache_util.TestingKnobs{ + OnJobComplete: func() { + jobCompletedChan <- struct{}{} + }, + }, + }, + }, + }) + ctx := context.Background() + defer testCluster.Stopper().Stop(ctx) + conn := testCluster.ServerConn(0) + defer conn.Close() + ts := testCluster.Server(0) + + testutils.SucceedsSoon(t, func() error { + row, err := conn.Query(` +SELECT claim_instance_id FROM system.jobs +WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJobID) + require.NoError(t, err) + if !row.Next() { + return errors.New("no node has claimed the job") + } + + _, err = testCluster.Server(0).GetStatusClient(t).UpdateTableMetadataCache(ctx, + &serverpb.UpdateTableMetadataCacheRequest{Local: false}) + if err != nil { + return err + } + <-jobCompletedChan + return nil + }) + + client, err := ts.GetAdminHTTPClient() + require.NoError(t, err) + uri := "/api/v2/table_metadata/updatejob/" + url := ts.AdminURL().WithPath(uri).String() + t.Run("job triggered", func(t *testing.T) { + assertJobTriggered(t, client, url, jobCompletedChan) + }) + + t.Run("authorization", func(t *testing.T) { + sessionUsername := username.TestUserName() + userClient, _, err := ts.GetAuthenticatedHTTPClientAndCookie(sessionUsername, false, 1) + require.NoError(t, err) + + // User isn't authorized and will receive a 404 response + failed := makeApiRequest[interface{}](t, userClient, url, http.MethodPost) + require.Equal(t, http.StatusText(http.StatusNotFound), failed) + + _, e := conn.Exec(fmt.Sprintf("GRANT CONNECT ON DATABASE defaultdb TO %s", sessionUsername.Normalized())) + require.NoError(t, e) + + // User is now authorized and will trigger job + assertJobTriggered(t, client, url, jobCompletedChan) + + _, e = conn.Exec(fmt.Sprintf("REVOKE CONNECT ON DATABASE defaultdb FROM %s", sessionUsername.Normalized())) + require.NoError(t, e) + failed = makeApiRequest[interface{}](t, userClient, url, http.MethodPost) + require.Equal(t, http.StatusText(http.StatusNotFound), failed) + + _, e = conn.Exec(fmt.Sprintf("GRANT admin TO %s", sessionUsername.Normalized())) + require.NoError(t, e) + assertJobTriggered(t, client, url, jobCompletedChan) + }) + + t.Run("staleness", func(t *testing.T) { + assertJobTriggered(t, client, url, jobCompletedChan) + // Trigger again should succeed + assertJobTriggered(t, client, url, jobCompletedChan) + + tablemetadatacache.DataValidDurationSetting.Override(ctx, &ts.ClusterSettings().SV, time.Minute) + // call trigger job api with onlyIfStale flag. This shouldn't trigger the job again since a minute hasn't passed + resp := makeApiRequest[tmJobTriggeredResponse]( + t, client, ts.AdminURL().WithPath(uri+"?onlyIfStale").String(), http.MethodPost) + require.Contains(t, resp.Message, "Not enough time has elapsed since last job run") + require.False(t, resp.JobTriggered) + + // onlyIfStale=false won't check DataValidDurationSetting value + assertJobTriggered(t, client, ts.AdminURL().WithPath(uri+"?onlyIfStale=false").String(), jobCompletedChan) + + // onlyIfStale with non "false" value will check DataValidDurationSetting value + resp = makeApiRequest[tmJobTriggeredResponse]( + t, client, ts.AdminURL().WithPath(uri+"?onlyIfStale=somevalue").String(), http.MethodPost) + require.Contains(t, resp.Message, "Not enough time has elapsed since last job run") + require.False(t, resp.JobTriggered) + + // set data_valid_duration to 1ms + tablemetadatacache.DataValidDurationSetting.Override(ctx, &ts.ClusterSettings().SV, time.Millisecond) + // call trigger job api with onlyIfStale flag. This should trigger the job again since 1ms has passed since last + // completion + assertJobTriggered(t, client, ts.AdminURL().WithPath(uri+"?onlyIfStale").String(), jobCompletedChan) + }) +} + +func makeApiRequest[T any]( + t *testing.T, client http.Client, uri string, httpMethod string, +) (mdResp T) { + req, err := http.NewRequest(httpMethod, uri, nil) require.NoError(t, err) resp, err := client.Do(req) require.NoError(t, err) @@ -560,10 +674,18 @@ func makeApiRequest[T any](t *testing.T, client http.Client, uri string) (mdResp if strings.Contains(contentType, "text/plain") { data = []byte(fmt.Sprintf(`"%s"`, strings.TrimSpace(string(data)))) } - require.NoError(t, json.Unmarshal(data, &mdResp)) + err = json.Unmarshal(data, &mdResp) + require.NoError(t, err) return mdResp } +func assertJobTriggered(t *testing.T, client http.Client, url string, c chan interface{}) { + resp := makeApiRequest[tmJobTriggeredResponse](t, client, url, http.MethodPost) + require.Contains(t, resp.Message, "Job triggered successfully") + require.True(t, resp.JobTriggered) + <-c +} + func setupTest(t *testing.T, conn *gosql.DB, db1 string, db2 string) (dbId1 int, dbId2 int) { _, err := conn.Exec(`CREATE DATABASE IF NOT EXISTS ` + db1) require.NoError(t, err) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 61eaa163a765..122088bc6142 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -113,6 +113,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache" + tablemetadatacacheutil "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/ts" @@ -1151,6 +1152,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { tableStatsTestingKnobs = tableStatsKnobs.(*stats.TableStatsTestingKnobs) } + if tableMetadataKnobs := cfg.TestingKnobs.TableMetadata; tableMetadataKnobs != nil { + execCfg.TableMetadataKnobs = tableMetadataKnobs.(*tablemetadatacacheutil.TestingKnobs) + + } // Set up internal memory metrics for use by internal SQL executors. // Don't add them to the registry now because it will be added as part of pgServer metrics. sqlMemMetrics := sql.MakeMemMetrics("sql", cfg.HistogramWindowInterval()) diff --git a/pkg/server/status.go b/pkg/server/status.go index 169e12f430ed..b4b13d7c988b 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -4211,7 +4211,7 @@ func (s *statusServer) localUpdateTableMetadataCache() ( select { case s.updateTableMetadataJobSignal <- struct{}{}: default: - return nil, status.Errorf(codes.Unavailable, "update table metadata cache job is not ready to start execution") + return nil, status.Errorf(codes.Aborted, "update table metadata cache job is not ready to start execution") } return &serverpb.UpdateTableMetadataCacheResponse{}, nil } diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index b99479dacb10..b053698c57fa 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -529,6 +529,7 @@ go_library( "//pkg/sql/storageparam/tablestorageparam", "//pkg/sql/syntheticprivilege", "//pkg/sql/syntheticprivilegecache", + "//pkg/sql/tablemetadatacache/util", "//pkg/sql/ttl/ttlbase", "//pkg/sql/types", "//pkg/sql/vtable", diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index a1b90e201594..57e2591e8245 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -108,6 +108,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache" + tablemetadatacache_util "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/upgrade" @@ -1306,6 +1307,7 @@ type ExecutorConfig struct { ExternalConnectionTestingKnobs *externalconn.TestingKnobs EventLogTestingKnobs *EventLogTestingKnobs InsightsTestingKnobs *insights.TestingKnobs + TableMetadataKnobs *tablemetadatacache_util.TestingKnobs // HistogramWindowInterval is (server.Config).HistogramWindowInterval. HistogramWindowInterval time.Duration diff --git a/pkg/sql/tablemetadatacache/cluster_settings.go b/pkg/sql/tablemetadatacache/cluster_settings.go index 532b621f0ddc..97dfe723bc89 100644 --- a/pkg/sql/tablemetadatacache/cluster_settings.go +++ b/pkg/sql/tablemetadatacache/cluster_settings.go @@ -26,7 +26,7 @@ var tableMetadataCacheAutoUpdatesEnabled = settings.RegisterBoolSetting( false, settings.WithPublic) -var tableMetadataCacheValidDuration = settings.RegisterDurationSetting( +var DataValidDurationSetting = settings.RegisterDurationSetting( settings.ApplicationLevel, "obs.tablemetadata.data_valid_duration", "the duration for which the data in system.table_metadata is considered valid", diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go index 12a69ffd119e..7a4fe86bbb37 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go @@ -44,7 +44,13 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int execCtx := execCtxI.(sql.JobExecContext) metrics := execCtx.ExecCfg().JobRegistry.MetricsStruct(). JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(TableMetadataUpdateJobMetrics) + var onJobStartKnob func() + var onJobCompleteKnob func() + if execCtx.ExecCfg().TableMetadataKnobs != nil { + onJobStartKnob = execCtx.ExecCfg().TableMetadataKnobs.OnJobStart + onJobCompleteKnob = execCtx.ExecCfg().TableMetadataKnobs.OnJobComplete + } // We must reset the job's num runs to 0 so that it doesn't get // delayed by the job system's exponential backoff strategy. if err := j.job.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { @@ -68,7 +74,7 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int default: } }) - tableMetadataCacheValidDuration.SetOnChange(&settings.SV, func(_ context.Context) { + DataValidDurationSetting.SetOnChange(&settings.SV, func(_ context.Context) { select { case scheduleSettingsCh <- struct{}{}: default: @@ -78,7 +84,7 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int var timer timeutil.Timer for { if tableMetadataCacheAutoUpdatesEnabled.Get(&settings.SV) { - timer.Reset(tableMetadataCacheValidDuration.Get(&settings.SV)) + timer.Reset(DataValidDurationSetting.Get(&settings.SV)) } select { case <-scheduleSettingsCh: @@ -93,6 +99,9 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int return ctx.Err() } + if onJobStartKnob != nil { + onJobStartKnob() + } // Run table metadata update job. metrics.NumRuns.Inc(1) j.markAsRunning(ctx) @@ -100,6 +109,9 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int log.Errorf(ctx, "error running table metadata update job: %s", err) } j.markAsCompleted(ctx) + if onJobCompleteKnob != nil { + onJobCompleteKnob() + } } } diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go index f57552afa96e..19e93570c106 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go @@ -163,7 +163,7 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) { t.Run("AutomaticUpdatesEnabled", func(t *testing.T) { conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = true`) - tableMetadataCacheValidDuration.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) + DataValidDurationSetting.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) err := waitForJobRuns(3, 10*time.Second) require.NoError(t, err, "Job should have run at least 3 times") mockCallsCount := getMockCallCount() @@ -185,7 +185,7 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) { t.Run("AutomaticUpdatesDisabled", func(t *testing.T) { conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = f`) - tableMetadataCacheValidDuration.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) + DataValidDurationSetting.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) initialCount := getMockCallCount() err := waitForJobRuns(1, 200*time.Millisecond) require.Error(t, err, "Job should not run after being disabled") diff --git a/pkg/sql/tablemetadatacache/util/BUILD.bazel b/pkg/sql/tablemetadatacache/util/BUILD.bazel new file mode 100644 index 000000000000..e1c5e7377bde --- /dev/null +++ b/pkg/sql/tablemetadatacache/util/BUILD.bazel @@ -0,0 +1,8 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "util", + srcs = ["test_utils.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util", + visibility = ["//visibility:public"], +) diff --git a/pkg/sql/tablemetadatacache/util/test_utils.go b/pkg/sql/tablemetadatacache/util/test_utils.go new file mode 100644 index 000000000000..014e51b395a7 --- /dev/null +++ b/pkg/sql/tablemetadatacache/util/test_utils.go @@ -0,0 +1,22 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tablemetadatacacheutil + +// TestingKnobs provides hooks into the table metadata cache job +type TestingKnobs struct { + // onJobStart is called when the job starts + OnJobStart func() + // onJobComplete is called when the job completes + OnJobComplete func() +} + +// ModuleTestingKnobs implements base.ModuleTestingKnobs interface. +func (*TestingKnobs) ModuleTestingKnobs() {}