Skip to content

Commit

Permalink
[AQUMV] Answer Aggregation Query directly.
Browse files Browse the repository at this point in the history
This commits enable answer query which has aggregation directly. Use the
results of view has aggregations to avoid compute those from origin
table.
This may lead to significant efficiency gains if the SQL has a large
amount of data.
AQUMV will always return results immediately.

If we have a valid view like:
create materialized view mv as
  select sum(c1) as mc1, count(c2) as mc2, avg(c3) as mc3, count(*) as
mc4
  from t where c1 > 90;

SQL:
select count(*), sum(c1), count(c2), avg(c3), abs(count(*) - 21) from t
where c1 > 90;

Could be rewritten to:

select mc4, mc1, mc2, mc3, abs((mc4 - 21)) from mv;

Plan:
explain(verbose, costs off)
select count(*), sum(c1), count(c2), avg(c3), abs(count(*) - 21) from t
where c1 > 90;
                                    QUERY PLAN
------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)
   Output: mc4, mc1, mc2, mc3, (abs((mc4 - 21)))
   ->  Seq Scan on mv
         Output: mc4, mc1, mc2, mc3, abs((mc4 - 21))
 Settings: enable_answer_query_using_materialized_views = 'on',
optimizer = 'off'
 Optimizer: Postgres query optimizer
(6 rows)

View query with Group By is not supported yet.

If some HAVING quals only exist in origin query and they could be
computed from view query's target list, then we could keep them like
post_quals.But as the view has aggregations, the additional quals should
be moved to WHERE instead of HAVING.

create table t(c1 int, c2 int, c3 int, c4 int);
create materialized view mv as
  select sum(c1) as mc1, count(c2) as mc2, avg(c3) as mc3, count(*) as
mc4
  from t where c1 > 90;

SQL:
select count(*), sum(c1) from t where c1 > 90 having abs(count(*) - 21)
> 0 and 2 > 1 and avg(c3) > 97;

Could be rewritten to (The HAVING clause has been rewritten to WHERE
clause):

select mc4, mc1 from mv where mc3 > 97 and abs(mc4 - 21) > 0;

Plan:
explain(verbose, costs off)
select count(*), sum(c1) from t where c1 > 90 having abs(count(*) - 21)
> 0 and 2 > 1 and avg(c3) > 97;
                                    QUERY PLAN
------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)
   Output: mc4, mc1
   ->  Seq Scan on aqumv.mv
         Output: mc4, mc1
         Filter: ((mv.mc3 > '97'::numeric) AND (abs((mv.mc4 - 21)) > 0))
 Optimizer: Postgres query optimizer
(7 rows)

There are two additional HAVING quals:
Expression: 2 > 1 (would be eliminated during planner). Expression:
abs(count(*) - 21) > 0, it could be computed from view as:
	abs(mc4 - 21) > 0

And the new one is put to WHERE clause and acts as a Filter finally.

There is a trick for ORDER BY for both origin query and view query. As
we has no Groupy By curretly, the aggregation results would be either
one or zero rows that make the Order By clause pointless.
We could avoid considering the sort columns if it's a junk for view
matching.

As we have no group by for view with aggs now, the final result would be
either one or zero row.
LIMIT, OFFSET clause of origin query could be applied to view if there
are consts.

create incremental materialized view mv as
  select sum(c1) as mc1, count(c2) as mc2, avg(c3) as mc3, count(*) as
mc4
  from t where c1 > 90;

Query:
  select count(*), sum(c1) from t where c1 > 90 limit 2;

Could be rewritten to:
  select mc4, mc1 from mv limit 2;

Authored-by: Zhang Mingli [email protected]
  • Loading branch information
avamingli committed Nov 21, 2024
1 parent f92faf0 commit a3b07d7
Show file tree
Hide file tree
Showing 7 changed files with 822 additions and 78 deletions.
1 change: 0 additions & 1 deletion src/backend/optimizer/README.cbdb.aqumv
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ AQUMV_MVP
---------
Support SELECT FROM a single relation both for view_query and the origin_query.
Below are not supported now:
Aggregation (on view_query)
Subquery
Join
Sublink
Expand Down
300 changes: 229 additions & 71 deletions src/backend/optimizer/plan/aqumv.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@
#include "nodes/pathnodes.h"
#include "nodes/pg_list.h"

RelOptInfo *answer_query_using_materialized_views(PlannerInfo *root,
RelOptInfo *current_rel,
query_pathkeys_callback qp_callback,
void *qp_extra);

