Skip to content

Commit

Permalink
Dynamic Table.
Browse files Browse the repository at this point in the history
Dynamic Table is a an auto-refreshing materialized view which could
be constructed by base tables, external tables, materialized views
and dynamic tables.
And it could be used to answer query by AQUMV.
As normal tables in CBDB, dynamic tables could also have distribution
keys.

The purpose of Dynamic Tables is to solve the problem often raised by
customers who are big fans of a lakehouse architecture: how can we run
queries on external tables as fast as internal tables?

CREATE DYNAMIC TABLE:

CREATE DYNAMIC TABLE dt0 SCHEDULE '5 * * * *' AS
  SELECT a, b, sum(c) FROM t1 GROUP BY a, b WITH NO DATA DISTRIBUTED
BY(b);
CREATE DYNAMIC TABLE

\d
                 List of relations
 Schema | Name |     Type      |  Owner  | Storage
--------+------+---------------+---------+---------
 public | dt0  | dynamic table | gpadmin | heap
 public | t1   | table         | gpadmin | heap
(2 rows)

CREATE DYNAMIC TABLE xxx AS Query
The Query allows any valid SELECT SQL of Materialized Views: from single
or multiple relations, base tables, materialized views, and dynamic
tables as well, joins, subquery, aggregation, group by and etc.

SCHEDULE:
A string used to schedule background job which auto-refreshes the
dynamic table.
We follow the valid string of pg_cron extension which supports linux
crontab, refer https://crontab.guru

 ┌───────────── min (0 - 59)
 │ ┌────────────── hour (0 - 23)
 │ │ ┌─────────────── day of month (1 - 31) or last day of the month ($)
 │ │ │ ┌──────────────── month (1 - 12)
 │ │ │ │ ┌───────────────── day of week (0 - 6) (0 to 6 are Sunday to
 │ │ │ │ │                  Saturday, or use names; 7 is also Sunday)
 │ │ │ │ │
 │ │ │ │ │
 * * * * *

You can also use '[1-59] seconds' to schedule a job based on an
interval.
The example creates a cron job refreshing the dynamic table at minute 5
of each hour.
For convenience, SCHEDULE is optional. If user didn't specific it, a default
schedule is provided: at every 5th minute.

WITH NO DATA:
Same as Materialized View, will create an empty Dynamic Table if
specified.

DISTRIBUTED BY:
Same as normal tables in CBDB, Dynamic Tables could support distribution
keys as materialized views.

Refresh Dynamic Table
As seen in pg_task, we put a command to auto-refresh dynamic tables.
However, if users want to do a REFRESH manually, exec command REFRESH
DYNAMIC TABLE is also supported.
REFRESH DYNAMIC TABLE dt0;
REFRESH DYNAMIC TABLE

Refresh WITH NO DATA
Same as Materialized Views, Refresh with no data will truncate the
Dynamic Table and make it unpopulated status.
REFRESH DYNAMIC TABLE dt0 WITH NO DATA;
REFRESH DYNAMIC TABLE

Drop Dynamic Table
Drop a Dynamic Table will drop its scheduler job automatically.
DROP DYNAMIC TABLE dt0;
DROP DYNAMIC TABLE

Like Materialized Views, Dynamic Tables could be used to answer query
too. This is limited by AQUMV.

Authored-by: Zhang Mingli [email protected]
  • Loading branch information
avamingli committed Nov 21, 2024
1 parent f92faf0 commit a2c9b3c
Show file tree
Hide file tree
Showing 27 changed files with 695 additions and 19 deletions.
1 change: 1 addition & 0 deletions contrib/pg_stat_statements/pg_stat_statements.c
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,7 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
rows = (qc && (qc->commandTag == CMDTAG_COPY ||
qc->commandTag == CMDTAG_FETCH ||
qc->commandTag == CMDTAG_SELECT ||
qc->commandTag == CMDTAG_REFRESH_DYNAMIC_TABLE ||
qc->commandTag == CMDTAG_REFRESH_MATERIALIZED_VIEW)) ?
qc->nprocessed : 0;

