Skip to content

Commit

Permalink
Transparently invoke REFRESH TABLE after inserts, updates, and deletes
Browse files Browse the repository at this point in the history
Remove previous strategy, where hooking into the corresponding database
wrapper function was extremely invasive, and not sustainable.
  • Loading branch information
amotl committed Sep 7, 2023
1 parent 14d7c0d commit fb8c740
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 58 deletions.
28 changes: 28 additions & 0 deletions mlflow_cratedb/adapter/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import sqlalchemy as sa
import sqlparse
from sqlalchemy.event import listen


def _setup_db_create_tables(engine: sa.Engine):
Expand Down Expand Up @@ -31,3 +32,30 @@ def _setup_db_drop_tables():
TODO: Not implemented yet.
"""
pass


def enable_refresh_after_dml():
"""
Run `REFRESH TABLE <tablename>` after each INSERT, UPDATE, and DELETE operation.
CrateDB is eventually consistent, i.e. write operations are not flushed to
disk immediately, so readers may see stale data. In a traditional OLTP-like
application, this is not applicable.
This SQLAlchemy extension makes sure that data is synchronized after each
operation manipulating data.
"""
from mlflow.store.db.base_sql_model import Base

for mapper in Base.registry.mappers:
listen(mapper.class_, "after_insert", do_refresh)
listen(mapper.class_, "after_update", do_refresh)
listen(mapper.class_, "after_delete", do_refresh)


def do_refresh(mapper, connection, target):
"""
SQLAlchemy event handler for `after_{insert,update,delete}` events, invoking `REFRESH TABLE`.
"""
sql = f"REFRESH TABLE {target.__tablename__}"
connection.execute(sa.text(sql))
2 changes: 0 additions & 2 deletions mlflow_cratedb/monkey/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from mlflow_cratedb.monkey.environment_variables import patch_environment_variables
from mlflow_cratedb.monkey.models import patch_models
from mlflow_cratedb.monkey.server import patch_run_server
from mlflow_cratedb.monkey.tracking import patch_sqlalchemy_store

logger = logging.getLogger("mlflow")

Expand All @@ -19,7 +18,6 @@ def patch_all():

patch_environment_variables()
patch_models()
patch_sqlalchemy_store()
patch_dbtypes()
patch_db_utils()
patch_run_server()
3 changes: 3 additions & 0 deletions mlflow_cratedb/monkey/db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import sqlalchemy as sa

from mlflow_cratedb.adapter.db import enable_refresh_after_dml


def patch_db_utils():
import mlflow.store.db.utils as db_utils
Expand All @@ -21,6 +23,7 @@ def _initialize_tables(engine: sa.Engine):

from mlflow_cratedb.adapter.db import _setup_db_create_tables

enable_refresh_after_dml()
patch_sqlalchemy_inspector(engine)
_logger.info("Creating initial MLflow database tables...")
_setup_db_create_tables(engine)
Expand Down
56 changes: 0 additions & 56 deletions mlflow_cratedb/monkey/tracking.py

This file was deleted.

0 comments on commit fb8c740

Please sign in to comment.