Skip to content

Commit

Permalink
Merge #131141
Browse files Browse the repository at this point in the history
131141: opt: add rule to decorrelate union operators r=DrewKimball a=DrewKimball

#### opt: improve EnsureKey custom function

This commit makes a small improvement to the `EnsureKey` custom function
(used in decorrelation rules), so that it can add passthrough columns to
a `Project` operator in an effort to find a key. This can prevent rules
from adding unnecessary `Ordinality` operators to the query plan.

Epic: None

Release note: None

#### opt: improve exists subquery hoisting

This commit makes a small improvement to the subquery-hoisting rules so
that hoisting an `EXISTS` subquery can often avoid projecting a new
column to check for NULL values. This can allow other optimization rules
to match later on.

Epic: None

Release note: None

#### opt: add rule to decorrelate unions in EXISTS subqueries

This commit adds a new rule `TryDecorrelateUnion`, which matches on a
`Union` or `UnionAll` operator in the input of a `ScalarGroupBy`. The
`ScalarGroupBy` must have "any-not-null" semantics, meaning it produces
an arbitrary non-null value from each input column.

If these conditions are satisfied, the `Union` operator is replaced by an
`InnerJoin` between two `ScalarGroupBy` operators. A `Project` coalesces
columns from each side of the join to produce the final aggregated values.

This transformation does not itself decorrelate the `Union` operators, but
it does make it easier for other rules to do so.

Release note: None

Epic: None

Co-authored-by: Drew Kimball <[email protected]>
  • Loading branch information
