Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add adapter wrapper for MLflow/CrateDB, based on monkeypatching #5

Merged
merged 18 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ coverage.xml
*.egg-info
*.pyc
__pycache__

/mlartifacts
/mlruns
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
recursive-include mlflow_cratedb *.sql
25 changes: 23 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

## About

A wrapper around [MLflow] to use [CrateDB] as storage database for [MLflow Tracking].
An adapter wrapper for [MLflow] to use [CrateDB] as a storage database
for [MLflow Tracking].


## Setup
Expand All @@ -16,7 +17,27 @@ pip install --upgrade 'git+https://github.com/crate-workbench/mlflow-cratedb'

## Usage

TODO.
In order to spin up a CrateDB instance without further ado, you can use
Docker or Podman.
```shell
docker run --rm -it --publish=4200:4200 --publish=5432:5432 \
--env=CRATE_HEAP_SIZE=4g crate \
-Cdiscovery.type=single-node \
-Ccluster.routing.allocation.disk.threshold_enabled=false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that its a good idea to advertise disabling this threshold in examples. If disabled, writing data into CrateDB can lead to a machine crash due out-of-disk-space issues.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, true. I will remove it. It slipped in because I am running it this way because apparently my disk is always full, or pretends to be.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed with 9b02b1a.

```

Start the MLflow server, pointing it to your [CrateDB] instance,
running on `localhost`.
```shell
mlflow-cratedb server --backend-store-uri='crate://crate@localhost/?schema=mlflow' --dev
```

Please note that you need to invoke the `mlflow-cratedb` command, which
runs MLflow amalgamated with the necessary changes to support CrateDB.

Also note that we recommend to use a dedicated schema for storing MLflows
tables. In that spirit, the default schema `"doc"` is not populated by
tables of 3rd-party systems.


## Development
Expand Down
7 changes: 7 additions & 0 deletions mlflow_cratedb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from mlflow.utils import logging_utils

from mlflow_cratedb.monkey import patch_all

# Enable logging, and activate monkeypatch.
logging_utils.enable_logging()
patch_all()
Empty file.
61 changes: 61 additions & 0 deletions mlflow_cratedb/adapter/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import importlib.resources

import sqlalchemy as sa
import sqlparse
from sqlalchemy.event import listen


def _setup_db_create_tables(engine: sa.Engine):
"""
Because CrateDB does not play well with a full-fledged SQLAlchemy data model and
corresponding Alembic migrations, shortcut that and replace it with a classic
database schema provisioning based on SQL DDL.

It will cause additional maintenance, but well, c'est la vie.

TODO: Currently, the path is hardcoded to `cratedb.sql`.
"""
schema_name = engine.url.query.get("schema")
schema_prefix = ""
if schema_name is not None:
schema_prefix = f'"{schema_name}".'
with importlib.resources.path("mlflow_cratedb.adapter", "ddl") as ddl:
sql_file = ddl.joinpath("cratedb.sql")
sql_statements = sql_file.read_text().format(schema_prefix=schema_prefix)
with engine.connect() as connection:
for statement in sqlparse.split(sql_statements):
connection.execute(sa.text(statement))


def _setup_db_drop_tables():
"""
TODO: Not implemented yet.
"""
pass


def enable_refresh_after_dml():
"""
Run `REFRESH TABLE <tablename>` after each INSERT, UPDATE, and DELETE operation.

CrateDB is eventually consistent, i.e. write operations are not flushed to
disk immediately, so readers may see stale data. In a traditional OLTP-like
application, this is not applicable.

This SQLAlchemy extension makes sure that data is synchronized after each
operation manipulating data.
"""
from mlflow.store.db.base_sql_model import Base

for mapper in Base.registry.mappers:
listen(mapper.class_, "after_insert", do_refresh)
listen(mapper.class_, "after_update", do_refresh)
listen(mapper.class_, "after_delete", do_refresh)


def do_refresh(mapper, connection, target):
"""
SQLAlchemy event handler for `after_{insert,update,delete}` events, invoking `REFRESH TABLE`.
"""
sql = f"REFRESH TABLE {target.__tablename__}"
connection.execute(sa.text(sql))
137 changes: 137 additions & 0 deletions mlflow_cratedb/adapter/ddl/cratedb.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
CREATE TABLE IF NOT EXISTS {schema_prefix}"datasets" (
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea with the search_path, I think the code in this repository is already using that technique partly through SQLAlchemy.

Concluding that, I may also think, that when using that here again, we would not need to populate the schema name (here: {schema_prefix}) into the SQL DDL statements at all, so the code could be simplified?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concluding that, I may also think, that when using that here again, we would not need to populate the schema name (here: {schema_prefix}) into the SQL DDL statements at all, so the code could be simplified?

Yes right.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved with 3488790.

"dataset_uuid" TEXT NOT NULL,
"experiment_id" BIGINT NOT NULL,
"name" TEXT NOT NULL,
"digest" TEXT NOT NULL,
"dataset_source_type" TEXT NOT NULL,
"dataset_source" TEXT NOT NULL,
"dataset_schema" TEXT,
"dataset_profile" TEXT,
PRIMARY KEY ("experiment_id", "name", "digest")
);

