Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add baseline infrastructure for emulating materialized views #56

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
when the repository does not exist. With previous versions of CrateDB, it was
`RepositoryUnknownException`.

- Add baseline infrastructure for emulating materialized views.

## 2023/06/27 0.0.0

- Import "data retention" implementation from <https://github.com/crate/crate-airflow-tutorial>.
Expand Down
Empty file.
57 changes: 57 additions & 0 deletions cratedb_toolkit/materialized/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (c) 2023, Crate.io Inc.
# Distributed under the terms of the AGPLv3 license, see LICENSE.
import logging

import sqlalchemy as sa

from cratedb_toolkit.materialized.model import MaterializedViewSettings
from cratedb_toolkit.materialized.store import MaterializedViewStore
from cratedb_toolkit.model import TableAddress

logger = logging.getLogger(__name__)


class MaterializedViewManager:
"""
The main application, implementing basic synthetic materialized views.
"""

def __init__(self, settings: MaterializedViewSettings):
# Runtime context settings.
self.settings = settings

# Retention policy store API.
self.store = MaterializedViewStore(settings=self.settings)

def refresh(self, name: str):
"""
Resolve materialized view, and refresh it.
"""
logger.info(f"Refreshing materialized view: {name}")

table_schema, table_name = name.split(".")
table_address = TableAddress(schema=table_schema, table=table_name)
mview = self.store.get_by_table(table_address)
logger.info(f"Loaded materialized view definition: {mview}")

sql_ddl = f"DROP TABLE IF EXISTS {mview.staging_table_fullname}"
logger.info(f"Dropping materialized view (staging): {sql_ddl}")
self.store.execute(sa.text(sql_ddl))

# TODO: IF NOT EXISTS
sql_ddl = f"CREATE TABLE {mview.staging_table_fullname} AS (\n{mview.sql}\n)"
logger.info(f"Creating materialized view (staging): {sql_ddl}")
self.store.execute(sa.text(sql_ddl))
Comment on lines +41 to +44
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another spot where I discovered a shortcoming in SQL syntax: CREATE TABLE ... AS ... does not support the IF NOT EXISTS infix.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We filed a database issue about it.

sql_refresh = f"REFRESH TABLE {mview.staging_table_fullname}"
self.store.execute(sa.text(sql_refresh))

# sql_ddl = f"DROP TABLE IF EXISTS {mview.table_fullname}"
# logger.info(f"Dropping materialized view (live): {sql_ddl}")
# self.store.execute(sa.text(sql_ddl))

# FIXME: SQLParseException[Target table name must not include a schema]
sql_ddl = f"ALTER TABLE {mview.staging_table_fullname} RENAME TO {mview.table_name}"
logger.info(f"Activating materialized view: {sql_ddl}")
self.store.execute(sa.text(sql_ddl))
Comment on lines +52 to +55
Copy link
Member Author

@amotl amotl Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a ticket in crate/crate, about that currently, you can't rename a table "into a different schema", i.e. it will always be doc.

Copy link
Member Author

@amotl amotl Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like resolving this issue might not be too difficult. If someone has the capacity to file a corresponding issue at crate/crate, I will appreciate it. /cc @hammerhead, @hlcianfagna 🌻

// we do not support renaming to a different schema, thus the target table identifier must not include a schema
// this is an artificial limitation, technically it can be done
List<String> newIdentParts = node.newName().getParts();
if (newIdentParts.size() > 1) {
    throw new IllegalArgumentException("Target table name must not include a schema");
}

-- https://github.com/crate/crate/blob/5.4.3/server/src/main/java/io/crate/analyze/AlterTableAnalyzer.java#L96C1-L102C10

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hlcianfagna suggested to use ALTER CLUSTER SWAP TABLE as a workaround at crate/crate#14833 (comment). Thank you.

sql_refresh = f"REFRESH TABLE {mview.table_fullname}"
self.store.execute(sa.text(sql_refresh))
98 changes: 98 additions & 0 deletions cratedb_toolkit/materialized/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Copyright (c) 2023, Crate.io Inc.
# Distributed under the terms of the AGPLv3 license, see LICENSE.
import dataclasses
import os
import typing as t

from cratedb_toolkit.model import DatabaseAddress, TableAddress


@dataclasses.dataclass
class MaterializedView:
"""
Manage the database representation of a "materialized view" entity.

This layout has to be synchronized with the corresponding table definition
per SQL DDL statement within `schema.sql`.
"""

