From c65943f493de0a7f5a08150f466702ddfd1fd325 Mon Sep 17 00:00:00 2001 From: XiaochenCui Date: Sat, 20 Jul 2024 22:30:40 +0000 Subject: [PATCH 1/5] changefeedccl: update the age filter of "show changefeed jobs" Previously, "SHOW CHANGEFEED JOBS" return jobs from the last 14 days whereas "SHOW JOBS" return jobs from the last 12 hours. This commit addresses this issue by: - Use the same "age filter" for these two commands. - Add a new knob "StubTimeNow" for the job updater. Fixes: #124882 Release note (sql change): Unify the age filter of commands "SHOW JOBS" and "SHOW CHANGEFEED JOBS". --- .../show_changefeed_jobs_test.go | 49 +++++++++++++++++++ pkg/jobs/jobs.go | 8 ++- pkg/jobs/testing_knobs.go | 8 +++ pkg/sql/delegate/show_changefeed_jobs.go | 4 +- pkg/sql/delegate/show_jobs.go | 6 ++- 5 files changed, 70 insertions(+), 5 deletions(-) diff --git a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go index 2a8d767006c1..f579c0ea6a0c 100644 --- a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go +++ b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go @@ -16,6 +16,7 @@ import ( "strconv" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" @@ -23,12 +24,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) @@ -562,3 +565,49 @@ func TestShowChangefeedJobsAuthorization(t *testing.T) { // Only enterprise sinks create jobs. cdcTest(t, testFn, feedTestEnterpriseSinks) } + +// TestShowChangefeedJobsDefaultFilter verifies that "SHOW JOBS" AND "SHOW CHANGEFEED JOBS" +// use the same age filter (12 hours). +func TestShowChangefeedJobsDefaultFilter(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + countChangefeedJobs := func() (count int) { + query := `select count(*) from [SHOW CHANGEFEED JOBS]` + sqlDB.QueryRow(t, query).Scan(&count) + return count + } + changefeedJobExists := func(id catpb.JobID) bool { + rows := sqlDB.Query(t, `SHOW CHANGEFEED JOB $1`, id) + defer rows.Close() + return rows.Next() + } + + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + + foo := feed(t, f, `CREATE CHANGEFEED FOR foo`) + waitForJobStatus(sqlDB, t, foo.(cdctest.EnterpriseTestFeed).JobID(), jobs.StatusRunning) + require.Equal(t, 1, countChangefeedJobs()) + + // The job is not visible after closed (and its finished time is older than 12 hours). + closeFeed(t, foo) + require.Equal(t, 0, countChangefeedJobs()) + + // We can still see the job if we explicitly ask for it. + jobID := foo.(cdctest.EnterpriseTestFeed).JobID() + require.True(t, changefeedJobExists(jobID)) + } + + updateKnobs := func(opts *feedTestOptions) { + opts.knobsFn = func(knobs *base.TestingKnobs) { + knobs.JobsTestingKnobs.(*jobs.TestingKnobs).StubTimeNow = func() time.Time { + return timeutil.Now().Add(-13 * time.Hour) + } + } + } + + cdcTest(t, testFn, feedTestForceSink("kafka"), updateKnobs) +} diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index f4ea23794382..4278f6c228c4 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -466,7 +466,13 @@ func (u Updater) canceled(ctx context.Context) error { return fmt.Errorf("job with status %s cannot be requested to be canceled", md.Status) } ju.UpdateStatus(StatusCanceled) - md.Payload.FinishedMicros = timeutil.ToUnixMicros(u.j.registry.clock.Now().GoTime()) + var now time.Time + if u.j.registry.knobs.StubTimeNow != nil { + now = u.j.registry.knobs.StubTimeNow() + } else { + now = u.j.registry.clock.Now().GoTime() + } + md.Payload.FinishedMicros = timeutil.ToUnixMicros(now) ju.UpdatePayload(md.Payload) return nil }) diff --git a/pkg/jobs/testing_knobs.go b/pkg/jobs/testing_knobs.go index a27178146e56..8b1e842f4e71 100644 --- a/pkg/jobs/testing_knobs.go +++ b/pkg/jobs/testing_knobs.go @@ -83,6 +83,14 @@ type TestingKnobs struct { // BeforeWaitForJobsQuery is called once per invocation of the // poll-show-jobs query in WaitForJobs. BeforeWaitForJobsQuery func(jobs []jobspb.JobID) + + // Jobs updater will use this function to get the current time. If nil, + // "registry.clock.Now" will be used. + // + // TODO (xiaochen): currently only "Updater.canceled" uses this knob, + // we may want to extend this to other parts of the jobs package and + // give this knob a default value (so it won't be nil). + StubTimeNow func() time.Time } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/sql/delegate/show_changefeed_jobs.go b/pkg/sql/delegate/show_changefeed_jobs.go index e90ad52bf9b2..657f112e54e4 100644 --- a/pkg/sql/delegate/show_changefeed_jobs.go +++ b/pkg/sql/delegate/show_changefeed_jobs.go @@ -73,8 +73,8 @@ FROM if n.Jobs == nil { // The query intends to present: // - first all the running jobs sorted in order of start time, - // - then all completed jobs sorted in order of completion time. - // + // - then all completed jobs sorted in order of completion time (no more than 12 hours). + whereClause = fmt.Sprintf(`WHERE %s`, ageFilter) // The "ORDER BY" clause below exploits the fact that all // running jobs have finished = NULL. orderbyClause = `ORDER BY COALESCE(finished, now()) DESC, started DESC` diff --git a/pkg/sql/delegate/show_jobs.go b/pkg/sql/delegate/show_jobs.go index e3b28042e32e..0860e72a3897 100644 --- a/pkg/sql/delegate/show_jobs.go +++ b/pkg/sql/delegate/show_jobs.go @@ -20,6 +20,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" ) +const ageFilter = `(finished IS NULL OR finished > now() - '12h':::interval)` + func constructSelectQuery(n *tree.ShowJobs) string { var baseQuery strings.Builder baseQuery.WriteString(`SELECT job_id, job_type, `) @@ -75,9 +77,9 @@ func constructSelectQuery(n *tree.ShowJobs) string { // The query intends to present: // - first all the running jobs sorted in order of start time, - // - then all completed jobs sorted in order of completion time. + // - then all completed jobs sorted in order of completion time (no more than 12 hours). whereClause = fmt.Sprintf( - `WHERE %s AND (finished IS NULL OR finished > now() - '12h':::interval)`, typePredicate) + `WHERE %s AND %s`, typePredicate, ageFilter) // The "ORDER BY" clause below exploits the fact that all // running jobs have finished = NULL. orderbyClause = `ORDER BY COALESCE(finished, now()) DESC, started DESC` From 0aca993b58e8c55a178f63f3f8eea78a5a232059 Mon Sep 17 00:00:00 2001 From: DarrylWong Date: Wed, 18 Sep 2024 10:27:56 -0400 Subject: [PATCH 2/5] roachtest: add always-collect-artifacts flag This flag makes it so the test runner will collect artifacts during test teardown, even if the test did not fail or time out. --- pkg/cmd/roachtest/roachtestflags/flags.go | 8 ++++++++ pkg/cmd/roachtest/test_runner.go | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/cmd/roachtest/roachtestflags/flags.go b/pkg/cmd/roachtest/roachtestflags/flags.go index 33e5afb9b777..e27b27a3e4ac 100644 --- a/pkg/cmd/roachtest/roachtestflags/flags.go +++ b/pkg/cmd/roachtest/roachtestflags/flags.go @@ -435,6 +435,14 @@ var ( providers. Set this to false when running many concurrent Azure tests. Azure can return stale VM information when many PUT calls are made in succession.`, }) + + AlwaysCollectArtifacts bool = false + _ = registerRunFlag(&AlwaysCollectArtifacts, FlagInfo{ + Name: "always-collect-artifacts", + Usage: ` + Always collect artifacts during test teardown, even if the test did not + time out or fail.`, + }) ) // The flags below override the final cluster configuration. They have no diff --git a/pkg/cmd/roachtest/test_runner.go b/pkg/cmd/roachtest/test_runner.go index 40403db39f25..8fb357b57e76 100644 --- a/pkg/cmd/roachtest/test_runner.go +++ b/pkg/cmd/roachtest/test_runner.go @@ -1535,7 +1535,7 @@ func (r *testRunner) postTestAssertions( func (r *testRunner) teardownTest( ctx context.Context, t *testImpl, c *clusterImpl, timedOut bool, ) (string, error) { - if timedOut || t.Failed() { + if timedOut || t.Failed() || roachtestflags.AlwaysCollectArtifacts { snapURL := "" if timedOut { // If the Side-Eye integration was configured, capture a snapshot of the From c92fbc2d8706c3d48d96a6a5cfbb7fcb18da3355 Mon Sep 17 00:00:00 2001 From: DarrylWong Date: Wed, 18 Sep 2024 18:07:36 -0400 Subject: [PATCH 3/5] roachtest, teamcity: publish runner logs to same directory Previously we would publish the runner logs to the top level directory, causing it to be hard to find. This change publishes them to their own directory. --- pkg/cmd/roachtest/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cmd/roachtest/run.go b/pkg/cmd/roachtest/run.go index 951bfcf5b58c..e808bb608176 100644 --- a/pkg/cmd/roachtest/run.go +++ b/pkg/cmd/roachtest/run.go @@ -214,7 +214,7 @@ func runTests(register func(registry.Registry), filter *registry.TestFilter) err if roachtestflags.TeamCity { // Collect the runner logs. - fmt.Printf("##teamcity[publishArtifacts '%s']\n", filepath.Join(literalArtifactsDir, runnerLogsDir)) + fmt.Printf("##teamcity[publishArtifacts '%s' => '%s']\n", filepath.Join(literalArtifactsDir, runnerLogsDir), runnerLogsDir) } if summaryErr := maybeDumpSummaryMarkdown(runner); summaryErr != nil { From e7ad630244b028da18acb9939284c6e36fda5a20 Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Wed, 18 Sep 2024 19:39:43 -0400 Subject: [PATCH 4/5] opt: fix HoistJoinSubquery This commit fixes a bug in `HoistJoinSubquery` that would cause it to generate a Project expression that produced columns from the RHS of the hoisted semi- or anti-join. This bug has existed since the rule was introduced. I believe it was not discovered because this rule only fires in very rare circumstances for semi- and anti-joins. Fixes #130398 Release note (bug fix): A bug has been fixed that could cause a very rare internal error "lists in SetPrivate are not all the same length" when executing queries. --- pkg/sql/opt/norm/decorrelate_funcs.go | 8 +- pkg/sql/opt/norm/testdata/rules/decorrelate | 196 ++++++++++++++++++++ 2 files changed, 203 insertions(+), 1 deletion(-) diff --git a/pkg/sql/opt/norm/decorrelate_funcs.go b/pkg/sql/opt/norm/decorrelate_funcs.go index 53537a9c90dc..d036b8bf9fbc 100644 --- a/pkg/sql/opt/norm/decorrelate_funcs.go +++ b/pkg/sql/opt/norm/decorrelate_funcs.go @@ -391,7 +391,13 @@ func (c *CustomFuncs) HoistJoinSubquery( } join := c.ConstructApplyJoin(op, left, hoister.input(), newFilters, private) - passthrough := c.OutputCols(left).Union(c.OutputCols(right)) + var passthrough opt.ColSet + switch op { + case opt.SemiJoinOp, opt.AntiJoinOp: + passthrough = c.OutputCols(left) + default: + passthrough = c.OutputCols(left).Union(c.OutputCols(right)) + } return c.f.ConstructProject(join, memo.EmptyProjectionsExpr, passthrough) } diff --git a/pkg/sql/opt/norm/testdata/rules/decorrelate b/pkg/sql/opt/norm/testdata/rules/decorrelate index e5a6ab73ab3d..43e3720b4744 100644 --- a/pkg/sql/opt/norm/testdata/rules/decorrelate +++ b/pkg/sql/opt/norm/testdata/rules/decorrelate @@ -5757,6 +5757,202 @@ project └── filters └── case:18 OR (x:8 IS NULL) [outer=(8,18)] +# Regression test for #130398. Do not include columns from the RHS of the +# semi-join when constructin the Project expression. The opt directive is used +# here because the SplitDisjunctionOfJoinTerms exploration rule must fire to +# trigger HoistJoinSubquery. +opt expect=HoistJoinSubquery +SELECT EXISTS( + SELECT 1 + FROM a + WHERE EXISTS( + SELECT + FROM a AS a2 + WHERE a2.i = 0 OR (a2.s = a2.s AND a1.i IN (SELECT i FROM a)) + ) +) +FROM a AS a1 +UNION +SELECT true FROM a +---- +union + ├── columns: exists:44!null + ├── left columns: exists:32 + ├── right columns: bool:43 + ├── cardinality: [0 - 2] + ├── key: (44) + ├── project + │ ├── columns: exists:32!null + │ ├── group-by (hash) + │ │ ├── columns: a1.k:1!null true_agg:34 + │ │ ├── grouping columns: a1.k:1!null + │ │ ├── key: (1) + │ │ ├── fd: (1)-->(34) + │ │ ├── left-join-apply + │ │ │ ├── columns: a1.k:1!null a1.i:2 true:33 + │ │ │ ├── fd: (1)-->(2) + │ │ │ ├── scan a [as=a1] + │ │ │ │ ├── columns: a1.k:1!null a1.i:2 + │ │ │ │ ├── key: (1) + │ │ │ │ └── fd: (1)-->(2) + │ │ │ ├── project + │ │ │ │ ├── columns: true:33!null + │ │ │ │ ├── outer: (2) + │ │ │ │ ├── fd: ()-->(33) + │ │ │ │ ├── semi-join (cross) + │ │ │ │ │ ├── outer: (2) + │ │ │ │ │ ├── scan a + │ │ │ │ │ ├── scan a [as=a2] + │ │ │ │ │ │ └── columns: a2.i:16 a2.s:18 + │ │ │ │ │ └── filters + │ │ │ │ │ └── or [outer=(2,16,18), correlated-subquery] + │ │ │ │ │ ├── a2.i:16 = 0 + │ │ │ │ │ └── and + │ │ │ │ │ ├── (a2.s:18 IS DISTINCT FROM CAST(NULL AS STRING)) OR CAST(NULL AS BOOL) + │ │ │ │ │ └── any: eq + │ │ │ │ │ ├── scan a + │ │ │ │ │ │ └── columns: a.i:23 + │ │ │ │ │ └── a1.i:2 + │ │ │ │ └── projections + │ │ │ │ └── true [as=true:33] + │ │ │ └── filters (true) + │ │ └── aggregations + │ │ └── const-not-null-agg [as=true_agg:34, outer=(33)] + │ │ └── true:33 + │ └── projections + │ └── true_agg:34 IS NOT NULL [as=exists:32, outer=(34)] + └── project + ├── columns: bool:43!null + ├── fd: ()-->(43) + ├── scan a + └── projections + └── true [as=bool:43] + +# This is a more synthetic regression test for #130398. +exprnorm expect=HoistJoinSubquery disable=PushFilterIntoJoinLeft +(SemiJoin + (Scan [ (Table "xy") (Cols "x,y") ]) + (Select + (Scan [ (Table "uv") (Cols "u,v") ]) + [ (Is (Var "v") (Null "int")) ] + ) + [ + (Exists + (Select + (Scan [ (Table "cd") (Cols "c,d") ]) + [ (Eq (Var "x") (Var "d")) ] + ) + [] + ) + ] + [] +) +---- +group-by (hash) + ├── columns: x:1!null y:2 + ├── grouping columns: x:1!null + ├── key: (1) + ├── fd: (1)-->(2) + ├── inner-join (cross) + │ ├── columns: x:1!null y:2 v:6 d:10!null + │ ├── fd: ()-->(6), (1)-->(2), (1)==(10), (10)==(1) + │ ├── select + │ │ ├── columns: v:6 + │ │ ├── fd: ()-->(6) + │ │ ├── scan uv + │ │ │ └── columns: v:6 + │ │ └── filters + │ │ └── v:6 IS NOT DISTINCT FROM CAST(NULL AS INT8) [outer=(6), constraints=(/6: [/NULL - /NULL]; tight), fd=()-->(6)] + │ ├── inner-join (hash) + │ │ ├── columns: x:1!null y:2 d:10!null + │ │ ├── multiplicity: left-rows(zero-or-more), right-rows(zero-or-one) + │ │ ├── fd: (1)-->(2), (1)==(10), (10)==(1) + │ │ ├── scan xy + │ │ │ ├── columns: x:1!null y:2 + │ │ │ ├── key: (1) + │ │ │ └── fd: (1)-->(2) + │ │ ├── scan cd + │ │ │ └── columns: d:10!null + │ │ └── filters + │ │ └── x:1 = d:10 [outer=(1,10), constraints=(/1: (/NULL - ]; /10: (/NULL - ]), fd=(1)==(10), (10)==(1)] + │ └── filters (true) + └── aggregations + └── const-agg [as=y:2, outer=(2)] + └── y:2 + +# This is a synthetic regression test for #130398 that tests an anti-join. +exprnorm expect=HoistJoinSubquery disable=PushFilterIntoJoinLeft +(AntiJoin + (Scan [ (Table "xy") (Cols "x,y") ]) + (Select + (Scan [ (Table "uv") (Cols "u,v") ]) + [ (Is (Var "v") (Null "int")) ] + ) + [ + (Exists + (Select + (Scan [ (Table "cd") (Cols "c,d") ]) + [ (Eq (Var "x") (Var "d")) ] + ) + [] + ) + ] + [] +) +---- +anti-join-apply + ├── columns: x:1!null y:2 + ├── key: (1) + ├── fd: (1)-->(2) + ├── scan xy + │ ├── columns: x:1!null y:2 + │ ├── key: (1) + │ └── fd: (1)-->(2) + ├── project + │ ├── columns: exists:15!null + │ ├── outer: (1) + │ ├── group-by (hash) + │ │ ├── columns: u:5!null true_agg:14 + │ │ ├── grouping columns: u:5!null + │ │ ├── outer: (1) + │ │ ├── key: (5) + │ │ ├── fd: (5)-->(14) + │ │ ├── left-join (cross) + │ │ │ ├── columns: u:5!null v:6 true:13 + │ │ │ ├── outer: (1) + │ │ │ ├── fd: ()-->(6) + │ │ │ ├── select + │ │ │ │ ├── columns: u:5!null v:6 + │ │ │ │ ├── key: (5) + │ │ │ │ ├── fd: ()-->(6) + │ │ │ │ ├── scan uv + │ │ │ │ │ ├── columns: u:5!null v:6 + │ │ │ │ │ ├── key: (5) + │ │ │ │ │ └── fd: (5)-->(6) + │ │ │ │ └── filters + │ │ │ │ └── v:6 IS NOT DISTINCT FROM CAST(NULL AS INT8) [outer=(6), constraints=(/6: [/NULL - /NULL]; tight), fd=()-->(6)] + │ │ │ ├── project + │ │ │ │ ├── columns: true:13!null + │ │ │ │ ├── outer: (1) + │ │ │ │ ├── fd: ()-->(13) + │ │ │ │ ├── select + │ │ │ │ │ ├── columns: d:10!null + │ │ │ │ │ ├── outer: (1) + │ │ │ │ │ ├── fd: ()-->(10) + │ │ │ │ │ ├── scan cd + │ │ │ │ │ │ └── columns: d:10!null + │ │ │ │ │ └── filters + │ │ │ │ │ └── x:1 = d:10 [outer=(1,10), constraints=(/1: (/NULL - ]; /10: (/NULL - ]), fd=(1)==(10), (10)==(1)] + │ │ │ │ └── projections + │ │ │ │ └── true [as=true:13] + │ │ │ └── filters (true) + │ │ └── aggregations + │ │ └── const-not-null-agg [as=true_agg:14, outer=(13)] + │ │ └── true:13 + │ └── projections + │ └── true_agg:14 IS NOT NULL [as=exists:15, outer=(14)] + └── filters + └── exists:15 [outer=(15), constraints=(/15: [/true - /true]; tight), fd=()-->(15)] # -------------------------------------------------- # HoistValuesSubquery From e0f5fde563603286e6e8eec52b4d376e39ca4090 Mon Sep 17 00:00:00 2001 From: Matt Spilchen Date: Thu, 19 Sep 2024 10:19:05 -0300 Subject: [PATCH 5/5] sql/catalog/lease: collect stats in TestDescriptorRefreshOnRetry A flake was observed in TestDescriptorRefreshOnRetry where a lease wasn't relinquished in time. The cause is unclear, so this change adds stack trace collection when acquiring the lease for the table under test. Epic: None Closes: #130520 Release note: None --- pkg/sql/catalog/lease/lease_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 01d73b31435b..ca9ce0005bea 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -17,6 +17,7 @@ import ( "context" gosql "database/sql" "fmt" + "runtime/debug" "strings" "sync" "sync/atomic" @@ -929,6 +930,7 @@ func TestDescriptorRefreshOnRetry(t *testing.T) { fooReleaseCount := int32(0) var tableID int64 + ctx := context.Background() var params base.TestServerArgs params.Knobs = base.TestingKnobs{ SQLLeaseManager: &lease.ManagerTestingKnobs{ @@ -938,10 +940,12 @@ func TestDescriptorRefreshOnRetry(t *testing.T) { RemoveOnceDereferenced: true, LeaseAcquiredEvent: func(desc catalog.Descriptor, _ error) { if desc.GetName() == "foo" { + log.Infof(ctx, "lease acquirer stack trace: %s", debug.Stack()) atomic.AddInt32(&fooAcquiredCount, 1) } }, LeaseReleasedEvent: func(id descpb.ID, _ descpb.DescriptorVersion, _ error) { + log.Infof(ctx, "releasing lease for ID %d", int64(id)) if int64(id) == atomic.LoadInt64(&tableID) { atomic.AddInt32(&fooReleaseCount, 1) } @@ -970,6 +974,7 @@ CREATE TABLE t.foo (v INT); tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, s.Codec(), "t", "foo") atomic.StoreInt64(&tableID, int64(tableDesc.GetID())) + log.Infof(ctx, "table ID for foo is %d", tableDesc.GetID()) tx, err := sqlDB.Begin() if err != nil {