craig[bot] and DrewKimball committed Oct 3, 2024
2 parents 06f78c6 + 1ec3568 commit 0c0af95
Show file tree
Hide file tree
Showing 13 changed files with 1,794 additions and 1,580 deletions.
16 changes: 8 additions & 8 deletions pkg/sql/opt/memo/testdata/stats/with
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ with &1 (t0)
├── stats: [rows=10000]
├── fd: ()-->(30)
├── inner-join (cross)
│ ├── columns: true_agg:28(bool!null)
│ ├── columns: canary_agg:28(bool!null)
│ ├── stats: [rows=10000]
│ ├── fd: ()-->(28)
│ ├── scan a
│ │ └── stats: [rows=5000]
│ ├── inner-join (cross)
│ │ ├── columns: true_agg:28(bool!null)
│ │ ├── columns: canary_agg:28(bool!null)
│ │ ├── cardinality: [0 - 2]
│ │ ├── multiplicity: left-rows(zero-or-one), right-rows(one-or-more)
│ │ ├── stats: [rows=2]
Expand All @@ -132,29 +132,29 @@ with &1 (t0)
│ │ │ ├── cardinality: [1 - 2]
│ │ │ └── stats: [rows=2]
│ │ ├── select
│ │ │ ├── columns: true_agg:28(bool!null)
│ │ │ ├── columns: canary_agg:28(bool!null)
│ │ │ ├── cardinality: [0 - 1]
│ │ │ ├── stats: [rows=1, distinct(28)=1, null(28)=0]
│ │ │ ├── key: ()
│ │ │ ├── fd: ()-->(28)
│ │ │ ├── scalar-group-by
│ │ │ │ ├── columns: true_agg:28(bool)
│ │ │ │ ├── columns: canary_agg:28(bool)
│ │ │ │ ├── cardinality: [1 - 1]
│ │ │ │ ├── stats: [rows=1, distinct(28)=1, null(28)=0]
│ │ │ │ ├── key: ()
│ │ │ │ ├── fd: ()-->(28)
│ │ │ │ ├── values
│ │ │ │ │ ├── columns: true:27(bool!null)
│ │ │ │ │ ├── columns: canary:27(bool!null)
│ │ │ │ │ ├── cardinality: [1 - 1]
│ │ │ │ │ ├── stats: [rows=1]
│ │ │ │ │ ├── key: ()
│ │ │ │ │ ├── fd: ()-->(27)
│ │ │ │ │ └── (true,) [type=tuple{bool}]
│ │ │ │ └── aggregations
│ │ │ │ └── const-agg [as=true_agg:28, type=bool, outer=(27)]
│ │ │ │ └── true:27 [type=bool]
│ │ │ │ └── const-agg [as=canary_agg:28, type=bool, outer=(27)]
│ │ │ │ └── canary:27 [type=bool]
│ │ │ └── filters
│ │ │ └── true_agg:28 IS NOT NULL [type=bool, outer=(28), constraints=(/28: (/NULL - ]; tight)]
│ │ │ └── canary_agg:28 IS NOT NULL [type=bool, outer=(28), constraints=(/28: (/NULL - ]; tight)]
│ │ └── filters (true)
│ └── filters (true)
└── projections
Expand Down
27 changes: 11 additions & 16 deletions pkg/sql/opt/memo/testdata/typing
Original file line number Diff line number Diff line change
Expand Up @@ -419,30 +419,25 @@ SELECT EXISTS(SELECT * FROM a WHERE expr<0) FROM (SELECT x+1 AS expr FROM a)
project
├── columns: exists:11(bool!null)
├── group-by (hash)
│ ├── columns: true_agg:13(bool) rownum:15(int!null)
│ ├── grouping columns: rownum:15(int!null)
│ ├── columns: x:1(int!null) canary_agg:12(int)
│ ├── grouping columns: x:1(int!null)
│ ├── left-join (cross)
│ │ ├── columns: expr:5(int!null) true:12(bool) rownum:15(int!null)
│ │ ├── ordinality
│ │ │ ├── columns: expr:5(int!null) rownum:15(int!null)
│ │ │ └── project
│ │ │ ├── columns: expr:5(int!null)
│ │ │ ├── scan a
│ │ │ │ └── columns: x:1(int!null)
│ │ │ └── projections
│ │ │ └── x:1 + 1 [as=expr:5, type=int]
│ │ ├── columns: x:1(int!null) expr:5(int!null) x:6(int)
│ │ ├── project
│ │ │ ├── columns: true:12(bool!null)
│ │ │ ├── columns: expr:5(int!null) x:1(int!null)
│ │ │ ├── scan a
│ │ │ │ └── columns: x:1(int!null)
│ │ │ └── projections
│ │ │ └── true [as=true:12, type=bool]
│ │ │ └── x:1 + 1 [as=expr:5, type=int]
│ │ ├── scan a
│ │ │ └── columns: x:6(int!null)
│ │ └── filters
│ │ └── expr:5 < 0 [type=bool]
│ └── aggregations
│ └── const-not-null-agg [as=true_agg:13, type=bool]
│ └── true:12 [type=bool]
│ └── const-not-null-agg [as=canary_agg:12, type=int]
│ └── x:6 [type=int]
└── projections
└── true_agg:13 IS NOT NULL [as=exists:11, type=bool]
└── canary_agg:12 IS NOT NULL [as=exists:11, type=bool]