table_schema: t.Optional[str] = dataclasses.field(
default=None,
metadata={"help": "The target table schema"},
)
table_name: t.Optional[str] = dataclasses.field(
default=None,
metadata={"help": "The target table name"},
)
sql: t.Optional[str] = dataclasses.field(
default=None,
metadata={"help": "The SQL statement defining the emulated materialized view"},
)

id: t.Optional[str] = dataclasses.field( # noqa: A003
default=None,
metadata={"help": "The materialized view identifier"},
)

@property
def table_fullname(self) -> str:
return f'"{self.table_schema}"."{self.table_name}"'

@property
def staging_table_fullname(self) -> str:
# FIXME: SQLParseException[Target table name must not include a schema]
# TODO: CrateDB does not support renaming to a different schema, thus the target
# table identifier must not include a schema. This is an artificial limitation.
# Technically, it can be done.
# https://github.com/crate/crate/blob/5.3.3/server/src/main/java/io/crate/analyze/AlterTableAnalyzer.java#L97-L102
# return f'"{self.table_schema}-staging"."{self.table_name}"'
return f'"{self.table_schema}"."{self.table_name}-staging"'

@classmethod
def from_record(cls, record) -> "MaterializedView":
return cls(**record)

def to_storage_dict(self, identifier: t.Optional[str] = None) -> t.Dict[str, str]:
"""
Return representation suitable for storing into database table using SQLAlchemy.
"""

# Serialize to dictionary.
data = dataclasses.asdict(self)

# Optionally add identifier.
if identifier is not None:
data["id"] = identifier

return data


def default_table_address():
"""
The default address of the materialized view management table.
"""
schema = os.environ.get("CRATEDB_EXT_SCHEMA", "ext")
return TableAddress(schema=schema, table="materialized_view")


@dataclasses.dataclass
class MaterializedViewSettings:
"""
Bundle all configuration and runtime settings.
"""

# Database connection URI.
database: DatabaseAddress = dataclasses.field(
default_factory=lambda: DatabaseAddress.from_string("crate://localhost/")
)

# The address of the materialized view table.
materialized_table: TableAddress = dataclasses.field(default_factory=default_table_address)

# Only pretend to invoke statements.
dry_run: t.Optional[bool] = False

def to_dict(self):
data = dataclasses.asdict(self)
data["materialized_table"] = self.materialized_table
return data
36 changes: 36 additions & 0 deletions cratedb_toolkit/materialized/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (c) 2021-2023, Crate.io Inc.
# Distributed under the terms of the AGPLv3 license, see LICENSE.
import logging
from importlib.resources import read_text

from cratedb_toolkit.materialized.model import MaterializedViewSettings
from cratedb_toolkit.util.database import run_sql

logger = logging.getLogger(__name__)


def setup_schema(settings: MaterializedViewSettings):
"""
Set up the materialized view management table schema.

TODO: Refactor to `store` module.
"""

logger.info(
f"Installing materialized view management table at "
f"database '{settings.database.safe}', table {settings.materialized_table}"
)

# Read SQL DDL statement.
sql = read_text("cratedb_toolkit.materialized", "schema.sql")

tplvars = settings.to_dict()
sql = sql.format_map(tplvars)

if settings.dry_run:
logger.info(f"Pretending to execute SQL statement:\n{sql}")
return

# Materialize table schema.
run_sql(settings.database.dburi, sql)
run_sql(settings.database.dburi, f"REFRESH TABLE {settings.materialized_table.fullname}")
14 changes: 14 additions & 0 deletions cratedb_toolkit/materialized/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- Set up the materialized view management database table schema.
CREATE TABLE IF NOT EXISTS {materialized_table.fullname} (

"id" TEXT NOT NULL PRIMARY KEY,

-- Target: The database table to be populated.
"table_schema" TEXT, -- The source table schema.
"table_name" TEXT, -- The source table name.

-- The SQL statement defining the emulated materialized view.
"sql" TEXT

)
CLUSTERED INTO 1 SHARDS;
157 changes: 157 additions & 0 deletions cratedb_toolkit/materialized/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Copyright (c) 2021-2023, Crate.io Inc.
# Distributed under the terms of the AGPLv3 license, see LICENSE.
import logging
import typing as t
import uuid

