Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Init MLflow support #2

Merged
merged 38 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
3c62ef5
Init MLflow support
mwiewior Apr 25, 2023
a22d632
Fixing linters
mwiewior Apr 25, 2023
2d0d397
Addin support in starter
mwiewior Apr 25, 2023
150a48d
Fix for isort
mwiewior Apr 25, 2023
54f5b60
Fixing Unit tests
mwiewior Apr 25, 2023
6e91f0f
Fixing Unit tests
mwiewior Apr 25, 2023
b5c3947
Fixing ut linting
mwiewior Apr 25, 2023
5d65a8b
Applying comments
mwiewior May 15, 2023
7435c6e
Passing mlflow config
mwiewior Jun 7, 2023
9480fe1
mlflow stage_name
mwiewior Jun 7, 2023
7349e15
mlflow stage_name
mwiewior Jun 7, 2023
c880eb5
mlflow stage_name
mwiewior Jun 7, 2023
83dcd45
Adding mlflow_helpers
mwiewior Jun 7, 2023
6daf8d2
Fixing stage name
mwiewior Jun 7, 2023
115a60d
Removing eval
mwiewior Jun 8, 2023
5d64d40
Adding pipeline name to each task/node
mwiewior Jun 8, 2023
94647ca
Docs update
mwiewior Jun 8, 2023
acccc84
Doc for implementation details
mwiewior Jun 8, 2023
7dc2061
UDF inference
mwiewior Jun 12, 2023
57abf33
Adding run finalizer hook
mwiewior Jun 14, 2023
85eea44
Starter updates for MLflow
mwiewior Jun 15, 2023
d81b84d
Fixes for metrics and model upload
mwiewior Jun 15, 2023
013522f
Jinja cookie cutting corrections for mlflow enablement
Lasica Jun 16, 2023
2c31cc3
limiting kedro version to 0.18.8 because of dataset bugs
Lasica Jun 16, 2023
ddc021b
removed extra _ in name
Lasica Jun 16, 2023
f70bc4b
Un-hardcode MLflow config
marrrcin Jun 19, 2023
062273e
fix calling mlflow procedure
Lasica Jun 19, 2023
efec3dc
fixing mlflow task name
Lasica Jun 19, 2023
f223c07
updated docs
Lasica Jun 19, 2023
99674ae
changelog
Lasica Jun 19, 2023
d44034c
added spellcheck to precommit, fixed spellcheck issues
Lasica Jun 19, 2023
637c73a
docs: updated placeholder
Lasica Jun 19, 2023
9d0a63b
refactor: fixed typo in function name
Lasica Jun 19, 2023
5ec874d
refactor: Changed enable mlflow param to allow lowercase
Lasica Jun 19, 2023
b506515
docs: added link to mlflow snowflake integration
Lasica Jun 19, 2023
587b848
docs: fix syntax highlight
Lasica Jun 19, 2023
84335f7
Merge branch 'develop' into feature/mlflow-support
Lasica Jun 20, 2023
9d990f9
docs: spellcheck dict
Lasica Jun 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/images/mlflow-support.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Welcome to Kedro Snowflake plugin documentation!
Quickstart <source/03_quickstart.rst>
Data Assets <source/04_data_assets.rst>
Development <source/05_development.md>
MLflow support <source/06_mlflow.md>


Indices and tables
Expand Down
88 changes: 88 additions & 0 deletions docs/source/06_mlflow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# [Beta] MLflow support

## High level architecture
The key challenge is to provide access to the external service endpoints (like MLflow)
that is currently not yet supported natively in Snowpark (External Access feature is on the Snowflake roadmap). Snowflake external
functions are the preferred workaround.
![MLflow and Kedro-snowflake](../images/mlflow-support.png)

