Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Enable to override params at predict time in KedroPipelineModel #612

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## [Unreleased]

- :sparkles: Enable to override parameters and the runner at predict time in ``KedroPipelineModel`` ([#445](https://github.com/Galileo-Galilei/kedro-mlflow/issues/445), [#612](https://github.com/Galileo-Galilei/kedro-mlflow/pull/612))

### Changed

- :boom: :pushpin: Pin ``mlflow>=2.7.0`` to support predict parameters for custom models (see above feature)

## [0.13.3] - 2024-10-29

### Added
Expand Down
13 changes: 12 additions & 1 deletion kedro_mlflow/framework/hooks/mlflow_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,18 @@ def after_pipeline_run(
if isinstance(model_signature, str):
if model_signature == "auto":
input_data = catalog.load(pipeline.input_name)
model_signature = infer_signature(model_input=input_data)

# all pipeline params will be overridable at predict time: https://mlflow.org/docs/latest/model/signatures.html#model-signatures-with-inference-params
# I add the special "runner" parameter to be able to choose it at runtime
pipeline_params = {
ds_name[7:]: catalog.load(ds_name)
for ds_name in pipeline.inference.inputs()
if ds_name.startswith("params:")
} | {"runner": "SequentialRunner"}
model_signature = infer_signature(
model_input=input_data,
params=pipeline_params,
)

mlflow.pyfunc.log_model(
python_model=kedro_pipeline_model,
Expand Down
35 changes: 33 additions & 2 deletions kedro_mlflow/mlflow/kedro_pipeline_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from kedro.io import DataCatalog, MemoryDataset
from kedro.pipeline import Pipeline
from kedro.runner import AbstractRunner, SequentialRunner
from kedro.utils import load_obj
from kedro_datasets.pickle import PickleDataset
from mlflow.pyfunc import PythonModel

Expand Down Expand Up @@ -196,17 +197,47 @@ def load_context(self, context):
updated_catalog._datasets[name]._filepath = Path(uri)
self.loaded_catalog.save(name=name, data=updated_catalog.load(name))

def predict(self, context, model_input):
def predict(self, context, model_input, params=None):
# we create an empty hook manager but do NOT register hooks
# because we want this model be executable outside of a kedro project

# params can pass
# TODO globals
# TODO runtime
# TODO parameters -> I'd prefer not have them, but it would require catalog to be able to not be fully resolved if we want to pass runtime and globals
# TODO hooks
# TODO runner

params = params or {}

runner_class = params.pop("runner", "SequentialRunner")

# we don't want to recreate the runner object on each predict
# because reimporting comes with a performance penalty in a serving setup
# so if it is the default we just use the existing runner
runner = (
self.runner
if runner_class == type(self.runner).__name__
else load_obj(
runner_class, "kedro.runner"
)() # do not forget to instantiate the class with ending ()
)

hook_manager = _create_hook_manager()
# _register_hooks(hook_manager, predict_params.hooks)

for name, value in params.items():
# no need to check if params are in the catalog, because mlflow already checks that the params matching the signature
param = f"params:{name}"
self._logger.info(f"Using {param}={value} for the prediction")
self.loaded_catalog.save(name=param, data=value)

self.loaded_catalog.save(
name=self.input_name,
data=model_input,
)

run_output = self.runner.run(
run_output = runner.run(
pipeline=self.pipeline,
catalog=self.loaded_catalog,
hook_manager=hook_manager,
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
kedro>=0.19.0, <0.20.0
kedro_datasets
mlflow>=1.29.0, <3.0.0
mlflow>=2.7.0, <3.0.0
pydantic>=1.0.0, <3.0.0
195 changes: 176 additions & 19 deletions tests/framework/hooks/test_hook_pipeline_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def preprocess_fun(data):
return data

def train_fun(data, param):
return 2
return 1

def predict_fun(model, data):
return data * model
Expand Down Expand Up @@ -105,7 +105,7 @@ def remove_stopwords(data, stopwords):
return data

def train_fun_hyperparam(data, hyperparam):
return 2
return 1

def predict_fun(model, data):
return data * model
Expand Down Expand Up @@ -156,10 +156,36 @@ def convert_probs_to_pred(data, threshold):
return pipeline_ml_with_parameters


@pytest.fixture
def catalog_with_parameters(kedro_project_with_mlflow_conf):
catalog_with_parameters = DataCatalog(
{
"data": MemoryDataset(pd.DataFrame(data=[0.5], columns=["a"])),
"cleaned_data": MemoryDataset(),
"params:stopwords": MemoryDataset(["Hello", "Hi"]),
"params:penalty": MemoryDataset(0.1),
"model": PickleDataset(
filepath=(
kedro_project_with_mlflow_conf / "data" / "model.csv"
).as_posix()
),
"params:threshold": MemoryDataset(0.5),
}
)
return catalog_with_parameters


@pytest.fixture
def dummy_signature(dummy_catalog, dummy_pipeline_ml):
input_data = dummy_catalog.load(dummy_pipeline_ml.input_name)
dummy_signature = infer_signature(input_data)
params_dict = {
key: dummy_catalog.load(key)
for key in dummy_pipeline_ml.inference.inputs()
if key.startswith("params:")
}
dummy_signature = infer_signature(
model_input=input_data, params={**params_dict, "runner": "SequentialRunner"}
)
return dummy_signature


Expand Down Expand Up @@ -303,7 +329,7 @@ def test_mlflow_hook_save_pipeline_ml(
assert trained_model.metadata.signature.to_dict() == {
"inputs": '[{"type": "long", "name": "a", "required": true}]',
"outputs": None,
"params": None,
"params": '[{"name": "runner", "type": "string", "default": "SequentialRunner", "shape": null}]',
}


Expand Down Expand Up @@ -434,28 +460,14 @@ def test_mlflow_hook_save_pipeline_ml_with_default_copy_mode_assign(
def test_mlflow_hook_save_pipeline_ml_with_parameters(
kedro_project_with_mlflow_conf, # a fixture to be in a kedro project
pipeline_ml_with_parameters,
catalog_with_parameters,
dummy_run_params,
):
# config_with_base_mlflow_conf is a conftest fixture
bootstrap_project(kedro_project_with_mlflow_conf)
with KedroSession.create(project_path=kedro_project_with_mlflow_conf) as session:
context = session.load_context()

catalog_with_parameters = DataCatalog(
{
"data": MemoryDataset(pd.DataFrame(data=[1], columns=["a"])),
"cleaned_data": MemoryDataset(),
"params:stopwords": MemoryDataset(["Hello", "Hi"]),
"params:penalty": MemoryDataset(0.1),
"model": PickleDataset(
filepath=(
kedro_project_with_mlflow_conf / "data" / "model.csv"
).as_posix()
),
"params:threshold": MemoryDataset(0.5),
}
)

mlflow_hook = MlflowHook()
mlflow_hook.after_context_created(context)

Expand Down Expand Up @@ -687,3 +699,148 @@ def test_mlflow_hook_save_pipeline_ml_with_dataset_factory(
trained_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/artifacts")
# the real test is that the model is loaded without error
assert trained_model is not None


def test_mlflow_hook_save_and_load_pipeline_ml_with_inference_parameters(
kedro_project_with_mlflow_conf, # a fixture to be in a kedro project
pipeline_ml_with_parameters,
catalog_with_parameters,
dummy_run_params,
):
bootstrap_project(kedro_project_with_mlflow_conf)
with KedroSession.create(project_path=kedro_project_with_mlflow_conf) as session:
context = session.load_context()

mlflow_hook = MlflowHook()
mlflow_hook.after_context_created(context)

runner = SequentialRunner()
mlflow_hook.after_catalog_created(
catalog=catalog_with_parameters,
# `after_catalog_created` is not using any of arguments bellow,
# so we are setting them to empty values.
conf_catalog={},
conf_creds={},
feed_dict={},
save_version="",
load_versions="",
)
mlflow_hook.before_pipeline_run(
run_params=dummy_run_params,
pipeline=pipeline_ml_with_parameters,
catalog=catalog_with_parameters,
)
runner.run(
pipeline_ml_with_parameters, catalog_with_parameters, session._hook_manager
)

current_run_id = mlflow.active_run().info.run_id

# This is what we want to test: parameters should be passed by defautl to the signature
mlflow_hook.after_pipeline_run(
run_params=dummy_run_params,
pipeline=pipeline_ml_with_parameters,
catalog=catalog_with_parameters,
)

# test 1 : parameters should have been logged
trained_model = mlflow.pyfunc.load_model(f"runs:/{current_run_id}/model")

# The "threshold" parameter of the inference pipeline should be in the signature
# {
# key: dummy_catalog.load(key)
# for key in dummy_pipeline_ml.inference.inputs()
# if key.startswith("params:")
# }

assert (
'{"name": "threshold", "type": "double", "default": 0.5, "shape": null}'
in trained_model.metadata.signature.to_dict()["params"]
)

# test 2 : we get different results when passing parameters

inference_data = pd.DataFrame(data=[0.2, 0.6, 0.9], columns=["a"])

assert all(
trained_model.predict(inference_data)
== pd.DataFrame([0, 1, 1]).values # no param = 0.5, the default
)

assert all(
trained_model.predict(
inference_data,
params={"threshold": 0.8},
)
== pd.DataFrame([0, 0, 1]).values # 0.6 is now below threshold
)


def test_mlflow_hook_save_and_load_pipeline_ml_specify_runner(
kedro_project_with_mlflow_conf, # a fixture to be in a kedro project
pipeline_ml_with_parameters,
catalog_with_parameters,
dummy_run_params,
):
bootstrap_project(kedro_project_with_mlflow_conf)
with KedroSession.create(project_path=kedro_project_with_mlflow_conf) as session:
context = session.load_context()

mlflow_hook = MlflowHook()
mlflow_hook.after_context_created(context)

runner = SequentialRunner()
mlflow_hook.after_catalog_created(
catalog=catalog_with_parameters,
# `after_catalog_created` is not using any of arguments bellow,
# so we are setting them to empty values.
conf_catalog={},
conf_creds={},
feed_dict={},
save_version="",
load_versions="",
)
mlflow_hook.before_pipeline_run(
run_params=dummy_run_params,
pipeline=pipeline_ml_with_parameters,
catalog=catalog_with_parameters,
)
runner.run(
pipeline_ml_with_parameters, catalog_with_parameters, session._hook_manager
)

current_run_id = mlflow.active_run().info.run_id

# This is what we want to test: parameters should be passed by defautl to the signature
mlflow_hook.after_pipeline_run(
run_params=dummy_run_params,
pipeline=pipeline_ml_with_parameters,
catalog=catalog_with_parameters,
)

# test : parameters should have been logged
trained_model = mlflow.pyfunc.load_model(f"runs:/{current_run_id}/model")

# test 1 : the parameters in the signature should have the runner with a default "SequentialRunner"
assert (
'{"name": "runner", "type": "string", "default": "SequentialRunner", "shape": null}'
in trained_model.metadata.signature.to_dict()["params"]
)

inference_data = pd.DataFrame(data=[0.2, 0.6, 0.9], columns=["a"])

# raise an error with a non existing runner
with pytest.raises(
AttributeError,
match="module 'kedro.runner' has no attribute 'non_existing_runner'",
):
trained_model.predict(
inference_data, params={"runner": "non_existing_runner"}
)

# second test : run with another runner (iI should test that it is indeed the other one which is picked)
# the log clearly shows it
assert all(
trained_model.predict(inference_data, params={"runner": "ThreadRunner"})
== pd.DataFrame([0, 1, 1]).values # no param = 0.5, the default
)
Loading