Expand Down
1 change: 1 addition & 0 deletions src/backend/catalog/heap.c
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,7 @@ InsertPgClassTuple(Relation pg_class_desc,
values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid);
values[Anum_pg_class_relisivm - 1] = BoolGetDatum(rd_rel->relisivm);
values[Anum_pg_class_relisdynamic - 1] = BoolGetDatum(rd_rel->relisdynamic);
if (relacl != (Datum) 0)
values[Anum_pg_class_relacl - 1] = relacl;
else
Expand Down
1 change: 1 addition & 0 deletions src/backend/catalog/index.c
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,7 @@ index_create_internal(Relation heapRelation,
indexRelation->rd_rel->relam = accessMethodObjectId;
indexRelation->rd_rel->relispartition = OidIsValid(parentIndexRelid);
indexRelation->rd_rel->relisivm = false;
indexRelation->rd_rel->relisdynamic = false;

/*
* store index's pg_class entry
Expand Down
20 changes: 17 additions & 3 deletions src/backend/catalog/objectaddress.c
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,9 @@ static const struct object_type_map
{
"materialized view", OBJECT_MATVIEW
},
{
"dynamic table", OBJECT_MATVIEW
},
{
"composite type", -1
}, /* unmapped */
Expand Down Expand Up @@ -4384,8 +4387,16 @@ getRelationDescription(StringInfo buffer, Oid relid, bool missing_ok)
relname);
break;
case RELKIND_MATVIEW:
appendStringInfo(buffer, _("materialized view %s"),
relname);
if (relForm->relisdynamic)
{
appendStringInfo(buffer, _("dynamic table %s"),
relname);
}
else
{
appendStringInfo(buffer, _("materialized view %s"),
relname);
}
break;
case RELKIND_COMPOSITE_TYPE:
appendStringInfo(buffer, _("composite type %s"),
Expand Down Expand Up @@ -4954,7 +4965,10 @@ getRelationTypeDescription(StringInfo buffer, Oid relid, int32 objectSubId,
appendStringInfoString(buffer, "view");
break;
case RELKIND_MATVIEW:
appendStringInfoString(buffer, "materialized view");
if (relForm->relisdynamic)
appendStringInfoString(buffer, "dynamic table");
else
appendStringInfoString(buffer, "materialized view");
break;
case RELKIND_COMPOSITE_TYPE:
appendStringInfoString(buffer, "composite type");
Expand Down
47 changes: 47 additions & 0 deletions src/backend/commands/createas.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "commands/prepare.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/taskcmds.h"
#include "commands/trigger.h"
#include "commands/view.h"
#include "miscadmin.h"
Expand Down Expand Up @@ -119,6 +120,9 @@ static bool check_ivm_restriction_walker(Node *node, check_ivm_restriction_conte
static Bitmapset *get_primary_key_attnos_from_query(Query *query, List **constraintList);
static bool check_aggregate_supports_ivm(Oid aggfnoid);

#define DYNAMIC_TABLE_DEFAULT_REFRESH_INTERVAL "5 * * * *"
static void create_dynamic_table_auto_refresh_task(ParseState *pstate, Relation DynamicTableRel, char *schedule);

/*
* create_ctas_internal
*
Expand Down Expand Up @@ -537,6 +541,14 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
CreateIvmTriggersOnBaseTables(query_immv, matviewOid);
}
}

/* Set Dynamic Tables. */
if (into->dynamicTbl)
{
SetDynamicTableState(matviewRel);
create_dynamic_table_auto_refresh_task(pstate, matviewRel, into->schedule);
}

table_close(matviewRel, NoLock);
}

Expand Down Expand Up @@ -1808,3 +1820,38 @@ get_primary_key_attnos_from_query(Query *query, List **constraintList)

return keys;
}

/*
* Create auto-refresh task for Dynamic Tables.
*/
static void
create_dynamic_table_auto_refresh_task(ParseState *pstate, Relation DynamicTableRel, char *schedule)
{
ObjectAddress refaddr;
ObjectAddress address;
StringInfoData buf;
char *dtname = NULL;

if (schedule == NULL)
schedule = DYNAMIC_TABLE_DEFAULT_REFRESH_INTERVAL;

/* Create auto refresh task. */
CreateTaskStmt *task_stmt = makeNode(CreateTaskStmt);

initStringInfo(&buf);
appendStringInfo(&buf, "gp_dynamic_table_refresh_%u", RelationGetRelid(DynamicTableRel));
task_stmt->taskname = pstrdup(buf.data);
task_stmt->schedule = pstrdup(schedule);
task_stmt->if_not_exists = false; /* report error if failed. */
dtname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(DynamicTableRel)),
RelationGetRelationName(DynamicTableRel));
resetStringInfo(&buf);
appendStringInfo(&buf, "REFRESH DYNAMIC TABLE %s", dtname);
task_stmt->sql = pstrdup(buf.data);
address = DefineTask(pstate, task_stmt);