CREATE TABLE IF NOT EXISTS {schema_prefix}"experiment_tags" (
"key" TEXT NOT NULL,
"value" TEXT,
"experiment_id" BIGINT NOT NULL,
PRIMARY KEY ("key", "experiment_id")
);

CREATE TABLE IF NOT EXISTS {schema_prefix}"experiments" (
"experiment_id" BIGINT NOT NULL, -- default=autoincrement
"name" TEXT NOT NULL,
"artifact_location" TEXT,
"lifecycle_stage" TEXT,
"creation_time" BIGINT, -- default=get_current_time_millis
"last_update_time" BIGINT, -- default=get_current_time_millis
PRIMARY KEY ("experiment_id")
);

CREATE TABLE IF NOT EXISTS {schema_prefix}"inputs" (
"input_uuid" TEXT NOT NULL,
"source_type" TEXT NOT NULL,
"source_id" TEXT NOT NULL,
"destination_type" TEXT NOT NULL,
"destination_id" TEXT NOT NULL,
PRIMARY KEY ("source_type", "source_id", "destination_type", "destination_id")
);

CREATE TABLE IF NOT EXISTS {schema_prefix}"input_tags" (
"input_uuid" TEXT NOT NULL,
"name" TEXT NOT NULL,
"value" TEXT NOT NULL,
PRIMARY KEY ("input_uuid", "name")
);

CREATE TABLE IF NOT EXISTS {schema_prefix}"latest_metrics" (
"key" TEXT NOT NULL,
"value" REAL NOT NULL,
"timestamp" BIGINT NOT NULL,
"step" BIGINT NOT NULL,
"is_nan" BOOLEAN NOT NULL,
"run_uuid" TEXT NOT NULL,
PRIMARY KEY ("key", "run_uuid")
);

CREATE TABLE IF NOT EXISTS {schema_prefix}"metrics" (
"key" TEXT NOT NULL,
"value" REAL NOT NULL,
"timestamp" BIGINT NOT NULL,
"step" BIGINT NOT NULL,
"is_nan" BOOLEAN NOT NULL,
"run_uuid" TEXT NOT NULL,
PRIMARY KEY ("key", "timestamp", "step", "run_uuid", "is_nan")
);

CREATE TABLE IF NOT EXISTS {schema_prefix}"model_versions" (
"name" TEXT NOT NULL,
"version" INTEGER NOT NULL,
"creation_time" BIGINT, -- default=get_current_time_millis
"last_update_time" BIGINT, -- default=get_current_time_millis
"description" TEXT,
"user_id" TEXT,
"current_stage" TEXT,
"source" TEXT,
"run_id" TEXT,
"run_link" TEXT,
"status" TEXT,
"status_message" TEXT
);

CREATE TABLE IF NOT EXISTS {schema_prefix}"model_version_tags" (
"name" TEXT NOT NULL,
"version" INTEGER NOT NULL,
"key" TEXT NOT NULL,
"value" TEXT
);

CREATE TABLE IF NOT EXISTS {schema_prefix}"params" (
"key" TEXT NOT NULL,
"value" TEXT NOT NULL,
"run_uuid" TEXT NOT NULL,
PRIMARY KEY ("key", "run_uuid")
);

CREATE TABLE IF NOT EXISTS {schema_prefix}"registered_models" (
"name" TEXT NOT NULL,
"key" TEXT NOT NULL,
"value" TEXT
);

CREATE TABLE IF NOT EXISTS {schema_prefix}"registered_model_aliases" (
"name" TEXT NOT NULL,
"alias" TEXT NOT NULL,
"version" TEXT NOT NULL
);

CREATE TABLE IF NOT EXISTS {schema_prefix}"registered_model_tags" (
"name" TEXT NOT NULL,
"creation_time" BIGINT, -- default=get_current_time_millis
"last_update_time" BIGINT, -- default=get_current_time_millis
"description" TEXT
);

CREATE TABLE IF NOT EXISTS {schema_prefix}"runs" (
"run_uuid" TEXT NOT NULL,
"name" TEXT,
"source_type" TEXT,
"source_name" TEXT,
"entry_point_name" TEXT,
"user_id" TEXT,
"status" TEXT,
"start_time" BIGINT,
"end_time" BIGINT,
"deleted_time" BIGINT,
"source_version" TEXT,
"lifecycle_stage" TEXT,
"artifact_uri" TEXT,
"experiment_id" BIGINT,
PRIMARY KEY ("run_uuid")
);

