From 7a5cb735e2e9f7a22a63f31181aa36e4bfe4a5b3 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 8 Aug 2022 11:32:59 -0700 Subject: [PATCH 1/4] partitionccl: remove unused functions This commit removes a couple of functions that are only used in tests. Release note: None --- pkg/ccl/partitionccl/BUILD.bazel | 7 - pkg/ccl/partitionccl/partition.go | 215 ------------------------- pkg/ccl/partitionccl/partition_test.go | 80 --------- 3 files changed, 302 deletions(-) diff --git a/pkg/ccl/partitionccl/BUILD.bazel b/pkg/ccl/partitionccl/BUILD.bazel index 5f6b5929cf03..5ca7906b97b6 100644 --- a/pkg/ccl/partitionccl/BUILD.bazel +++ b/pkg/ccl/partitionccl/BUILD.bazel @@ -13,23 +13,17 @@ go_library( "//pkg/sql/catalog", "//pkg/sql/catalog/catpb", "//pkg/sql/catalog/colinfo", - "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/schemaexpr", - "//pkg/sql/catalog/tabledesc", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/rowenc", "//pkg/sql/rowenc/valueside", "//pkg/sql/schemachanger/scdeps", "//pkg/sql/sem/eval", - "//pkg/sql/sem/normalize", "//pkg/sql/sem/tree", - "//pkg/sql/sem/tree/treecmp", "//pkg/sql/sem/volatility", - "//pkg/sql/types", "//pkg/util/encoding", "//pkg/util/errorutil/unimplemented", - "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", ], ) @@ -79,7 +73,6 @@ go_test( "//pkg/sql/rowenc", "//pkg/sql/scrub", "//pkg/sql/scrub/scrubtestutils", - "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/tests", "//pkg/sql/types", diff --git a/pkg/ccl/partitionccl/partition.go b/pkg/ccl/partitionccl/partition.go index f06349386b1b..d08adc7f0fee 100644 --- a/pkg/ccl/partitionccl/partition.go +++ b/pkg/ccl/partitionccl/partition.go @@ -18,23 +18,17 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scdeps" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" - "github.com/cockroachdb/cockroach/pkg/sql/sem/normalize" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree/treecmp" "github.com/cockroachdb/cockroach/pkg/sql/sem/volatility" - "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" - "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -408,215 +402,6 @@ func createPartitioning( return newImplicitCols, newPartitioning, err } -// selectPartitionExprs constructs an expression for selecting all rows in the -// given partitions. -func selectPartitionExprs( - evalCtx *eval.Context, tableDesc catalog.TableDescriptor, partNames tree.NameList, -) (tree.Expr, error) { - exprsByPartName := make(map[string]tree.TypedExpr) - for _, partName := range partNames { - exprsByPartName[string(partName)] = nil - } - - a := &tree.DatumAlloc{} - var prefixDatums []tree.Datum - if err := catalog.ForEachIndex(tableDesc, catalog.IndexOpts{ - AddMutations: true, - }, func(idx catalog.Index) error { - return selectPartitionExprsByName( - a, evalCtx, tableDesc, idx, idx.GetPartitioning(), prefixDatums, exprsByPartName, true /* genExpr */) - }); err != nil { - return nil, err - } - - var expr tree.TypedExpr = tree.DBoolFalse - for _, partName := range partNames { - partExpr, ok := exprsByPartName[string(partName)] - if !ok || partExpr == nil { - return nil, errors.Errorf("unknown partition: %s", partName) - } - expr = tree.NewTypedOrExpr(expr, partExpr) - } - - var err error - expr, err = normalize.Expr(evalCtx, expr) - if err != nil { - return nil, err - } - // In order to typecheck during simplification and normalization, we used - // dummy IndexVars. Swap them out for actual column references. - finalExpr, err := tree.SimpleVisit(expr, func(e tree.Expr) (recurse bool, newExpr tree.Expr, _ error) { - if ivar, ok := e.(*tree.IndexedVar); ok { - col, err := tableDesc.FindColumnWithID(descpb.ColumnID(ivar.Idx)) - if err != nil { - return false, nil, err - } - return false, &tree.ColumnItem{ColumnName: tree.Name(col.GetName())}, nil - } - return true, e, nil - }) - return finalExpr, err -} - -// selectPartitionExprsByName constructs an expression for selecting all rows in -// each partition and subpartition in the given index. To make it easy to -// simplify and normalize the exprs, references to table columns are represented -// as TypedOrdinalReferences with an ordinal of the column ID. -// -// NB Subpartitions do not affect the expression for their parent partitions. So -// if a partition foo (a=3) is then subpartitiond by (b=5) and no DEFAULT, the -// expression for foo is still `a=3`, not `a=3 AND b=5`. This means that if some -// partition is requested, we can omit all of the subpartitions, because they'll -// also necessarily select subsets of the rows it will. "requested" here is -// indicated by the caller by setting the corresponding name in the -// `exprsByPartName` map to nil. In this case, `genExpr` is then set to false -// for subpartitions of this call, which causes each subpartition to only -// register itself in the map with a placeholder entry (so we can still verify -// that the requested partitions are all valid). -func selectPartitionExprsByName( - a *tree.DatumAlloc, - evalCtx *eval.Context, - tableDesc catalog.TableDescriptor, - idx catalog.Index, - part catalog.Partitioning, - prefixDatums tree.Datums, - exprsByPartName map[string]tree.TypedExpr, - genExpr bool, -) error { - if part.NumColumns() == 0 { - return nil - } - - // Setting genExpr to false skips the expression generation and only - // registers each descendent partition in the map with a placeholder entry. - if !genExpr { - err := part.ForEachList(func(name string, _ [][]byte, subPartitioning catalog.Partitioning) error { - exprsByPartName[name] = tree.DBoolFalse - var fakeDatums tree.Datums - return selectPartitionExprsByName(a, evalCtx, tableDesc, idx, subPartitioning, fakeDatums, exprsByPartName, genExpr) - }) - if err != nil { - return err - } - return part.ForEachRange(func(name string, _, _ []byte) error { - exprsByPartName[name] = tree.DBoolFalse - return nil - }) - } - - var colVars tree.Exprs - { - // The recursive calls of selectPartitionExprsByName don't pass though - // the column ordinal references, so reconstruct them here. - colVars = make(tree.Exprs, len(prefixDatums)+part.NumColumns()) - for i := range colVars { - col, err := tabledesc.FindPublicColumnWithID(tableDesc, idx.GetKeyColumnID(i)) - if err != nil { - return err - } - colVars[i] = tree.NewTypedOrdinalReference(int(col.GetID()), col.GetType()) - } - } - - if part.NumLists() > 0 { - type exprAndPartName struct { - expr tree.TypedExpr - name string - } - // Any partitions using DEFAULT must specifically exclude any relevant - // higher specificity partitions (e.g for partitions `(1, DEFAULT)`, - // `(1, 2)`, the expr for the former must exclude the latter. This is - // done by bucketing the expression for each partition value by the - // number of DEFAULTs it involves. - partValueExprs := make([][]exprAndPartName, part.NumColumns()+1) - - err := part.ForEachList(func(name string, values [][]byte, subPartitioning catalog.Partitioning) error { - for _, valueEncBuf := range values { - t, _, err := rowenc.DecodePartitionTuple(a, evalCtx.Codec, tableDesc, idx, part, valueEncBuf, prefixDatums) - if err != nil { - return err - } - allDatums := append(prefixDatums, t.Datums...) - - // When len(allDatums) < len(colVars), the missing elements are DEFAULTs, so - // we can simply exclude them from the expr. - typContents := make([]*types.T, len(allDatums)) - for i, d := range allDatums { - typContents[i] = d.ResolvedType() - } - tupleTyp := types.MakeTuple(typContents) - partValueExpr := tree.NewTypedComparisonExpr( - treecmp.MakeComparisonOperator(treecmp.EQ), - tree.NewTypedTuple(tupleTyp, colVars[:len(allDatums)]), - tree.NewDTuple(tupleTyp, allDatums...), - ) - partValueExprs[len(t.Datums)] = append(partValueExprs[len(t.Datums)], exprAndPartName{ - expr: partValueExpr, - name: name, - }) - - genExpr := true - if _, ok := exprsByPartName[name]; ok { - // Presence of a partition name in the exprsByPartName map - // means the caller has expressed an interested in this - // partition, which means any subpartitions can be skipped - // (because they must by definition be a subset of this - // partition). This saves us a little work and also helps - // out the normalization & simplification of the resulting - // expression, since it doesn't have to account for which - // partitions overlap. - genExpr = false - } - if err := selectPartitionExprsByName( - a, evalCtx, tableDesc, idx, subPartitioning, allDatums, exprsByPartName, genExpr, - ); err != nil { - return err - } - } - return nil - }) - if err != nil { - return err - } - - // Walk backward through partValueExprs, so partition values with fewest - // DEFAULTs to most. As we go, keep an expression to be AND NOT'd with - // each partition value's expression in `excludeExpr`. This handles the - // exclusion of `(1, 2)` from the expression for `(1, DEFAULT)` in the - // example above. - // - // TODO(dan): The result of the way this currently works is correct but - // too broad. In a two column partitioning with cases for `(a, b)` and - // `(c, DEFAULT)`, the expression generated for `(c, DEFAULT)` will - // needlessly exclude `(a, b)`. Concretely, we end up with expressions - // like `(a) IN (1) AND ... (a, b) != (2, 3)`, where the `!= (2, 3)` - // part is irrelevant. This only happens in fairly unrealistic - // partitionings, so it's unclear if anything really needs to be done - // here. - excludeExpr := tree.TypedExpr(tree.DBoolFalse) - for i := len(partValueExprs) - 1; i >= 0; i-- { - nextExcludeExpr := tree.TypedExpr(tree.DBoolFalse) - for _, v := range partValueExprs[i] { - nextExcludeExpr = tree.NewTypedOrExpr(nextExcludeExpr, v.expr) - partValueExpr := tree.NewTypedAndExpr(v.expr, tree.NewTypedNotExpr(excludeExpr)) - // We can get multiple expressions for the same partition in - // a single-col `PARTITION foo VALUES IN ((1), (2))`. - if e, ok := exprsByPartName[v.name]; !ok || e == nil { - exprsByPartName[v.name] = partValueExpr - } else { - exprsByPartName[v.name] = tree.NewTypedOrExpr(e, partValueExpr) - } - } - excludeExpr = tree.NewTypedOrExpr(excludeExpr, nextExcludeExpr) - } - } - - if part.NumRanges() > 0 { - log.Fatal(evalCtx.Context, "TODO(dan): unsupported for range partitionings") - } - return nil -} - func init() { sql.CreatePartitioningCCL = createPartitioning scdeps.CreatePartitioningCCL = createPartitioning diff --git a/pkg/ccl/partitionccl/partition_test.go b/pkg/ccl/partitionccl/partition_test.go index 786aae2de26e..eb5ce9cee527 100644 --- a/pkg/ccl/partitionccl/partition_test.go +++ b/pkg/ccl/partitionccl/partition_test.go @@ -40,7 +40,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/importer" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/randgen" - "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -1241,85 +1240,6 @@ func TestInitialPartitioning(t *testing.T) { } } -func TestSelectPartitionExprs(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - // TODO(dan): PartitionExprs for range partitions is waiting on the new - // range partitioning syntax. - testData := partitioningTest{ - name: `partition exprs`, - schema: `CREATE TABLE %s ( - a INT, b INT, c INT, PRIMARY KEY (a, b, c) - ) PARTITION BY LIST (a, b) ( - PARTITION p33p44 VALUES IN ((3, 3), (4, 4)) PARTITION BY LIST (c) ( - PARTITION p335p445 VALUES IN (5), - PARTITION p33dp44d VALUES IN (DEFAULT) - ), - PARTITION p6d VALUES IN ((6, DEFAULT)), - PARTITION pdd VALUES IN ((DEFAULT, DEFAULT)) - )`, - } - if err := testData.parse(); err != nil { - t.Fatalf("%+v", err) - } - - tests := []struct { - // partitions is a comma-separated list of input partitions - partitions string - // expr is the expected output - expr string - }{ - {`p33p44`, `((a, b) = (3, 3)) OR ((a, b) = (4, 4))`}, - {`p335p445`, `((a, b, c) = (3, 3, 5)) OR ((a, b, c) = (4, 4, 5))`}, - {`p33dp44d`, `(((a, b) = (3, 3)) AND (NOT ((a, b, c) = (3, 3, 5)))) OR (((a, b) = (4, 4)) AND (NOT ((a, b, c) = (4, 4, 5))))`}, - // NB See the TODO in the impl for why this next case has some clearly - // unrelated `!=`s. - {`p6d`, `((a,) = (6,)) AND (NOT (((a, b) = (3, 3)) OR ((a, b) = (4, 4))))`}, - {`pdd`, `NOT ((((a, b) = (3, 3)) OR ((a, b) = (4, 4))) OR ((a,) = (6,)))`}, - - {`p335p445,p6d`, `(((a, b, c) = (3, 3, 5)) OR ((a, b, c) = (4, 4, 5))) OR (((a,) = (6,)) AND (NOT (((a, b) = (3, 3)) OR ((a, b) = (4, 4)))))`}, - - // TODO(dan): The expression simplification in this method is all done - // by our normal SQL expression simplification code. Seems like it could - // use some targeted work to clean these up. Ideally the following would - // all simplyify to `(a, b) IN ((3, 3), (4, 4))`. Some of them work - // because for every requested partition, all descendent partitions are - // omitted, which is an optimization to save a little work with the side - // benefit of making more of these what we want. - {`p335p445,p33dp44d`, `(((a, b, c) = (3, 3, 5)) OR ((a, b, c) = (4, 4, 5))) OR ((((a, b) = (3, 3)) AND (NOT ((a, b, c) = (3, 3, 5)))) OR (((a, b) = (4, 4)) AND (NOT ((a, b, c) = (4, 4, 5)))))`}, - {`p33p44,p335p445`, `((a, b) = (3, 3)) OR ((a, b) = (4, 4))`}, - {`p33p44,p335p445,p33dp44d`, `((a, b) = (3, 3)) OR ((a, b) = (4, 4))`}, - } - - evalCtx := &eval.Context{ - Codec: keys.SystemSQLCodec, - Settings: cluster.MakeTestingClusterSettings(), - } - for _, test := range tests { - t.Run(test.partitions, func(t *testing.T) { - var partNames tree.NameList - for _, p := range strings.Split(test.partitions, `,`) { - partNames = append(partNames, tree.Name(p)) - } - expr, err := selectPartitionExprs(evalCtx, testData.parsed.tableDesc, partNames) - if err != nil { - t.Fatalf("%+v", err) - } - if exprStr := expr.String(); exprStr != test.expr { - t.Errorf("got\n%s\nexpected\n%s", exprStr, test.expr) - } - }) - } - t.Run("error", func(t *testing.T) { - partNames := tree.NameList{`p33p44`, `nope`} - _, err := selectPartitionExprs(evalCtx, testData.parsed.tableDesc, partNames) - if !testutils.IsError(err, `unknown partition`) { - t.Errorf(`expected "unknown partition" error got: %+v`, err) - } - }) -} - func TestRepartitioning(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) From fba19fcd29e1da72efddabb054085a0f20df90b9 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Sun, 14 Aug 2022 23:27:08 -0400 Subject: [PATCH 2/4] sql: deflake TestSchemaChangeRetry Now that GC happens eagerly in the GC job, we need to hold off the GC job to make sure that key counts match the test's expectations. Fixes flakes like [this one](https://teamcity.cockroachdb.com/buildConfiguration/Cockroach_BazelEssentialCi/6093943) Release note: None --- pkg/sql/schema_changer_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 379a7da560e9..c1745eee8276 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -1362,6 +1362,7 @@ func TestSchemaChangeRetry(t *testing.T) { } const maxValue = 2000 + ctx, cancel := context.WithCancel(context.Background()) params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ WriteCheckpointInterval: time.Nanosecond, @@ -1379,10 +1380,17 @@ func TestSchemaChangeRetry(t *testing.T) { }, // Decrease the adopt loop interval so that retries happen quickly. JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + GCJob: &sql.GCJobTestingKnobs{ + RunBeforeResume: func(jobID jobspb.JobID) error { + <-ctx.Done() + return ctx.Err() + }, + }, } s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.Background()) + defer cancel() if _, err := sqlDB.Exec(` CREATE DATABASE t; From 5485899956ac27ccf3d7c40e0008c84e270a400d Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Fri, 1 Jul 2022 11:32:30 -0400 Subject: [PATCH 3/4] sql/colexec: specialize RangeStats operator to achieve parallelism Release note (performance improvement): The crdb_internal.range_statistics function now uses a vectorized implementation which allows the lookup of range metadata to occur in parallel. --- pkg/BUILD.bazel | 2 + pkg/kv/kvclient/rangestats/BUILD.bazel | 16 +++ pkg/kv/kvclient/rangestats/fetcher.go | 52 ++++++++ pkg/server/BUILD.bazel | 1 + pkg/server/server_sql.go | 5 + pkg/sql/colexec/BUILD.bazel | 2 + pkg/sql/colexec/builtin_funcs.go | 10 ++ pkg/sql/colexec/range_stats.go | 119 ++++++++++++++++++ pkg/sql/conn_executor.go | 1 + pkg/sql/distsql/server.go | 1 + pkg/sql/exec_util.go | 3 + pkg/sql/execinfra/server_config.go | 3 + .../opt/exec/execbuilder/testdata/range_stats | 13 ++ .../execbuilder/tests/local/generated_test.go | 7 ++ pkg/sql/planner.go | 1 + pkg/sql/sem/builtins/builtins.go | 20 ++- pkg/sql/sem/eval/context.go | 10 ++ pkg/sql/sem/tree/overload.go | 1 + 18 files changed, 256 insertions(+), 11 deletions(-) create mode 100644 pkg/kv/kvclient/rangestats/BUILD.bazel create mode 100644 pkg/kv/kvclient/rangestats/fetcher.go create mode 100644 pkg/sql/colexec/range_stats.go create mode 100644 pkg/sql/opt/exec/execbuilder/testdata/range_stats diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index fcd75b545328..82f03dca50a3 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1056,6 +1056,7 @@ GO_TARGETS = [ "//pkg/kv/kvclient/rangefeed/rangefeedcache:rangefeedcache_test", "//pkg/kv/kvclient/rangefeed:rangefeed", "//pkg/kv/kvclient/rangefeed:rangefeed_test", + "//pkg/kv/kvclient/rangestats:rangestats", "//pkg/kv/kvclient:kvclient", "//pkg/kv/kvnemesis:kvnemesis", "//pkg/kv/kvnemesis:kvnemesis_test", @@ -2327,6 +2328,7 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvclient/rangefeed:get_x_data", "//pkg/kv/kvclient/rangefeed/rangefeedbuffer:get_x_data", "//pkg/kv/kvclient/rangefeed/rangefeedcache:get_x_data", + "//pkg/kv/kvclient/rangestats:get_x_data", "//pkg/kv/kvnemesis:get_x_data", "//pkg/kv/kvprober:get_x_data", "//pkg/kv/kvserver:get_x_data", diff --git a/pkg/kv/kvclient/rangestats/BUILD.bazel b/pkg/kv/kvclient/rangestats/BUILD.bazel new file mode 100644 index 000000000000..7ea99deb0865 --- /dev/null +++ b/pkg/kv/kvclient/rangestats/BUILD.bazel @@ -0,0 +1,16 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "rangestats", + srcs = ["fetcher.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv", + "//pkg/roachpb", + "//pkg/sql/sem/eval", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvclient/rangestats/fetcher.go b/pkg/kv/kvclient/rangestats/fetcher.go new file mode 100644 index 000000000000..7ac8dc89adbf --- /dev/null +++ b/pkg/kv/kvclient/rangestats/fetcher.go @@ -0,0 +1,52 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package rangestats contains code to fetch range stats. +package rangestats + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" +) + +// Fetcher implements eval.RangeStatsFetcher. +type Fetcher struct { + db *kv.DB +} + +var _ eval.RangeStatsFetcher = (*Fetcher)(nil) + +// NewFetcher constructs a new Fetcher. +func NewFetcher(db *kv.DB) *Fetcher { + return &Fetcher{db: db} +} + +// RangeStats is part of the eval.RangeStatsFetcher interface. +func (f Fetcher) RangeStats( + ctx context.Context, keys ...roachpb.Key, +) ([]*roachpb.RangeStatsResponse, error) { + var ba kv.Batch + reqs := make([]roachpb.RangeStatsRequest, len(keys)) + for i, k := range keys { + reqs[i].Key = k + ba.AddRawRequest(&reqs[i]) + } + if err := f.db.Run(ctx, &ba); err != nil { + return nil, err + } + resps := make([]*roachpb.RangeStatsResponse, len(keys)) + for i, r := range ba.RawResponse().Responses { + resps[i] = r.GetInner().(*roachpb.RangeStatsResponse) + } + return resps, nil +} diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 376d3750523d..ab85cc4c817a 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -97,6 +97,7 @@ go_library( "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/kvtenant", "//pkg/kv/kvclient/rangefeed", + "//pkg/kv/kvclient/rangestats", "//pkg/kv/kvprober", "//pkg/kv/kvserver", "//pkg/kv/kvserver/allocator/storepool", diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 33a28d4205f4..ef763d46954d 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" @@ -623,6 +624,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { bulkSenderLimiter := bulk.MakeAndRegisterConcurrencyLimiter(&cfg.Settings.SV) + rangeStatsFetcher := rangestats.NewFetcher(cfg.db) + // Set up the DistSQL server. distSQLCfg := execinfra.ServerConfig{ AmbientContext: cfg.AmbientCtx, @@ -680,6 +683,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { SQLSQLResponseAdmissionQ: cfg.sqlSQLResponseAdmissionQ, CollectionFactory: collectionFactory, ExternalIORecorder: cfg.costController, + RangeStatsFetcher: rangeStatsFetcher, } cfg.TempStorageConfig.Mon.SetMetrics(distSQLMetrics.CurDiskBytesCount, distSQLMetrics.MaxDiskBytesHist) if distSQLTestingKnobs := cfg.TestingKnobs.DistSQL; distSQLTestingKnobs != nil { @@ -839,6 +843,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ConsistencyChecker: consistencychecker.NewConsistencyChecker(cfg.db), RangeProber: rangeprober.NewRangeProber(cfg.db), DescIDGenerator: descidgen.NewGenerator(codec, cfg.db), + RangeStatsFetcher: rangeStatsFetcher, } if sqlSchemaChangerTestingKnobs := cfg.TestingKnobs.SQLSchemaChanger; sqlSchemaChangerTestingKnobs != nil { diff --git a/pkg/sql/colexec/BUILD.bazel b/pkg/sql/colexec/BUILD.bazel index abd007e903f6..f6e80b921573 100644 --- a/pkg/sql/colexec/BUILD.bazel +++ b/pkg/sql/colexec/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "ordered_aggregator.go", "parallel_unordered_synchronizer.go", "partially_ordered_distinct.go", + "range_stats.go", "serial_unordered_synchronizer.go", "sort.go", "sort_chunks.go", @@ -41,6 +42,7 @@ go_library( "//pkg/col/coldata", "//pkg/col/coldataext", # keep "//pkg/col/typeconv", # keep + "//pkg/roachpb", "//pkg/server/telemetry", # keep "//pkg/settings", "//pkg/sql/catalog/colinfo", # keep diff --git a/pkg/sql/colexec/builtin_funcs.go b/pkg/sql/colexec/builtin_funcs.go index 72f10be9858b..a8250a98f997 100644 --- a/pkg/sql/colexec/builtin_funcs.go +++ b/pkg/sql/colexec/builtin_funcs.go @@ -124,6 +124,16 @@ func NewBuiltinFunctionOperator( return newSubstringOperator( allocator, columnTypes, argumentCols, outputIdx, input, ), nil + case tree.CrdbInternalRangeStats: + if len(argumentCols) != 1 { + return nil, errors.AssertionFailedf( + "expected 1 input column to crdb_internal.range_stats, got %d", + len(argumentCols), + ) + } + return newRangeStatsOperator( + evalCtx.RangeStatsFetcher, allocator, argumentCols[0], outputIdx, input, + ) default: return &defaultBuiltinFuncOperator{ OneInputHelper: colexecop.MakeOneInputHelper(input), diff --git a/pkg/sql/colexec/range_stats.go b/pkg/sql/colexec/range_stats.go new file mode 100644 index 000000000000..7c98466d7b91 --- /dev/null +++ b/pkg/sql/colexec/range_stats.go @@ -0,0 +1,119 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colexec + +import ( + gojson "encoding/json" + + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" + "github.com/cockroachdb/cockroach/pkg/sql/colexecop" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/util/json" +) + +type rangeStatsOperator struct { + colexecop.OneInputHelper + fetcher eval.RangeStatsFetcher + allocator *colmem.Allocator + argumentCol int + outputIdx int +} + +var _ colexecop.Operator = (*rangeStatsOperator)(nil) + +// newRangeStatsOperator constructs a vectorized operator to fetch range +// statistics. Importantly, this operator issues RangeStatsRequests in +// parallel to help amortize the latency required to issue the requests +// to nodes in remote regions. +func newRangeStatsOperator( + fetcher eval.RangeStatsFetcher, + allocator *colmem.Allocator, + argumentCol int, + outputIdx int, + input colexecop.Operator, +) (colexecop.Operator, error) { + return &rangeStatsOperator{ + OneInputHelper: colexecop.MakeOneInputHelper(input), + allocator: allocator, + argumentCol: argumentCol, + outputIdx: outputIdx, + fetcher: fetcher, + }, nil +} + +// TODO(ajwerner): Generalize this operator to deal with other very similar +// builtins like crdb_internal.leaseholder. Some of the places which retrieve +// range statistics also call that function; optimizing one without the other +// is pointless, and they are very similar. + +func (r *rangeStatsOperator) Next() coldata.Batch { + // Naively take the input batch and use it to define the batch size. + // + // TODO(ajwerner): As a first step towards being more sophisticated, + // this code could accumulate up to some minimum batch size before + // sending the first batch. + batch := r.Input.Next() + n := batch.Length() + if n == 0 { + return coldata.ZeroBatch + } + inSel := batch.Selection() + inVec := batch.ColVec(r.argumentCol) + inBytes := inVec.Bytes() + + output := batch.ColVec(r.outputIdx) + jsonOutput := output.JSON() + r.allocator.PerformOperation( + []coldata.Vec{output}, + func() { + keys := make([]roachpb.Key, 0, batch.Length()) + if inSel == nil { + for i := 0; i < batch.Length(); i++ { + keys = append(keys, inBytes.Get(i)) + } + } else { + for _, idx := range inSel { + keys = append(keys, inBytes.Get(idx)) + } + } + // TODO(ajwerner): Reserve memory for the responses. We know they'll + // at least, on average, contain keys so it'll be 2x the size of the + // keys plus some constant multiple. + res, err := r.fetcher.RangeStats(r.Ctx, keys...) + if err != nil { + colexecerror.ExpectedError(err) + } + readResponse := func(resultIndex, outputIndex int) { + jsonStr, err := gojson.Marshal(&res[resultIndex].MVCCStats) + if err != nil { + colexecerror.ExpectedError(err) + } + jsonDatum, err := json.ParseJSON(string(jsonStr)) + if err != nil { + colexecerror.ExpectedError(err) + } + jsonOutput.Set(outputIndex, jsonDatum) + } + if inSel != nil { + for i, s := range inSel { + readResponse(i, s) + } + } else { + for i := range res { + readResponse(i, i) + } + } + }) + return batch +} diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index dfa34b64daaf..f092a922c0ce 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2695,6 +2695,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo CatalogBuiltins: &p.evalCatalogBuiltins, QueryCancelKey: ex.queryCancelKey, DescIDGenerator: ex.getDescIDGenerator(), + RangeStatsFetcher: p.execCfg.RangeStatsFetcher, }, Tracing: &ex.sessionTracing, MemMetrics: &ex.memMetrics, diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 00b8906d9117..9f5ba3bbc12f 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -383,6 +383,7 @@ func (ds *ServerImpl) setupFlow( SQLStatsController: ds.ServerConfig.SQLStatsController, SchemaTelemetryController: ds.ServerConfig.SchemaTelemetryController, IndexUsageStatsController: ds.ServerConfig.IndexUsageStatsController, + RangeStatsFetcher: ds.ServerConfig.RangeStatsFetcher, } evalCtx.SetStmtTimestamp(timeutil.Unix(0 /* sec */, req.EvalContext.StmtTimestampNanos)) evalCtx.SetTxnTimestamp(timeutil.Unix(0 /* sec */, req.EvalContext.TxnTimestampNanos)) diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 82c0970d2031..1f523e090964 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1337,6 +1337,9 @@ type ExecutorConfig struct { // SyntheticPrivilegeCache SyntheticPrivilegeCache *cacheutil.Cache + + // RangeStatsFetcher is used to fetch RangeStats. + RangeStatsFetcher eval.RangeStatsFetcher } // UpdateVersionSystemSettingHook provides a callback that allows us diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index ef9def87ab9b..8e21ff2c7a5d 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -192,6 +192,9 @@ type ServerConfig struct { // ExternalIORecorder is used to record reads and writes from // external services (such as external storage) ExternalIORecorder multitenant.TenantSideExternalIORecorder + + // RangeStatsFetcher is used to fetch range stats for keys. + RangeStatsFetcher eval.RangeStatsFetcher } // RuntimeStats is an interface through which the rowexec layer can get diff --git a/pkg/sql/opt/exec/execbuilder/testdata/range_stats b/pkg/sql/opt/exec/execbuilder/testdata/range_stats new file mode 100644 index 000000000000..6787c656896e --- /dev/null +++ b/pkg/sql/opt/exec/execbuilder/testdata/range_stats @@ -0,0 +1,13 @@ +# LogicTest: local +# +# This file tests that we build a specialized vectorized operator for the +# crdb_internal.range_stats function. + +query T +EXPLAIN (VEC) SELECT crdb_internal.range_stats(start_key) + FROM crdb_internal.ranges_no_leases; +---- +│ +└ Node 1 + └ *colexec.rangeStatsOperator + └ *sql.planNodeToRowSource diff --git a/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go b/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go index 26aeca7622d4..99f94c36dc2c 100644 --- a/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go +++ b/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go @@ -389,6 +389,13 @@ func TestExecBuild_prepare( runExecBuildLogicTest(t, "prepare") } +func TestExecBuild_range_stats( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runExecBuildLogicTest(t, "range_stats") +} + func TestExecBuild_scalar( t *testing.T, ) { diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 78c7c3a5f9dc..3f90790473d5 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -486,6 +486,7 @@ func internalExtendedEvalCtx( SchemaTelemetryController: schemaTelemetryController, IndexUsageStatsController: indexUsageStatsController, StmtDiagnosticsRequestInserter: execCfg.StmtDiagnosticsRecorder.InsertRequest, + RangeStatsFetcher: execCfg.RangeStatsFetcher, }, Tracing: &SessionTracing{}, Descs: tables, diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index 7475eeb58366..b7eca29fcf7c 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -5414,20 +5414,18 @@ value if you rely on the HLC for accuracy.`, Types: tree.ArgTypes{ {"key", types.Bytes}, }, - ReturnType: tree.FixedReturnType(types.Jsonb), + SpecializedVecBuiltin: tree.CrdbInternalRangeStats, + ReturnType: tree.FixedReturnType(types.Jsonb), Fn: func(ctx *eval.Context, args tree.Datums) (tree.Datum, error) { - key := []byte(tree.MustBeDBytes(args[0])) - b := &kv.Batch{} - b.AddRawRequest(&roachpb.RangeStatsRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: key, - }, - }) - if err := ctx.Txn.Run(ctx.Context, b); err != nil { + if args[0] == tree.DNull { + return tree.DNull, nil + } + resps, err := ctx.RangeStatsFetcher.RangeStats(ctx.Ctx(), + roachpb.Key(tree.MustBeDBytes(args[0]))) + if err != nil { return nil, pgerror.Wrap(err, pgcode.InvalidParameterValue, "error fetching range stats") } - resp := b.RawResponse().Responses[0].GetInner().(*roachpb.RangeStatsResponse).MVCCStats - jsonStr, err := gojson.Marshal(&resp) + jsonStr, err := gojson.Marshal(&resps[0].MVCCStats) if err != nil { return nil, err } diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index cd03b27fb84e..9024a21cf6b1 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -223,6 +223,9 @@ type Context struct { QueryCancelKey pgwirecancel.BackendKeyData DescIDGenerator DescIDGenerator + + // RangeStatsFetcher is used to fetch RangeStats. + RangeStatsFetcher RangeStatsFetcher } // DescIDGenerator generates unique descriptor IDs. @@ -230,6 +233,13 @@ type DescIDGenerator interface { GenerateUniqueDescID(ctx context.Context) (catid.DescID, error) } +// RangeStatsFetcher is used to fetch RangeStats. +type RangeStatsFetcher interface { + + // RangeStats fetches the stats for the ranges which contain the passed keys. + RangeStats(ctx context.Context, keys ...roachpb.Key) ([]*roachpb.RangeStatsResponse, error) +} + var _ tree.ParseTimeContext = &Context{} // ConsistencyCheckRunner is an interface embedded in eval.Context used by diff --git a/pkg/sql/sem/tree/overload.go b/pkg/sql/sem/tree/overload.go index 367d3be22a01..2b94b7e00a34 100644 --- a/pkg/sql/sem/tree/overload.go +++ b/pkg/sql/sem/tree/overload.go @@ -40,6 +40,7 @@ type SpecializedVectorizedBuiltin int const ( _ SpecializedVectorizedBuiltin = iota SubstringStringIntInt + CrdbInternalRangeStats ) // AggregateOverload is an opaque type which is used to box an eval.AggregateOverload. From c4e6a61679219cdd5ca6e81036644938f1a53053 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Sun, 14 Aug 2022 23:49:07 -0400 Subject: [PATCH 4/4] sql: deflake TestUnsplit The bug here is that we were writing the new zone config after the GC job has already deleted the old one. Release note: None --- pkg/sql/unsplit_range_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pkg/sql/unsplit_range_test.go b/pkg/sql/unsplit_range_test.go index 3f0483dc3940..a1fb5267cd5c 100644 --- a/pkg/sql/unsplit_range_test.go +++ b/pkg/sql/unsplit_range_test.go @@ -309,11 +309,6 @@ func TestUnsplitRanges(t *testing.T) { if _, err := sqlDB.Exec(tc.query); err != nil { t.Fatal(err) } - // Push a new zone config for a few tables with TTL=0 so the data - // is deleted immediately. - if _, err := sqltestutils.AddImmediateGCZoneConfig(sqlDB, tableDesc.GetID()); err != nil { - t.Fatal(err) - } // Check GC worked! testutils.SucceedsSoon(t, func() error {