From 62d349d0532ff2656deba81c4a168f7028ca4973 Mon Sep 17 00:00:00 2001 From: Sergey Serebryakov Date: Thu, 15 Dec 2022 11:43:26 -0800 Subject: [PATCH 1/4] ## Introduction This commit implements one possible version of what a CMF fluent API can look like. It tries to achieve the following goals: - Remove some rarely used features from public API (such as typed parameters for pipelines and steps). - Automatically create steps if none are present when users call fluent API (e.g., `log_dataset`). - Initialize CMF in different usage contexts, for instance, retrieve initialization parameters from environment variables. - Automatically identify artifact association with steps (consumed/produced) in certain usage scenarios. ## Example Assuming a user has developed four functions - `fetch`, `preprocess`, `train` and `test`, the following is the example of CMF fluent API: ```python import cmflib.contrib.fluent as cmf cmf.set_cmf_parameters(filename='mlmd', graph=False) for step in (fetch, preprocess, train, test): with cmf.start_step(pipeline='my_pipeline', step=step.__name__): step() ``` ## API methods Methods can be categorized into three buckets: - Set CMF parameters (`set_cmf_parameters`). These parameters control CMF initialization, and do not include information about pipelines, steps and executions. - Start/end steps (`start_step` and `end_step`). These methods start a new pipeline step and ends currently active pipeline steps. The `start_step` method returns an instance of the `Step` class that can be used as a python context manager to automatically end steps. - Logging methods (`log_dataset`, `log_dataset_with_version`, `log_model`, `log_execution_metrics`, `log_metric` and `log_validation_output`). These methods log input/output artifacts. When these methods accept artifact URL, users can provide file system object instead (e.g., the one returned by `builtins.open` function). In this case, the association (input/output) is identified automatically, e.g.: ```python with open(_workspace / 'iris.pkl', 'rb') as stream: dataset: t.Dict = pickle.load(stream) cmf.log_dataset(stream) ``` All these methods will create a new step of one does not present. --- cmflib/contrib/fluent.py | 314 ++++++++++++++++++++++++++++++++ examples/fluent_api/pipeline.py | 98 ++++++++++ 2 files changed, 412 insertions(+) create mode 100644 cmflib/contrib/fluent.py create mode 100644 examples/fluent_api/pipeline.py diff --git a/cmflib/contrib/fluent.py b/cmflib/contrib/fluent.py new file mode 100644 index 00000000..b3a8f653 --- /dev/null +++ b/cmflib/contrib/fluent.py @@ -0,0 +1,314 @@ +import abc +import json +import os +from pathlib import Path +import typing as t +from cmflib.cmf import Cmf +import logging +import atexit + +""" +## Introduction + +This module implements one possible version of what a CMF fluent API can look like. It tries to achieve the following +goals: + + - Remove some rarely used features from public API (such as typed parameters for pipelines and steps). + - Automatically create steps if none are present when users call fluent API (e.g., `log_dataset`). + - Initialize CMF in different usage contexts, for instance, retrieve initialization parameters from environment + variables. + - Automatically identify artifact association with steps (consumed/produced) in certain usage scenarios. + +## Example + +Assuming a user has developed four functions - `fetch`, `preprocess`, `train` and `test`, the following is the example +of CMF fluent API: + +```python +import cmflib.contrib.fluent as cmf + +cmf.set_cmf_parameters(filename='mlmd', graph=False) +for step in (fetch, preprocess, train, test): + with cmf.start_step(pipeline='my_pipeline', step=step.__name__): + step() +``` + +## API methods + +Methods can be categorized into three buckets: +- Set CMF parameters (`set_cmf_parameters`). These parameters control CMF initialization, and do not include information + about pipelines, steps and executions. +- Start/end steps (`start_step` and `end_step`). These methods start a new pipeline step and ends currently active + pipeline steps. The `start_step` method returns an instance of the `Step` class that can be used as a python context + manager to automatically end steps. +- Logging methods (`log_dataset`, `log_dataset_with_version`, `log_model`, `log_execution_metrics`, `log_metric` and + `log_validation_output`). These methods log input/output artifacts. When these methods accept artifact URL, users + can provide file system object instead (e.g., the one returned by `builtins.open` function). In this case, + the association (input/output) is identified automatically, e.g.: + ```python + with open(_workspace / 'iris.pkl', 'rb') as stream: + dataset: t.Dict = pickle.load(stream) + cmf.log_dataset(stream) + ``` + All these methods will create a new step of one does not present. +""" + +logger = logging.getLogger('cmf.fluent') + +__all__ = [ + 'Step', + 'start_step', 'end_step', + 'log_dataset', 'log_dataset_with_version', + 'log_model', + 'log_execution_metrics', 'log_metric', + 'log_validation_output' +] + + +class Step: + """Object that contains parameters of an active step that can be used as a python context manager. + + It is used with `start_step` method: + ```python + with start_step(pipeline='mnist', step='preprocess', properties={'train_size': 0.7}): + train() + ``` + """ + + def __init__(self) -> None: + self._end_step: t.Callable = end_step + self.pipeline_info = {'name': _cmf.parent_context.name, 'id': _cmf.parent_context.id} + self.step_info = {'name': _cmf.child_context.name, 'id': _cmf.child_context.id} + self.step_exec_info = {'name': _cmf.execution.name, 'id': _cmf.execution.id} + + def __enter__(self) -> None: + """Nothing to do on enter.""" + ... + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + """End active step on exit.""" + self._end_step() + + +_cmf: t.Optional[Cmf] = None +"""If not None, active step is present.""" + +_cmf_params: t.Optional[t.Dict] = None +"""CMF initialization parameters (such as filename and graph) excluding pipeline parameters.""" + +_URL = t.Union[str, Path, t.IO] +_Event = t.Optional[str] +_Properties = t.Optional[t.Dict] + + +def set_cmf_parameters(filename: t.Optional[str] = None, graph: t.Optional[bool] = None) -> None: + """Set CMF initialization parameters.""" + global _cmf_params + if _cmf_params is None: + _cmf_params = {} + if filename is not None: + _cmf_params['filename'] = filename + if graph is not None: + _cmf_params['graph'] = graph + + +def start_step(pipeline: t.Optional[str] = None, step: t.Optional[str] = None, properties: t.Dict = None) -> Step: + """Initialize a new pipeline step. + + If active exists, it will be ended. The return object can be used as a context manager to automatically end the + step. The `CMF_FLUENT_INIT_METHOD` environment variable can specify initialization mechanism. Currently, the + following is supported: + - `env`: initialize step using environment variables. + + The following sequence defines parameter priorities (from highest to lowest): user -> env -> default. + """ + global _cmf + if _cmf is not None: + logger.warning("[start_step] ending active CMF step") + end_step() + + params = DefaultParams() + if os.environ.get('CMF_FLUENT_INIT_METHOD', None) == 'env': + params = EnvParams(params) + params = UserParams(params, pipeline, step, properties) + + _cmf = Cmf(**params.cmf_parameters(), pipeline_name=params.pipeline(), custom_properties=None) + _ = _cmf.create_context(pipeline_stage=params.step(), custom_properties=None) + _ = _cmf.create_execution(execution_type=params.step(), custom_properties=params.step_properties()) + + return Step() + + +def end_step() -> None: + """End active step if present. + + This method will commit metrics if there are any. + """ + global _cmf + if _cmf is not None: + for name in _cmf.metrics.keys(): + _cmf.commit_metrics(metrics_name=name) + _cmf = None + + +def log_dataset(url: _URL, event: _Event = None, properties: _Properties = None) -> None: + """Log dataset artifact.""" + _maybe_initialize_step() + url, event = _check_artifact_io(url, event) + _ = _cmf.log_dataset(url, event, properties) + + +def log_dataset_with_version(url: _URL, version: str, event: _Event = None, properties: _Properties = None) -> None: + """Log dataset artifact with version.""" + _maybe_initialize_step() + url, event = _check_artifact_io(url, event) + _ = _cmf.log_dataset_with_version(url, version, event, properties) + + +def log_model(path: _URL, event: _Event = None, model_framework: str = "Default", model_type: str = "Default", + model_name: str = "Default", properties: _Properties = None) -> None: + """Log model artifact.""" + _maybe_initialize_step() + url, event = _check_artifact_io(path, event) + _ = _cmf.log_model(path, event, model_framework, model_type, model_name, properties) + + +def log_execution_metrics(metrics_name: str, properties: _Properties = None) -> None: + """Log execution metrics.""" + _maybe_initialize_step() + _ = _cmf.log_execution_metrics(metrics_name, properties) + + +def log_metric(metrics_name: str, properties: _Properties = None) -> None: + """Log in-progress metrics.""" + _maybe_initialize_step() + _cmf.log_metric(metrics_name, properties) + + +def log_validation_output(version: str, properties: _Properties = None) -> None: + """Log model validation metrics.""" + _maybe_initialize_step() + _ = _cmf.log_validation_output(version, properties) + + +def _maybe_initialize_step() -> None: + """Start a new pipeline step if none exists.""" + global _cmf + if _cmf is None: + _ = start_step() + + +def _check_artifact_io(url: _URL, event: t.Optional[str] = None) -> t.Tuple[str, str]: + """Convert artifact `url` to string and maybe identify its direction (input/output). + """ + if isinstance(url, t.IO): + event = 'input' if 'r' in url.mode else 'output' + url = Path(url.name).absolute() + if isinstance(url, Path): + url = url.absolute().as_posix() + return url, event + + +@atexit.register +def unload(): + """Callback function to end the active step if present when this module is unloaded.""" + end_step() + + +class Params(abc.ABC): + """Base class that defines APIs to retrieve pipeline and step parameters.""" + + def __init__(self, parent: t.Optional['Params'] = None) -> None: + self.parent = parent + + def cmf_parameters(self) -> t.Dict: + """Return dictionary of CMF initialization parameters (excluding pipeline information).""" + ... + + def pipeline(self) -> str: + """Return pipeline name.""" + ... + + def step(self) -> str: + """Return step name.""" + ... + + def step_properties(self) -> t.Dict: + """Return step execution properties.""" + ... + + +class DefaultParams(Params): + """Default parameter provider.""" + + def __init__(self) -> None: + super().__init__() + + def cmf_parameters(self) -> t.Dict: + return {'filename': 'mlmd', 'graph': False} + + def pipeline(self) -> str: + return 'default' + + def step(self) -> str: + return 'default' + + def step_properties(self) -> t.Dict: + return {} + + +class EnvParams(Params): + """Parameter provider that retrieves information from environment.""" + + def __init__(self, parent: t.Optional[Params] = None) -> None: + super().__init__(parent or DefaultParams()) + + def cmf_parameters(self) -> t.Dict: + return self._get_dict('CMF_FLUENT_CMF_PARAMS', self.parent.cmf_parameters()) + + def pipeline(self) -> str: + return self._get_string('CMF_FLUENT_PIPELINE', self.parent.pipeline()) + + def step(self) -> str: + return self._get_string('CMF_FLUENT_STEP', self.parent.step()) + + def step_properties(self) -> t.Dict: + return self._get_dict('CMF_FLUENT_STEP_PROPERTIES', self.parent.step_properties()) + + @staticmethod + def _get_string(env_var: str, default: str) -> str: + value = os.environ.get(env_var, None) + return value if value is not None else default + + @staticmethod + def _get_dict(env_var: str, default: t.Dict) -> t.Dict: + params = os.environ.get(env_var, None) + return json.loads(params) if params else default + + +class UserParams(Params): + """Parameter provider that uses parameters provided by a user.""" + + def __init__(self, parent: t.Optional[Params] = None, + pipeline: t.Optional[str] = None, step: t.Optional[str] = None, + step_properties: t.Optional[t.Dict] = None) -> None: + super().__init__(parent or DefaultParams()) + self._pipeline = pipeline + self._step = step + self._step_properties = step_properties + + def cmf_parameters(self) -> t.Dict: + return self._get_value(_cmf_params, self.parent.cmf_parameters()) + + def pipeline(self) -> str: + return self._get_value(self._pipeline, self.parent.pipeline()) + + def step(self) -> str: + return self._get_value(self._step, self.parent.step()) + + def step_properties(self) -> t.Dict: + return self._get_value(self._step_properties, self.parent.step_properties()) + + @staticmethod + def _get_value(value: t.Any, default: t.Any) -> t.Any: + return value if value is not None else default diff --git a/examples/fluent_api/pipeline.py b/examples/fluent_api/pipeline.py new file mode 100644 index 00000000..331a290e --- /dev/null +++ b/examples/fluent_api/pipeline.py @@ -0,0 +1,98 @@ +import typing as t + +from sklearn.metrics import accuracy_score +from sklearn.tree import DecisionTreeClassifier + +import cmflib.contrib.fluent as cmf +from pathlib import Path +import pickle +from sklearn.datasets import load_iris +from sklearn.model_selection import train_test_split +from sklearn.utils import Bunch + + +_pipeline = 'iris' +"""Pipeline name.""" + +_workspace = Path(__file__).parent / 'workspace' +"""Path to a pipeline workspace containing serialized artifacts (datasets and ML models).""" + + +def fetch() -> None: + """Ingest the IRIS dataset into a pipeline.""" + with open(_workspace / 'iris.pkl', 'wb') as stream: + iris: Bunch = load_iris() + pickle.dump( + {'data': iris['data'], 'target': iris['target']}, + stream + ) + cmf.log_dataset(stream, properties={'name': 'iris', 'type': 'raw'}) + + +def preprocess(params: t.Optional[t.Dict] = None) -> None: + """Split raw dataset into train/test splits.""" + params = params or {} + with open(_workspace / 'iris.pkl', 'rb') as stream: + dataset: t.Dict = pickle.load(stream) + cmf.log_dataset(stream) + + x_train, x_test, y_train, y_test = train_test_split( + dataset['data'], + dataset['target'], + train_size=float(params.get('train_size', 0.7)), + shuffle=params.get('shuffle', 'true').lower() == 'true' + ) + + with open(_workspace / 'train.pkl', 'wb') as stream: + pickle.dump({'x': x_train, 'y': y_train}, stream) + cmf.log_dataset(stream, properties={'name': 'iris', 'type': 'preprocessed', 'split': 'train'}) + with open(_workspace / 'test.pkl', 'wb') as stream: + pickle.dump({'x': x_test, 'y': y_test}, stream) + cmf.log_dataset(stream, properties={'name': 'iris', 'type': 'preprocessed', 'split': 'test'}) + + +def train() -> None: + """Train a model.""" + with open(_workspace / 'train.pkl', 'rb') as stream: + dataset: t.Dict = pickle.load(stream) + cmf.log_dataset(stream) + + clf = DecisionTreeClassifier() + clf = clf.fit(dataset['x'], dataset['y']) + cmf.log_execution_metrics( + 'train', + {'accuracy': accuracy_score(y_true=dataset['y'], y_pred=clf.predict(dataset['x']))} + ) + + with open(_workspace / 'model.pkl', 'wb') as stream: + pickle.dump(clf, stream) + cmf.log_model(stream) + + +def test() -> None: + """Test a model.""" + with open(_workspace / 'test.pkl', 'rb') as stream: + dataset: t.Dict = pickle.load(stream) + cmf.log_dataset(stream) + + with open(_workspace / 'model.pkl', 'rb') as stream: + clf: DecisionTreeClassifier = pickle.load(stream) + cmf.log_model(stream) + + cmf.log_execution_metrics( + 'test', + {'accuracy': accuracy_score(y_true=dataset['y'], y_pred=clf.predict(dataset['x']))} + ) + + +def pipeline(): + """Run IRIS ML pipeline.""" + _workspace.mkdir(parents=True, exist_ok=True) + cmf.set_cmf_parameters(filename='mlmd', graph=False) + for step in (fetch, preprocess, train, test): + with cmf.start_step(pipeline=_pipeline, step=step.__name__): + step() + + +if __name__ == '__main__': + pipeline() From 13a231d1172d461250f3c6cc116299c7e8a52551 Mon Sep 17 00:00:00 2001 From: Sergey Serebryakov Date: Mon, 19 Dec 2022 12:50:01 -0800 Subject: [PATCH 2/4] Refactoring and ray runner. - Disabling support for file objects in logging methods (happens to be a bad idea to commit open files). - Splitting example into a pipeline definition and pipeline runners. - Adding example that shows how to run CMF pipelines with fluent API on ray cluster. --- cmflib/contrib/fluent.py | 68 ++++++++++++++++++++++----- examples/fluent_api/local_runner.py | 15 ++++++ examples/fluent_api/pipeline.py | 70 +++++++++++----------------- examples/fluent_api/ray_runner.py | 43 +++++++++++++++++ examples/fluent_api/requirements.txt | 2 + 5 files changed, 143 insertions(+), 55 deletions(-) create mode 100644 examples/fluent_api/local_runner.py create mode 100644 examples/fluent_api/ray_runner.py create mode 100644 examples/fluent_api/requirements.txt diff --git a/cmflib/contrib/fluent.py b/cmflib/contrib/fluent.py index b3a8f653..ddad6739 100644 --- a/cmflib/contrib/fluent.py +++ b/cmflib/contrib/fluent.py @@ -1,4 +1,5 @@ import abc +import io import json import os from pathlib import Path @@ -43,12 +44,12 @@ manager to automatically end steps. - Logging methods (`log_dataset`, `log_dataset_with_version`, `log_model`, `log_execution_metrics`, `log_metric` and `log_validation_output`). These methods log input/output artifacts. When these methods accept artifact URL, users - can provide file system object instead (e.g., the one returned by `builtins.open` function). In this case, - the association (input/output) is identified automatically, e.g.: + can provide either a string or a Path object: ```python - with open(_workspace / 'iris.pkl', 'rb') as stream: + ds_path = _workspace / 'iris.pkl' + with open(ds_path, 'rb') as stream: dataset: t.Dict = pickle.load(stream) - cmf.log_dataset(stream) + cmf.log_dataset(ds_path, 'input') ``` All these methods will create a new step of one does not present. """ @@ -57,6 +58,7 @@ __all__ = [ 'Step', + 'set_cmf_parameters', 'get_work_directory', 'start_step', 'end_step', 'log_dataset', 'log_dataset_with_version', 'log_model', @@ -96,7 +98,7 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None: _cmf_params: t.Optional[t.Dict] = None """CMF initialization parameters (such as filename and graph) excluding pipeline parameters.""" -_URL = t.Union[str, Path, t.IO] +_URL = t.Union[str, Path] _Event = t.Optional[str] _Properties = t.Optional[t.Dict] @@ -112,6 +114,12 @@ def set_cmf_parameters(filename: t.Optional[str] = None, graph: t.Optional[bool] _cmf_params['graph'] = graph +def get_work_directory() -> Path: + """Return artifact path.""" + db_file_path = (_cmf_params or {}).get('filename', None) + return Path(db_file_path).parent if db_file_path else Path.cwd() + + def start_step(pipeline: t.Optional[str] = None, step: t.Optional[str] = None, properties: t.Dict = None) -> Step: """Initialize a new pipeline step. @@ -127,8 +135,10 @@ def start_step(pipeline: t.Optional[str] = None, step: t.Optional[str] = None, p logger.warning("[start_step] ending active CMF step") end_step() + init_method = os.environ.get('CMF_FLUENT_INIT_METHOD', None) + params = DefaultParams() - if os.environ.get('CMF_FLUENT_INIT_METHOD', None) == 'env': + if init_method == 'env': params = EnvParams(params) params = UserParams(params, pipeline, step, properties) @@ -169,7 +179,7 @@ def log_model(path: _URL, event: _Event = None, model_framework: str = "Default" model_name: str = "Default", properties: _Properties = None) -> None: """Log model artifact.""" _maybe_initialize_step() - url, event = _check_artifact_io(path, event) + path, event = _check_artifact_io(path, event) _ = _cmf.log_model(path, event, model_framework, model_type, model_name, properties) @@ -201,11 +211,26 @@ def _maybe_initialize_step() -> None: def _check_artifact_io(url: _URL, event: t.Optional[str] = None) -> t.Tuple[str, str]: """Convert artifact `url` to string and maybe identify its direction (input/output). """ - if isinstance(url, t.IO): - event = 'input' if 'r' in url.mode else 'output' - url = Path(url.name).absolute() + if isinstance(url, (io.BufferedWriter, io.BufferedReader)): + raise RuntimeError( + f"This happens to be not a good idea. This file is still open at this point in time, and because of " + "buffering, parts of data can still be in memory. It's better to write and close the file, and then add it " + "to CMF.") + # _path = Path(url.name).absolute() + # _event = 'input' if 'r' in url.mode else 'output' + # if event and event != _event: + # raise ValueError(f"Inconsistent parameters: url(value={url}, type={type(url)}, path={_path}) while " + # f"used-provided event is `{event}`.") + # url, event = _path, _event + else: + if not event: + raise ValueError(f"When url is not a file object, event must be specified.") if isinstance(url, Path): url = url.absolute().as_posix() + if not isinstance(url, str): + raise ValueError(f"Unrecognized URL: url={url}, type={type(url)}.") + + assert isinstance(url, (str, Path)), f"BUG: url={url}, type={type(url)}, event={event}" return url, event @@ -239,7 +264,10 @@ def step_properties(self) -> t.Dict: class DefaultParams(Params): - """Default parameter provider.""" + """Default parameter provider. + + TODO: This does not work (probably, pipeline and step should not have the same names?) + """ def __init__(self) -> None: super().__init__() @@ -258,7 +286,23 @@ def step_properties(self) -> t.Dict: class EnvParams(Params): - """Parameter provider that retrieves information from environment.""" + """Parameter provider that retrieves information from environment. + + To enable this, set the following environment variable `CMF_FLUENT_INIT_METHOD = 'env'`, e.g: + ```python + step_env = { + 'CMF_FLUENT_INIT_METHOD': 'env', + 'CMF_FLUENT_CMF_PARAMS': json.dumps({'filename': 'mlmd', 'graph': False}), + 'CMF_FLUENT_PIPELINE': 'iris', + 'CMF_FLUENT_STEP': 'fetch' + } + ``` + The following environment variables are supported: + - `CMF_FLUENT_CMF_PARAMS`: JSON-string dictionary containing `filename` and `graph` keys. + - `CMF_FLUENT_PIPELINE`: string defining a pipeline name. + - `CMF_FLUENT_STEP`: string defining a step name. + - `CMF_FLUENT_STEP_PROPERTIES`: JSON-string dictionary containing execution properties. + """ def __init__(self, parent: t.Optional[Params] = None) -> None: super().__init__(parent or DefaultParams()) diff --git a/examples/fluent_api/local_runner.py b/examples/fluent_api/local_runner.py new file mode 100644 index 00000000..687ef2e8 --- /dev/null +++ b/examples/fluent_api/local_runner.py @@ -0,0 +1,15 @@ +import cmflib.contrib.fluent as cmf +from pipeline import (fetch, preprocess, train, test) + + +def pipeline(): + """Run IRIS ML pipeline.""" + (cmf.get_work_directory() / 'workspace').mkdir(parents=True, exist_ok=True) + cmf.set_cmf_parameters(filename='mlmd', graph=False) + for step in (fetch, preprocess, train, test): + with cmf.start_step(pipeline='iris', step=step.__name__): + step() + + +if __name__ == '__main__': + pipeline() diff --git a/examples/fluent_api/pipeline.py b/examples/fluent_api/pipeline.py index 331a290e..b871eda3 100644 --- a/examples/fluent_api/pipeline.py +++ b/examples/fluent_api/pipeline.py @@ -4,37 +4,28 @@ from sklearn.tree import DecisionTreeClassifier import cmflib.contrib.fluent as cmf -from pathlib import Path import pickle from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split from sklearn.utils import Bunch -_pipeline = 'iris' -"""Pipeline name.""" - -_workspace = Path(__file__).parent / 'workspace' -"""Path to a pipeline workspace containing serialized artifacts (datasets and ML models).""" - - def fetch() -> None: """Ingest the IRIS dataset into a pipeline.""" - with open(_workspace / 'iris.pkl', 'wb') as stream: + ds_path = cmf.get_work_directory() / 'workspace' / 'iris.pkl' + with open(ds_path, 'wb') as stream: iris: Bunch = load_iris() - pickle.dump( - {'data': iris['data'], 'target': iris['target']}, - stream - ) - cmf.log_dataset(stream, properties={'name': 'iris', 'type': 'raw'}) + pickle.dump({'data': iris['data'], 'target': iris['target']}, stream) + cmf.log_dataset(ds_path, 'output', properties={'name': 'iris', 'type': 'raw'}) def preprocess(params: t.Optional[t.Dict] = None) -> None: """Split raw dataset into train/test splits.""" params = params or {} - with open(_workspace / 'iris.pkl', 'rb') as stream: + ds_path = cmf.get_work_directory() / 'workspace' / 'iris.pkl' + with open(ds_path, 'rb') as stream: dataset: t.Dict = pickle.load(stream) - cmf.log_dataset(stream) + cmf.log_dataset(ds_path, 'input') x_train, x_test, y_train, y_test = train_test_split( dataset['data'], @@ -43,56 +34,49 @@ def preprocess(params: t.Optional[t.Dict] = None) -> None: shuffle=params.get('shuffle', 'true').lower() == 'true' ) - with open(_workspace / 'train.pkl', 'wb') as stream: + ds_path = cmf.get_work_directory() / 'workspace' / 'train.pkl' + with open(ds_path, 'wb') as stream: pickle.dump({'x': x_train, 'y': y_train}, stream) - cmf.log_dataset(stream, properties={'name': 'iris', 'type': 'preprocessed', 'split': 'train'}) - with open(_workspace / 'test.pkl', 'wb') as stream: + cmf.log_dataset(ds_path, 'output', properties={'name': 'iris', 'type': 'preprocessed', 'split': 'train'}) + + d_path = cmf.get_work_directory() / 'workspace' / 'test.pkl' + with open(d_path, 'wb') as stream: pickle.dump({'x': x_test, 'y': y_test}, stream) - cmf.log_dataset(stream, properties={'name': 'iris', 'type': 'preprocessed', 'split': 'test'}) + cmf.log_dataset(d_path, 'output', properties={'name': 'iris', 'type': 'preprocessed', 'split': 'test'}) def train() -> None: """Train a model.""" - with open(_workspace / 'train.pkl', 'rb') as stream: + ds_path = cmf.get_work_directory() / 'workspace' / 'train.pkl' + with open(ds_path, 'rb') as stream: dataset: t.Dict = pickle.load(stream) - cmf.log_dataset(stream) + cmf.log_dataset(ds_path, 'input') - clf = DecisionTreeClassifier() - clf = clf.fit(dataset['x'], dataset['y']) + clf = DecisionTreeClassifier().fit(dataset['x'], dataset['y']) cmf.log_execution_metrics( 'train', {'accuracy': accuracy_score(y_true=dataset['y'], y_pred=clf.predict(dataset['x']))} ) - with open(_workspace / 'model.pkl', 'wb') as stream: + mdl_path = cmf.get_work_directory() / 'workspace' / 'model.pkl' + with open(mdl_path, 'wb') as stream: pickle.dump(clf, stream) - cmf.log_model(stream) + cmf.log_model(mdl_path, 'output') def test() -> None: """Test a model.""" - with open(_workspace / 'test.pkl', 'rb') as stream: + ds_path = cmf.get_work_directory() / 'workspace' / 'test.pkl' + with open(ds_path, 'rb') as stream: dataset: t.Dict = pickle.load(stream) - cmf.log_dataset(stream) + cmf.log_dataset(ds_path, 'input') - with open(_workspace / 'model.pkl', 'rb') as stream: + mdl_path = cmf.get_work_directory() / 'workspace' / 'model.pkl' + with open(mdl_path, 'rb') as stream: clf: DecisionTreeClassifier = pickle.load(stream) - cmf.log_model(stream) + cmf.log_model(mdl_path, 'input') cmf.log_execution_metrics( 'test', {'accuracy': accuracy_score(y_true=dataset['y'], y_pred=clf.predict(dataset['x']))} ) - - -def pipeline(): - """Run IRIS ML pipeline.""" - _workspace.mkdir(parents=True, exist_ok=True) - cmf.set_cmf_parameters(filename='mlmd', graph=False) - for step in (fetch, preprocess, train, test): - with cmf.start_step(pipeline=_pipeline, step=step.__name__): - step() - - -if __name__ == '__main__': - pipeline() diff --git a/examples/fluent_api/ray_runner.py b/examples/fluent_api/ray_runner.py new file mode 100644 index 00000000..efc7ea7f --- /dev/null +++ b/examples/fluent_api/ray_runner.py @@ -0,0 +1,43 @@ +from pathlib import Path +import json +import ray +from pipeline import (fetch as _fetch, preprocess as _preprocess, train as _train, test as _test) + + +# Make pipeline step functions ray remotes. +fetch = ray.remote(_fetch) +preprocess = ray.remote(_preprocess) +train = ray.remote(_train) +test = ray.remote(_test) + + +def pipeline() -> None: + """Run an IRIS ML pipeline on a ray cluster.""" + # Make sure the `workspace` directory exists. This is an analogy for MLflow's artifact path. There's probably + # a better way to define it here. + mlmd_store = Path.cwd() / 'mlmd' + (mlmd_store.parent / 'workspace').mkdir(parents=True, exist_ok=True) + + # The fluent API can initialize itself from environment variables. Define them. + pipeline_env = { + 'CMF_FLUENT_INIT_METHOD': 'env', + 'CMF_FLUENT_CMF_PARAMS': json.dumps({'filename': mlmd_store.as_posix(), 'graph': False}), + 'CMF_FLUENT_PIPELINE': 'iris', + 'CMF_FLUENT_STEP': None + } + + # Run steps in a synchronous manner one step at a time. The code also passes the environment variables so that all + # steps (running in different processes (including, maybe, remote hosts)), use the same CMF configuration. + for step in (fetch, preprocess, train, test): + step_env = pipeline_env.copy() + step_env['CMF_FLUENT_STEP'] = step.remote.__name__ + ref: ray.ObjectRef = step.options(runtime_env={'env_vars': step_env}).remote() + ray.get(ref) + + # PS: The code can be refactored. If steps return variables, and other steps accept these variables as inputs, it's + # possible to define a graph of steps, and just ask to run the last step. Ray will take care about distributing + # steps across available ray workers, possibly, parallelizing computations. + + +if __name__ == '__main__': + pipeline() diff --git a/examples/fluent_api/requirements.txt b/examples/fluent_api/requirements.txt new file mode 100644 index 00000000..965dbd2c --- /dev/null +++ b/examples/fluent_api/requirements.txt @@ -0,0 +1,2 @@ +ray +scikit-learn From 7b64d20a2680e666560adf46ed81ade60d42642c Mon Sep 17 00:00:00 2001 From: Sergey Serebryakov Date: Mon, 19 Dec 2022 13:06:44 -0800 Subject: [PATCH 3/4] Command line utility to query the MLMD sqlite store. --- examples/fluent_api/query.py | 50 ++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 examples/fluent_api/query.py diff --git a/examples/fluent_api/query.py b/examples/fluent_api/query.py new file mode 100644 index 00000000..87566ae2 --- /dev/null +++ b/examples/fluent_api/query.py @@ -0,0 +1,50 @@ +import json + +import click +import typing as t +import pandas as pd +from cmflib import cmfquery +from tabulate import tabulate + +__all__ = ['query'] + + +def _print_executions_in_stage(cmf_query: cmfquery.CmfQuery, stage_name: str, frmt: str) -> None: + print('\n') + print('\n') + df: pd.DataFrame = cmf_query.get_all_executions_in_stage(stage_name) + + drop_cols = [] + if 'Git_Start_Commit' in df.columns: + drop_cols.append('Git_Start_Commit') + if 'Git_End_Commit' in df.columns: + drop_cols.append('Git_End_Commit') + if drop_cols: + df.drop(columns=drop_cols, inplace=True, axis=1) + + if frmt == 'table': + print(tabulate(df, headers='keys', tablefmt='psql')) + else: + print(json.dumps(df.to_dict("records"), indent=4)) + + +def query(mlmd_path: str, pipeline: str, steps: str, frmt: str) -> None: + cmf_query = cmfquery.CmfQuery(mlmd_path) + stages: t.List[str] = cmf_query.get_pipeline_stages(pipeline) + print(stages) + + for name in steps.split(','): + _print_executions_in_stage(cmf_query, name, frmt) + + +@click.command() +@click.argument('mlmd_path', required=True, type=str, help='Path to MLMD sqlite store.') +@click.argument('pipeline', required=True, type=str, help='Pipeline name.') +@click.argument('steps', required=True, type=str, help='Comma-separated string of step names.') +@click.argument('format', required=True, type=str, help='Output format (`table`, `json`)') +def query_cli(mlmd_path: str, pipeline: str, steps: str, frmt: str) -> None: + query(mlmd_path, pipeline, steps, frmt) + + +if __name__ == '__main__': + query_cli() From 1edf880f3ed9f5d331a06171d8e79d232582a945 Mon Sep 17 00:00:00 2001 From: Sergey Serebryakov Date: Mon, 19 Dec 2022 17:44:02 -0800 Subject: [PATCH 4/4] Refactoring to introduce a single point of entry for all environments (local / ray) ```python python pipeline.py -e local python pipeline.py -e ray ``` --- .../fluent_api/{local_runner.py => _local.py} | 6 +- .../fluent_api/{ray_runner.py => _ray.py} | 7 +-- examples/fluent_api/pipeline.py | 56 ++++++++++++++----- 3 files changed, 47 insertions(+), 22 deletions(-) rename examples/fluent_api/{local_runner.py => _local.py} (90%) rename examples/fluent_api/{ray_runner.py => _ray.py} (97%) diff --git a/examples/fluent_api/local_runner.py b/examples/fluent_api/_local.py similarity index 90% rename from examples/fluent_api/local_runner.py rename to examples/fluent_api/_local.py index 687ef2e8..c0846b1d 100644 --- a/examples/fluent_api/local_runner.py +++ b/examples/fluent_api/_local.py @@ -1,6 +1,8 @@ import cmflib.contrib.fluent as cmf from pipeline import (fetch, preprocess, train, test) +__all__ = ['pipeline'] + def pipeline(): """Run IRIS ML pipeline.""" @@ -9,7 +11,3 @@ def pipeline(): for step in (fetch, preprocess, train, test): with cmf.start_step(pipeline='iris', step=step.__name__): step() - - -if __name__ == '__main__': - pipeline() diff --git a/examples/fluent_api/ray_runner.py b/examples/fluent_api/_ray.py similarity index 97% rename from examples/fluent_api/ray_runner.py rename to examples/fluent_api/_ray.py index efc7ea7f..e654d823 100644 --- a/examples/fluent_api/ray_runner.py +++ b/examples/fluent_api/_ray.py @@ -4,6 +4,9 @@ from pipeline import (fetch as _fetch, preprocess as _preprocess, train as _train, test as _test) +__all__ = ['ray'] + + # Make pipeline step functions ray remotes. fetch = ray.remote(_fetch) preprocess = ray.remote(_preprocess) @@ -37,7 +40,3 @@ def pipeline() -> None: # PS: The code can be refactored. If steps return variables, and other steps accept these variables as inputs, it's # possible to define a graph of steps, and just ask to run the last step. Ray will take care about distributing # steps across available ray workers, possibly, parallelizing computations. - - -if __name__ == '__main__': - pipeline() diff --git a/examples/fluent_api/pipeline.py b/examples/fluent_api/pipeline.py index b871eda3..d3315fa5 100644 --- a/examples/fluent_api/pipeline.py +++ b/examples/fluent_api/pipeline.py @@ -1,8 +1,8 @@ import typing as t - +from pathlib import Path +import click from sklearn.metrics import accuracy_score from sklearn.tree import DecisionTreeClassifier - import cmflib.contrib.fluent as cmf import pickle from sklearn.datasets import load_iris @@ -10,19 +10,25 @@ from sklearn.utils import Bunch -def fetch() -> None: +def workspace(path: t.Optional[str] = None) -> Path: + """Return directory to read from or write to.""" + return Path(path) if path else cmf.get_work_directory() / 'workspace' + + +def fetch(output_dir: t.Optional[str] = None) -> None: """Ingest the IRIS dataset into a pipeline.""" - ds_path = cmf.get_work_directory() / 'workspace' / 'iris.pkl' + ds_path = workspace(output_dir) / 'iris.pkl' with open(ds_path, 'wb') as stream: iris: Bunch = load_iris() pickle.dump({'data': iris['data'], 'target': iris['target']}, stream) cmf.log_dataset(ds_path, 'output', properties={'name': 'iris', 'type': 'raw'}) -def preprocess(params: t.Optional[t.Dict] = None) -> None: +def preprocess(params: t.Optional[t.Dict] = None, + input_dir: t.Optional[str] = None, output_dir: t.Optional[str] = None) -> None: """Split raw dataset into train/test splits.""" params = params or {} - ds_path = cmf.get_work_directory() / 'workspace' / 'iris.pkl' + ds_path = workspace(input_dir) / 'iris.pkl' with open(ds_path, 'rb') as stream: dataset: t.Dict = pickle.load(stream) cmf.log_dataset(ds_path, 'input') @@ -34,20 +40,20 @@ def preprocess(params: t.Optional[t.Dict] = None) -> None: shuffle=params.get('shuffle', 'true').lower() == 'true' ) - ds_path = cmf.get_work_directory() / 'workspace' / 'train.pkl' + ds_path = workspace(output_dir) / 'train.pkl' with open(ds_path, 'wb') as stream: pickle.dump({'x': x_train, 'y': y_train}, stream) cmf.log_dataset(ds_path, 'output', properties={'name': 'iris', 'type': 'preprocessed', 'split': 'train'}) - d_path = cmf.get_work_directory() / 'workspace' / 'test.pkl' + d_path = workspace(output_dir) / 'test.pkl' with open(d_path, 'wb') as stream: pickle.dump({'x': x_test, 'y': y_test}, stream) cmf.log_dataset(d_path, 'output', properties={'name': 'iris', 'type': 'preprocessed', 'split': 'test'}) -def train() -> None: +def train(input_dir: t.Optional[str] = None, output_dir: t.Optional[str] = None) -> None: """Train a model.""" - ds_path = cmf.get_work_directory() / 'workspace' / 'train.pkl' + ds_path = workspace(input_dir) / 'train.pkl' with open(ds_path, 'rb') as stream: dataset: t.Dict = pickle.load(stream) cmf.log_dataset(ds_path, 'input') @@ -58,20 +64,20 @@ def train() -> None: {'accuracy': accuracy_score(y_true=dataset['y'], y_pred=clf.predict(dataset['x']))} ) - mdl_path = cmf.get_work_directory() / 'workspace' / 'model.pkl' + mdl_path = workspace(output_dir) / 'model.pkl' with open(mdl_path, 'wb') as stream: pickle.dump(clf, stream) cmf.log_model(mdl_path, 'output') -def test() -> None: +def test(input_dir: t.Optional[str] = None) -> None: """Test a model.""" - ds_path = cmf.get_work_directory() / 'workspace' / 'test.pkl' + ds_path = workspace(input_dir) / 'test.pkl' with open(ds_path, 'rb') as stream: dataset: t.Dict = pickle.load(stream) cmf.log_dataset(ds_path, 'input') - mdl_path = cmf.get_work_directory() / 'workspace' / 'model.pkl' + mdl_path = workspace(input_dir) / 'model.pkl' with open(mdl_path, 'rb') as stream: clf: DecisionTreeClassifier = pickle.load(stream) cmf.log_model(mdl_path, 'input') @@ -80,3 +86,25 @@ def test() -> None: 'test', {'accuracy': accuracy_score(y_true=dataset['y'], y_pred=clf.predict(dataset['x']))} ) + + +@click.command() +@click.option('-e', '--env', default='local', help='Environment to use (local, ray, pachyderm)') +def main(env: str) -> None: + pipeline_fn: t.Optional[t.Callable] = None + if env == 'local': + import _local + pipeline_fn = _local.pipeline + elif env == 'ray': + import _ray + pipeline_fn = _ray.pipeline + + if not pipeline_fn: + print(f"Unrecognized environment: '{env}'") + exit(1) + + pipeline_fn() + + +if __name__ == '__main__': + main()