CREATE TABLE IF NOT EXISTS {schema_prefix}"tags" (
"key" TEXT NOT NULL,
"value" TEXT,
"run_uuid" TEXT NOT NULL,
PRIMARY KEY ("key", "run_uuid")
);
15 changes: 15 additions & 0 deletions mlflow_cratedb/adapter/ddl/drop.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
DROP TABLE IF EXISTS {schema_prefix}"datasets";
DROP TABLE IF EXISTS {schema_prefix}"experiment_tags";
DROP TABLE IF EXISTS {schema_prefix}"experiments";
DROP TABLE IF EXISTS {schema_prefix}"inputs";
DROP TABLE IF EXISTS {schema_prefix}"input_tags";
DROP TABLE IF EXISTS {schema_prefix}"latest_metrics";
DROP TABLE IF EXISTS {schema_prefix}"metrics";
DROP TABLE IF EXISTS {schema_prefix}"model_versions";
DROP TABLE IF EXISTS {schema_prefix}"model_version_tags";
DROP TABLE IF EXISTS {schema_prefix}"params";
DROP TABLE IF EXISTS {schema_prefix}"registered_models";
DROP TABLE IF EXISTS {schema_prefix}"registered_model_aliases";
DROP TABLE IF EXISTS {schema_prefix}"registered_model_tags";
DROP TABLE IF EXISTS {schema_prefix}"runs";
DROP TABLE IF EXISTS {schema_prefix}"tags";
8 changes: 8 additions & 0 deletions mlflow_cratedb/adapter/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from vasuki import generate_nagamani19_int


def generate_unique_integer() -> int:
"""
Produce a short, unique, non-sequential identifier based on Hashids.
"""
return generate_nagamani19_int(size=10)
2 changes: 2 additions & 0 deletions mlflow_cratedb/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Intercept CLI entrypoint for monkeypatching.
from mlflow.cli import cli # noqa: F401
23 changes: 23 additions & 0 deletions mlflow_cratedb/monkey/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import logging

from mlflow_cratedb.monkey.db_types import patch_dbtypes
from mlflow_cratedb.monkey.db_utils import patch_db_utils
from mlflow_cratedb.monkey.environment_variables import patch_environment_variables
from mlflow_cratedb.monkey.models import patch_models
from mlflow_cratedb.monkey.server import patch_run_server

logger = logging.getLogger("mlflow")

ANSI_YELLOW = "\033[93m"
ANSI_RESET = "\033[0m"


def patch_all():
logger.info(f"{ANSI_YELLOW}Amalgamating MLflow for CrateDB{ANSI_RESET}")
logger.debug("To undo that, run `pip uninstall mlflow-cratedb`")

patch_environment_variables()
patch_models()
patch_dbtypes()
patch_db_utils()
patch_run_server()
10 changes: 10 additions & 0 deletions mlflow_cratedb/monkey/db_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
def patch_dbtypes():
"""
Register CrateDB as available database type.
"""
import mlflow.store.db.db_types as db_types

db_types.CRATEDB = "crate"

if db_types.CRATEDB not in db_types.DATABASE_ENGINES:
db_types.DATABASE_ENGINES.append(db_types.CRATEDB)
55 changes: 55 additions & 0 deletions mlflow_cratedb/monkey/db_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import functools
import typing as t

import sqlalchemy as sa

from mlflow_cratedb.adapter.db import enable_refresh_after_dml


def patch_db_utils():
import mlflow.store.db.utils as db_utils

db_utils._initialize_tables = _initialize_tables
db_utils._verify_schema = _verify_schema


@functools.cache
def _initialize_tables(engine: sa.Engine):
"""
Skip SQLAlchemy schema provisioning and Alembic migrations.
Both don't play well with CrateDB.
"""
from mlflow.store.db.utils import _logger

from mlflow_cratedb.adapter.db import _setup_db_create_tables

enable_refresh_after_dml()
patch_sqlalchemy_inspector(engine)
_logger.info("Creating initial MLflow database tables...")
_setup_db_create_tables(engine)


def _verify_schema(engine: sa.Engine):
"""
Skipping Alembic, that's a no-op.
"""
pass


def patch_sqlalchemy_inspector(engine: sa.Engine):
"""
When using `get_table_names()`, make sure the correct schema name gets used.
Copy link
Member

@seut seut Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't adjusting the search_path a better way to set a default schema to use?

Copy link
Member Author

@amotl amotl Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for spotting. I observed some flaws here, but I will revisit it to find out why the search_path may not be honored here. I think the other parts of the application are already using that technique well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, patching that function was unnecessary, and got removed with 186c9e4 again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi again. We needed to bring back this patch with e8acfc2. #12 (commits) has more details.


TODO: Submit this to SQLAlchemy?
"""
get_table_names_dist = engine.dialect.get_table_names
schema_name = engine.url.query.get("schema")
if isinstance(schema_name, tuple):
schema_name = schema_name[0]

def get_table_names(connection: sa.Connection, schema: t.Optional[str] = None, **kw: t.Any) -> t.List[str]:
if schema is None:
schema = schema_name
return get_table_names_dist(connection=connection, schema=schema, **kw)

engine.dialect.get_table_names = get_table_names # type: ignore
8 changes: 8 additions & 0 deletions mlflow_cratedb/monkey/environment_variables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
def patch_environment_variables():
"""
Do not send multiple retrying HTTP requests only if connection is unstable.
"""
import mlflow.environment_variables as envvars
from mlflow.environment_variables import _EnvironmentVariable

envvars.MLFLOW_HTTP_REQUEST_MAX_RETRIES = _EnvironmentVariable("MLFLOW_HTTP_REQUEST_MAX_RETRIES", int, 0)
Loading