refaddr.classId = RelationRelationId;
refaddr.objectId = RelationGetRelid(DynamicTableRel);
refaddr.objectSubId = 0;
recordDependencyOn(&address, &refaddr, DEPENDENCY_AUTO);
}
7 changes: 6 additions & 1 deletion src/backend/commands/explain.c
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,12 @@ ExplainOneUtility(Node *utilityStmt, IntoClause *into, ExplainState *es,
if (ctas->objtype == OBJECT_TABLE)
ExplainDummyGroup("CREATE TABLE AS", NULL, es);
else if (ctas->objtype == OBJECT_MATVIEW)
ExplainDummyGroup("CREATE MATERIALIZED VIEW", NULL, es);
{
if(ctas->into && ctas->into->dynamicTbl)
ExplainDummyGroup("CREATE DYNAMIC TABLE", NULL, es);
else
ExplainDummyGroup("CREATE MATERIALIZED VIEW", NULL, es);
}
else
elog(ERROR, "unexpected object type: %d",
(int) ctas->objtype);
Expand Down
47 changes: 46 additions & 1 deletion src/backend/commands/matview.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,46 @@ MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation)
return refreshClause;
}

/*
* SetMatViewIVMState
* Mark a materialized view as IVM, or not.
*
* NOTE: caller must be holding an appropriate lock on the relation.
*/
void
SetDynamicTableState(Relation relation)
{
Relation pgrel;
HeapTuple tuple;

Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);

/*
* Update relation's pg_class entry. Crucial side-effect: other backends
* (and this one too!) are sent SI message to make them rebuild relcache
* entries.
*/
pgrel = table_open(RelationRelationId, RowExclusiveLock);
tuple = SearchSysCacheCopy1(RELOID,
ObjectIdGetDatum(RelationGetRelid(relation)));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u",
RelationGetRelid(relation));

((Form_pg_class) GETSTRUCT(tuple))->relisdynamic = true;

CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);

heap_freetuple(tuple);
table_close(pgrel, RowExclusiveLock);

/*
* Advance command counter to make the updated pg_class row locally
* visible.
*/
CommandCounterIncrement();
}

/*
* SetMatViewIVMState
* Mark a materialized view as IVM, or not.
Expand Down Expand Up @@ -701,7 +741,12 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
* completion tag output might break applications using it.
*/
if (qc)
SetQueryCompletion(qc, CMDTAG_REFRESH_MATERIALIZED_VIEW, processed);
{
if (stmt->isdynamic)
SetQueryCompletion(qc, CMDTAG_REFRESH_DYNAMIC_TABLE, processed);
else
SetQueryCompletion(qc, CMDTAG_REFRESH_MATERIALIZED_VIEW, processed);
}

return address;
}
Expand Down
4 changes: 4 additions & 0 deletions src/backend/nodes/copyfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1808,6 +1808,8 @@ _copyIntoClause(const IntoClause *from)
COPY_SCALAR_FIELD(ivm);
COPY_SCALAR_FIELD(matviewOid);
COPY_STRING_FIELD(enrname);
COPY_SCALAR_FIELD(dynamicTbl);
COPY_STRING_FIELD(schedule);

