diff --git a/CHANGES.md b/CHANGES.md index 037d278f..22b52eb8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -20,6 +20,8 @@ when the repository does not exist. With previous versions of CrateDB, it was `RepositoryUnknownException`. +- Add baseline infrastructure for emulating materialized views. + ## 2023/06/27 0.0.0 - Import "data retention" implementation from . diff --git a/cratedb_toolkit/materialized/__init__.py b/cratedb_toolkit/materialized/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/materialized/core.py b/cratedb_toolkit/materialized/core.py new file mode 100644 index 00000000..5a4d89cb --- /dev/null +++ b/cratedb_toolkit/materialized/core.py @@ -0,0 +1,57 @@ +# Copyright (c) 2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import logging + +import sqlalchemy as sa + +from cratedb_toolkit.materialized.model import MaterializedViewSettings +from cratedb_toolkit.materialized.store import MaterializedViewStore +from cratedb_toolkit.model import TableAddress + +logger = logging.getLogger(__name__) + + +class MaterializedViewManager: + """ + The main application, implementing basic synthetic materialized views. + """ + + def __init__(self, settings: MaterializedViewSettings): + # Runtime context settings. + self.settings = settings + + # Retention policy store API. + self.store = MaterializedViewStore(settings=self.settings) + + def refresh(self, name: str): + """ + Resolve materialized view, and refresh it. + """ + logger.info(f"Refreshing materialized view: {name}") + + table_schema, table_name = name.split(".") + table_address = TableAddress(schema=table_schema, table=table_name) + mview = self.store.get_by_table(table_address) + logger.info(f"Loaded materialized view definition: {mview}") + + sql_ddl = f"DROP TABLE IF EXISTS {mview.staging_table_fullname}" + logger.info(f"Dropping materialized view (staging): {sql_ddl}") + self.store.execute(sa.text(sql_ddl)) + + # TODO: IF NOT EXISTS + sql_ddl = f"CREATE TABLE {mview.staging_table_fullname} AS (\n{mview.sql}\n)" + logger.info(f"Creating materialized view (staging): {sql_ddl}") + self.store.execute(sa.text(sql_ddl)) + sql_refresh = f"REFRESH TABLE {mview.staging_table_fullname}" + self.store.execute(sa.text(sql_refresh)) + + # sql_ddl = f"DROP TABLE IF EXISTS {mview.table_fullname}" + # logger.info(f"Dropping materialized view (live): {sql_ddl}") + # self.store.execute(sa.text(sql_ddl)) + + # FIXME: SQLParseException[Target table name must not include a schema] + sql_ddl = f"ALTER TABLE {mview.staging_table_fullname} RENAME TO {mview.table_name}" + logger.info(f"Activating materialized view: {sql_ddl}") + self.store.execute(sa.text(sql_ddl)) + sql_refresh = f"REFRESH TABLE {mview.table_fullname}" + self.store.execute(sa.text(sql_refresh)) diff --git a/cratedb_toolkit/materialized/model.py b/cratedb_toolkit/materialized/model.py new file mode 100644 index 00000000..6f3f2335 --- /dev/null +++ b/cratedb_toolkit/materialized/model.py @@ -0,0 +1,98 @@ +# Copyright (c) 2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import dataclasses +import os +import typing as t + +from cratedb_toolkit.model import DatabaseAddress, TableAddress + + +@dataclasses.dataclass +class MaterializedView: + """ + Manage the database representation of a "materialized view" entity. + + This layout has to be synchronized with the corresponding table definition + per SQL DDL statement within `schema.sql`. + """ + + table_schema: t.Optional[str] = dataclasses.field( + default=None, + metadata={"help": "The target table schema"}, + ) + table_name: t.Optional[str] = dataclasses.field( + default=None, + metadata={"help": "The target table name"}, + ) + sql: t.Optional[str] = dataclasses.field( + default=None, + metadata={"help": "The SQL statement defining the emulated materialized view"}, + ) + + id: t.Optional[str] = dataclasses.field( # noqa: A003 + default=None, + metadata={"help": "The materialized view identifier"}, + ) + + @property + def table_fullname(self) -> str: + return f'"{self.table_schema}"."{self.table_name}"' + + @property + def staging_table_fullname(self) -> str: + # FIXME: SQLParseException[Target table name must not include a schema] + # TODO: CrateDB does not support renaming to a different schema, thus the target + # table identifier must not include a schema. This is an artificial limitation. + # Technically, it can be done. + # https://github.com/crate/crate/blob/5.3.3/server/src/main/java/io/crate/analyze/AlterTableAnalyzer.java#L97-L102 + # return f'"{self.table_schema}-staging"."{self.table_name}"' + return f'"{self.table_schema}"."{self.table_name}-staging"' + + @classmethod + def from_record(cls, record) -> "MaterializedView": + return cls(**record) + + def to_storage_dict(self, identifier: t.Optional[str] = None) -> t.Dict[str, str]: + """ + Return representation suitable for storing into database table using SQLAlchemy. + """ + + # Serialize to dictionary. + data = dataclasses.asdict(self) + + # Optionally add identifier. + if identifier is not None: + data["id"] = identifier + + return data + + +def default_table_address(): + """ + The default address of the materialized view management table. + """ + schema = os.environ.get("CRATEDB_EXT_SCHEMA", "ext") + return TableAddress(schema=schema, table="materialized_view") + + +@dataclasses.dataclass +class MaterializedViewSettings: + """ + Bundle all configuration and runtime settings. + """ + + # Database connection URI. + database: DatabaseAddress = dataclasses.field( + default_factory=lambda: DatabaseAddress.from_string("crate://localhost/") + ) + + # The address of the materialized view table. + materialized_table: TableAddress = dataclasses.field(default_factory=default_table_address) + + # Only pretend to invoke statements. + dry_run: t.Optional[bool] = False + + def to_dict(self): + data = dataclasses.asdict(self) + data["materialized_table"] = self.materialized_table + return data diff --git a/cratedb_toolkit/materialized/schema.py b/cratedb_toolkit/materialized/schema.py new file mode 100644 index 00000000..11b5962a --- /dev/null +++ b/cratedb_toolkit/materialized/schema.py @@ -0,0 +1,36 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import logging +from importlib.resources import read_text + +from cratedb_toolkit.materialized.model import MaterializedViewSettings +from cratedb_toolkit.util.database import run_sql + +logger = logging.getLogger(__name__) + + +def setup_schema(settings: MaterializedViewSettings): + """ + Set up the materialized view management table schema. + + TODO: Refactor to `store` module. + """ + + logger.info( + f"Installing materialized view management table at " + f"database '{settings.database.safe}', table {settings.materialized_table}" + ) + + # Read SQL DDL statement. + sql = read_text("cratedb_toolkit.materialized", "schema.sql") + + tplvars = settings.to_dict() + sql = sql.format_map(tplvars) + + if settings.dry_run: + logger.info(f"Pretending to execute SQL statement:\n{sql}") + return + + # Materialize table schema. + run_sql(settings.database.dburi, sql) + run_sql(settings.database.dburi, f"REFRESH TABLE {settings.materialized_table.fullname}") diff --git a/cratedb_toolkit/materialized/schema.sql b/cratedb_toolkit/materialized/schema.sql new file mode 100644 index 00000000..ae392cc2 --- /dev/null +++ b/cratedb_toolkit/materialized/schema.sql @@ -0,0 +1,14 @@ +-- Set up the materialized view management database table schema. +CREATE TABLE IF NOT EXISTS {materialized_table.fullname} ( + + "id" TEXT NOT NULL PRIMARY KEY, + + -- Target: The database table to be populated. + "table_schema" TEXT, -- The source table schema. + "table_name" TEXT, -- The source table name. + + -- The SQL statement defining the emulated materialized view. + "sql" TEXT + +) +CLUSTERED INTO 1 SHARDS; diff --git a/cratedb_toolkit/materialized/store.py b/cratedb_toolkit/materialized/store.py new file mode 100644 index 00000000..3781a202 --- /dev/null +++ b/cratedb_toolkit/materialized/store.py @@ -0,0 +1,157 @@ +# Copyright (c) 2021-2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import logging +import typing as t +import uuid + +import sqlalchemy as sa +from sqlalchemy import MetaData, Result, Table +from sqlalchemy.orm import Session + +from cratedb_toolkit.exception import TableNotFound +from cratedb_toolkit.materialized.model import MaterializedView, MaterializedViewSettings +from cratedb_toolkit.model import TableAddress +from cratedb_toolkit.util.database import DatabaseAdapter + +logger = logging.getLogger(__name__) + + +class MaterializedViewStore: + """ + A wrapper around the materialized view management table. + """ + + def __init__(self, settings: MaterializedViewSettings): + self.settings = settings + + logger.info( + f"Connecting to database {self.settings.database.safe}, " + f"table {self.settings.materialized_table.fullname}" + ) + + # Set up generic database adapter. + self.database: DatabaseAdapter = DatabaseAdapter(dburi=self.settings.database.dburi) + + # Set up SQLAlchemy Core adapter for materialized view management table. + metadata = MetaData(schema=self.settings.materialized_table.schema) + self.table = Table(self.settings.materialized_table.table, metadata, autoload_with=self.database.engine) + + def create(self, mview: MaterializedView, ignore: t.Optional[str] = None): + """ + Create a new materialized view, and return its identifier. + + TODO: Generalize, see `RetentionPolicyStore`. + """ + + # TODO: Sanity check, whether target table already exists? + + ignore = ignore or "" + + # Sanity checks. + if mview.table_schema is None: + raise ValueError("Table schema needs to be defined") + if mview.table_name is None: + raise ValueError("Table name needs to be defined") + if self.exists(mview): + if not ignore.startswith("DuplicateKey"): + raise ValueError(f"Materialized view '{mview.table_schema}.{mview.table_name}' already exists") + + table = self.table + # TODO: Add UUID as converter to CrateDB driver? + identifier = str(uuid.uuid4()) + data = mview.to_storage_dict(identifier=identifier) + insertable = sa.insert(table).values(**data).returning(table.c.id) + cursor = self.execute(insertable) + identifier = cursor.one()[0] + self.synchronize() + return identifier + + def retrieve(self): + """ + Retrieve all records from database table. + + TODO: Add filtering capabilities. + TODO: Generalize, see `RetentionPolicyStore`. + """ + + # Run SELECT statement, and return result. + selectable = sa.select(self.table) + records = self.query(selectable) + return records + + def get_by_table(self, table_address: TableAddress) -> MaterializedView: + """ + Retrieve effective policies to process, by strategy and tags. + """ + table = self.table + selectable = sa.select(table).where( + table.c.table_schema == table_address.schema, + table.c.table_name == table_address.table, + ) + logger.info(f"View definition DQL: {selectable}") + try: + record = self.query(selectable)[0] + except IndexError: + raise KeyError( + f"Synthetic materialized table definition does not exist: {table_address.schema}.{table_address.table}" + ) + mview = MaterializedView.from_record(record) + return mview + + def delete(self, identifier: str) -> int: + """ + Delete materialized view by identifier. + + TODO: Generalize, see `RetentionPolicyStore`. + """ + table = self.table + constraint = table.c.id == identifier + deletable = sa.delete(table).where(constraint) + result = self.execute(deletable) + self.synchronize() + if result.rowcount == 0: + logger.warning(f"Materialized view not found with id: {identifier}") + return result.rowcount + + def execute(self, statement) -> Result: + """ + Execute SQL statement, and return result object. + + TODO: Generalize, see `RetentionPolicyStore`. + """ + with Session(self.database.engine) as session: + result = session.execute(statement) + session.commit() + return result + + def query(self, statement) -> t.List[t.Dict]: + """ + Execute SQL statement, fetch result rows, and return them converted to dictionaries. + + TODO: Generalize, see `RetentionPolicyStore`. + """ + cursor = self.execute(statement) + rows = cursor.mappings().fetchall() + records = [dict(row.items()) for row in rows] + return records + + def exists(self, mview: MaterializedView): + """ + Check if retention policy for specific table already exists. + + TODO: Generalize, see `RetentionPolicyStore`. + """ + table = self.table + selectable = sa.select(table).where( + table.c.table_schema == mview.table_schema, + table.c.table_name == mview.table_name, + ) + result = self.query(selectable) + return bool(result) + + def synchronize(self): + """ + Synchronize data by issuing `REFRESH TABLE` statement. + """ + sql = f"REFRESH TABLE {self.settings.materialized_table.fullname};" + self.database.run_sql(sql) diff --git a/examples/materialized_view.py b/examples/materialized_view.py new file mode 100644 index 00000000..f2e5e319 --- /dev/null +++ b/examples/materialized_view.py @@ -0,0 +1,206 @@ +# Copyright (c) 2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +""" +About +===== + +Example program demonstrating how to create and maintain a basic variant +of materialized views. + +It initializes the materialized view subsystem, inserts a bunch of data, +and creates a materialized view on it. After that, it inserts more data, +and refreshes the materialized view. + +The program obtains a single positional argument from the command line, +the database URI, in SQLAlchemy-compatible string format. By default, +the program connects to a CrateDB instance on localhost. + +Synopsis +======== +:: + + # Install package + pip install cratedb-toolkit + + # General. + python examples/materialized_view.py crate://:@:4200?ssl=true + + # Default. + python examples/materialized_view.py crate://localhost:4200 + +""" +import logging +import os +from textwrap import dedent + +from cratedb_toolkit.materialized.core import MaterializedViewManager +from cratedb_toolkit.materialized.model import MaterializedView, MaterializedViewSettings +from cratedb_toolkit.materialized.schema import setup_schema +from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.util import DatabaseAdapter, boot_with_dburi + +logger = logging.getLogger(__name__) + + +class MaterializedViewExample: + """ + An example program demonstrating basic materialized views. + """ + + def __init__(self, dburi): + # Set up a generic database adapter. + self.db = DatabaseAdapter(dburi=dburi) + + # Configure store API to use the `examples` schema. + self.settings = MaterializedViewSettings(database=DatabaseAddress.from_string(dburi)) + if "PYTEST_CURRENT_TEST" not in os.environ: + self.settings.materialized_table.schema = "examples" + + # Drop all tables used within this example. + self.cleanup() + + # Create the SQL DDL schema for the materialized views management table. + # TODO: Refactor to `MaterializedViewManager`. + setup_schema(settings=self.settings) + + # Provide manager instance. + self.manager = MaterializedViewManager(settings=self.settings) + + def cleanup(self): + """ + Drop materialized view management table and data tables. + """ + self.db.run_sql(f"DROP TABLE IF EXISTS {self.settings.materialized_table.fullname};") + self.db.run_sql('DROP TABLE IF EXISTS "examples"."raw_metrics";') + self.db.run_sql('DROP TABLE IF EXISTS "examples"."raw_metrics_view";') + self.db.run_sql('DROP TABLE IF EXISTS "examples"."raw_metrics_view-staging";') + + def setup(self): + """ + Create materialized view record in management table. + """ + + logger.info("Creating materialized view") + + # Add a record. + mview = MaterializedView( + table_schema="examples", + table_name="raw_metrics_view", + sql=dedent( + """ + SELECT variable, MIN(value), MAX(value), AVG(value) + FROM "examples"."raw_metrics" + GROUP BY variable + """ + ).strip(), + ) + self.manager.store.create(mview, ignore="DuplicateKeyException") + + self.db.run_sql(f"REFRESH TABLE {self.settings.materialized_table.fullname};") + + def setup_data(self): + """ + Provision and populate data table. + """ + ddl = """ + CREATE TABLE "examples"."raw_metrics" ( + "variable" TEXT, + "timestamp" TIMESTAMP WITH TIME ZONE, + "ts_day" TIMESTAMP GENERATED ALWAYS AS date_trunc('day', "timestamp"), + "value" REAL, + "quality" INTEGER, + PRIMARY KEY ("variable", "timestamp", "ts_day") + ) + PARTITIONED BY ("ts_day") + CLUSTERED INTO 1 SHARDS + ; + """ + self.db.run_sql(ddl) + + dml = """ + INSERT INTO "examples"."raw_metrics" + (variable, timestamp, value, quality) + SELECT + 'temperature' AS variable, + generate_series AS timestamp, + 12 + RANDOM()*40 AS value, + 0 AS quality + FROM generate_series('2023-06-01', '2023-06-30', '2 days'::INTERVAL); + """ + self.db.run_sql(dml) + + dml = """ + INSERT INTO "examples"."raw_metrics" + (variable, timestamp, value, quality) + SELECT + 'humidity' AS variable, + generate_series AS timestamp, + 40 + RANDOM()*50 AS value, + 0 AS quality + FROM generate_series('2023-06-01', '2023-06-30', '2 days'::INTERVAL); + """ + self.db.run_sql(dml) + + self.db.run_sql('REFRESH TABLE "examples"."raw_metrics";') + + def refresh(self): + """ + Refresh materialized view. + """ + self.manager.refresh(name="examples.raw_metrics_view") + # self.db.run_sql('REFRESH TABLE "examples"."raw_metrics_view";') + + +def main(dburi: str): + """ + Create and refresh emulated materialized views. + """ + + logger.info("Running example application") + + # Set up all the jazz. + logger.info("Provisioning database") + example = MaterializedViewExample(dburi=dburi) + example.setup() + example.setup_data() + + logger.info("Invoking materialized view refresh") + + # Invoke materialized view refresh. + example.refresh() + + # Report about number of records in table after first materialized view refresh. + count = example.db.count_records("examples.raw_metrics_view") + logger.info(f"Database table `examples.raw_metrics_view` contains {count} records") + + # Insert more data. + dml = """ + INSERT INTO "examples"."raw_metrics" + (variable, timestamp, value, quality) + SELECT + 'battery' AS variable, + generate_series AS timestamp, + 0 + RANDOM()*100 AS value, + 0 AS quality + FROM generate_series('2023-06-01', '2023-06-30', '2 days'::INTERVAL); + """ + example.db.run_sql(dml) + example.db.run_sql('REFRESH TABLE "examples"."raw_metrics";') + + # Invoke materialized view refresh. + example.refresh() + + # Report about number of records in table after second materialized view refresh. + count = example.db.count_records("examples.raw_metrics_view") + logger.info(f"Database table `examples.raw_metrics_view` contains {count} records") + + example.cleanup() + + +if __name__ == "__main__": + """ + The program obtains a single positional argument from the command line, + the database URI, in SQLAlchemy-compatible string format. + """ + dburi = boot_with_dburi() + main(dburi) diff --git a/tests/materialized/__init__.py b/tests/materialized/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/materialized/conftest.py b/tests/materialized/conftest.py new file mode 100644 index 00000000..cacc55a2 --- /dev/null +++ b/tests/materialized/conftest.py @@ -0,0 +1,38 @@ +import pytest + +from cratedb_toolkit.materialized.model import MaterializedViewSettings +from cratedb_toolkit.materialized.schema import setup_schema +from cratedb_toolkit.materialized.store import MaterializedViewStore +from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.util.database import DatabaseAdapter + + +@pytest.fixture() +def settings(cratedb): + """ + Provide configuration and runtime settings object, parameterized for the test suite. + """ + database_url = cratedb.get_connection_url() + settings = MaterializedViewSettings(database=DatabaseAddress.from_string(database_url)) + # job_settings.policy_table.schema = TESTDRIVE_EXT_SCHEMA + return settings + + +@pytest.fixture() +def database(cratedb, settings): + """ + Provide a client database adapter, which is connected to the test database instance. + """ + yield DatabaseAdapter(dburi=settings.database.dburi) + + +@pytest.fixture() +def store(database, settings): + """ + Provide a client database adapter, which is connected to the test database instance. + The retention policy database table schema has been established. + """ + # dcdcd + setup_schema(settings=settings) + store = MaterializedViewStore(settings=settings) + yield store diff --git a/tests/materialized/test_core.py b/tests/materialized/test_core.py new file mode 100644 index 00000000..5d292bed --- /dev/null +++ b/tests/materialized/test_core.py @@ -0,0 +1,64 @@ +import re + +import pytest +from sqlalchemy.exc import ProgrammingError + +from cratedb_toolkit.materialized.core import MaterializedViewManager +from cratedb_toolkit.materialized.model import MaterializedView +from tests.retention.conftest import TESTDRIVE_DATA_SCHEMA, TESTDRIVE_EXT_SCHEMA + + +@pytest.fixture +def mview(store) -> MaterializedView: + item = MaterializedView( + table_schema=TESTDRIVE_DATA_SCHEMA, + table_name="foobar", + sql=f'SELECT * FROM "{TESTDRIVE_DATA_SCHEMA}"."raw_metrics"', + id=None, + ) + store.create(item, ignore="DuplicateKeyException") + return item + + +def foo(database): + sdcsdc + # database.run_sql("DROP TABLE IF EXISTS testdrive.foobar;") + database.run_sql(f'CREATE TABLE "{TESTDRIVE_EXT_SCHEMA}"."foobar" AS SELECT 1;') + database.run_sql(f'CREATE TABLE "{TESTDRIVE_DATA_SCHEMA}"."raw_metrics" AS SELECT 1;') + # database.run_sql('CREATE TABLE "testdrive"."foobar" AS SELECT 1;') + + +def test_materialized_undefined(settings, database, store): + mvm = MaterializedViewManager(settings=settings) + with pytest.raises(KeyError) as ex: + mvm.refresh("unknown.unknown") + ex.match("Synthetic materialized table definition does not exist: unknown.unknown") + + +def test_materialized_missing_schema(settings, database, store, mview): + mvm = MaterializedViewManager(settings=settings) + with pytest.raises(ProgrammingError) as ex: + mvm.refresh(f"{TESTDRIVE_DATA_SCHEMA}.foobar") + ex.match(re.escape("SchemaUnknownException[Schema 'testdrive-data' unknown")) + + +def test_materialized_missing_table(settings, database, store, mview): + database.run_sql(f'CREATE TABLE "{TESTDRIVE_DATA_SCHEMA}"."foobar" AS SELECT 1;') + + mvm = MaterializedViewManager(settings=settings) + with pytest.raises(ProgrammingError) as ex: + mvm.refresh(f"{TESTDRIVE_DATA_SCHEMA}.foobar") + ex.match(re.escape("RelationUnknown[Relation 'testdrive-data.raw_metrics' unknown]")) + + +def te2st_materialized_success(settings, database, store, mview): + # TODO: Does not work. + # database.run_sql("CREATE TABLE IF NOT EXISTS testdrive.foobar AS SELECT 1") + + # database.run_sql("DROP TABLE IF EXISTS testdrive.foobar;") + # database.run_sql('CREATE TABLE "testdrive"."raw_data" AS SELECT 1;') + # database.run_sql('CREATE TABLE "testdrive"."foobar" AS SELECT 1;') + # database.run_sql('REFRESH TABLE "testdrive"."foobar";') + + mvm = MaterializedViewManager(settings=settings) + print(mvm.refresh("testdrive.foobar")) diff --git a/tests/materialized/test_examples.py b/tests/materialized/test_examples.py new file mode 100644 index 00000000..d61782f8 --- /dev/null +++ b/tests/materialized/test_examples.py @@ -0,0 +1,11 @@ +# Copyright (c) 2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. + + +def test_example_materialized_view(store): + """ + Verify that the program `examples/materialized_view.py` works. + """ + from examples.materialized_view import main + + main(dburi=store.database.dburi)