typedef struct
{
int varno;
Expand All @@ -70,6 +65,7 @@ static aqumv_equivalent_transformation_context* aqumv_init_context(List *view_tl
static bool aqumv_process_targetlist(aqumv_equivalent_transformation_context *context, List *query_tlist, List **mv_final_tlist);
static void aqumv_sort_targetlist(aqumv_equivalent_transformation_context* context);
static Node *aqumv_adjust_sub_matched_expr_mutator(Node *node, aqumv_equivalent_transformation_context *context);
static bool contain_var_or_aggstar_clause_walker(Node *node, void *context);

typedef struct
{
Expand All @@ -82,16 +78,52 @@ typedef struct
int count; /* Count of subnodes in this expression */
} expr_to_sort;

static bool
contain_var_or_aggstar_clause(Node *node)
{
return contain_var_or_aggstar_clause_walker(node, NULL);
}

/* Copy from contain_var_clause_walker, but return true with aggstar. */
static bool
contain_var_or_aggstar_clause_walker(Node *node, void *context)
{
if (node == NULL)
return false;

if (IsA(node, Aggref) && ((Aggref *) node)->aggstar)
return true;

if (IsA(node, Var))
{
if (((Var *) node)->varlevelsup == 0)
return true; /* abort the tree traversal and return true */
return false;
}
if (IsA(node, CurrentOfExpr))
return true;
if (IsA(node, PlaceHolderVar))
{
if (((PlaceHolderVar *) node)->phlevelsup == 0)
return true; /* abort the tree traversal and return true */
/* else fall through to check the contained expr */
}
return expression_tree_walker(node, contain_var_or_aggstar_clause_walker, context);
}

/*
* Answer Query Using Materialized Views(AQUMV).
* This function modifies root(parse and etc.), current_rel in-place.
*/
RelOptInfo*
answer_query_using_materialized_views(PlannerInfo *root,
RelOptInfo *current_rel,
query_pathkeys_callback qp_callback,
void *qp_extra)
answer_query_using_materialized_views(PlannerInfo *root, AqumvContext aqumv_context)
{
RelOptInfo *current_rel = aqumv_context->current_rel;
query_pathkeys_callback qp_callback = aqumv_context->qp_callback;
void *qp_extra = aqumv_context->qp_extra;
List *raw_processed_tlist = aqumv_context->raw_processed_tlist;
Node *raw_havingQual = aqumv_context->raw_havingQual;

Query *parse = root->parse; /* Query of origin SQL. */
Query *viewQuery; /* Query of view. */
RelOptInfo *mv_final_rel = current_rel; /* Final rel after rewritten. */
Expand Down Expand Up @@ -202,18 +234,18 @@ answer_query_using_materialized_views(PlannerInfo *root,
* The Seqscan on a heap-storaged mv seems ordered, but it's a free lunch.
* A Parallel Seqscan breaks that hypothesis.
*/
if(viewQuery->hasAggs ||
viewQuery->hasWindowFuncs ||
if(viewQuery->hasWindowFuncs ||
viewQuery->hasDistinctOn ||
viewQuery->hasModifyingCTE ||
viewQuery->hasSubLinks ||
(limit_needed(viewQuery)) ||
(viewQuery->groupClause != NIL) ||
/* IVM doesn't support belows now, just in case. */
(viewQuery->rowMarks != NIL) ||
(viewQuery->distinctClause != NIL) ||
(viewQuery->cteList != NIL) ||
(viewQuery->setOperations != NULL) ||
(!viewQuery->hasAggs && (viewQuery->groupClause != NIL)) ||
((viewQuery->havingQual != NULL) && (viewQuery->groupClause == NIL)) ||
(viewQuery->scatterClause != NIL))
continue;

Expand Down Expand Up @@ -293,69 +325,195 @@ answer_query_using_materialized_views(PlannerInfo *root,

context = aqumv_init_context(viewQuery->targetList, matviewRel->rd_att);

/*
* Process and rewrite target list, return false if failed.
*/
if(!aqumv_process_targetlist(context, parse->targetList, &mv_final_tlist))
if (!parse->hasAggs && viewQuery->hasAggs)
continue;

viewQuery->targetList = mv_final_tlist;
if (parse->hasAggs && viewQuery->hasAggs)
{
if (parse->hasDistinctOn ||
parse->distinctClause != NIL ||
parse->groupClause != NIL ||
parse->groupingSets != NIL ||
parse->groupDistinct)
continue;

/* No Group by now. */
if (viewQuery->hasDistinctOn ||
viewQuery->distinctClause != NIL ||
viewQuery->groupClause != NIL ||
viewQuery->groupingSets != NIL ||
viewQuery->groupDistinct ||
viewQuery->havingQual != NULL || /* HAVING clause is not supported on IMMV yet. */
limit_needed(viewQuery)) /* LIMIT, OFFSET is not supported on IMMV yet. */
continue;

/*
* NB: Update processed_tlist again in case that tlist has been changed.
*/
preprocess_targetlist(subroot);
/*
* There is a trick for ORDER BY for both origin query and view query.
* As we has no Groupy By here, the aggregation results would be either one or
* zero rows that make the Order By clause pointless.
* We could avoid considering the sort columns if it's a junk for view matching.
* This in-place update raw_processed_tlist.
*/
if (parse->sortClause != NIL || viewQuery->sortClause != NIL)
{
ListCell *lc;
ListCell *lcc;

foreach (lc, raw_processed_tlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
if (!tle->resjunk || (0 == tle->ressortgroupref))
continue;

foreach (lcc, parse->sortClause)
{
SortGroupClause *srt = (SortGroupClause *) lfirst(lcc);
if (tle->ressortgroupref == srt->tleSortGroupRef)
foreach_delete_current(raw_processed_tlist, lc);
}
}
/* Earse view's sort caluse, it's ok to let alone view's target list. */
viewQuery->sortClause = NIL;
}

/*
* We have successfully processed target list, and all columns in Aggrefs
* could be computed from viewQuery.
*/
viewQuery->hasAggs = parse->hasAggs;
viewQuery->hasDistinctOn = parse->hasDistinctOn;
/*
* For HAVING quals have aggregations, we have already processed them in
* Aggrefs during aqumv_process_targetlist().
* For HAVING quals don't have aggregations, they may be pushed down to
* jointree's quals and would be processed in post_quals later.
* Set havingQual before we preprocess_aggrefs for that.
*/
viewQuery->havingQual = parse->havingQual;
if (viewQuery->hasAggs)
{
preprocess_aggrefs(subroot, (Node *) subroot->processed_tlist);
preprocess_aggrefs(subroot, viewQuery->havingQual);
/*
* Process Limit:
* The result would be one row at most.
* View may be useful even Limit clause is different, ex:
* View:
* create incremental materialized view mv as
* select count(*) as mc1 from t;
* Query:
* select count(*) from t limit 1;
* Rewrite to:
* select mc1 from mv limit 1;
*/
/* Below logic is based on view has no LIMIT/OFFSET. */
Assert(!limit_needed(viewQuery));
if (limit_needed(parse))
{
Node *node;
/*
* AQUMV don't support sublinks now.
* Use query's LIMIT/OFFSET if they are const in case.
*/
node = parse->limitCount;
if (node && !IsA(node, Const))
continue;

node = parse->limitOffset;
if (node && !IsA(node, Const))
continue;

viewQuery->limitCount = copyObject(parse->limitCount);
viewQuery->limitOffset = copyObject(parse->limitOffset);
viewQuery->limitOption = parse->limitOption;
}

preprocess_qual_conditions(subroot, (Node *) viewQuery->jointree);

if(!aqumv_process_from_quals(parse->jointree->quals, viewQuery->jointree->quals, &post_quals))
continue;

if (post_quals != NIL)
continue;

/* Move HAVING quals to WHERE quals. */
viewQuery->jointree->quals = aqumv_adjust_sub_matched_expr_mutator(copyObject(raw_havingQual), context);
if (context->has_unmatched)
continue;
subroot->hasHavingQual = false;

if(!aqumv_process_targetlist(context, raw_processed_tlist, &mv_final_tlist))
continue;

viewQuery->targetList = mv_final_tlist;
viewQuery->hasAggs = false;
subroot->agginfos = NIL;
subroot->aggtransinfos = NIL;
subroot->hasNonPartialAggs = false;
subroot->hasNonSerialAggs = false;
subroot->numOrderedAggs = false;
/* CBDB specifical */
subroot->hasNonCombine = false;
subroot->numPureOrderedAggs = false;
/*
* NB: Update processed_tlist again in case that tlist has been changed.
*/
subroot->processed_tlist = NIL;
preprocess_targetlist(subroot);
}
viewQuery->groupClause = parse->groupClause;
viewQuery->groupingSets = parse->groupingSets;
viewQuery->sortClause = parse->sortClause;
viewQuery->distinctClause = parse->distinctClause;
viewQuery->limitOption = parse->limitOption;
viewQuery->limitCount = parse->limitCount;
viewQuery->limitOffset = parse->limitOffset;
else
{
/*
* Process and rewrite target list, return false if failed.
*/
if(!aqumv_process_targetlist(context, parse->targetList, &mv_final_tlist))
continue;

/*
* AQUMV
* Process all quals to conjunctive normal form.
*
* We assume that the selection predicates of view and query expressions
* have been converted into conjunctive normal form(CNF) before we process
* them.
*/
preprocess_qual_conditions(subroot, (Node *) viewQuery->jointree);
viewQuery->targetList = mv_final_tlist;

/*
* Process quals, return false if failed.
* Else, post_quals are filled if there were.
* Like process target list, post_quals is used later to see if we could
* rewrite and apply it to mv relation.
*/
if(!aqumv_process_from_quals(parse->jointree->quals, viewQuery->jointree->quals, &post_quals))
continue;
/*
* NB: Update processed_tlist again in case that tlist has been changed.
*/
preprocess_targetlist(subroot);

/* Rewrite post_quals, return false if failed. */
post_quals = (List *)aqumv_adjust_sub_matched_expr_mutator((Node *)post_quals, context);
if (context->has_unmatched)
continue;
/*
* We have successfully processed target list, and all columns in Aggrefs
* could be computed from viewQuery.
*/
viewQuery->hasAggs = parse->hasAggs;
viewQuery->hasDistinctOn = parse->hasDistinctOn;
/*
* For HAVING quals don't have aggregations, they may be pushed down to
* jointree's quals and would be processed in post_quals later.
* Set havingQual before we preprocess_aggrefs for that.
*/
viewQuery->havingQual = parse->havingQual;
if (viewQuery->hasAggs)
{
preprocess_aggrefs(subroot, (Node *) subroot->processed_tlist);
preprocess_aggrefs(subroot, viewQuery->havingQual);
}

viewQuery->havingQual = aqumv_adjust_sub_matched_expr_mutator(viewQuery->havingQual, context);
if (context->has_unmatched)
continue;

viewQuery->groupClause = parse->groupClause;
viewQuery->groupingSets = parse->groupingSets;
viewQuery->sortClause = parse->sortClause;
viewQuery->distinctClause = parse->distinctClause;
viewQuery->limitOption = parse->limitOption;
viewQuery->limitCount = parse->limitCount;
viewQuery->limitOffset = parse->limitOffset;

/*
* AQUMV
* Process all quals to conjunctive normal form.
*
* We assume that the selection predicates of view and query expressions
* have been converted into conjunctive normal form(CNF) before we process
* them.
*/
preprocess_qual_conditions(subroot, (Node *) viewQuery->jointree);

/*
* Process quals, return false if failed.
* Else, post_quals are filled if there were.
* Like process target list, post_quals is used later to see if we could
* rewrite and apply it to mv relation.
*/
if(!aqumv_process_from_quals(parse->jointree->quals, viewQuery->jointree->quals, &post_quals))
continue;

/* Rewrite post_quals, return false if failed. */
post_quals = (List *)aqumv_adjust_sub_matched_expr_mutator((Node *)post_quals, context);
if (context->has_unmatched)
continue;

viewQuery->jointree->quals = (Node *)post_quals;
}

/*
* AQUMV
Expand All @@ -368,7 +526,6 @@ answer_query_using_materialized_views(PlannerInfo *root,
mvrte->relkind = RELKIND_MATVIEW;
mvrte->relid = matviewRel->rd_rel->oid;
viewQuery->rtable = list_make1(mvrte); /* rewrite to SELECT FROM mv itself. */
viewQuery->jointree->quals = (Node *)post_quals; /* Could be NULL, but doesn'y matter for now. */

/*
* Build a plan of new SQL.
Expand Down Expand Up @@ -396,6 +553,7 @@ answer_query_using_materialized_views(PlannerInfo *root,
/* CBDB specifical */
root->hasNonCombine = subroot->hasNonCombine;
root->numPureOrderedAggs = subroot->numPureOrderedAggs;
root->hasHavingQual = subroot->hasHavingQual;

/*
* Update pathkeys which may be changed by qp_callback.
Expand Down Expand Up @@ -454,8 +612,8 @@ aqumv_init_context(List *view_tlist, TupleDesc mv_tupledesc)
if (tle->resjunk)
continue;

/* Avoid expression has no Vars. */
if(!contain_var_clause((Node*)tle))
/* Avoid expression has no Vars, excpet for count(*). */
if(!contain_var_or_aggstar_clause((Node*)tle))
continue;

/* To be sorted later */
Expand Down Expand Up @@ -628,7 +786,7 @@ static Node *aqumv_adjust_sub_matched_expr_mutator(Node *node, aqumv_equivalent_
* And if expr doesn't have Vars, return it to upper.
* Keep TargetEntry expr no changed in case for count(*).
*/
if (!contain_var_clause((Node *)node_expr))
if (!contain_var_or_aggstar_clause((Node *)node_expr))
return is_targetEntry ? node : (Node *)node_expr;

/*
Expand Down
Loading

0 comments on commit a3b07d7

Please sign in to comment.