## Implementation details
Kedro-Snowflake <-> MLflow integration is based on the following concepts:
* [Snowflake external functions](https://docs.snowflake.com/en/sql-reference/external-functions-introduction) that
are used for wrapping POST requests to the MLflow instance. In the minimal setup the following wrapping external functions for MLflow REST API calls must be created:
* [Create run](https://mlflow.org/docs/latest/rest-api.html#create-run)
* [Update run](https://mlflow.org/docs/latest/rest-api.html#update-run)
* [Log param](https://mlflow.org/docs/latest/rest-api.html#log-param)
* [Log metric](https://mlflow.org/docs/latest/rest-api.html#log-metric)
* [Search experiment](https://mlflow.org/docs/latest/rest-api.html#search-experiments)
* [Snowflake externa function translators](https://docs.snowflake.com/en/sql-reference/external-functions-translators) for
changing the format of the data sent/received from the MLflow instance.
* [Snowflake API integration](https://docs.snowflake.com/en/sql-reference/sql/create-api-integration) for setting up
a communication channel from the Snowflake instance to the cloud HTTPS proxy/gateway service
where your MLflow instance is hosted (e.g. Amazon API Gateway, Google Cloud API Gateway or Azure API Management).
* [Snowflake storage integration](https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration) to enable
your Snowflake instance to upload artifacts (e.g. serialized models) to the cloud storage (Amazon S3, Azure Blob Storage, Google Cloud Storage) used by the
MLflow instance.
## Configuration example

```yaml
mlflow:
# MLflow experiment name for tracking runs
experiment_name: demo-mlops
stage: "@MLFLOW_STAGE"
# Snowflake external functions needed for calling MLflow instance
functions:
experiment_get_by_name: demo.demo.mlflow_experiment_get_by_name
run_create: demo.demo.mlflow_run_create
run_update: demo.demo.mlflow_run_update
run_log_metric: demo.demo.mlflow_run_log_metric
run_log_parameter: demo.demo.mlflow_run_log_parameter
```

## Kedro starter
The provided Kedro starter (Snowflights) has a builtin MLflow support.
You can enable it during the project setup, i.e.:
```bash
TBD
Lasica marked this conversation as resolved.
Show resolved Hide resolved
```

## Deployment to Snowflake and inference

### Deployment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

### Inference with User Defined Function (UDF)
```sql
select
MLFLOW$SNOWFLIGHTS_MODEL(
"engines",
"passenger_capacity",
"crew",
"d_check_complete",
"moon_clearance_complete",
"iata_approved",
"company_rating",
"review_scores_rating"
) AS price
from
(
select
1 as "engines",
100 as "passenger_capacity",
5 as "crew",
true as "d_check_complete",
true as "moon_clearance_complete",
true as "iata_approved",
10.0 as "company_rating",
5.0 as "review_scores_rating"
union all
select
2,
20,
5,
false,
false,
false,
3.0,
5.0
);
```
1 change: 1 addition & 0 deletions docs/spellcheck_exceptions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,4 @@ kedroazureml
ly
svg
MLOps
natively
31 changes: 31 additions & 0 deletions kedro_snowflake/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def check_credentials(cls, values):
class DependenciesConfig(BaseModel):
packages: List[str] = [
"snowflake-snowpark-python",
"mlflow",
"cachetools",
"pluggy",
"PyYAML==6.0",
Expand Down Expand Up @@ -80,9 +81,25 @@ class SnowflakeRuntimeConfig(BaseModel):
pipeline_name_mapping: Optional[Dict[str, str]] = {"__default__": "default"}


class MLflowFunctionsConfig(BaseModel):
experiment_get_by_name: str = "mlflow_experiment_get_by_name"
run_create: str = "mlflow_run_create"
run_update: str = "mlflow_run_update"
run_log_metric: str = "mlflow_run_log_metric"
run_log_parameter: str = "mlflow_run_log_parameter"


class SnowflakeMLflowConfig(BaseModel):
experiment_name: Optional[str]
functions: MLflowFunctionsConfig
run_id: Optional[str]
stage: Optional[str]


class SnowflakeConfig(BaseModel):
connection: SnowflakeConnectionConfig
runtime: SnowflakeRuntimeConfig
mlflow: SnowflakeMLflowConfig


class KedroSnowflakeConfig(BaseModel):
Expand Down Expand Up @@ -136,6 +153,7 @@ class KedroSnowflakeConfig(BaseModel):
# https://repo.anaconda.com/pkgs/snowflake/
packages:
- snowflake-snowpark-python
- mlflow
- cachetools
- pluggy
- PyYAML==6.0
Expand All @@ -152,9 +170,22 @@ class KedroSnowflakeConfig(BaseModel):
- more-itertools
- openpyxl
- backoff
- pydantic
# Optionally provide mapping for user-friendly pipeline names
pipeline_name_mapping:
__default__: default
# EXPERIMENTAL: Either MLflow experiment name to enable MLflow tracking
# or leave empty
mlflow:
experiment_name: ~
stage: ~
# Snowflake external functions needed for calling MLflow instance
functions:
experiment_get_by_name: mlflow_experiment_get_by_name
run_create: mlflow_run_create
run_update: mlflow_run_update
run_log_metric: mlflow_run_log_metric
run_log_parameter: mlflow_run_log_parameter
""".strip()

# This auto-validates the template above during import
Expand Down
123 changes: 112 additions & 11 deletions kedro_snowflake/generator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import os
import re
Expand Down Expand Up @@ -59,6 +60,9 @@ def __init__(
self.config = config
self.pipeline_name = pipeline_name
self.extra_env = extra_env
self.mlflow_enabled = (
True if self.config.snowflake.mlflow.experiment_name else False
)

def _get_pipeline_name_for_snowflake(self):
return (self.config.snowflake.runtime.pipeline_name_mapping or {}).get(
Expand All @@ -78,7 +82,9 @@ def _generate_task_sql(
warehouse=self.connection_parameters["warehouse"],
after_tasks=",".join(after_tasks),
task_body=self.TASK_BODY_TEMPLATE.format(
root_task_name=self._root_task_name,
root_task_name=self._root_task_name
if not self.mlflow_enabled
else self._mlflow_root_task_name,
environment=self.kedro_environment,
sproc_name=self.SPROC_NAME,
pipeline_name=pipeline_name,
Expand All @@ -101,31 +107,71 @@ def _generate_root_task_sql(self):
schedule=self.config.snowflake.runtime.schedule,
)

def _sanitize_node_name(self, node_name: str) -> str:
return re.sub(r"\W", "_", node_name)
def _generate_root_task_suspend_sql(self):
return """
alter task {task_name} suspend;
""".strip().format(
task_name=self._root_task_name
)

def _generate_mlflow_drop_task_sql(self):
return """
drop task if exists {task_name};
""".strip().format(
task_name=self._mlflow_root_task_name
)

def _generate_mlflow_root_task_sql(self):
return """
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-use self.TASK_TEMPLATE?

create or replace task {task_name}
warehouse = '{warehouse}'
after {after_task}
as
call {root_sproc}();
""".strip().format(
task_name=self._mlflow_root_task_name,
warehouse=self.connection_parameters["warehouse"],
root_sproc=self._mlfow_root_sproc_name,
after_task=self._root_task_name,
)

def _standardize_node_name(self, node_name: str) -> str:
sanity_node_name = re.sub(r"\W", "_", node_name)
return f"kedro_{self._get_pipeline_name_for_snowflake()}_{sanity_node_name}"

def _generate_snowflake_tasks_sql(
self,
pipeline: Pipeline,
) -> List[str]:
sql_statements = [self._generate_root_task_sql()]
sql_statements = [
self._generate_root_task_sql(),
self._generate_root_task_suspend_sql(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be immediately execute suspend after execute? Will it work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not ? For a brand new pipeline it is always suspended - so does not take any effect - for a replaced one it should just change the state ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe it would make sense to always drop a whole pipeline and then recreate it from scratch - it would then fix #3 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be, let's discuss it.
For the suspend vs execute - wouldn't the pipeline stop executing, because of the "suspend"?

]
if self.mlflow_enabled:
sql_statements.append(self._generate_mlflow_root_task_sql())
else:
sql_statements.append(self._generate_mlflow_drop_task_sql())

node_dependencies = (
pipeline.node_dependencies
) # <-- this one is not topological
for node in pipeline.nodes: # <-- this one is topological
after_tasks = [self._root_task_name] + [
self._sanitize_node_name(n.name) for n in node_dependencies[node]
f"{self._standardize_node_name(n.name)}"
for n in node_dependencies[node]
]
if self.mlflow_enabled:
after_tasks.append(self._mlflow_root_task_name)
sql_statements.append(
self._generate_task_sql(
self._sanitize_node_name(node.name),
self._standardize_node_name(node.name),
after_tasks,
self.pipeline_name,
[node.name],
self.extra_params,
)
)

return sql_statements

def _generate_task_execute_sql(self):
Expand All @@ -137,14 +183,25 @@ def _generate_task_execute_sql(self):

@property
def _root_task_name(self):
root_task_name = f"kedro_snowflake_start_{self._get_pipeline_name_for_snowflake()}_task".upper()
root_task_name = (
f"kedro_{self._get_pipeline_name_for_snowflake()}_start_task".upper()
)
return root_task_name

@property
def _root_sproc_name(self):
return (
f"kedro_snowflake_start_{self._get_pipeline_name_for_snowflake()}".upper()
def _mlflow_root_task_name(self):
mlflow_root_task_name = (
f"kedro_{self._get_pipeline_name_for_snowflake()}_mlflow_start_task".upper()
)
return mlflow_root_task_name

@property
def _root_sproc_name(self):
return f"kedro_{self._get_pipeline_name_for_snowflake()}_start".upper()

@property
def _mlfow_root_sproc_name(self):
Lasica marked this conversation as resolved.
Show resolved Hide resolved
return f"kedro_{self._get_pipeline_name_for_snowflake()}_start_mlflow".upper()

def generate(self) -> KedroSnowflakePipeline:
"""Generate a SnowflakePipeline object from a Kedro pipeline.
Expand Down Expand Up @@ -202,6 +259,13 @@ def generate(self) -> KedroSnowflakePipeline:
snowflake_stage_name
)

if self.mlflow_enabled:
mlflow_root_sproc = ( # noqa: F841
self._construct_kedro_snowflake_mlflow_root_sproc(
snowflake_stage_name
)
)

logger.info("Creating Kedro Snowflake Sproc")
snowflake_sproc = self._construct_kedro_snowflake_sproc(
imports=self._generate_imports_for_sproc(
Expand All @@ -219,7 +283,7 @@ def generate(self) -> KedroSnowflakePipeline:
pipeline_sql_statements,
self._generate_task_execute_sql(),
self._root_task_name,
[self._sanitize_node_name(n.name) for n in pipeline.nodes],
[self._standardize_node_name(n.name) for n in pipeline.nodes],
)

def _generate_imports_for_sproc(self, dependencies_dir, snowflake_stage_name):
Expand Down Expand Up @@ -270,6 +334,43 @@ def _drop_and_recreate_stages(self, *stages):
def snowflake_session(self):
return Session.builder.configs(self.connection_parameters).create()

def _construct_kedro_snowflake_mlflow_root_sproc(self, stage_location: str):
experiment_name = self.config.snowflake.mlflow.experiment_name
experiment_get_by_name_func = (
self.config.snowflake.mlflow.functions.experiment_get_by_name
)
run_create_func = self.config.snowflake.mlflow.functions.run_create
experiment_id = (
self.snowflake_session.sql(
f"SELECT {experiment_get_by_name_func}('{experiment_name}'):body.experiments[0].experiment_id"
).collect()[0][0]
).strip(" \"'\t\r\n")
mlflow_config = self.config.snowflake.mlflow.dict()

def mlflow_start_run(session: Session) -> str:
run_id = (
session.sql(
f"SELECT {run_create_func}({experiment_id}):body.run.info.run_id"
).collect()[0][0]
).strip(" \"'\t\r\n")
mlflow_config["run_id"] = run_id
mlflow_config_json = json.dumps(mlflow_config)
session.sql(
f"call system$set_return_value('{mlflow_config_json}');"
).collect()
return run_id
Lasica marked this conversation as resolved.
Show resolved Hide resolved

return sproc(
func=mlflow_start_run,
name=self._mlfow_root_sproc_name,
is_permanent=True,
replace=True,
stage_location=stage_location,
packages=["snowflake-snowpark-python"],
execute_as="caller",
session=self.snowflake_session,
)

def _construct_kedro_snowflake_root_sproc(self, stage_location: str):
def kedro_start_run(session: Session) -> str:
from uuid import uuid4
Expand Down
8 changes: 5 additions & 3 deletions kedro_snowflake/starters/snowflights/cookiecutter.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
"snowflake_account": "",
"snowflake_user": "",
"snowflake_warehouse": "",
"snowflake_database": "KEDRO",
"snowflake_schema": "PUBLIC",
"snowflake_password_env_variable": "SNOWFLAKE_PASSWORD"
"snowflake_database": "DEMO",
"snowflake_schema": "DEMO",
"snowflake_password_env_variable": "SNOWFLAKE_PASSWORD",
"pipeline_name": "default",
"enable_mlflow_integration": "False"
}
Loading