Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
127584: changefeedccl: unify the age filter of "SHOW JOBS" and "SHOW CHANGEFEED JOBS" r=asg0451 a=XiaochenCui

Previously, "SHOW CHANGEFEED JOBS" return jobs from the last 14 days whereas "SHOW JOBS" return jobs from the last 12 hours.

This commit addresses this issue by:

- Use the same "age filter" for these two commands.
- Add a new knob "StubTimeNow" for job updater.

Fixes: #124882

Release note (sql change): Unify the age filter of commands "SHOW JOBS" and "SHOW CHANGEFEED JOBS".

130963: roachtest, teamcity: publish runner logs to same directory r=srosenberg,herkolategan a=DarrylWong

Previously we would publish the runner logs to the top level directory, causing it to be hard to find. This change publishes them to their own directory.

Fixes: #125298
Epic: none
Release note: none

130981: opt: fix HoistJoinSubquery r=mgartner a=mgartner

This commit fixes a bug in `HoistJoinSubquery` that would cause it to
generate a Project expression that produced columns from the RHS of the
hoisted semi- or anti-join.

This bug has existed since the rule was introduced. I believe it was not
discovered because this rule only fires in very rare circumstances for
semi- and anti-joins.

Fixes #130398

Release note (bug fix): A bug has been fixed that could cause a very
rare internal error "lists in SetPrivate are not all the same length"
when executing queries.


131012: sql/catalog/lease: collect stats in TestDescriptorRefreshOnRetry r=fqazi a=spilchen

A flake was observed in TestDescriptorRefreshOnRetry where a lease wasn't relinquished in time. The cause is unclear, so this change adds stack trace collection when acquiring the lease for the table under test.

Epic: None
Closes: #130520
Release note: None

Co-authored-by: XiaochenCui <[email protected]>
Co-authored-by: DarrylWong <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Matt Spilchen <[email protected]>
  • Loading branch information
5 people committed Sep 19, 2024
5 parents 6cb413a + c65943f + c92fbc2 + e7ad630 + e0f5fde commit 3e338af
Show file tree
Hide file tree
Showing 11 changed files with 288 additions and 8 deletions.
49 changes: 49 additions & 0 deletions pkg/ccl/changefeedccl/show_changefeed_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -562,3 +565,49 @@ func TestShowChangefeedJobsAuthorization(t *testing.T) {
// Only enterprise sinks create jobs.
cdcTest(t, testFn, feedTestEnterpriseSinks)
}

// TestShowChangefeedJobsDefaultFilter verifies that "SHOW JOBS" AND "SHOW CHANGEFEED JOBS"
// use the same age filter (12 hours).
func TestShowChangefeedJobsDefaultFilter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

countChangefeedJobs := func() (count int) {
query := `select count(*) from [SHOW CHANGEFEED JOBS]`
sqlDB.QueryRow(t, query).Scan(&count)
return count
}
changefeedJobExists := func(id catpb.JobID) bool {
rows := sqlDB.Query(t, `SHOW CHANGEFEED JOB $1`, id)
defer rows.Close()
return rows.Next()
}

sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

foo := feed(t, f, `CREATE CHANGEFEED FOR foo`)
waitForJobStatus(sqlDB, t, foo.(cdctest.EnterpriseTestFeed).JobID(), jobs.StatusRunning)
require.Equal(t, 1, countChangefeedJobs())

// The job is not visible after closed (and its finished time is older than 12 hours).
closeFeed(t, foo)
require.Equal(t, 0, countChangefeedJobs())

// We can still see the job if we explicitly ask for it.
jobID := foo.(cdctest.EnterpriseTestFeed).JobID()
require.True(t, changefeedJobExists(jobID))
}

updateKnobs := func(opts *feedTestOptions) {
opts.knobsFn = func(knobs *base.TestingKnobs) {
knobs.JobsTestingKnobs.(*jobs.TestingKnobs).StubTimeNow = func() time.Time {
return timeutil.Now().Add(-13 * time.Hour)
}
}
}

