diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index fcd75b545328..82f03dca50a3 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1056,6 +1056,7 @@ GO_TARGETS = [ "//pkg/kv/kvclient/rangefeed/rangefeedcache:rangefeedcache_test", "//pkg/kv/kvclient/rangefeed:rangefeed", "//pkg/kv/kvclient/rangefeed:rangefeed_test", + "//pkg/kv/kvclient/rangestats:rangestats", "//pkg/kv/kvclient:kvclient", "//pkg/kv/kvnemesis:kvnemesis", "//pkg/kv/kvnemesis:kvnemesis_test", @@ -2327,6 +2328,7 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvclient/rangefeed:get_x_data", "//pkg/kv/kvclient/rangefeed/rangefeedbuffer:get_x_data", "//pkg/kv/kvclient/rangefeed/rangefeedcache:get_x_data", + "//pkg/kv/kvclient/rangestats:get_x_data", "//pkg/kv/kvnemesis:get_x_data", "//pkg/kv/kvprober:get_x_data", "//pkg/kv/kvserver:get_x_data", diff --git a/pkg/ccl/partitionccl/BUILD.bazel b/pkg/ccl/partitionccl/BUILD.bazel index 5f6b5929cf03..5ca7906b97b6 100644 --- a/pkg/ccl/partitionccl/BUILD.bazel +++ b/pkg/ccl/partitionccl/BUILD.bazel @@ -13,23 +13,17 @@ go_library( "//pkg/sql/catalog", "//pkg/sql/catalog/catpb", "//pkg/sql/catalog/colinfo", - "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/schemaexpr", - "//pkg/sql/catalog/tabledesc", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/rowenc", "//pkg/sql/rowenc/valueside", "//pkg/sql/schemachanger/scdeps", "//pkg/sql/sem/eval", - "//pkg/sql/sem/normalize", "//pkg/sql/sem/tree", - "//pkg/sql/sem/tree/treecmp", "//pkg/sql/sem/volatility", - "//pkg/sql/types", "//pkg/util/encoding", "//pkg/util/errorutil/unimplemented", - "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", ], ) @@ -79,7 +73,6 @@ go_test( "//pkg/sql/rowenc", "//pkg/sql/scrub", "//pkg/sql/scrub/scrubtestutils", - "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/tests", "//pkg/sql/types", diff --git a/pkg/ccl/partitionccl/partition.go b/pkg/ccl/partitionccl/partition.go index f06349386b1b..d08adc7f0fee 100644 --- a/pkg/ccl/partitionccl/partition.go +++ b/pkg/ccl/partitionccl/partition.go @@ -18,23 +18,17 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scdeps" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" - "github.com/cockroachdb/cockroach/pkg/sql/sem/normalize" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree/treecmp" "github.com/cockroachdb/cockroach/pkg/sql/sem/volatility" - "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" - "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -408,215 +402,6 @@ func createPartitioning( return newImplicitCols, newPartitioning, err } -// selectPartitionExprs constructs an expression for selecting all rows in the -// given partitions. -func selectPartitionExprs( - evalCtx *eval.Context, tableDesc catalog.TableDescriptor, partNames tree.NameList, -) (tree.Expr, error) { - exprsByPartName := make(map[string]tree.TypedExpr) - for _, partName := range partNames { - exprsByPartName[string(partName)] = nil - } - - a := &tree.DatumAlloc{} - var prefixDatums []tree.Datum - if err := catalog.ForEachIndex(tableDesc, catalog.IndexOpts{ - AddMutations: true, - }, func(idx catalog.Index) error { - return selectPartitionExprsByName( - a, evalCtx, tableDesc, idx, idx.GetPartitioning(), prefixDatums, exprsByPartName, true /* genExpr */) - }); err != nil { - return nil, err - } - - var expr tree.TypedExpr = tree.DBoolFalse - for _, partName := range partNames { - partExpr, ok := exprsByPartName[string(partName)] - if !ok || partExpr == nil { - return nil, errors.Errorf("unknown partition: %s", partName) - } - expr = tree.NewTypedOrExpr(expr, partExpr) - } - - var err error - expr, err = normalize.Expr(evalCtx, expr) - if err != nil { - return nil, err - } - // In order to typecheck during simplification and normalization, we used - // dummy IndexVars. Swap them out for actual column references. - finalExpr, err := tree.SimpleVisit(expr, func(e tree.Expr) (recurse bool, newExpr tree.Expr, _ error) { - if ivar, ok := e.(*tree.IndexedVar); ok { - col, err := tableDesc.FindColumnWithID(descpb.ColumnID(ivar.Idx)) - if err != nil { - return false, nil, err - } - return false, &tree.ColumnItem{ColumnName: tree.Name(col.GetName())}, nil - } - return true, e, nil - }) - return finalExpr, err -} - -// selectPartitionExprsByName constructs an expression for selecting all rows in -// each partition and subpartition in the given index. To make it easy to -// simplify and normalize the exprs, references to table columns are represented -// as TypedOrdinalReferences with an ordinal of the column ID. -// -// NB Subpartitions do not affect the expression for their parent partitions. So -// if a partition foo (a=3) is then subpartitiond by (b=5) and no DEFAULT, the -// expression for foo is still `a=3`, not `a=3 AND b=5`. This means that if some -// partition is requested, we can omit all of the subpartitions, because they'll -// also necessarily select subsets of the rows it will. "requested" here is -// indicated by the caller by setting the corresponding name in the -// `exprsByPartName` map to nil. In this case, `genExpr` is then set to false -// for subpartitions of this call, which causes each subpartition to only -// register itself in the map with a placeholder entry (so we can still verify -// that the requested partitions are all valid). -func selectPartitionExprsByName( - a *tree.DatumAlloc, - evalCtx *eval.Context, - tableDesc catalog.TableDescriptor, - idx catalog.Index, - part catalog.Partitioning, - prefixDatums tree.Datums, - exprsByPartName map[string]tree.TypedExpr, - genExpr bool, -) error { - if part.NumColumns() == 0 { - return nil - } - - // Setting genExpr to false skips the expression generation and only - // registers each descendent partition in the map with a placeholder entry. - if !genExpr { - err := part.ForEachList(func(name string, _ [][]byte, subPartitioning catalog.Partitioning) error { - exprsByPartName[name] = tree.DBoolFalse - var fakeDatums tree.Datums - return selectPartitionExprsByName(a, evalCtx, tableDesc, idx, subPartitioning, fakeDatums, exprsByPartName, genExpr) - }) - if err != nil { - return err - } - return part.ForEachRange(func(name string, _, _ []byte) error { - exprsByPartName[name] = tree.DBoolFalse - return nil - }) - } - - var colVars tree.Exprs - { - // The recursive calls of selectPartitionExprsByName don't pass though - // the column ordinal references, so reconstruct them here. - colVars = make(tree.Exprs, len(prefixDatums)+part.NumColumns()) - for i := range colVars { - col, err := tabledesc.FindPublicColumnWithID(tableDesc, idx.GetKeyColumnID(i)) - if err != nil { - return err - } - colVars[i] = tree.NewTypedOrdinalReference(int(col.GetID()), col.GetType()) - } - } - - if part.NumLists() > 0 { - type exprAndPartName struct { - expr tree.TypedExpr - name string - } - // Any partitions using DEFAULT must specifically exclude any relevant - // higher specificity partitions (e.g for partitions `(1, DEFAULT)`, - // `(1, 2)`, the expr for the former must exclude the latter. This is - // done by bucketing the expression for each partition value by the - // number of DEFAULTs it involves. - partValueExprs := make([][]exprAndPartName, part.NumColumns()+1) - - err := part.ForEachList(func(name string, values [][]byte, subPartitioning catalog.Partitioning) error { - for _, valueEncBuf := range values { - t, _, err := rowenc.DecodePartitionTuple(a, evalCtx.Codec, tableDesc, idx, part, valueEncBuf, prefixDatums) - if err != nil { - return err - } - allDatums := append(prefixDatums, t.Datums...) - - // When len(allDatums) < len(colVars), the missing elements are DEFAULTs, so - // we can simply exclude them from the expr. - typContents := make([]*types.T, len(allDatums)) - for i, d := range allDatums { - typContents[i] = d.ResolvedType() - } - tupleTyp := types.MakeTuple(typContents) - partValueExpr := tree.NewTypedComparisonExpr( - treecmp.MakeComparisonOperator(treecmp.EQ), - tree.NewTypedTuple(tupleTyp, colVars[:len(allDatums)]), - tree.NewDTuple(tupleTyp, allDatums...), - ) - partValueExprs[len(t.Datums)] = append(partValueExprs[len(t.Datums)], exprAndPartName{ - expr: partValueExpr, - name: name, - }) - - genExpr := true - if _, ok := exprsByPartName[name]; ok { - // Presence of a partition name in the exprsByPartName map - // means the caller has expressed an interested in this - // partition, which means any subpartitions can be skipped - // (because they must by definition be a subset of this - // partition). This saves us a little work and also helps - // out the normalization & simplification of the resulting - // expression, since it doesn't have to account for which - // partitions overlap. - genExpr = false - } - if err := selectPartitionExprsByName( - a, evalCtx, tableDesc, idx, subPartitioning, allDatums, exprsByPartName, genExpr, - ); err != nil { - return err - } - } - return nil - }) - if err != nil { - return err - } - - // Walk backward through partValueExprs, so partition values with fewest - // DEFAULTs to most. As we go, keep an expression to be AND NOT'd with - // each partition value's expression in `excludeExpr`. This handles the - // exclusion of `(1, 2)` from the expression for `(1, DEFAULT)` in the - // example above. - // - // TODO(dan): The result of the way this currently works is correct but - // too broad. In a two column partitioning with cases for `(a, b)` and - // `(c, DEFAULT)`, the expression generated for `(c, DEFAULT)` will - // needlessly exclude `(a, b)`. Concretely, we end up with expressions - // like `(a) IN (1) AND ... (a, b) != (2, 3)`, where the `!= (2, 3)` - // part is irrelevant. This only happens in fairly unrealistic - // partitionings, so it's unclear if anything really needs to be done - // here. - excludeExpr := tree.TypedExpr(tree.DBoolFalse) - for i := len(partValueExprs) - 1; i >= 0; i-- { - nextExcludeExpr := tree.TypedExpr(tree.DBoolFalse) - for _, v := range partValueExprs[i] { - nextExcludeExpr = tree.NewTypedOrExpr(nextExcludeExpr, v.expr) - partValueExpr := tree.NewTypedAndExpr(v.expr, tree.NewTypedNotExpr(excludeExpr)) - // We can get multiple expressions for the same partition in - // a single-col `PARTITION foo VALUES IN ((1), (2))`. - if e, ok := exprsByPartName[v.name]; !ok || e == nil { - exprsByPartName[v.name] = partValueExpr - } else { - exprsByPartName[v.name] = tree.NewTypedOrExpr(e, partValueExpr) - } - } - excludeExpr = tree.NewTypedOrExpr(excludeExpr, nextExcludeExpr) - } - } - - if part.NumRanges() > 0 { - log.Fatal(evalCtx.Context, "TODO(dan): unsupported for range partitionings") - } - return nil -} - func init() { sql.CreatePartitioningCCL = createPartitioning scdeps.CreatePartitioningCCL = createPartitioning diff --git a/pkg/ccl/partitionccl/partition_test.go b/pkg/ccl/partitionccl/partition_test.go index 786aae2de26e..eb5ce9cee527 100644 --- a/pkg/ccl/partitionccl/partition_test.go +++ b/pkg/ccl/partitionccl/partition_test.go @@ -40,7 +40,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/importer" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/randgen" - "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -1241,85 +1240,6 @@ func TestInitialPartitioning(t *testing.T) { } } -func TestSelectPartitionExprs(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - // TODO(dan): PartitionExprs for range partitions is waiting on the new - // range partitioning syntax. - testData := partitioningTest{ - name: `partition exprs`, - schema: `CREATE TABLE %s ( - a INT, b INT, c INT, PRIMARY KEY (a, b, c) - ) PARTITION BY LIST (a, b) ( - PARTITION p33p44 VALUES IN ((3, 3), (4, 4)) PARTITION BY LIST (c) ( - PARTITION p335p445 VALUES IN (5), - PARTITION p33dp44d VALUES IN (DEFAULT) - ), - PARTITION p6d VALUES IN ((6, DEFAULT)), - PARTITION pdd VALUES IN ((DEFAULT, DEFAULT)) - )`, - } - if err := testData.parse(); err != nil { - t.Fatalf("%+v", err) - } - - tests := []struct { - // partitions is a comma-separated list of input partitions - partitions string - // expr is the expected output - expr string - }{ - {`p33p44`, `((a, b) = (3, 3)) OR ((a, b) = (4, 4))`}, - {`p335p445`, `((a, b, c) = (3, 3, 5)) OR ((a, b, c) = (4, 4, 5))`}, - {`p33dp44d`, `(((a, b) = (3, 3)) AND (NOT ((a, b, c) = (3, 3, 5)))) OR (((a, b) = (4, 4)) AND (NOT ((a, b, c) = (4, 4, 5))))`}, - // NB See the TODO in the impl for why this next case has some clearly - // unrelated `!=`s. - {`p6d`, `((a,) = (6,)) AND (NOT (((a, b) = (3, 3)) OR ((a, b) = (4, 4))))`}, - {`pdd`, `NOT ((((a, b) = (3, 3)) OR ((a, b) = (4, 4))) OR ((a,) = (6,)))`}, - - {`p335p445,p6d`, `(((a, b, c) = (3, 3, 5)) OR ((a, b, c) = (4, 4, 5))) OR (((a,) = (6,)) AND (NOT (((a, b) = (3, 3)) OR ((a, b) = (4, 4)))))`}, - - // TODO(dan): The expression simplification in this method is all done - // by our normal SQL expression simplification code. Seems like it could - // use some targeted work to clean these up. Ideally the following would - // all simplyify to `(a, b) IN ((3, 3), (4, 4))`. Some of them work - // because for every requested partition, all descendent partitions are - // omitted, which is an optimization to save a little work with the side - // benefit of making more of these what we want. - {`p335p445,p33dp44d`, `(((a, b, c) = (3, 3, 5)) OR ((a, b, c) = (4, 4, 5))) OR ((((a, b) = (3, 3)) AND (NOT ((a, b, c) = (3, 3, 5)))) OR (((a, b) = (4, 4)) AND (NOT ((a, b, c) = (4, 4, 5)))))`}, - {`p33p44,p335p445`, `((a, b) = (3, 3)) OR ((a, b) = (4, 4))`}, - {`p33p44,p335p445,p33dp44d`, `((a, b) = (3, 3)) OR ((a, b) = (4, 4))`}, - } - - evalCtx := &eval.Context{ - Codec: keys.SystemSQLCodec, - Settings: cluster.MakeTestingClusterSettings(), - } - for _, test := range tests { - t.Run(test.partitions, func(t *testing.T) { - var partNames tree.NameList - for _, p := range strings.Split(test.partitions, `,`) { - partNames = append(partNames, tree.Name(p)) - } - expr, err := selectPartitionExprs(evalCtx, testData.parsed.tableDesc, partNames) - if err != nil { - t.Fatalf("%+v", err) - } - if exprStr := expr.String(); exprStr != test.expr { - t.Errorf("got\n%s\nexpected\n%s", exprStr, test.expr) - } - }) - } - t.Run("error", func(t *testing.T) { - partNames := tree.NameList{`p33p44`, `nope`} - _, err := selectPartitionExprs(evalCtx, testData.parsed.tableDesc, partNames) - if !testutils.IsError(err, `unknown partition`) { - t.Errorf(`expected "unknown partition" error got: %+v`, err) - } - }) -} - func TestRepartitioning(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvclient/rangestats/BUILD.bazel b/pkg/kv/kvclient/rangestats/BUILD.bazel new file mode 100644 index 000000000000..7ea99deb0865 --- /dev/null +++ b/pkg/kv/kvclient/rangestats/BUILD.bazel @@ -0,0 +1,16 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "rangestats", + srcs = ["fetcher.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv", + "//pkg/roachpb", + "//pkg/sql/sem/eval", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvclient/rangestats/fetcher.go b/pkg/kv/kvclient/rangestats/fetcher.go new file mode 100644 index 000000000000..7ac8dc89adbf --- /dev/null +++ b/pkg/kv/kvclient/rangestats/fetcher.go @@ -0,0 +1,52 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package rangestats contains code to fetch range stats. +package rangestats + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" +) + +// Fetcher implements eval.RangeStatsFetcher. +type Fetcher struct { + db *kv.DB +} + +var _ eval.RangeStatsFetcher = (*Fetcher)(nil) + +// NewFetcher constructs a new Fetcher. +func NewFetcher(db *kv.DB) *Fetcher { + return &Fetcher{db: db} +} + +// RangeStats is part of the eval.RangeStatsFetcher interface. +func (f Fetcher) RangeStats( + ctx context.Context, keys ...roachpb.Key, +) ([]*roachpb.RangeStatsResponse, error) { + var ba kv.Batch + reqs := make([]roachpb.RangeStatsRequest, len(keys)) + for i, k := range keys { + reqs[i].Key = k + ba.AddRawRequest(&reqs[i]) + } + if err := f.db.Run(ctx, &ba); err != nil { + return nil, err + } + resps := make([]*roachpb.RangeStatsResponse, len(keys)) + for i, r := range ba.RawResponse().Responses { + resps[i] = r.GetInner().(*roachpb.RangeStatsResponse) + } + return resps, nil +} diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 376d3750523d..ab85cc4c817a 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -97,6 +97,7 @@ go_library( "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/kvtenant", "//pkg/kv/kvclient/rangefeed", + "//pkg/kv/kvclient/rangestats", "//pkg/kv/kvprober", "//pkg/kv/kvserver", "//pkg/kv/kvserver/allocator/storepool", diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 33a28d4205f4..ef763d46954d 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" @@ -623,6 +624,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { bulkSenderLimiter := bulk.MakeAndRegisterConcurrencyLimiter(&cfg.Settings.SV) + rangeStatsFetcher := rangestats.NewFetcher(cfg.db) + // Set up the DistSQL server. distSQLCfg := execinfra.ServerConfig{ AmbientContext: cfg.AmbientCtx, @@ -680,6 +683,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { SQLSQLResponseAdmissionQ: cfg.sqlSQLResponseAdmissionQ, CollectionFactory: collectionFactory, ExternalIORecorder: cfg.costController, + RangeStatsFetcher: rangeStatsFetcher, } cfg.TempStorageConfig.Mon.SetMetrics(distSQLMetrics.CurDiskBytesCount, distSQLMetrics.MaxDiskBytesHist) if distSQLTestingKnobs := cfg.TestingKnobs.DistSQL; distSQLTestingKnobs != nil { @@ -839,6 +843,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ConsistencyChecker: consistencychecker.NewConsistencyChecker(cfg.db), RangeProber: rangeprober.NewRangeProber(cfg.db), DescIDGenerator: descidgen.NewGenerator(codec, cfg.db), + RangeStatsFetcher: rangeStatsFetcher, } if sqlSchemaChangerTestingKnobs := cfg.TestingKnobs.SQLSchemaChanger; sqlSchemaChangerTestingKnobs != nil { diff --git a/pkg/sql/colexec/BUILD.bazel b/pkg/sql/colexec/BUILD.bazel index abd007e903f6..f6e80b921573 100644 --- a/pkg/sql/colexec/BUILD.bazel +++ b/pkg/sql/colexec/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "ordered_aggregator.go", "parallel_unordered_synchronizer.go", "partially_ordered_distinct.go", + "range_stats.go", "serial_unordered_synchronizer.go", "sort.go", "sort_chunks.go", @@ -41,6 +42,7 @@ go_library( "//pkg/col/coldata", "//pkg/col/coldataext", # keep "//pkg/col/typeconv", # keep + "//pkg/roachpb", "//pkg/server/telemetry", # keep "//pkg/settings", "//pkg/sql/catalog/colinfo", # keep diff --git a/pkg/sql/colexec/builtin_funcs.go b/pkg/sql/colexec/builtin_funcs.go index 72f10be9858b..a8250a98f997 100644 --- a/pkg/sql/colexec/builtin_funcs.go +++ b/pkg/sql/colexec/builtin_funcs.go @@ -124,6 +124,16 @@ func NewBuiltinFunctionOperator( return newSubstringOperator( allocator, columnTypes, argumentCols, outputIdx, input, ), nil + case tree.CrdbInternalRangeStats: + if len(argumentCols) != 1 { + return nil, errors.AssertionFailedf( + "expected 1 input column to crdb_internal.range_stats, got %d", + len(argumentCols), + ) + } + return newRangeStatsOperator( + evalCtx.RangeStatsFetcher, allocator, argumentCols[0], outputIdx, input, + ) default: return &defaultBuiltinFuncOperator{ OneInputHelper: colexecop.MakeOneInputHelper(input), diff --git a/pkg/sql/colexec/range_stats.go b/pkg/sql/colexec/range_stats.go new file mode 100644 index 000000000000..7c98466d7b91 --- /dev/null +++ b/pkg/sql/colexec/range_stats.go @@ -0,0 +1,119 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colexec + +import ( + gojson "encoding/json" + + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" + "github.com/cockroachdb/cockroach/pkg/sql/colexecop" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/util/json" +) + +type rangeStatsOperator struct { + colexecop.OneInputHelper + fetcher eval.RangeStatsFetcher + allocator *colmem.Allocator + argumentCol int + outputIdx int +} + +var _ colexecop.Operator = (*rangeStatsOperator)(nil) + +// newRangeStatsOperator constructs a vectorized operator to fetch range +// statistics. Importantly, this operator issues RangeStatsRequests in +// parallel to help amortize the latency required to issue the requests +// to nodes in remote regions. +func newRangeStatsOperator( + fetcher eval.RangeStatsFetcher, + allocator *colmem.Allocator, + argumentCol int, + outputIdx int, + input colexecop.Operator, +) (colexecop.Operator, error) { + return &rangeStatsOperator{ + OneInputHelper: colexecop.MakeOneInputHelper(input), + allocator: allocator, + argumentCol: argumentCol, + outputIdx: outputIdx, + fetcher: fetcher, + }, nil +} + +// TODO(ajwerner): Generalize this operator to deal with other very similar +// builtins like crdb_internal.leaseholder. Some of the places which retrieve +// range statistics also call that function; optimizing one without the other +// is pointless, and they are very similar. + +func (r *rangeStatsOperator) Next() coldata.Batch { + // Naively take the input batch and use it to define the batch size. + // + // TODO(ajwerner): As a first step towards being more sophisticated, + // this code could accumulate up to some minimum batch size before + // sending the first batch. + batch := r.Input.Next() + n := batch.Length() + if n == 0 { + return coldata.ZeroBatch + } + inSel := batch.Selection() + inVec := batch.ColVec(r.argumentCol) + inBytes := inVec.Bytes() + + output := batch.ColVec(r.outputIdx) + jsonOutput := output.JSON() + r.allocator.PerformOperation( + []coldata.Vec{output}, + func() { + keys := make([]roachpb.Key, 0, batch.Length()) + if inSel == nil { + for i := 0; i < batch.Length(); i++ { + keys = append(keys, inBytes.Get(i)) + } + } else { + for _, idx := range inSel { + keys = append(keys, inBytes.Get(idx)) + } + } + // TODO(ajwerner): Reserve memory for the responses. We know they'll + // at least, on average, contain keys so it'll be 2x the size of the + // keys plus some constant multiple. + res, err := r.fetcher.RangeStats(r.Ctx, keys...) + if err != nil { + colexecerror.ExpectedError(err) + } + readResponse := func(resultIndex, outputIndex int) { + jsonStr, err := gojson.Marshal(&res[resultIndex].MVCCStats) + if err != nil { + colexecerror.ExpectedError(err) + } + jsonDatum, err := json.ParseJSON(string(jsonStr)) + if err != nil { + colexecerror.ExpectedError(err) + } + jsonOutput.Set(outputIndex, jsonDatum) + } + if inSel != nil { + for i, s := range inSel { + readResponse(i, s) + } + } else { + for i := range res { + readResponse(i, i) + } + } + }) + return batch +} diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index dfa34b64daaf..f092a922c0ce 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2695,6 +2695,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo CatalogBuiltins: &p.evalCatalogBuiltins, QueryCancelKey: ex.queryCancelKey, DescIDGenerator: ex.getDescIDGenerator(), + RangeStatsFetcher: p.execCfg.RangeStatsFetcher, }, Tracing: &ex.sessionTracing, MemMetrics: &ex.memMetrics, diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 00b8906d9117..9f5ba3bbc12f 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -383,6 +383,7 @@ func (ds *ServerImpl) setupFlow( SQLStatsController: ds.ServerConfig.SQLStatsController, SchemaTelemetryController: ds.ServerConfig.SchemaTelemetryController, IndexUsageStatsController: ds.ServerConfig.IndexUsageStatsController, + RangeStatsFetcher: ds.ServerConfig.RangeStatsFetcher, } evalCtx.SetStmtTimestamp(timeutil.Unix(0 /* sec */, req.EvalContext.StmtTimestampNanos)) evalCtx.SetTxnTimestamp(timeutil.Unix(0 /* sec */, req.EvalContext.TxnTimestampNanos)) diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 82c0970d2031..1f523e090964 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1337,6 +1337,9 @@ type ExecutorConfig struct { // SyntheticPrivilegeCache SyntheticPrivilegeCache *cacheutil.Cache + + // RangeStatsFetcher is used to fetch RangeStats. + RangeStatsFetcher eval.RangeStatsFetcher } // UpdateVersionSystemSettingHook provides a callback that allows us diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index ef9def87ab9b..8e21ff2c7a5d 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -192,6 +192,9 @@ type ServerConfig struct { // ExternalIORecorder is used to record reads and writes from // external services (such as external storage) ExternalIORecorder multitenant.TenantSideExternalIORecorder + + // RangeStatsFetcher is used to fetch range stats for keys. + RangeStatsFetcher eval.RangeStatsFetcher } // RuntimeStats is an interface through which the rowexec layer can get diff --git a/pkg/sql/opt/exec/execbuilder/testdata/range_stats b/pkg/sql/opt/exec/execbuilder/testdata/range_stats new file mode 100644 index 000000000000..6787c656896e --- /dev/null +++ b/pkg/sql/opt/exec/execbuilder/testdata/range_stats @@ -0,0 +1,13 @@ +# LogicTest: local +# +# This file tests that we build a specialized vectorized operator for the +# crdb_internal.range_stats function. + +query T +EXPLAIN (VEC) SELECT crdb_internal.range_stats(start_key) + FROM crdb_internal.ranges_no_leases; +---- +│ +└ Node 1 + └ *colexec.rangeStatsOperator + └ *sql.planNodeToRowSource diff --git a/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go b/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go index 26aeca7622d4..99f94c36dc2c 100644 --- a/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go +++ b/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go @@ -389,6 +389,13 @@ func TestExecBuild_prepare( runExecBuildLogicTest(t, "prepare") } +func TestExecBuild_range_stats( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runExecBuildLogicTest(t, "range_stats") +} + func TestExecBuild_scalar( t *testing.T, ) { diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 78c7c3a5f9dc..3f90790473d5 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -486,6 +486,7 @@ func internalExtendedEvalCtx( SchemaTelemetryController: schemaTelemetryController, IndexUsageStatsController: indexUsageStatsController, StmtDiagnosticsRequestInserter: execCfg.StmtDiagnosticsRecorder.InsertRequest, + RangeStatsFetcher: execCfg.RangeStatsFetcher, }, Tracing: &SessionTracing{}, Descs: tables, diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 379a7da560e9..c1745eee8276 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -1362,6 +1362,7 @@ func TestSchemaChangeRetry(t *testing.T) { } const maxValue = 2000 + ctx, cancel := context.WithCancel(context.Background()) params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ WriteCheckpointInterval: time.Nanosecond, @@ -1379,10 +1380,17 @@ func TestSchemaChangeRetry(t *testing.T) { }, // Decrease the adopt loop interval so that retries happen quickly. JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + GCJob: &sql.GCJobTestingKnobs{ + RunBeforeResume: func(jobID jobspb.JobID) error { + <-ctx.Done() + return ctx.Err() + }, + }, } s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.Background()) + defer cancel() if _, err := sqlDB.Exec(` CREATE DATABASE t; diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index 7475eeb58366..b7eca29fcf7c 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -5414,20 +5414,18 @@ value if you rely on the HLC for accuracy.`, Types: tree.ArgTypes{ {"key", types.Bytes}, }, - ReturnType: tree.FixedReturnType(types.Jsonb), + SpecializedVecBuiltin: tree.CrdbInternalRangeStats, + ReturnType: tree.FixedReturnType(types.Jsonb), Fn: func(ctx *eval.Context, args tree.Datums) (tree.Datum, error) { - key := []byte(tree.MustBeDBytes(args[0])) - b := &kv.Batch{} - b.AddRawRequest(&roachpb.RangeStatsRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: key, - }, - }) - if err := ctx.Txn.Run(ctx.Context, b); err != nil { + if args[0] == tree.DNull { + return tree.DNull, nil + } + resps, err := ctx.RangeStatsFetcher.RangeStats(ctx.Ctx(), + roachpb.Key(tree.MustBeDBytes(args[0]))) + if err != nil { return nil, pgerror.Wrap(err, pgcode.InvalidParameterValue, "error fetching range stats") } - resp := b.RawResponse().Responses[0].GetInner().(*roachpb.RangeStatsResponse).MVCCStats - jsonStr, err := gojson.Marshal(&resp) + jsonStr, err := gojson.Marshal(&resps[0].MVCCStats) if err != nil { return nil, err } diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index cd03b27fb84e..9024a21cf6b1 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -223,6 +223,9 @@ type Context struct { QueryCancelKey pgwirecancel.BackendKeyData DescIDGenerator DescIDGenerator + + // RangeStatsFetcher is used to fetch RangeStats. + RangeStatsFetcher RangeStatsFetcher } // DescIDGenerator generates unique descriptor IDs. @@ -230,6 +233,13 @@ type DescIDGenerator interface { GenerateUniqueDescID(ctx context.Context) (catid.DescID, error) } +// RangeStatsFetcher is used to fetch RangeStats. +type RangeStatsFetcher interface { + + // RangeStats fetches the stats for the ranges which contain the passed keys. + RangeStats(ctx context.Context, keys ...roachpb.Key) ([]*roachpb.RangeStatsResponse, error) +} + var _ tree.ParseTimeContext = &Context{} // ConsistencyCheckRunner is an interface embedded in eval.Context used by diff --git a/pkg/sql/sem/tree/overload.go b/pkg/sql/sem/tree/overload.go index 367d3be22a01..2b94b7e00a34 100644 --- a/pkg/sql/sem/tree/overload.go +++ b/pkg/sql/sem/tree/overload.go @@ -40,6 +40,7 @@ type SpecializedVectorizedBuiltin int const ( _ SpecializedVectorizedBuiltin = iota SubstringStringIntInt + CrdbInternalRangeStats ) // AggregateOverload is an opaque type which is used to box an eval.AggregateOverload. diff --git a/pkg/sql/unsplit_range_test.go b/pkg/sql/unsplit_range_test.go index 3f0483dc3940..a1fb5267cd5c 100644 --- a/pkg/sql/unsplit_range_test.go +++ b/pkg/sql/unsplit_range_test.go @@ -309,11 +309,6 @@ func TestUnsplitRanges(t *testing.T) { if _, err := sqlDB.Exec(tc.query); err != nil { t.Fatal(err) } - // Push a new zone config for a few tables with TTL=0 so the data - // is deleted immediately. - if _, err := sqltestutils.AddImmediateGCZoneConfig(sqlDB, tableDesc.GetID()); err != nil { - t.Fatal(err) - } // Check GC worked! testutils.SucceedsSoon(t, func() error {