diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 176ce1967609..07b3ae98d38c 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -279,6 +279,7 @@ go_library( "//pkg/sql/stats", "//pkg/sql/stmtdiagnostics", "//pkg/sql/syntheticprivilegecache", + "//pkg/sql/tablemetadatacache", "//pkg/sql/ttl/ttljob", "//pkg/sql/ttl/ttlschedule", "//pkg/storage", @@ -530,6 +531,7 @@ go_test( "//pkg/sql/sessiondata", "//pkg/sql/sqlstats", "//pkg/sql/sqlstats/insights", + "//pkg/sql/tablemetadatacache", "//pkg/storage", "//pkg/storage/disk", "//pkg/storage/fs", diff --git a/pkg/server/api_v2_databases_metadata.go b/pkg/server/api_v2_databases_metadata.go index 943efbdbcfb0..6da85773928c 100644 --- a/pkg/server/api_v2_databases_metadata.go +++ b/pkg/server/api_v2_databases_metadata.go @@ -21,9 +21,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/apiutil" "github.com/cockroachdb/cockroach/pkg/server/authserver" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/srverrors" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache" "github.com/cockroachdb/cockroach/pkg/util/safesql" "github.com/cockroachdb/errors" ) @@ -36,6 +38,7 @@ const ( pageNumKey = "pageNum" pageSizeKey = "pageSize" storeIdKey = "storeId" + onlyIfStaleKey = "onlyIfStale" defaultPageSize = 10 defaultPageNum = 1 ) @@ -667,7 +670,7 @@ func (a *apiV2Server) getDBMetadata( // responses: // // "200": -// description: A tmUpdateJobStatusResponse +// description: A tmUpdateJobStatusResponse for GET requests and tmJobTriggeredResponse for POST requests. // "404": // description: Not found if the user doesn't have the correct authorizations func (a *apiV2Server) TableMetadataJob(w http.ResponseWriter, r *http.Request) { @@ -690,6 +693,13 @@ func (a *apiV2Server) TableMetadataJob(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: resp, err = a.getTableMetadataUpdateJobStatus(ctx) + case http.MethodPost: + // onlyIfStale will be true if the query param exists and has any value other than "false" + var onlyIfStale bool + if r.URL.Query().Has(onlyIfStaleKey) { + onlyIfStale = r.URL.Query().Get(onlyIfStaleKey) != "false" + } + resp, err = a.triggerTableMetadataUpdateJob(ctx, onlyIfStale) default: http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) return @@ -752,6 +762,31 @@ func (a *apiV2Server) getTableMetadataUpdateJobStatus( return jobStatus, nil } +// triggerTableMetadataUpdateJob will trigger the table metadata update job if it isn't currently running and if it +// is stale, if onlyIfStale is true. +func (a *apiV2Server) triggerTableMetadataUpdateJob( + ctx context.Context, onlyIfStale bool, +) (tmJobTriggeredResponse, error) { + status, err := a.getTableMetadataUpdateJobStatus(ctx) + if err != nil { + return tmJobTriggeredResponse{}, err + } + if status.CurrentStatus != "NOT_RUNNING" { + return tmJobTriggeredResponse{JobTriggered: false, Message: "Job is already running"}, nil + } + stalenessDuration := tablemetadatacache.DataValidDurationSetting.Get(&a.sqlServer.execCfg.Settings.SV) + if onlyIfStale && status.LastCompletedTime != nil && time.Since(*status.LastCompletedTime) < stalenessDuration { + return tmJobTriggeredResponse{JobTriggered: false, Message: "Not enough time has elapsed since last job run"}, nil + } + + _, err = a.status.UpdateTableMetadataCache(ctx, &serverpb.UpdateTableMetadataCacheRequest{Local: false}) + if err != nil { + return tmJobTriggeredResponse{}, err + } + + return tmJobTriggeredResponse{JobTriggered: true, Message: "Job triggered successfully"}, nil +} + func (a *apiV2Server) updateTableMetadataJobAuthorized( ctx context.Context, sqlUser username.SQLUsername, ) (isAuthorized bool, retErr error) { @@ -832,3 +867,8 @@ type tmUpdateJobStatusResponse struct { LastCompletedTime *time.Time `json:"last_completed_time"` LastUpdatedTime *time.Time `json:"last_updated_time"` } + +type tmJobTriggeredResponse struct { + JobTriggered bool `json:"job_triggered"` + Message string `json:"message"` +} diff --git a/pkg/server/api_v2_databases_metadata_test.go b/pkg/server/api_v2_databases_metadata_test.go index e0ee0d7f337b..f2c4776db508 100644 --- a/pkg/server/api_v2_databases_metadata_test.go +++ b/pkg/server/api_v2_databases_metadata_test.go @@ -21,12 +21,19 @@ import ( "slices" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -73,7 +80,7 @@ func TestGetTableMetadata(t *testing.T) { require.Contains(t, string(respBytes), "Method Not Allowed") }) t.Run("unknown db id", func(t *testing.T) { - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath("/api/v2/table_metadata/?dbId=1000").String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath("/api/v2/table_metadata/?dbId=1000").String(), http.MethodGet) require.Len(t, mdResp.Results, 0) require.Equal(t, int64(0), mdResp.PaginationInfo.TotalResults) }) @@ -84,14 +91,14 @@ func TestGetTableMetadata(t *testing.T) { // Assert that the test user gets an empty response for db 1 uri1 := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d", db1Id) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String(), http.MethodGet) require.Empty(t, mdResp.Results) require.Zero(t, mdResp.PaginationInfo.TotalResults) // Assert that the test user gets an empty response for db 2 uri2 := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d", db2Id) - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String(), http.MethodGet) require.Empty(t, mdResp.Results) require.Zero(t, mdResp.PaginationInfo.TotalResults) @@ -99,13 +106,13 @@ func TestGetTableMetadata(t *testing.T) { // Grant connect access to DB 1 _, e := conn.Exec(fmt.Sprintf("GRANT CONNECT ON DATABASE %s TO %s", db1Name, sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String(), http.MethodGet) // Assert that user now see results for db1 require.NotEmpty(t, mdResp.Results) require.True(t, slices.IsSortedFunc(mdResp.Results, defaultTMComparator)) // Assert that the test user gets an empty response for db 2 - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String(), http.MethodGet) require.Empty(t, mdResp.Results) require.Zero(t, mdResp.PaginationInfo.TotalResults) @@ -113,7 +120,7 @@ func TestGetTableMetadata(t *testing.T) { // Revoke connect access from db1 _, e = conn.Exec(fmt.Sprintf("REVOKE CONNECT ON DATABASE %s FROM %s", db1Name, sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String(), http.MethodGet) // Assert that user no longer sees results from db1 require.Empty(t, mdResp.Results) @@ -122,13 +129,13 @@ func TestGetTableMetadata(t *testing.T) { // Revoke connect access from db1 _, e = conn.Exec(fmt.Sprintf("GRANT admin TO %s", sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri1).String(), http.MethodGet) // Assert that user now see results for db1 require.NotEmpty(t, mdResp.Results) require.True(t, slices.IsSortedFunc(mdResp.Results, defaultTMComparator)) // Assert that user now see results for db1 - mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String()) + mdResp = makeApiRequest[PaginatedResponse[[]tableMetadata]](t, userClient, ts.AdminURL().WithPath(uri2).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) require.True(t, slices.IsSortedFunc(mdResp.Results, defaultTMComparator)) @@ -211,7 +218,7 @@ func TestGetTableMetadata(t *testing.T) { for _, tt := range sortTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/table_metadata/%s&dbId=%d", tt.queryString, tt.dbId) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) require.True(t, slices.IsSortedFunc(mdResp.Results, tt.comparator)) for _, tbmd := range mdResp.Results { @@ -242,7 +249,7 @@ func TestGetTableMetadata(t *testing.T) { for _, tt := range tableNameTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d&name=%s", tt.dbId, tt.nameFilter) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, int64(tt.expectedCount), mdResp.PaginationInfo.TotalResults) }) @@ -263,7 +270,7 @@ func TestGetTableMetadata(t *testing.T) { for _, tt := range pageTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/table_metadata/%s&dbId=%d", tt.queryString, db1Id) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) require.LessOrEqual(t, len(mdResp.Results), tt.expectedPageSize) require.Equal(t, tt.expectedPageSize, mdResp.PaginationInfo.PageSize) @@ -273,14 +280,14 @@ func TestGetTableMetadata(t *testing.T) { t.Run("large page num", func(t *testing.T) { uri := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d&pageSize=1&pageNum=100", db1Id) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Empty(t, mdResp.Results) }) }) t.Run("filter store id", func(t *testing.T) { storeIds := []int64{1, 2} uri := fmt.Sprintf("/api/v2/table_metadata/?dbId=%d&storeId=%d&storeId=%d", db1Id, storeIds[0], storeIds[1]) - mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]tableMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) for _, tmdr := range mdResp.Results { require.Condition(t, func() (success bool) { @@ -367,7 +374,7 @@ func TestGetDBMetadata(t *testing.T) { for _, tt := range sortTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/database_metadata/%s", tt.queryString) - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) isSorted := slices.IsSortedFunc(mdResp.Results, tt.comparator) require.True(t, isSorted) @@ -382,7 +389,7 @@ func TestGetDBMetadata(t *testing.T) { // Assert that the test user gets an empty response for db 1 uri := "/api/v2/database_metadata/" - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Empty(t, mdResp.Results) require.Zero(t, mdResp.PaginationInfo.TotalResults) @@ -390,7 +397,7 @@ func TestGetDBMetadata(t *testing.T) { // Grant connect access to DB 1 _, e := conn.Exec(fmt.Sprintf("GRANT CONNECT ON DATABASE %s TO %s", db1Name, sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) // Assert that user now see results for db1 require.Len(t, mdResp.Results, 1) @@ -400,7 +407,7 @@ func TestGetDBMetadata(t *testing.T) { // Revoke connect access from db1 _, e = conn.Exec(fmt.Sprintf("REVOKE CONNECT ON DATABASE %s FROM %s", db1Name, sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) // Assert that user no longer sees results from db1 require.Empty(t, mdResp.Results) @@ -408,7 +415,7 @@ func TestGetDBMetadata(t *testing.T) { // Make user admin _, e = conn.Exec(fmt.Sprintf("GRANT admin TO %s", sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) // Assert that user now see results for all dbs require.Len(t, mdResp.Results, 2) @@ -430,7 +437,7 @@ func TestGetDBMetadata(t *testing.T) { for _, tt := range pageTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/database_metadata/%s", tt.queryString) - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.NotEmpty(t, mdResp.Results) require.LessOrEqual(t, len(mdResp.Results), tt.expectedPageSize) require.Equal(t, tt.expectedPageSize, mdResp.PaginationInfo.PageSize) @@ -455,7 +462,7 @@ func TestGetDBMetadata(t *testing.T) { for _, tt := range dbtableNameTests { t.Run(tt.name, func(t *testing.T) { uri := fmt.Sprintf("/api/v2/database_metadata/?name=%s", tt.nameFilter) - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, int64(tt.expectedCount), mdResp.PaginationInfo.TotalResults) }) @@ -464,7 +471,7 @@ func TestGetDBMetadata(t *testing.T) { t.Run("filter store id", func(t *testing.T) { storeIds := []int64{8, 9} uri := fmt.Sprintf("/api/v2/database_metadata/?storeId=%d&storeId=%d", storeIds[0], storeIds[1]) - mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) for _, dmdr := range mdResp.Results { require.Condition(t, func() (success bool) { return slices.Contains(dmdr.StoreIds, storeIds[0]) || slices.Contains(dmdr.StoreIds, storeIds[1]) @@ -512,29 +519,153 @@ func TestGetTableMetadataUpdateJobStatus(t *testing.T) { userClient, _, err := ts.GetAuthenticatedHTTPClientAndCookie(sessionUsername, false, 1) require.NoError(t, err) - failed := makeApiRequest[interface{}](t, userClient, ts.AdminURL().WithPath(uri).String()) + failed := makeApiRequest[interface{}](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, http.StatusText(http.StatusNotFound), failed) _, e := conn.Exec(fmt.Sprintf("GRANT CONNECT ON DATABASE defaultdb TO %s", sessionUsername.Normalized())) require.NoError(t, e) - mdResp := makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp := makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, "NOT_RUNNING", mdResp.CurrentStatus) _, e = conn.Exec(fmt.Sprintf("REVOKE CONNECT ON DATABASE defaultdb FROM %s", sessionUsername.Normalized())) require.NoError(t, e) - failed = makeApiRequest[string](t, userClient, ts.AdminURL().WithPath(uri).String()) + failed = makeApiRequest[string](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, http.StatusText(http.StatusNotFound), failed) _, e = conn.Exec(fmt.Sprintf("GRANT admin TO %s", sessionUsername.Normalized())) require.NoError(t, e) - mdResp = makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String()) + mdResp = makeApiRequest[tmUpdateJobStatusResponse](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet) require.Equal(t, "NOT_RUNNING", mdResp.CurrentStatus) }) } -func makeApiRequest[T any](t *testing.T, client http.Client, uri string) (mdResp T) { - req, err := http.NewRequest("GET", uri, nil) +func TestTriggerMetadataUpdateJob(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderStress(t, "too slow under stress") + + testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{}) + ctx := context.Background() + defer testCluster.Stopper().Stop(ctx) + conn := testCluster.ServerConn(0) + defer conn.Close() + ts := testCluster.Server(0) + + // Get the node id that claimed the update job. We'll issue the + // RPC to a node that doesn't own the job to test that the RPC can + // propagate the request to the correct node. + var nodeID int + testutils.SucceedsSoon(t, func() error { + row, err := conn.Query(` +SELECT claim_instance_id FROM system.jobs +WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJobID) + require.NoError(t, err) + if !row.Next() { + return errors.New("no node has claimed the job") + } + require.NoError(t, row.Scan(&nodeID)) + + rpcGatewayNode := (nodeID + 1) % 3 + _, err = testCluster.Server(rpcGatewayNode).GetStatusClient(t).UpdateTableMetadataCache(ctx, + &serverpb.UpdateTableMetadataCacheRequest{Local: false}) + if err != nil { + return err + } + // The job shouldn't be busy. + return nil + }) + + client, err := ts.GetAdminHTTPClient() + require.NoError(t, err) + uri := "/api/v2/table_metadata/updatejob/" + waitUntilJobCompletes := func(t *testing.T, client http.Client) { + testutils.SucceedsSoon(t, func() error { + resp := makeApiRequest[tmUpdateJobStatusResponse](t, client, ts.AdminURL().WithPath(uri).String(), http.MethodGet) + if resp.CurrentStatus != "NOT_RUNNING" { + return errors.New("Job is running") + } + return nil + }) + } + waitUntilJobCompletes(t, client) + + t.Run("job triggered", func(t *testing.T) { + resp := makeApiRequest[tmJobTriggeredResponse]( + t, client, ts.AdminURL().WithPath(uri).String(), http.MethodPost) + require.True(t, resp.JobTriggered) + require.Contains(t, "Job triggered successfully", resp.Message) + }) + + t.Run("authorization", func(t *testing.T) { + sessionUsername := username.TestUserName() + userClient, _, err := ts.GetAuthenticatedHTTPClientAndCookie(sessionUsername, false, 1) + require.NoError(t, err) + + // User isn't authorized and will receive a 404 response + failed := makeApiRequest[interface{}](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodPost) + require.Equal(t, http.StatusText(http.StatusNotFound), failed) + + _, e := conn.Exec(fmt.Sprintf("GRANT CONNECT ON DATABASE defaultdb TO %s", sessionUsername.Normalized())) + require.NoError(t, e) + + // User is now authorized and will receive a response + resp := makeApiRequest[tmJobTriggeredResponse]( + t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodPost) + require.NotEmpty(t, resp) + }) + + t.Run("staleness", func(t *testing.T) { + // make sure job isn't running + waitUntilJobCompletes(t, client) + resp := makeApiRequest[tmJobTriggeredResponse]( + t, client, ts.AdminURL().WithPath(uri).String(), http.MethodPost) + require.True(t, resp.JobTriggered) + + waitUntilJobCompletes(t, client) + require.Contains(t, "Job triggered successfully", resp.Message) + resp = makeApiRequest[tmJobTriggeredResponse]( + t, client, ts.AdminURL().WithPath(uri).String(), http.MethodPost) + require.True(t, resp.JobTriggered) + require.Contains(t, "Job triggered successfully", resp.Message) + + waitUntilJobCompletes(t, client) + tablemetadatacache.DataValidDurationSetting.Override(ctx, &ts.ClusterSettings().SV, time.Minute) + // call trigger job api with onlyIfStale flag. This shouldn't trigger the job again since a minute hasn't passed + resp = makeApiRequest[tmJobTriggeredResponse]( + t, client, ts.AdminURL().WithPath(uri+"?onlyIfStale").String(), http.MethodPost) + require.False(t, resp.JobTriggered) + require.Contains(t, "Not enough time has elapsed since last job run", resp.Message) + + // onlyIfStale=false won't check DataValidDurationSetting value + resp = makeApiRequest[tmJobTriggeredResponse]( + t, client, ts.AdminURL().WithPath(uri+"?onlyIfStale=false").String(), http.MethodPost) + require.True(t, resp.JobTriggered) + require.Contains(t, "Job triggered successfully", resp.Message) + + waitUntilJobCompletes(t, client) + // onlyIfStale with non "false" value will check DataValidDurationSetting value + resp = makeApiRequest[tmJobTriggeredResponse]( + t, client, ts.AdminURL().WithPath(uri+"?onlyIfStale=somevalue").String(), http.MethodPost) + require.False(t, resp.JobTriggered) + require.Contains(t, "Not enough time has elapsed since last job run", resp.Message) + + waitUntilJobCompletes(t, client) + // set data_valid_duration to 1ms + tablemetadatacache.DataValidDurationSetting.Override(ctx, &ts.ClusterSettings().SV, time.Millisecond) + // call trigger job api with onlyIfStale flag. This should trigger the job again since 1ms has passed since last + // completion + resp = makeApiRequest[tmJobTriggeredResponse]( + t, client, ts.AdminURL().WithPath(uri+"?onlyIfStale").String(), http.MethodPost) + require.True(t, resp.JobTriggered) + require.Contains(t, "Job triggered successfully", resp.Message) + }) +} + +func makeApiRequest[T any]( + t *testing.T, client http.Client, uri string, httpMethod string, +) (mdResp T) { + req, err := http.NewRequest(httpMethod, uri, nil) require.NoError(t, err) resp, err := client.Do(req) require.NoError(t, err) diff --git a/pkg/sql/tablemetadatacache/cluster_settings.go b/pkg/sql/tablemetadatacache/cluster_settings.go index 532b621f0ddc..97dfe723bc89 100644 --- a/pkg/sql/tablemetadatacache/cluster_settings.go +++ b/pkg/sql/tablemetadatacache/cluster_settings.go @@ -26,7 +26,7 @@ var tableMetadataCacheAutoUpdatesEnabled = settings.RegisterBoolSetting( false, settings.WithPublic) -var tableMetadataCacheValidDuration = settings.RegisterDurationSetting( +var DataValidDurationSetting = settings.RegisterDurationSetting( settings.ApplicationLevel, "obs.tablemetadata.data_valid_duration", "the duration for which the data in system.table_metadata is considered valid", diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go index 12a69ffd119e..41aacb561cdd 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go @@ -68,7 +68,7 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int default: } }) - tableMetadataCacheValidDuration.SetOnChange(&settings.SV, func(_ context.Context) { + DataValidDurationSetting.SetOnChange(&settings.SV, func(_ context.Context) { select { case scheduleSettingsCh <- struct{}{}: default: @@ -78,7 +78,7 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int var timer timeutil.Timer for { if tableMetadataCacheAutoUpdatesEnabled.Get(&settings.SV) { - timer.Reset(tableMetadataCacheValidDuration.Get(&settings.SV)) + timer.Reset(DataValidDurationSetting.Get(&settings.SV)) } select { case <-scheduleSettingsCh: diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go index f57552afa96e..19e93570c106 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go @@ -163,7 +163,7 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) { t.Run("AutomaticUpdatesEnabled", func(t *testing.T) { conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = true`) - tableMetadataCacheValidDuration.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) + DataValidDurationSetting.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) err := waitForJobRuns(3, 10*time.Second) require.NoError(t, err, "Job should have run at least 3 times") mockCallsCount := getMockCallCount() @@ -185,7 +185,7 @@ func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) { t.Run("AutomaticUpdatesDisabled", func(t *testing.T) { conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = f`) - tableMetadataCacheValidDuration.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) + DataValidDurationSetting.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) initialCount := getMockCallCount() err := waitForJobRuns(1, 200*time.Millisecond) require.Error(t, err, "Job should not run after being disabled")