cdcTest(t, testFn, feedTestForceSink("kafka"), updateKnobs)
}
8 changes: 8 additions & 0 deletions pkg/cmd/roachtest/roachtestflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,14 @@ var (
providers. Set this to false when running many concurrent Azure tests. Azure
can return stale VM information when many PUT calls are made in succession.`,
})

AlwaysCollectArtifacts bool = false
_ = registerRunFlag(&AlwaysCollectArtifacts, FlagInfo{
Name: "always-collect-artifacts",
Usage: `
Always collect artifacts during test teardown, even if the test did not
time out or fail.`,
})
)

// The flags below override the final cluster configuration. They have no
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func runTests(register func(registry.Registry), filter *registry.TestFilter) err

if roachtestflags.TeamCity {
// Collect the runner logs.
fmt.Printf("##teamcity[publishArtifacts '%s']\n", filepath.Join(literalArtifactsDir, runnerLogsDir))
fmt.Printf("##teamcity[publishArtifacts '%s' => '%s']\n", filepath.Join(literalArtifactsDir, runnerLogsDir), runnerLogsDir)
}

if summaryErr := maybeDumpSummaryMarkdown(runner); summaryErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1535,7 +1535,7 @@ func (r *testRunner) postTestAssertions(
func (r *testRunner) teardownTest(
ctx context.Context, t *testImpl, c *clusterImpl, timedOut bool,
) (string, error) {
if timedOut || t.Failed() {
if timedOut || t.Failed() || roachtestflags.AlwaysCollectArtifacts {
snapURL := ""
if timedOut {
// If the Side-Eye integration was configured, capture a snapshot of the
Expand Down
8 changes: 7 additions & 1 deletion pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,13 @@ func (u Updater) canceled(ctx context.Context) error {
return fmt.Errorf("job with status %s cannot be requested to be canceled", md.Status)
}
ju.UpdateStatus(StatusCanceled)
md.Payload.FinishedMicros = timeutil.ToUnixMicros(u.j.registry.clock.Now().GoTime())
var now time.Time
if u.j.registry.knobs.StubTimeNow != nil {
now = u.j.registry.knobs.StubTimeNow()
} else {
now = u.j.registry.clock.Now().GoTime()
}
md.Payload.FinishedMicros = timeutil.ToUnixMicros(now)
ju.UpdatePayload(md.Payload)
return nil
})
Expand Down
8 changes: 8 additions & 0 deletions pkg/jobs/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ type TestingKnobs struct {
// BeforeWaitForJobsQuery is called once per invocation of the
// poll-show-jobs query in WaitForJobs.
BeforeWaitForJobsQuery func(jobs []jobspb.JobID)

// Jobs updater will use this function to get the current time. If nil,
// "registry.clock.Now" will be used.
//
// TODO (xiaochen): currently only "Updater.canceled" uses this knob,
// we may want to extend this to other parts of the jobs package and
// give this knob a default value (so it won't be nil).
StubTimeNow func() time.Time
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
gosql "database/sql"
"fmt"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -929,6 +930,7 @@ func TestDescriptorRefreshOnRetry(t *testing.T) {
fooReleaseCount := int32(0)
var tableID int64

ctx := context.Background()
var params base.TestServerArgs
params.Knobs = base.TestingKnobs{
SQLLeaseManager: &lease.ManagerTestingKnobs{
Expand All @@ -938,10 +940,12 @@ func TestDescriptorRefreshOnRetry(t *testing.T) {
RemoveOnceDereferenced: true,
LeaseAcquiredEvent: func(desc catalog.Descriptor, _ error) {
if desc.GetName() == "foo" {
log.Infof(ctx, "lease acquirer stack trace: %s", debug.Stack())
atomic.AddInt32(&fooAcquiredCount, 1)
}
},
LeaseReleasedEvent: func(id descpb.ID, _ descpb.DescriptorVersion, _ error) {
log.Infof(ctx, "releasing lease for ID %d", int64(id))
if int64(id) == atomic.LoadInt64(&tableID) {
atomic.AddInt32(&fooReleaseCount, 1)
}
Expand Down Expand Up @@ -970,6 +974,7 @@ CREATE TABLE t.foo (v INT);

tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, s.Codec(), "t", "foo")
atomic.StoreInt64(&tableID, int64(tableDesc.GetID()))
log.Infof(ctx, "table ID for foo is %d", tableDesc.GetID())

tx, err := sqlDB.Begin()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/delegate/show_changefeed_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ FROM
if n.Jobs == nil {
// The query intends to present:
// - first all the running jobs sorted in order of start time,
// - then all completed jobs sorted in order of completion time.
//
// - then all completed jobs sorted in order of completion time (no more than 12 hours).
whereClause = fmt.Sprintf(`WHERE %s`, ageFilter)
// The "ORDER BY" clause below exploits the fact that all
// running jobs have finished = NULL.
orderbyClause = `ORDER BY COALESCE(finished, now()) DESC, started DESC`
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/delegate/show_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
)

const ageFilter = `(finished IS NULL OR finished > now() - '12h':::interval)`

func constructSelectQuery(n *tree.ShowJobs) string {
var baseQuery strings.Builder
baseQuery.WriteString(`SELECT job_id, job_type, `)
Expand Down Expand Up @@ -75,9 +77,9 @@ func constructSelectQuery(n *tree.ShowJobs) string {

// The query intends to present:
// - first all the running jobs sorted in order of start time,
// - then all completed jobs sorted in order of completion time.
// - then all completed jobs sorted in order of completion time (no more than 12 hours).
whereClause = fmt.Sprintf(
`WHERE %s AND (finished IS NULL OR finished > now() - '12h':::interval)`, typePredicate)
`WHERE %s AND %s`, typePredicate, ageFilter)
// The "ORDER BY" clause below exploits the fact that all
// running jobs have finished = NULL.
orderbyClause = `ORDER BY COALESCE(finished, now()) DESC, started DESC`
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/opt/norm/decorrelate_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,13 @@ func (c *CustomFuncs) HoistJoinSubquery(
}

join := c.ConstructApplyJoin(op, left, hoister.input(), newFilters, private)
passthrough := c.OutputCols(left).Union(c.OutputCols(right))
var passthrough opt.ColSet
switch op {
case opt.SemiJoinOp, opt.AntiJoinOp:
passthrough = c.OutputCols(left)
default:
passthrough = c.OutputCols(left).Union(c.OutputCols(right))
}
return c.f.ConstructProject(join, memo.EmptyProjectionsExpr, passthrough)
}

Expand Down
Loading

0 comments on commit 3e338af

Please sign in to comment.