diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 2538c5a031a8..d91dd23f77db 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -2288,6 +2288,7 @@ GO_TARGETS = [ "//pkg/sql/syntheticprivilege:syntheticprivilege", "//pkg/sql/syntheticprivilege:syntheticprivilege_test", "//pkg/sql/syntheticprivilegecache:syntheticprivilegecache", + "//pkg/sql/tablemetadatacache/util:util", "//pkg/sql/tablemetadatacache:tablemetadatacache", "//pkg/sql/tablemetadatacache:tablemetadatacache_test", "//pkg/sql/tests:tests", diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index f145195e9cc7..3390f64755cb 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -60,4 +60,5 @@ type TestingKnobs struct { TenantCapabilitiesTestingKnobs ModuleTestingKnobs TableStatsKnobs ModuleTestingKnobs Insights ModuleTestingKnobs + TableMetadata ModuleTestingKnobs } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 176ce1967609..a19f70bcdd33 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -279,6 +279,8 @@ go_library( "//pkg/sql/stats", "//pkg/sql/stmtdiagnostics", "//pkg/sql/syntheticprivilegecache", + "//pkg/sql/tablemetadatacache", + "//pkg/sql/tablemetadatacache/util", "//pkg/sql/ttl/ttljob", "//pkg/sql/ttl/ttlschedule", "//pkg/storage", @@ -530,6 +532,8 @@ go_test( "//pkg/sql/sessiondata", "//pkg/sql/sqlstats", "//pkg/sql/sqlstats/insights", + "//pkg/sql/tablemetadatacache", + "//pkg/sql/tablemetadatacache/util", "//pkg/storage", "//pkg/storage/disk", "//pkg/storage/fs", diff --git a/pkg/server/api_v2_databases_metadata.go b/pkg/server/api_v2_databases_metadata.go index f7bcf0b7027f..8e3075e5f126 100644 --- a/pkg/server/api_v2_databases_metadata.go +++ b/pkg/server/api_v2_databases_metadata.go @@ -21,11 +21,16 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/apiutil" "github.com/cockroachdb/cockroach/pkg/server/authserver" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/srverrors" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache" "github.com/cockroachdb/cockroach/pkg/util/safesql" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -36,10 +41,19 @@ const ( pageNumKey = "pageNum" pageSizeKey = "pageSize" storeIdKey = "storeId" + onlyIfStaleKey = "onlyIfStale" defaultPageSize = 10 defaultPageNum = 1 ) +type JobStatusMessage string + +const ( + MetadataNotStale JobStatusMessage = "Not enough time has elapsed since last job run" + JobRunning JobStatusMessage = "Job is already running" + JobTriggered JobStatusMessage = "Job triggered successfully" +) + // GetTableMetadata returns a paginated response of table metadata and statistics. This is not a live view of // the table data but instead is cached data that had been precomputed at an earlier time. // @@ -670,14 +684,13 @@ func (a *apiV2Server) getDBMetadata( // responses: // // "200": -// description: A tmUpdateJobStatusResponse +// description: A tmUpdateJobStatusResponse for GET requests and tmJobTriggeredResponse for POST requests. // "404": // description: Not found if the user doesn't have the correct authorizations func (a *apiV2Server) TableMetadataJob(w http.ResponseWriter, r *http.Request) { ctx := r.Context() ctx = a.sqlServer.AnnotateCtx(ctx) sqlUser := authserver.UserFromHTTPAuthInfoContext(ctx) - authorized, err := a.updateTableMetadataJobAuthorized(ctx, sqlUser) if err != nil { srverrors.APIV2InternalError(ctx, err, w) @@ -693,6 +706,13 @@ func (a *apiV2Server) TableMetadataJob(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: resp, err = a.getTableMetadataUpdateJobStatus(ctx) + case http.MethodPost: + // onlyIfStale will be true if the query param exists and has any value other than "false" + var onlyIfStale bool + if r.URL.Query().Has(onlyIfStaleKey) { + onlyIfStale = r.URL.Query().Get(onlyIfStaleKey) != "false" + } + resp, err = a.triggerTableMetadataUpdateJob(ctx, onlyIfStale) default: http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) return @@ -755,6 +775,34 @@ func (a *apiV2Server) getTableMetadataUpdateJobStatus( return jobStatus, nil } +// triggerTableMetadataUpdateJob will trigger the table metadata update job if it isn't currently running and if it +// is stale, if onlyIfStale is true. +func (a *apiV2Server) triggerTableMetadataUpdateJob( + ctx context.Context, onlyIfStale bool, +) (tmJobTriggeredResponse, error) { + jobStatus, err := a.getTableMetadataUpdateJobStatus(ctx) + if err != nil { + return tmJobTriggeredResponse{}, err + } + + stalenessDuration := tablemetadatacache.DataValidDurationSetting.Get(&a.sqlServer.execCfg.Settings.SV) + if onlyIfStale && jobStatus.LastCompletedTime != nil && timeutil.Since(*jobStatus.LastCompletedTime) < stalenessDuration { + return tmJobTriggeredResponse{JobTriggered: false, Message: MetadataNotStale}, nil + } + + _, err = a.status.UpdateTableMetadataCache(ctx, &serverpb.UpdateTableMetadataCacheRequest{Local: false}) + if err != nil { + st, ok := status.FromError(err) + if ok { + if st.Code() == codes.Aborted { + return tmJobTriggeredResponse{JobTriggered: false, Message: JobRunning}, nil + } + } + return tmJobTriggeredResponse{}, err + } + return tmJobTriggeredResponse{JobTriggered: true, Message: JobTriggered}, nil +} + func (a *apiV2Server) updateTableMetadataJobAuthorized( ctx context.Context, sqlUser username.SQLUsername, ) (isAuthorized bool, retErr error) { @@ -835,3 +883,8 @@ type tmUpdateJobStatusResponse struct { LastCompletedTime *time.Time `json:"last_completed_time"` LastUpdatedTime *time.Time `json:"last_updated_time"` } + +type tmJobTriggeredResponse struct { + JobTriggered bool `json:"job_triggered"` + Message JobStatusMessage `json:"message"` +} diff --git a/pkg/server/api_v2_databases_metadata_test.go b/pkg/server/api_v2_databases_metadata_test.go index ca3965961086..6d350bd8aaef 100644 --- a/pkg/server/api_v2_databases_metadata_test.go +++ b/pkg/server/api_v2_databases_metadata_test.go @@ -21,10 +21,15 @@ import ( "slices" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache" + tablemetadatacache_util "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/stretchr/testify/require" @@ -45,7 +50,7 @@ func descendingComparator[T any](comparator func(first, second T) int) func(firs func TestGetTableMetadata(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{}) + testCluster := serverutils.StartCluster(t, 1, base.TestClusterArgs{}) ctx := context.Background() defer testCluster.Stopper().Stop(ctx) conn := testCluster.ServerConn(0) @@ -73,7 +78,7 @@ func TestGetTableMetadata(t *testing.T) { require.Contains(t, string(respBytes), "Method Not Allowed") }) t.Run("unknown db id", func(t *testing.T) { - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath("/api/v2/table_metadata/?dbId=1000").String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath("/api/v2/table_metadata/?dbId=1000").String(), http.MethodGet) require.Len(t, mdResp.Results, 0) require.Equal(t, int64(0), mdResp.PaginationInfo.TotalResults) }) @@ -84,14 +89,14 @@ func TestGetTableMetadata(t *testing.T) { // Assert that the test user gets an empty response for db 1 uri1 := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d", db1Id) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String(), http.MethodGet) require.Empty(t, mdResp.Results) require.Zero(t, mdResp.PaginationInfo.TotalResults) // Assert that the test user gets an empty response for db 2 uri2 := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d", db2Id) - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String(), http.MethodGet) require.Empty(t, mdResp.Results) require.Zero(t, mdResp.PaginationInfo.TotalResults) @@ -99,13 +104,13 @@ func TestGetTableMetadata(t *testing.T) { // Grant connect access to DB 1 _, e := conn.Exec(fmt.Sprintf("GRANT CONNECT ON DATABASE %s TO %s", db1Name, sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String(), http.MethodGet) // Assert that user now see results for db1 require.NotEmpty(t, mdResp.Results) require.True(t, slices.IsSortedFunc(mdResp.Results, defaultTMComparator)) // Assert that the test user gets an empty response for db 2 - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String(), http.MethodGet) require.Empty(t, mdResp.Results) require.Zero(t, mdResp.PaginationInfo.TotalResults) @@ -113,7 +118,7 @@ func TestGetTableMetadata(t *testing.T) { // Revoke connect access from db1 _, e = conn.Exec(fmt.Sprintf("REVOKE CONNECT ON DATABASE %s FROM %s", db1Name, sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String(), http.MethodGet) // Assert that user no longer sees results from db1 require.Empty(t, mdResp.Results) @@ -122,13 +127,13 @@ func TestGetTableMetadata(t *testing.T) { // Revoke connect access from db1 _, e = conn.Exec(fmt.Sprintf("GRANT admin TO %s", sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String(), http.MethodGet) // Assert that user now see results for db1 require.NotEmpty(t, mdResp.Results) require.True(t, slices.IsSortedFunc(mdResp.Results, defaultTMComparator)) // Assert that user now see results for db1 - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) require.True(t, slices.IsSortedFunc(mdResp.Results, defaultTMComparator)) @@ -211,7 +216,7 @@ func TestGetTableMetadata(t *testing.T) { for _, tt := range sortTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/table_metadata/%s&dbId=%d", tt.queryString, tt.dbId) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) require.True(t, slices.IsSortedFunc(mdResp.Results, tt.comparator)) for _, tbmd := range mdResp.Results { @@ -242,7 +247,7 @@ func TestGetTableMetadata(t *testing.T) { for _, tt := range tableNameTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d&name=%s", tt.dbId, tt.nameFilter) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, int64(tt.expectedCount), mdResp.PaginationInfo.TotalResults) }) @@ -263,7 +268,7 @@ func TestGetTableMetadata(t *testing.T) { for _, tt := range pageTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/table_metadata/%s&dbId=%d", tt.queryString, db1Id) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) require.LessOrEqual(t, len(mdResp.Results), tt.expectedPageSize) require.Equal(t, tt.expectedPageSize, mdResp.PaginationInfo.PageSize) @@ -273,14 +278,14 @@ func TestGetTableMetadata(t *testing.T) { t.Run("large page num", func(t *testing.T) { uri := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d&pageSize=1&pageNum=100", db1Id) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Empty(t, mdResp.Results) }) }) t.Run("filter store id", func(t *testing.T) { storeIds := []int64{1, 2} uri := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d&storeId=%d&storeId=%d", db1Id, storeIds[0], storeIds[1]) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) for _, tmdr := range mdResp.Results { require.Condition(t, func() (success bool) { @@ -313,7 +318,7 @@ func TestGetTableMetadata(t *testing.T) { }) t.Run("no views", func(t *testing.T) { uri := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d&name=%s", db1Id, "view") - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, int64(0), mdResp.PaginationInfo.TotalResults) }) @@ -322,7 +327,7 @@ func TestGetTableMetadata(t *testing.T) { func TestGetDBMetadata(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{}) + testCluster := serverutils.StartCluster(t, 1, base.TestClusterArgs{}) ctx := context.Background() defer testCluster.Stopper().Stop(ctx) conn := testCluster.ServerConn(0) @@ -384,7 +389,7 @@ func TestGetDBMetadata(t *testing.T) { for _, tt := range sortTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/database_metadata/%s", tt.queryString) - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) isSorted := slices.IsSortedFunc(mdResp.Results, tt.comparator) require.True(t, isSorted) @@ -399,7 +404,7 @@ func TestGetDBMetadata(t *testing.T) { // Assert that the test user gets an empty response for db 1 uri := "/api/v2/database_metadata/" - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Empty(t, mdResp.Results) require.Zero(t, mdResp.PaginationInfo.TotalResults) @@ -407,7 +412,7 @@ func TestGetDBMetadata(t *testing.T) { // Grant connect access to DB 1 _, e := conn.Exec(fmt.Sprintf("GRANT CONNECT ON DATABASE %s TO %s", db1Name, sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) // Assert that user now see results for db1 require.Len(t, mdResp.Results, 1) @@ -417,7 +422,7 @@ func TestGetDBMetadata(t *testing.T) { // Revoke connect access from db1 _, e = conn.Exec(fmt.Sprintf("REVOKE CONNECT ON DATABASE %s FROM %s", db1Name, sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) // Assert that user no longer sees results from db1 require.Empty(t, mdResp.Results) @@ -425,7 +430,7 @@ func TestGetDBMetadata(t *testing.T) { // Make user admin _, e = conn.Exec(fmt.Sprintf("GRANT admin TO %s", sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) // Assert that user now see results for all dbs (new_test_db_1, new_test_db_2, system, postgres, and defaultdb) require.Len(t, mdResp.Results, 5) @@ -447,7 +452,7 @@ func TestGetDBMetadata(t *testing.T) { for _, tt := range pageTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/database_metadata/%s", tt.queryString) - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) require.LessOrEqual(t, len(mdResp.Results), tt.expectedPageSize) require.Equal(t, tt.expectedPageSize, mdResp.PaginationInfo.PageSize) @@ -472,7 +477,7 @@ func TestGetDBMetadata(t *testing.T) { for _, tt := range dbtableNameTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/database_metadata/?name=%s", tt.nameFilter) - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, int64(tt.expectedCount), mdResp.PaginationInfo.TotalResults) }) @@ -481,7 +486,7 @@ func TestGetDBMetadata(t *testing.T) { t.Run("filter store id", func(t *testing.T) { storeIds := []int64{8, 9} uri := fmt.Sprintf("/api/v2/database_metadata/?storeId=%d&storeId=%d", storeIds[0], storeIds[1]) - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) for _, dmdr := range mdResp.Results { require.Condition(t, func() (success bool) { return slices.Contains(dmdr.StoreIds, storeIds[0]) || slices.Contains(dmdr.StoreIds, storeIds[1]) @@ -512,7 +517,7 @@ func TestGetDBMetadata(t *testing.T) { }) t.Run("table count only includes tables", func(t *testing.T) { uri := fmt.Sprintf("/api/v2/database_metadata/?name=%s", db1Name) - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, int64(1), mdResp.PaginationInfo.TotalResults) // This count should not include views, materialized views, or sequences @@ -521,7 +526,7 @@ func TestGetDBMetadata(t *testing.T) { t.Run("empty database", func(t *testing.T) { mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, - ts.AdminURL().WithPath("/api/v2/database_metadata/?name=defaultdb").String()) + ts.AdminURL().WithPath("/api/v2/database_metadata/?name=defaultdb").String(), http.MethodGet) require.Equal(t, int64(1), mdResp.PaginationInfo.TotalResults) }) @@ -530,7 +535,7 @@ func TestGetDBMetadata(t *testing.T) { func TestGetTableMetadataUpdateJobStatus(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{}) + testCluster := serverutils.StartCluster(t, 1, base.TestClusterArgs{}) ctx := context.Background() defer testCluster.Stopper().Stop(ctx) conn := testCluster.ServerConn(0) @@ -544,29 +549,121 @@ func TestGetTableMetadataUpdateJobStatus(t *testing.T) { userClient, _, err := ts.GetAuthenticatedHTTPClientAndCookie(sessionUsername, false, 1) require.NoError(t, err) - failed := makeApiRequest[interface{}](t, userClient, ts.AdminURL().WithPath(uri).String()) + failed := makeApiRequest[interface{}](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, http.StatusText(http.StatusNotFound), failed) _, e := conn.Exec(fmt.Sprintf("GRANT CONNECT ON DATABASE defaultdb TO %s", sessionUsername.Normalized())) require.NoError(t, e) - mdResp := makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, "NOT_RUNNING", mdResp.CurrentStatus) _, e = conn.Exec(fmt.Sprintf("REVOKE CONNECT ON DATABASE defaultdb FROM %s", sessionUsername.Normalized())) require.NoError(t, e) - failed = makeApiRequest[string](t, userClient, ts.AdminURL().WithPath(uri).String()) + failed = makeApiRequest[string](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, http.StatusText(http.StatusNotFound), failed) _, e = conn.Exec(fmt.Sprintf("GRANT admin TO %s", sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp = makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, "NOT_RUNNING", mdResp.CurrentStatus) }) } -func makeApiRequest[T any](t *testing.T, client http.Client, uri string) (mdResp T) { - req, err := http.NewRequest("GET", uri, nil) +func TestTriggerMetadataUpdateJob(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderStress(t, "too slow under stress") + jobCompletedChan := make(chan interface{}) + jobReadyChan := make(chan interface{}) + defer close(jobCompletedChan) + defer close(jobReadyChan) + testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + + Knobs: base.TestingKnobs{ + TableMetadata: &tablemetadatacache_util.TestingKnobs{ + OnJobReady: func() { + jobReadyChan <- struct{}{} + }, + OnJobComplete: func() { + jobCompletedChan <- struct{}{} + }, + }, + }, + }, + }) + ctx := context.Background() + defer testCluster.Stopper().Stop(ctx) + conn := testCluster.ServerConn(0) + defer conn.Close() + runner := sqlutils.MakeSQLRunner(conn) + ts := testCluster.Server(0) + + client, err := ts.GetAdminHTTPClient() + require.NoError(t, err) + uri := "/api/v2/table_metadata/updatejob/" + url := ts.AdminURL().WithPath(uri).String() + <-jobReadyChan + t.Run("job triggered", func(t *testing.T) { + assertJobTriggered(t, client, url, jobCompletedChan) + }) + + t.Run("authorization", func(t *testing.T) { + sessionUsername := username.TestUserName() + userClient, _, err := ts.GetAuthenticatedHTTPClientAndCookie(sessionUsername, false, 1) + require.NoError(t, err) + + // User isn't authorized and will receive a 404 response + failed := makeApiRequest[interface{}](t, userClient, url, http.MethodPost) + require.Equal(t, http.StatusText(http.StatusNotFound), failed) + + runner.Exec(t, fmt.Sprintf("GRANT CONNECT ON DATABASE defaultdb TO %s", sessionUsername.Normalized())) + + // User is now authorized and will trigger job + assertJobTriggered(t, client, url, jobCompletedChan) + + runner.Exec(t, fmt.Sprintf("REVOKE CONNECT ON DATABASE defaultdb FROM %s", sessionUsername.Normalized())) + failed = makeApiRequest[interface{}](t, userClient, url, http.MethodPost) + require.Equal(t, http.StatusText(http.StatusNotFound), failed) + + runner.Exec(t, fmt.Sprintf("GRANT admin TO %s", sessionUsername.Normalized())) + assertJobTriggered(t, client, url, jobCompletedChan) + }) + + t.Run("staleness", func(t *testing.T) { + assertJobTriggered(t, client, url, jobCompletedChan) + // Trigger again should succeed + assertJobTriggered(t, client, url, jobCompletedChan) + + tablemetadatacache.DataValidDurationSetting.Override(ctx, &ts.ClusterSettings().SV, time.Minute) + // call trigger job api with onlyIfStale flag. This shouldn't trigger the job again since a minute hasn't passed + resp := makeApiRequest[tmJobTriggeredResponse]( + t, client, ts.AdminURL().WithPath(uri+"?onlyIfStale").String(), http.MethodPost) + require.Contains(t, resp.Message, "Not enough time has elapsed since last job run") + require.False(t, resp.JobTriggered) + + // onlyIfStale=false won't check DataValidDurationSetting value + assertJobTriggered(t, client, ts.AdminURL().WithPath(uri+"?onlyIfStale=false").String(), jobCompletedChan) + + // onlyIfStale with non "false" value will check DataValidDurationSetting value + resp = makeApiRequest[tmJobTriggeredResponse]( + t, client, ts.AdminURL().WithPath(uri+"?onlyIfStale=somevalue").String(), http.MethodPost) + require.Contains(t, resp.Message, "Not enough time has elapsed since last job run") + require.False(t, resp.JobTriggered) + + // set data_valid_duration to 1ms + tablemetadatacache.DataValidDurationSetting.Override(ctx, &ts.ClusterSettings().SV, time.Millisecond) + // call trigger job api with onlyIfStale flag. This should trigger the job again since 1ms has passed since last + // completion + assertJobTriggered(t, client, ts.AdminURL().WithPath(uri+"?onlyIfStale").String(), jobCompletedChan) + }) +} + +func makeApiRequest[T any]( + t *testing.T, client http.Client, uri string, httpMethod string, +) (mdResp T) { + req, err := http.NewRequest(httpMethod, uri, nil) require.NoError(t, err) resp, err := client.Do(req) require.NoError(t, err) @@ -578,10 +675,18 @@ func makeApiRequest[T any](t *testing.T, client http.Client, uri string) (mdResp if strings.Contains(contentType, "text/plain") { data = []byte(fmt.Sprintf(`"%s"`, strings.TrimSpace(string(data)))) } - require.NoError(t, json.Unmarshal(data, &mdResp)) + err = json.Unmarshal(data, &mdResp) + require.NoError(t, err) return mdResp } +func assertJobTriggered(t *testing.T, client http.Client, url string, c chan interface{}) { + resp := makeApiRequest[tmJobTriggeredResponse](t, client, url, http.MethodPost) + require.Contains(t, resp.Message, "Job triggered successfully") + require.True(t, resp.JobTriggered) + <-c +} + func setupTest(t *testing.T, conn *gosql.DB, db1 string, db2 string) (dbId1 int, dbId2 int) { _, err := conn.Exec(`CREATE DATABASE IF NOT EXISTS ` + db1) require.NoError(t, err) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index e084dfe64e63..1d8b25ac6cfc 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -113,6 +113,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache" + tablemetadatacacheutil "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/ts" @@ -1151,6 +1152,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { tableStatsTestingKnobs = tableStatsKnobs.(*stats.TableStatsTestingKnobs) } + if tableMetadataKnobs := cfg.TestingKnobs.TableMetadata; tableMetadataKnobs != nil { + execCfg.TableMetadataKnobs = tableMetadataKnobs.(*tablemetadatacacheutil.TestingKnobs) + + } // Set up internal memory metrics for use by internal SQL executors. // Don't add them to the registry now because it will be added as part of pgServer metrics. sqlMemMetrics := sql.MakeMemMetrics("sql", cfg.HistogramWindowInterval()) diff --git a/pkg/server/status.go b/pkg/server/status.go index 169e12f430ed..b4b13d7c988b 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -4211,7 +4211,7 @@ func (s *statusServer) localUpdateTableMetadataCache() ( select { case s.updateTableMetadataJobSignal <- struct{}{}: default: - return nil, status.Errorf(codes.Unavailable, "update table metadata cache job is not ready to start execution") + return nil, status.Errorf(codes.Aborted, "update table metadata cache job is not ready to start execution") } return &serverpb.UpdateTableMetadataCacheResponse{}, nil } diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index b99479dacb10..b053698c57fa 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -529,6 +529,7 @@ go_library( "//pkg/sql/storageparam/tablestorageparam", "//pkg/sql/syntheticprivilege", "//pkg/sql/syntheticprivilegecache", + "//pkg/sql/tablemetadatacache/util", "//pkg/sql/ttl/ttlbase", "//pkg/sql/types", "//pkg/sql/vtable", diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 607dbbb15d82..c2b94bdf8a52 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -108,6 +108,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache" + tablemetadatacache_util "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/upgrade" @@ -1309,6 +1310,7 @@ type ExecutorConfig struct { ExternalConnectionTestingKnobs *externalconn.TestingKnobs EventLogTestingKnobs *EventLogTestingKnobs InsightsTestingKnobs *insights.TestingKnobs + TableMetadataKnobs *tablemetadatacache_util.TestingKnobs // HistogramWindowInterval is (server.Config).HistogramWindowInterval. HistogramWindowInterval time.Duration diff --git a/pkg/sql/tablemetadatacache/cluster_settings.go b/pkg/sql/tablemetadatacache/cluster_settings.go index 532b621f0ddc..97dfe723bc89 100644 --- a/pkg/sql/tablemetadatacache/cluster_settings.go +++ b/pkg/sql/tablemetadatacache/cluster_settings.go @@ -26,7 +26,7 @@ var tableMetadataCacheAutoUpdatesEnabled = settings.RegisterBoolSetting( false, settings.WithPublic) -var tableMetadataCacheValidDuration = settings.RegisterDurationSetting( +var DataValidDurationSetting = settings.RegisterDurationSetting( settings.ApplicationLevel, "obs.tablemetadata.data_valid_duration", "the duration for which the data in system.table_metadata is considered valid", diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go index 12a69ffd119e..824a4b3ac718 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go @@ -38,13 +38,17 @@ var _ jobs.Resumer = (*tableMetadataUpdateJobResumer)(nil) // Resume is part of the jobs.Resumer interface. func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI interface{}) error { - log.Infof(ctx, "starting table metadata update job") j.job.MarkIdle(true) execCtx := execCtxI.(sql.JobExecContext) metrics := execCtx.ExecCfg().JobRegistry.MetricsStruct(). JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(TableMetadataUpdateJobMetrics) - + var onJobStartKnob, onJobCompleteKnob, onJobReady func() + if execCtx.ExecCfg().TableMetadataKnobs != nil { + onJobStartKnob = execCtx.ExecCfg().TableMetadataKnobs.OnJobStart + onJobCompleteKnob = execCtx.ExecCfg().TableMetadataKnobs.OnJobComplete + onJobReady = execCtx.ExecCfg().TableMetadataKnobs.OnJobReady + } // We must reset the job's num runs to 0 so that it doesn't get // delayed by the job system's exponential backoff strategy. if err := j.job.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { @@ -68,7 +72,7 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int default: } }) - tableMetadataCacheValidDuration.SetOnChange(&settings.SV, func(_ context.Context) { + DataValidDurationSetting.SetOnChange(&settings.SV, func(_ context.Context) { select { case scheduleSettingsCh <- struct{}{}: default: @@ -76,9 +80,12 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int }) var timer timeutil.Timer + if onJobReady != nil { + onJobReady() + } for { if tableMetadataCacheAutoUpdatesEnabled.Get(&settings.SV) { - timer.Reset(tableMetadataCacheValidDuration.Get(&settings.SV)) + timer.Reset(DataValidDurationSetting.Get(&settings.SV)) } select { case <-scheduleSettingsCh: @@ -93,6 +100,9 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int return ctx.Err() } + if onJobStartKnob != nil { + onJobStartKnob() + } // Run table metadata update job. metrics.NumRuns.Inc(1) j.markAsRunning(ctx) @@ -100,6 +110,9 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int log.Errorf(ctx, "error running table metadata update job: %s", err) } j.markAsCompleted(ctx) + if onJobCompleteKnob != nil { + onJobCompleteKnob() + } } } diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go index f57552afa96e..19e93570c106 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go @@ -163,7 +163,7 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) { t.Run("AutomaticUpdatesEnabled", func(t *testing.T) { conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = true`) - tableMetadataCacheValidDuration.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) + DataValidDurationSetting.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) err := waitForJobRuns(3, 10*time.Second) require.NoError(t, err, "Job should have run at least 3 times") mockCallsCount := getMockCallCount() @@ -185,7 +185,7 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) { t.Run("AutomaticUpdatesDisabled", func(t *testing.T) { conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = f`) - tableMetadataCacheValidDuration.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) + DataValidDurationSetting.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) initialCount := getMockCallCount() err := waitForJobRuns(1, 200*time.Millisecond) require.Error(t, err, "Job should not run after being disabled") diff --git a/pkg/sql/tablemetadatacache/util/BUILD.bazel b/pkg/sql/tablemetadatacache/util/BUILD.bazel new file mode 100644 index 000000000000..e1c5e7377bde --- /dev/null +++ b/pkg/sql/tablemetadatacache/util/BUILD.bazel @@ -0,0 +1,8 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "util", + srcs = ["test_utils.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache/util", + visibility = ["//visibility:public"], +) diff --git a/pkg/sql/tablemetadatacache/util/test_utils.go b/pkg/sql/tablemetadatacache/util/test_utils.go new file mode 100644 index 000000000000..edbb14289083 --- /dev/null +++ b/pkg/sql/tablemetadatacache/util/test_utils.go @@ -0,0 +1,24 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tablemetadatacacheutil + +// TestingKnobs provides hooks into the table metadata cache job +type TestingKnobs struct { + // onJobResume is called when the job is ready + OnJobReady func() + // onJobStart is called when the job starts + OnJobStart func() + // onJobComplete is called when the job completes + OnJobComplete func() +} + +// ModuleTestingKnobs implements base.ModuleTestingKnobs interface. +func (*TestingKnobs) ModuleTestingKnobs() {}