diff --git a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go index 2a8d767006c1..f579c0ea6a0c 100644 --- a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go +++ b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go @@ -16,6 +16,7 @@ import ( "strconv" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" @@ -23,12 +24,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) @@ -562,3 +565,49 @@ func TestShowChangefeedJobsAuthorization(t *testing.T) { // Only enterprise sinks create jobs. cdcTest(t, testFn, feedTestEnterpriseSinks) } + +// TestShowChangefeedJobsDefaultFilter verifies that "SHOW JOBS" AND "SHOW CHANGEFEED JOBS" +// use the same age filter (12 hours). +func TestShowChangefeedJobsDefaultFilter(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + countChangefeedJobs := func() (count int) { + query := `select count(*) from [SHOW CHANGEFEED JOBS]` + sqlDB.QueryRow(t, query).Scan(&count) + return count + } + changefeedJobExists := func(id catpb.JobID) bool { + rows := sqlDB.Query(t, `SHOW CHANGEFEED JOB $1`, id) + defer rows.Close() + return rows.Next() + } + + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + + foo := feed(t, f, `CREATE CHANGEFEED FOR foo`) + waitForJobStatus(sqlDB, t, foo.(cdctest.EnterpriseTestFeed).JobID(), jobs.StatusRunning) + require.Equal(t, 1, countChangefeedJobs()) + + // The job is not visible after closed (and its finished time is older than 12 hours). + closeFeed(t, foo) + require.Equal(t, 0, countChangefeedJobs()) + + // We can still see the job if we explicitly ask for it. + jobID := foo.(cdctest.EnterpriseTestFeed).JobID() + require.True(t, changefeedJobExists(jobID)) + } + + updateKnobs := func(opts *feedTestOptions) { + opts.knobsFn = func(knobs *base.TestingKnobs) { + knobs.JobsTestingKnobs.(*jobs.TestingKnobs).StubTimeNow = func() time.Time { + return timeutil.Now().Add(-13 * time.Hour) + } + } + } + + cdcTest(t, testFn, feedTestForceSink("kafka"), updateKnobs) +} diff --git a/pkg/cmd/roachtest/roachtestflags/flags.go b/pkg/cmd/roachtest/roachtestflags/flags.go index 33e5afb9b777..e27b27a3e4ac 100644 --- a/pkg/cmd/roachtest/roachtestflags/flags.go +++ b/pkg/cmd/roachtest/roachtestflags/flags.go @@ -435,6 +435,14 @@ var ( providers. Set this to false when running many concurrent Azure tests. Azure can return stale VM information when many PUT calls are made in succession.`, }) + + AlwaysCollectArtifacts bool = false + _ = registerRunFlag(&AlwaysCollectArtifacts, FlagInfo{ + Name: "always-collect-artifacts", + Usage: ` + Always collect artifacts during test teardown, even if the test did not + time out or fail.`, + }) ) // The flags below override the final cluster configuration. They have no diff --git a/pkg/cmd/roachtest/run.go b/pkg/cmd/roachtest/run.go index 951bfcf5b58c..e808bb608176 100644 --- a/pkg/cmd/roachtest/run.go +++ b/pkg/cmd/roachtest/run.go @@ -214,7 +214,7 @@ func runTests(register func(registry.Registry), filter *registry.TestFilter) err if roachtestflags.TeamCity { // Collect the runner logs. - fmt.Printf("##teamcity[publishArtifacts '%s']\n", filepath.Join(literalArtifactsDir, runnerLogsDir)) + fmt.Printf("##teamcity[publishArtifacts '%s' => '%s']\n", filepath.Join(literalArtifactsDir, runnerLogsDir), runnerLogsDir) } if summaryErr := maybeDumpSummaryMarkdown(runner); summaryErr != nil { diff --git a/pkg/cmd/roachtest/test_runner.go b/pkg/cmd/roachtest/test_runner.go index 40403db39f25..8fb357b57e76 100644 --- a/pkg/cmd/roachtest/test_runner.go +++ b/pkg/cmd/roachtest/test_runner.go @@ -1535,7 +1535,7 @@ func (r *testRunner) postTestAssertions( func (r *testRunner) teardownTest( ctx context.Context, t *testImpl, c *clusterImpl, timedOut bool, ) (string, error) { - if timedOut || t.Failed() { + if timedOut || t.Failed() || roachtestflags.AlwaysCollectArtifacts { snapURL := "" if timedOut { // If the Side-Eye integration was configured, capture a snapshot of the diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index f4ea23794382..4278f6c228c4 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -466,7 +466,13 @@ func (u Updater) canceled(ctx context.Context) error { return fmt.Errorf("job with status %s cannot be requested to be canceled", md.Status) } ju.UpdateStatus(StatusCanceled) - md.Payload.FinishedMicros = timeutil.ToUnixMicros(u.j.registry.clock.Now().GoTime()) + var now time.Time + if u.j.registry.knobs.StubTimeNow != nil { + now = u.j.registry.knobs.StubTimeNow() + } else { + now = u.j.registry.clock.Now().GoTime() + } + md.Payload.FinishedMicros = timeutil.ToUnixMicros(now) ju.UpdatePayload(md.Payload) return nil }) diff --git a/pkg/jobs/testing_knobs.go b/pkg/jobs/testing_knobs.go index a27178146e56..8b1e842f4e71 100644 --- a/pkg/jobs/testing_knobs.go +++ b/pkg/jobs/testing_knobs.go @@ -83,6 +83,14 @@ type TestingKnobs struct { // BeforeWaitForJobsQuery is called once per invocation of the // poll-show-jobs query in WaitForJobs. BeforeWaitForJobsQuery func(jobs []jobspb.JobID) + + // Jobs updater will use this function to get the current time. If nil, + // "registry.clock.Now" will be used. + // + // TODO (xiaochen): currently only "Updater.canceled" uses this knob, + // we may want to extend this to other parts of the jobs package and + // give this knob a default value (so it won't be nil). + StubTimeNow func() time.Time } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 01d73b31435b..ca9ce0005bea 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -17,6 +17,7 @@ import ( "context" gosql "database/sql" "fmt" + "runtime/debug" "strings" "sync" "sync/atomic" @@ -929,6 +930,7 @@ func TestDescriptorRefreshOnRetry(t *testing.T) { fooReleaseCount := int32(0) var tableID int64 + ctx := context.Background() var params base.TestServerArgs params.Knobs = base.TestingKnobs{ SQLLeaseManager: &lease.ManagerTestingKnobs{ @@ -938,10 +940,12 @@ func TestDescriptorRefreshOnRetry(t *testing.T) { RemoveOnceDereferenced: true, LeaseAcquiredEvent: func(desc catalog.Descriptor, _ error) { if desc.GetName() == "foo" { + log.Infof(ctx, "lease acquirer stack trace: %s", debug.Stack()) atomic.AddInt32(&fooAcquiredCount, 1) } }, LeaseReleasedEvent: func(id descpb.ID, _ descpb.DescriptorVersion, _ error) { + log.Infof(ctx, "releasing lease for ID %d", int64(id)) if int64(id) == atomic.LoadInt64(&tableID) { atomic.AddInt32(&fooReleaseCount, 1) } @@ -970,6 +974,7 @@ CREATE TABLE t.foo (v INT); tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, s.Codec(), "t", "foo") atomic.StoreInt64(&tableID, int64(tableDesc.GetID())) + log.Infof(ctx, "table ID for foo is %d", tableDesc.GetID()) tx, err := sqlDB.Begin() if err != nil { diff --git a/pkg/sql/delegate/show_changefeed_jobs.go b/pkg/sql/delegate/show_changefeed_jobs.go index e90ad52bf9b2..657f112e54e4 100644 --- a/pkg/sql/delegate/show_changefeed_jobs.go +++ b/pkg/sql/delegate/show_changefeed_jobs.go @@ -73,8 +73,8 @@ FROM if n.Jobs == nil { // The query intends to present: // - first all the running jobs sorted in order of start time, - // - then all completed jobs sorted in order of completion time. - // + // - then all completed jobs sorted in order of completion time (no more than 12 hours). + whereClause = fmt.Sprintf(`WHERE %s`, ageFilter) // The "ORDER BY" clause below exploits the fact that all // running jobs have finished = NULL. orderbyClause = `ORDER BY COALESCE(finished, now()) DESC, started DESC` diff --git a/pkg/sql/delegate/show_jobs.go b/pkg/sql/delegate/show_jobs.go index e3b28042e32e..0860e72a3897 100644 --- a/pkg/sql/delegate/show_jobs.go +++ b/pkg/sql/delegate/show_jobs.go @@ -20,6 +20,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" ) +const ageFilter = `(finished IS NULL OR finished > now() - '12h':::interval)` + func constructSelectQuery(n *tree.ShowJobs) string { var baseQuery strings.Builder baseQuery.WriteString(`SELECT job_id, job_type, `) @@ -75,9 +77,9 @@ func constructSelectQuery(n *tree.ShowJobs) string { // The query intends to present: // - first all the running jobs sorted in order of start time, - // - then all completed jobs sorted in order of completion time. + // - then all completed jobs sorted in order of completion time (no more than 12 hours). whereClause = fmt.Sprintf( - `WHERE %s AND (finished IS NULL OR finished > now() - '12h':::interval)`, typePredicate) + `WHERE %s AND %s`, typePredicate, ageFilter) // The "ORDER BY" clause below exploits the fact that all // running jobs have finished = NULL. orderbyClause = `ORDER BY COALESCE(finished, now()) DESC, started DESC` diff --git a/pkg/sql/opt/norm/decorrelate_funcs.go b/pkg/sql/opt/norm/decorrelate_funcs.go index 123670f11d54..6799a4d00957 100644 --- a/pkg/sql/opt/norm/decorrelate_funcs.go +++ b/pkg/sql/opt/norm/decorrelate_funcs.go @@ -391,7 +391,13 @@ func (c *CustomFuncs) HoistJoinSubquery( } join := c.ConstructApplyJoin(op, left, hoister.input(), newFilters, private) - passthrough := c.OutputCols(left).Union(c.OutputCols(right)) + var passthrough opt.ColSet + switch op { + case opt.SemiJoinOp, opt.AntiJoinOp: + passthrough = c.OutputCols(left) + default: + passthrough = c.OutputCols(left).Union(c.OutputCols(right)) + } return c.f.ConstructProject(join, memo.EmptyProjectionsExpr, passthrough) } diff --git a/pkg/sql/opt/norm/testdata/rules/decorrelate b/pkg/sql/opt/norm/testdata/rules/decorrelate index b4693be21830..5bab69f4c5ac 100644 --- a/pkg/sql/opt/norm/testdata/rules/decorrelate +++ b/pkg/sql/opt/norm/testdata/rules/decorrelate @@ -5757,6 +5757,202 @@ project └── filters └── case:18 OR (x:8 IS NULL) [outer=(8,18)] +# Regression test for #130398. Do not include columns from the RHS of the +# semi-join when constructin the Project expression. The opt directive is used +# here because the SplitDisjunctionOfJoinTerms exploration rule must fire to +# trigger HoistJoinSubquery. +opt expect=HoistJoinSubquery +SELECT EXISTS( + SELECT 1 + FROM a + WHERE EXISTS( + SELECT + FROM a AS a2 + WHERE a2.i = 0 OR (a2.s = a2.s AND a1.i IN (SELECT i FROM a)) + ) +) +FROM a AS a1 +UNION +SELECT true FROM a +---- +union + ├── columns: exists:44!null + ├── left columns: exists:32 + ├── right columns: bool:43 + ├── cardinality: [0 - 2] + ├── key: (44) + ├── project + │ ├── columns: exists:32!null + │ ├── group-by (hash) + │ │ ├── columns: a1.k:1!null true_agg:34 + │ │ ├── grouping columns: a1.k:1!null + │ │ ├── key: (1) + │ │ ├── fd: (1)-->(34) + │ │ ├── left-join-apply + │ │ │ ├── columns: a1.k:1!null a1.i:2 true:33 + │ │ │ ├── fd: (1)-->(2) + │ │ │ ├── scan a [as=a1] + │ │ │ │ ├── columns: a1.k:1!null a1.i:2 + │ │ │ │ ├── key: (1) + │ │ │ │ └── fd: (1)-->(2) + │ │ │ ├── project + │ │ │ │ ├── columns: true:33!null + │ │ │ │ ├── outer: (2) + │ │ │ │ ├── fd: ()-->(33) + │ │ │ │ ├── semi-join (cross) + │ │ │ │ │ ├── outer: (2) + │ │ │ │ │ ├── scan a + │ │ │ │ │ ├── scan a [as=a2] + │ │ │ │ │ │ └── columns: a2.i:16 a2.s:18 + │ │ │ │ │ └── filters + │ │ │ │ │ └── or [outer=(2,16,18), correlated-subquery] + │ │ │ │ │ ├── a2.i:16 = 0 + │ │ │ │ │ └── and + │ │ │ │ │ ├── (a2.s:18 IS DISTINCT FROM CAST(NULL AS STRING)) OR CAST(NULL AS BOOL) + │ │ │ │ │ └── any: eq + │ │ │ │ │ ├── scan a + │ │ │ │ │ │ └── columns: a.i:23 + │ │ │ │ │ └── a1.i:2 + │ │ │ │ └── projections + │ │ │ │ └── true [as=true:33] + │ │ │ └── filters (true) + │ │ └── aggregations + │ │ └── const-not-null-agg [as=true_agg:34, outer=(33)] + │ │ └── true:33 + │ └── projections + │ └── true_agg:34 IS NOT NULL [as=exists:32, outer=(34)] + └── project + ├── columns: bool:43!null + ├── fd: ()-->(43) + ├── scan a + └── projections + └── true [as=bool:43] + +# This is a more synthetic regression test for #130398. +exprnorm expect=HoistJoinSubquery disable=PushFilterIntoJoinLeft +(SemiJoin + (Scan [ (Table "xy") (Cols "x,y") ]) + (Select + (Scan [ (Table "uv") (Cols "u,v") ]) + [ (Is (Var "v") (Null "int")) ] + ) + [ + (Exists + (Select + (Scan [ (Table "cd") (Cols "c,d") ]) + [ (Eq (Var "x") (Var "d")) ] + ) + [] + ) + ] + [] +) +---- +group-by (hash) + ├── columns: x:1!null y:2 + ├── grouping columns: x:1!null + ├── key: (1) + ├── fd: (1)-->(2) + ├── inner-join (cross) + │ ├── columns: x:1!null y:2 v:6 d:10!null + │ ├── fd: ()-->(6), (1)-->(2), (1)==(10), (10)==(1) + │ ├── select + │ │ ├── columns: v:6 + │ │ ├── fd: ()-->(6) + │ │ ├── scan uv + │ │ │ └── columns: v:6 + │ │ └── filters + │ │ └── v:6 IS NOT DISTINCT FROM CAST(NULL AS INT8) [outer=(6), constraints=(/6: [/NULL - /NULL]; tight), fd=()-->(6)] + │ ├── inner-join (hash) + │ │ ├── columns: x:1!null y:2 d:10!null + │ │ ├── multiplicity: left-rows(zero-or-more), right-rows(zero-or-one) + │ │ ├── fd: (1)-->(2), (1)==(10), (10)==(1) + │ │ ├── scan xy + │ │ │ ├── columns: x:1!null y:2 + │ │ │ ├── key: (1) + │ │ │ └── fd: (1)-->(2) + │ │ ├── scan cd + │ │ │ └── columns: d:10!null + │ │ └── filters + │ │ └── x:1 = d:10 [outer=(1,10), constraints=(/1: (/NULL - ]; /10: (/NULL - ]), fd=(1)==(10), (10)==(1)] + │ └── filters (true) + └── aggregations + └── const-agg [as=y:2, outer=(2)] + └── y:2 + +# This is a synthetic regression test for #130398 that tests an anti-join. +exprnorm expect=HoistJoinSubquery disable=PushFilterIntoJoinLeft +(AntiJoin + (Scan [ (Table "xy") (Cols "x,y") ]) + (Select + (Scan [ (Table "uv") (Cols "u,v") ]) + [ (Is (Var "v") (Null "int")) ] + ) + [ + (Exists + (Select + (Scan [ (Table "cd") (Cols "c,d") ]) + [ (Eq (Var "x") (Var "d")) ] + ) + [] + ) + ] + [] +) +---- +anti-join-apply + ├── columns: x:1!null y:2 + ├── key: (1) + ├── fd: (1)-->(2) + ├── scan xy + │ ├── columns: x:1!null y:2 + │ ├── key: (1) + │ └── fd: (1)-->(2) + ├── project + │ ├── columns: exists:15!null + │ ├── outer: (1) + │ ├── group-by (hash) + │ │ ├── columns: u:5!null true_agg:14 + │ │ ├── grouping columns: u:5!null + │ │ ├── outer: (1) + │ │ ├── key: (5) + │ │ ├── fd: (5)-->(14) + │ │ ├── left-join (cross) + │ │ │ ├── columns: u:5!null v:6 true:13 + │ │ │ ├── outer: (1) + │ │ │ ├── fd: ()-->(6) + │ │ │ ├── select + │ │ │ │ ├── columns: u:5!null v:6 + │ │ │ │ ├── key: (5) + │ │ │ │ ├── fd: ()-->(6) + │ │ │ │ ├── scan uv + │ │ │ │ │ ├── columns: u:5!null v:6 + │ │ │ │ │ ├── key: (5) + │ │ │ │ │ └── fd: (5)-->(6) + │ │ │ │ └── filters + │ │ │ │ └── v:6 IS NOT DISTINCT FROM CAST(NULL AS INT8) [outer=(6), constraints=(/6: [/NULL - /NULL]; tight), fd=()-->(6)] + │ │ │ ├── project + │ │ │ │ ├── columns: true:13!null + │ │ │ │ ├── outer: (1) + │ │ │ │ ├── fd: ()-->(13) + │ │ │ │ ├── select + │ │ │ │ │ ├── columns: d:10!null + │ │ │ │ │ ├── outer: (1) + │ │ │ │ │ ├── fd: ()-->(10) + │ │ │ │ │ ├── scan cd + │ │ │ │ │ │ └── columns: d:10!null + │ │ │ │ │ └── filters + │ │ │ │ │ └── x:1 = d:10 [outer=(1,10), constraints=(/1: (/NULL - ]; /10: (/NULL - ]), fd=(1)==(10), (10)==(1)] + │ │ │ │ └── projections + │ │ │ │ └── true [as=true:13] + │ │ │ └── filters (true) + │ │ └── aggregations + │ │ └── const-not-null-agg [as=true_agg:14, outer=(13)] + │ │ └── true:13 + │ └── projections + │ └── true_agg:14 IS NOT NULL [as=exists:15, outer=(14)] + └── filters + └── exists:15 [outer=(15), constraints=(/15: [/true - /true]; tight), fd=()-->(15)] # -------------------------------------------------- # HoistValuesSubquery