import sqlalchemy as sa
from sqlalchemy import MetaData, Result, Table
from sqlalchemy.orm import Session

from cratedb_toolkit.exception import TableNotFound
from cratedb_toolkit.materialized.model import MaterializedView, MaterializedViewSettings
from cratedb_toolkit.model import TableAddress
from cratedb_toolkit.util.database import DatabaseAdapter

logger = logging.getLogger(__name__)


class MaterializedViewStore:
"""
A wrapper around the materialized view management table.
"""

def __init__(self, settings: MaterializedViewSettings):
self.settings = settings

logger.info(
f"Connecting to database {self.settings.database.safe}, "
f"table {self.settings.materialized_table.fullname}"
)

# Set up generic database adapter.
self.database: DatabaseAdapter = DatabaseAdapter(dburi=self.settings.database.dburi)

# Set up SQLAlchemy Core adapter for materialized view management table.
metadata = MetaData(schema=self.settings.materialized_table.schema)
self.table = Table(self.settings.materialized_table.table, metadata, autoload_with=self.database.engine)

def create(self, mview: MaterializedView, ignore: t.Optional[str] = None):
"""
Create a new materialized view, and return its identifier.

TODO: Generalize, see `RetentionPolicyStore`.
"""

# TODO: Sanity check, whether target table already exists?

ignore = ignore or ""

# Sanity checks.
if mview.table_schema is None:
raise ValueError("Table schema needs to be defined")
if mview.table_name is None:
raise ValueError("Table name needs to be defined")
if self.exists(mview):
if not ignore.startswith("DuplicateKey"):
raise ValueError(f"Materialized view '{mview.table_schema}.{mview.table_name}' already exists")

table = self.table
# TODO: Add UUID as converter to CrateDB driver?
identifier = str(uuid.uuid4())
data = mview.to_storage_dict(identifier=identifier)
insertable = sa.insert(table).values(**data).returning(table.c.id)
cursor = self.execute(insertable)
identifier = cursor.one()[0]
self.synchronize()
return identifier

def retrieve(self):
"""
Retrieve all records from database table.

TODO: Add filtering capabilities.
TODO: Generalize, see `RetentionPolicyStore`.
"""

# Run SELECT statement, and return result.
selectable = sa.select(self.table)
records = self.query(selectable)
return records

def get_by_table(self, table_address: TableAddress) -> MaterializedView:
"""
Retrieve effective policies to process, by strategy and tags.
"""
table = self.table
selectable = sa.select(table).where(
table.c.table_schema == table_address.schema,
table.c.table_name == table_address.table,
)
logger.info(f"View definition DQL: {selectable}")
try:
record = self.query(selectable)[0]
except IndexError:
raise KeyError(
f"Synthetic materialized table definition does not exist: {table_address.schema}.{table_address.table}"
)
mview = MaterializedView.from_record(record)
return mview

def delete(self, identifier: str) -> int:
"""
Delete materialized view by identifier.

TODO: Generalize, see `RetentionPolicyStore`.
"""
table = self.table
constraint = table.c.id == identifier
deletable = sa.delete(table).where(constraint)
result = self.execute(deletable)
self.synchronize()
if result.rowcount == 0:
logger.warning(f"Materialized view not found with id: {identifier}")
return result.rowcount

def execute(self, statement) -> Result:
"""
Execute SQL statement, and return result object.

TODO: Generalize, see `RetentionPolicyStore`.
"""
with Session(self.database.engine) as session:
result = session.execute(statement)
session.commit()
return result

def query(self, statement) -> t.List[t.Dict]:
"""
Execute SQL statement, fetch result rows, and return them converted to dictionaries.

TODO: Generalize, see `RetentionPolicyStore`.
"""
cursor = self.execute(statement)
rows = cursor.mappings().fetchall()
records = [dict(row.items()) for row in rows]
return records

def exists(self, mview: MaterializedView):
"""
Check if retention policy for specific table already exists.

TODO: Generalize, see `RetentionPolicyStore`.
"""
table = self.table
selectable = sa.select(table).where(
table.c.table_schema == mview.table_schema,
table.c.table_name == mview.table_name,
)
result = self.query(selectable)
return bool(result)

def synchronize(self):
"""
Synchronize data by issuing `REFRESH TABLE` statement.
"""
sql = f"REFRESH TABLE {self.settings.materialized_table.fullname};"
self.database.run_sql(sql)
Loading
Loading