return newnode;
}
Expand Down Expand Up @@ -4218,6 +4220,7 @@ _copyDropStmt(const DropStmt *from)
COPY_SCALAR_FIELD(behavior);
COPY_SCALAR_FIELD(missing_ok);
COPY_SCALAR_FIELD(concurrent);
COPY_SCALAR_FIELD(isdynamic);

return newnode;
}
Expand Down Expand Up @@ -4786,6 +4789,7 @@ _copyRefreshMatViewStmt(const RefreshMatViewStmt *from)
COPY_SCALAR_FIELD(concurrent);
COPY_SCALAR_FIELD(skipData);
COPY_NODE_FIELD(relation);
COPY_SCALAR_FIELD(isdynamic);

return newnode;
}
Expand Down
4 changes: 4 additions & 0 deletions src/backend/nodes/equalfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ _equalIntoClause(const IntoClause *a, const IntoClause *b)
COMPARE_SCALAR_FIELD(ivm);
COMPARE_SCALAR_FIELD(matviewOid);
COMPARE_STRING_FIELD(enrname);
COMPARE_SCALAR_FIELD(dynamicTbl);
COMPARE_STRING_FIELD(schedule);
return true;
}

Expand Down Expand Up @@ -1480,6 +1482,7 @@ _equalDropStmt(const DropStmt *a, const DropStmt *b)
COMPARE_SCALAR_FIELD(behavior);
COMPARE_SCALAR_FIELD(missing_ok);
COMPARE_SCALAR_FIELD(concurrent);
COMPARE_SCALAR_FIELD(isdynamic);

return true;
}
Expand Down Expand Up @@ -1962,6 +1965,7 @@ _equalRefreshMatViewStmt(const RefreshMatViewStmt *a, const RefreshMatViewStmt *
COMPARE_SCALAR_FIELD(concurrent);
COMPARE_SCALAR_FIELD(skipData);
COMPARE_NODE_FIELD(relation);
COMPARE_SCALAR_FIELD(isdynamic);

return true;
}
Expand Down
2 changes: 2 additions & 0 deletions src/backend/nodes/outfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,8 @@ _outIntoClause(StringInfo str, const IntoClause *node)
WRITE_BOOL_FIELD(ivm);
WRITE_OID_FIELD(matviewOid);
WRITE_STRING_FIELD(enrname);
WRITE_BOOL_FIELD(dynamicTbl);
WRITE_STRING_FIELD(schedule);
}

static void
Expand Down
1 change: 1 addition & 0 deletions src/backend/nodes/outfuncs_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ _outDropStmt(StringInfo str, const DropStmt *node)
WRITE_ENUM_FIELD(behavior, DropBehavior);
WRITE_BOOL_FIELD(missing_ok);
WRITE_BOOL_FIELD(concurrent);
WRITE_BOOL_FIELD(isdynamic);
}

static void
Expand Down
2 changes: 2 additions & 0 deletions src/backend/nodes/readfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,8 @@ _readIntoClause(void)
READ_BOOL_FIELD(ivm);
READ_OID_FIELD(matviewOid);
READ_STRING_FIELD(enrname);
READ_BOOL_FIELD(dynamicTbl);
READ_STRING_FIELD(schedule);

READ_DONE();
}
Expand Down
1 change: 1 addition & 0 deletions src/backend/nodes/readfuncs_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,7 @@ _readDropStmt(void)
READ_ENUM_FIELD(behavior,DropBehavior);
READ_BOOL_FIELD(missing_ok);
READ_BOOL_FIELD(concurrent);
READ_BOOL_FIELD(isdynamic);

/* Force 'missing_ok' in QEs */
#ifdef COMPILING_BINARY_FUNCS
Expand Down
Loading

0 comments on commit a2c9b3c

Please sign in to comment.