# Cast
build
Expand Down
105 changes: 72 additions & 33 deletions pkg/sql/opt/norm/decorrelate_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,14 +527,9 @@ func (c *CustomFuncs) ConstructApplyJoin(
// input expression (perhaps augmented with a key column(s) or wrapped by
// Ordinality).
func (c *CustomFuncs) EnsureKey(in memo.RelExpr) memo.RelExpr {
_, ok := c.CandidateKey(in)
if ok {
return in
}

// Try to add the preexisting primary key if the input is a Scan or Scan
// wrapped in a Select.
if res, ok := c.TryAddKeyToScan(in); ok {
if res, ok := c.tryFindExistingKey(in); ok {
return res
}

Expand All @@ -544,34 +539,34 @@ func (c *CustomFuncs) EnsureKey(in memo.RelExpr) memo.RelExpr {
return c.f.ConstructOrdinality(in, &private)
}

// TryAddKeyToScan checks whether the input expression is a non-virtual table
// Scan, either alone or wrapped in a Select. If so, it returns a new Scan
// (possibly wrapped in a Select) augmented with the preexisting primary key
// for the table.
func (c *CustomFuncs) TryAddKeyToScan(in memo.RelExpr) (_ memo.RelExpr, ok bool) {
augmentScan := func(scan *memo.ScanExpr) (_ memo.RelExpr, ok bool) {
private := scan.ScanPrivate
// tryFindExistingKey attempts to find an existing key for the input expression.
// It may modify the expression in order to project the key column.
func (c *CustomFuncs) tryFindExistingKey(in memo.RelExpr) (_ memo.RelExpr, ok bool) {
_, hasKey := c.CandidateKey(in)
if hasKey {
return in, true
}
switch t := in.(type) {
case *memo.ProjectExpr:
input, foundKey := c.tryFindExistingKey(t.Input)
if foundKey {
return c.f.ConstructProject(input, t.Projections, input.Relational().OutputCols), true
}

case *memo.ScanExpr:
private := t.ScanPrivate
tableID := private.Table
table := c.f.Metadata().Table(tableID)
if !table.IsVirtualTable() {
keyCols := c.PrimaryKeyCols(tableID)
private.Cols = private.Cols.Union(keyCols)
return c.f.ConstructScan(&private), true
}
return nil, false
}

switch t := in.(type) {
case *memo.ScanExpr:
if res, ok := augmentScan(t); ok {
return res, true
}

case *memo.SelectExpr:
if scan, ok := t.Input.(*memo.ScanExpr); ok {
if res, ok := augmentScan(scan); ok {
return c.f.ConstructSelect(res, t.Filters), true
}
input, foundKey := c.tryFindExistingKey(t.Input)
if foundKey {
return c.f.ConstructSelect(input, t.Filters), true
}
}

Expand Down Expand Up @@ -1129,19 +1124,34 @@ func (r *subqueryHoister) constructConditionalExpr(scalar opt.ScalarExpr) opt.Sc
// CONST_AGG which will need to be changed to a CONST_NOT_NULL_AGG (which is
// defined to ignore those nulls so that its result will be unaffected).
func (r *subqueryHoister) constructGroupByExists(subquery memo.RelExpr) memo.RelExpr {
trueColID := r.f.Metadata().AddColumn("true", types.Bool)
aggColID := r.f.Metadata().AddColumn("true_agg", types.Bool)
var canaryColTyp *types.T
var canaryColID opt.ColumnID
var subqueryWithCanary memo.RelExpr
if subquery.Relational().NotNullCols.Empty() {
canaryColTyp = types.Bool
canaryColID = r.f.Metadata().AddColumn("canary", types.Bool)
subqueryWithCanary = r.f.ConstructProject(
subquery,
memo.ProjectionsExpr{r.f.ConstructProjectionsItem(memo.TrueSingleton, canaryColID)},
opt.ColSet{},
)
} else {
canaryColID, _ = subquery.Relational().NotNullCols.Next(0)
canaryColTyp = r.mem.Metadata().ColumnMeta(canaryColID).Type
subqueryWithCanary = r.f.ConstructProject(
subquery,
memo.ProjectionsExpr{},
opt.MakeColSet(canaryColID),
)
}
aggColID := r.f.Metadata().AddColumn("canary_agg", canaryColTyp)
existsColID := r.f.Metadata().AddColumn("exists", types.Bool)

return r.f.ConstructProject(
r.f.ConstructScalarGroupBy(
r.f.ConstructProject(
subquery,
memo.ProjectionsExpr{r.f.ConstructProjectionsItem(memo.TrueSingleton, trueColID)},
opt.ColSet{},
),
subqueryWithCanary,
memo.AggregationsExpr{r.f.ConstructAggregationsItem(
r.f.ConstructConstAgg(r.f.ConstructVariable(trueColID)),
r.f.ConstructConstAgg(r.f.ConstructVariable(canaryColID)),
aggColID,
)},
memo.EmptyGroupingPrivate,
Expand Down Expand Up @@ -1527,3 +1537,32 @@ func getSubstituteColsSetOp(set memo.RelExpr, substituteCols opt.ColSet) opt.Col
}
return newSubstituteCols
}

// MakeCoalesceProjectionsForUnion builds a series of projections that coalesce
// columns from the left and right inputs of a union, projecting the result
// using the union operator's output columns.
func (c *CustomFuncs) MakeCoalesceProjectionsForUnion(
setPrivate *memo.SetPrivate,
) memo.ProjectionsExpr {
projections := make(memo.ProjectionsExpr, len(setPrivate.OutCols))
for i := range setPrivate.OutCols {
projections[i] = c.f.ConstructProjectionsItem(
c.f.ConstructCoalesce(memo.ScalarListExpr{
c.f.ConstructVariable(setPrivate.LeftCols[i]),
c.f.ConstructVariable(setPrivate.RightCols[i]),
}),
setPrivate.OutCols[i],
)
}
return projections
}

// MakeAnyNotNullScalarGroupBy wraps the input expression in a ScalarGroupBy
// that aggregates the input columns with AnyNotNull functions.
func (c *CustomFuncs) MakeAnyNotNullScalarGroupBy(input memo.RelExpr) memo.RelExpr {
return c.f.ConstructScalarGroupBy(
input,
c.MakeAggCols(opt.AnyNotNullAggOp, input.Relational().OutputCols),
memo.EmptyGroupingPrivate,
)
}
64 changes: 64 additions & 0 deletions pkg/sql/opt/norm/rules/decorrelate.opt
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,70 @@
(OutputCols2 $left $right)
)

# TryDecorrelateUnion replaces a Union/UnionAll beneath a ScalarGroupBy with a
# cross-join (InnerJoin on True) between two ScalarGroupBy operators. A Project
# operator coalesces columns from each join input to produce the final result.
# This transformation applies when the ScalarGroupBy has only "any-not-null"
# aggregations, which select an arbitrary non-null value from the input column.
#
# Here's a simplified example:
#
# scalar-group-by
# ├── union-all
# │ ├── scan foo
# │ └── scan bar (has-outer-cols)
# └── aggregations
# └── any-not-null
# =>
# project
# ├── inner-join (cross)
# │ ├── scalar-group-by
# │ │ └── scan foo
# │ ├── scalar-group-by
# │ │ └── scan bar
# │ └── filters (true)
# └── projections
# └── coalesce
#
# This situation occurs after a correlated EXISTS subquery containing a Union is
# hoisted. Note that TryDecorrelateUnion does not itself decorrelate the Union,
# but makes it easier for other rules to do so.
#
# NOTE: the outer Project operator is necessary just in case the ScalarGroupBy
# is synthesizing new columns, despite using any-not-null aggregations.
# NOTE: TryDecorrelateUnion should be ordered before TryDecorrelateScalarGroupBy
# to ensure that Union operators have a chance to be decorrelated.
#
# TODO(drewk): We could extend this rule to apply to other aggregations; for
# example, for a count() we can sum the counts taken on each side of the join.
# TODO(drewk): We could extend this rule to handle other set operations. For
# example, ExceptAll could become an AntiJoin.
[TryDecorrelateUnion, Normalize]
(ScalarGroupBy
$input:(Union | UnionAll $left:* $right:* $unionPrivate:*) &
(HasOuterCols $input)
$aggs:* & (AreAllAnyNotNullAggs $aggs)
$private:*
)
=>
(Project
(Project
(InnerJoin
(MakeAnyNotNullScalarGroupBy $left)
(MakeAnyNotNullScalarGroupBy $right)
[]
(EmptyJoinPrivate)
)
(MakeCoalesceProjectionsForUnion $unionPrivate)
(MakeEmptyColSet)
)
(ConvertAnyNotNullAggsToProjections $aggs)
(IntersectionCols
(GroupingOutputCols $private $aggs)
(OutputCols $input)
)
)

# TryDecorrelateScalarGroupBy "pushes down" a Join into a ScalarGroupBy
# operator, in an attempt to keep "digging" down to find and eliminate
# unnecessary correlation. The eventual hope is to trigger the DecorrelateJoin
Expand Down
Loading

0 comments on commit 0c0af95

Please sign in to comment.