Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…achdb#133686 cockroachdb#133690

133234: workload: tpcc consistency check added flag as-of. r=srosenberg,nameisbhaskar,vidit-bhat a=shailendra-patel

While running the consistency checker on the tpcc database with an active tpcc workload, the consistency check fails with a retryable error, such as restart transaction:`TransactionRetryWithProtoRefreshError: ReadWithinUncertaintyIntervalError:`
To fix this, added a new flag `as-of` which allows to run consistency check using `AS OF SYSTEM TIME`.

Epic: none
Release note: None

133607: sql: check object type when revoking privilege r=rafiss a=rafiss

fixes cockroachdb#131157
Release note (bug fix): Fix an unhandled error that could occur when using `REVOKE ... ON SEQUENCE FROM ... user` on an object that is not a sequence.

133616: roachtest: validate token return in perturbation/* tests r=kvoli a=andrewbaptist

This commit adds validation that all RAC tokens are returned on all stable nodes at the end of the test.

Fixes: cockroachdb#133410

Release note: None

133686: rac2: order testingRCRange.mu before RaftMu in tests r=sumeerbhola a=kvoli

`testingRCRange.mu` was being acquired, and held before acquiring `RaftMu` in `testingRCRange.admit()`, which conflicted with different ordering (reversed). This was a test only issue with `TestRangeController`.

Order `testingRCRange.mu` before `RaftMu` in `admit()`.

Fixes: cockroachdb#133650
Release note: None

133690: roachtest: always pass a Context to queries r=kvoli a=andrewbaptist

Queries can hang if there is no context passed to them. In roachtests, a context can be cancelled if there is a VM preemption. It is always better to use the test context and avoid this risk. This change updates the perturbation/* tests to always pass a context.

Fixes: cockroachdb#133625

Release note: None

Co-authored-by: Shailendra Patel <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Andrew Baptist <[email protected]>
Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
5 people committed Oct 29, 2024
6 parents 0f9175f + 5d6fc67 + 4b26b96 + 865919a + 5d74d15 + 192a365 commit c14a15d
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 25 deletions.
46 changes: 44 additions & 2 deletions pkg/cmd/roachtest/tests/admission_control_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -901,6 +902,47 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster)
t.L().Printf("validating stats after the perturbation")
failures = append(failures, isAcceptableChange(t.L(), baselineStats, afterStats, v.acceptableChange)...)
require.True(t, len(failures) == 0, strings.Join(failures, "\n"))
v.validateTokensReturned(ctx, t)
}

// validateTokensReturned ensures that all RAC tokens are returned to the pool
// at the end of the test.
func (v variations) validateTokensReturned(ctx context.Context, t test.Test) {
t.L().Printf("validating all tokens returned")
for _, node := range v.stableNodes() {
// Wait for the tokens to be returned to the pool. Normally this will
// pass immediately however it is possible that there is still some
// recovery so loop a few times.
testutils.SucceedsWithin(t, func() error {
db := v.Conn(ctx, t.L(), node)
defer db.Close()
for _, sType := range []string{"send", "eval"} {
for _, tType := range []string{"elastic", "regular"} {
statPrefix := fmt.Sprintf("kvflowcontrol.tokens.%s.%s", sType, tType)
query := fmt.Sprintf(`
SELECT d.value::INT8 AS deducted, r.value::INT8 AS returned
FROM
crdb_internal.node_metrics d,
crdb_internal.node_metrics r
WHERE
d.name='%s.deducted' AND
r.name='%s.returned'`,
statPrefix, statPrefix)
rows, err := db.QueryContext(ctx, query)
require.NoError(t, err)
require.True(t, rows.Next())
var deducted, returned int64
if err := rows.Scan(&deducted, &returned); err != nil {
return err
}
if deducted != returned {
return errors.Newf("tokens not returned for %s: deducted %d returned %d", statPrefix, deducted, returned)
}
}
}
return nil
}, 5*time.Second)
}
}

// trackedStat is a collection of the relevant values from the histogram. The
Expand Down Expand Up @@ -994,7 +1036,7 @@ func (v variations) waitForRebalanceToStop(ctx context.Context, t test.Test) {
Multiplier: 1,
}
for r := retry.StartWithCtx(ctx, opts); r.Next(); {
if row := db.QueryRow(q); row != nil {
if row := db.QueryRowContext(ctx, q); row != nil {
var secondsSinceLastEvent int
if err := row.Scan(&secondsSinceLastEvent); err != nil && !errors.Is(err, gosql.ErrNoRows) {
t.Fatal(err)
Expand All @@ -1021,7 +1063,7 @@ func (v variations) waitForIOOverloadToEnd(ctx context.Context, t test.Test) {
anyOverloaded := false
for _, nodeId := range v.targetNodes() {
db := v.Conn(ctx, t.L(), nodeId)
if row := db.QueryRow(q); row != nil {
if row := db.QueryRowContext(ctx, q); row != nil {
var overload float64
if err := row.Scan(&overload); err != nil && !errors.Is(err, gosql.ErrNoRows) {
db.Close()
Expand Down
8 changes: 4 additions & 4 deletions pkg/cmd/roachtest/tests/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func profileTopStatements(

// Enable continuous statement diagnostics rather than just the first one.
sql := "SET CLUSTER SETTING sql.stmt_diagnostics.collect_continuously.enabled=true"
if _, err := db.Exec(sql); err != nil {
if _, err := db.ExecContext(ctx, sql); err != nil {
return err
}

Expand Down Expand Up @@ -199,7 +199,7 @@ FROM (
dbName,
minNumExpectedStmts,
)
if _, err := db.Exec(sql); err != nil {
if _, err := db.ExecContext(ctx, sql); err != nil {
return err
}
return nil
Expand All @@ -217,7 +217,7 @@ func downloadProfiles(
query := "SELECT id, collected_at FROM system.statement_diagnostics"
db := cluster.Conn(ctx, logger, 1)
defer db.Close()
idRow, err := db.Query(query)
idRow, err := db.QueryContext(ctx, query)
if err != nil {
return err
}
Expand All @@ -236,7 +236,7 @@ func downloadProfiles(
return err
}
url := urlPrefix + diagID
resp, err := client.Get(context.Background(), url)
resp, err := client.Get(ctx, url)
if err != nil {
return err
}
Expand Down
50 changes: 33 additions & 17 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,11 @@ type testingRCRange struct {
snapshots []testingTrackerSnapshot
raftLog raft.MemoryStorage

// mu is ordered after RaftMu.
//
// This is because we hold RaftMu when calling into the RangeController,
// which in turn may call back out to the testingRCRange for state
// information, as it mocks the dependencies of the RangeController.
mu struct {
syncutil.Mutex
r testingRange
Expand Down Expand Up @@ -506,26 +511,37 @@ func (r *testingRCRange) startWaitForEval(name string, pri admissionpb.WorkPrior
}

func (r *testingRCRange) admit(ctx context.Context, storeID roachpb.StoreID, av AdmittedVector) {
r.mu.Lock()
defer r.mu.Unlock()

for _, replDesc := range sortReplicasLocked(r) {
replica := r.mu.r.replicaSet[replDesc.ReplicaID]
if replica.desc.StoreID == storeID {
for _, v := range av.Admitted {
// Ensure that Match doesn't lag behind the highest index in the
// AdmittedVector.
replica.info.Match = max(replica.info.Match, v)
var replicaID roachpb.ReplicaID
var found bool
func() {
// We need to ensure that r.mu isn't held before (and while) holding
// RaftMu, in order to order the locks correctly (RaftMu before
// testingRCRange.mu).
r.mu.Lock()
defer r.mu.Unlock()
for _, replDesc := range sortReplicasLocked(r) {
replica := r.mu.r.replicaSet[replDesc.ReplicaID]
if replica.desc.StoreID == storeID {
for _, v := range av.Admitted {
// Ensure that Match doesn't lag behind the highest index in the
// AdmittedVector.
replica.info.Match = max(replica.info.Match, v)
}
replicaID = replica.desc.ReplicaID
r.mu.r.replicaSet[replicaID] = replica
found = true
break
}
r.mu.r.replicaSet[replica.desc.ReplicaID] = replica
func() {
r.rc.opts.ReplicaMutexAsserter.RaftMu.Lock()
defer r.rc.opts.ReplicaMutexAsserter.RaftMu.Unlock()
r.rc.AdmitRaftMuLocked(ctx, replica.desc.ReplicaID, av)
}()
return
}
}()

if !found {
panic("replica not found")
}

r.rc.opts.ReplicaMutexAsserter.RaftMu.Lock()
defer r.rc.opts.ReplicaMutexAsserter.RaftMu.Unlock()
r.rc.AdmitRaftMuLocked(ctx, replicaID, av)
}

type testingRange struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,12 @@ database_name schema_name table_name grantee privilege_type is_grantable
otherdb public tbl admin ALL true
otherdb public tbl root ALL true
otherdb public tbl testuser SELECT false

statement ok
CREATE TABLE t131157 (c1 INT)

statement ok
GRANT ALL ON t131157 TO testuser

statement error t131157 is not a sequence
REVOKE CREATE ON SEQUENCE t131157 FROM testuser
2 changes: 2 additions & 0 deletions pkg/sql/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,8 @@ func (p *planner) getDescriptorsFromTargetListForPrivilegeChange(
objectType: privilege.Sequence,
},
)
} else if targets.Tables.SequenceOnly {
return nil, pgerror.Newf(pgcode.WrongObjectType, "%s is not a sequence", tableDesc.GetName())
} else {
descs = append(
descs,
Expand Down
11 changes: 9 additions & 2 deletions pkg/workload/tpcc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ type tpcc struct {
// context group for any background reset table operation to avoid goroutine leaks during long duration workloads
resetTableGrp ctxgroup.Group
resetTableCancelFn context.CancelFunc

asOfSystemTime string
}

type waitSetter struct {
Expand Down Expand Up @@ -260,7 +262,8 @@ var tpccMeta = workload.Meta{
`deprecated-fk-indexes`: {RuntimeOnly: true},
`query-trace-file`: {RuntimeOnly: true},
`fake-time`: {RuntimeOnly: true},
"txn-preamble-file": {RuntimeOnly: true},
`txn-preamble-file`: {RuntimeOnly: true},
`aost`: {RuntimeOnly: true, CheckConsistencyOnly: true},
}

g.flags.IntVar(&g.warehouses, `warehouses`, 1, `Number of warehouses for loading`)
Expand Down Expand Up @@ -300,6 +303,10 @@ var tpccMeta = workload.Meta{
g.flags.StringVar(&g.queryTraceFile, `query-trace-file`, ``, `File to write the query traces to. Defaults to no output`)
// Support executing a query file before each transaction.
g.flags.StringVar(&g.txnPreambleFile, "txn-preamble-file", "", "queries that will be injected before each txn")
g.flags.StringVar(&g.asOfSystemTime, "aost", "",
"This is an optional parameter to specify AOST; used exclusively in conjunction with the TPC-C consistency "+
"check. Example values are (\"'-1m'\", \"'-1h'\")")

RandomSeed.AddFlag(&g.flags)
g.connFlags = workload.NewConnFlags(&g.flags)
// Hardcode this since it doesn't seem like anyone will want to change
Expand Down Expand Up @@ -604,7 +611,7 @@ func (w *tpcc) Hooks() workload.Hooks {
}

start := timeutil.Now()
err := check.Fn(db, "" /* asOfSystemTime */)
err := check.Fn(db, w.asOfSystemTime /* asOfSystemTime */)
log.Infof(ctx, `check %s took %s`, check.Name, timeutil.Since(start))
if err != nil {
return errors.Wrapf(err, `check failed: %s`, check.Name)
Expand Down

0 comments on commit c14a15d

Please sign in to comment.