1
- """Load inference model from Mlflow registry."""
1
+ """Load inference model from MLflow registry."""
2
2
3
3
import os
4
4
from pathlib import Path
5
- from typing import Any , Literal , Optional
6
-
5
+ from typing import Any , Literal
7
6
import dagshub
8
7
import mlflow
9
8
import pandas as pd
10
- from rich import print
11
9
from loguru import logger
12
10
13
11
from ecoml .data_preparation .features import (
16
14
get_pooling_features ,
17
15
)
18
16
from ecoml .data_preparation .pytorch_utils import PytorchLayer
17
+ import importlib .resources as pkg_resources
18
+ import ecoml
19
19
20
20
ALLOWED_LAYER_TYPES = Literal ["convolutional" , "pooling" , "dense" ]
21
21
22
22
23
23
class InferenceModel :
24
24
"""Inference Model.
25
25
26
- It downloads model from MLFlow Registry on DagsHub, if not present on the first run .
26
+ Downloads model from MLflow Registry on DagsHub if not already present .
27
27
"""
28
28
29
29
def __init__ (
30
30
self ,
31
31
layer_type : ALLOWED_LAYER_TYPES ,
32
32
model_version : int ,
33
33
verbose : bool = False ,
34
- dagshub_repo_owner : Optional [str ] = "fuzzylabs" ,
35
- dagshub_repo_name : Optional [str ] = "edge-vision-power-estimation" ,
34
+ dagshub_repo_owner : str = "fuzzylabs" ,
35
+ dagshub_repo_name : str = "edge-vision-power-estimation" ,
36
+ use_packaged_models : bool = False ,
36
37
):
37
38
self .layer_type = layer_type
38
39
self .model_version = model_version
39
40
self .verbose = verbose
40
41
self .repo_name = dagshub_repo_name
41
42
self .repo_owner = dagshub_repo_owner
42
- self .base_model_dir = Path .cwd () / "ecoml_models" / self .layer_type
43
- # Download model from MLFlow Registry if not present on first run
43
+ self .use_packaged_models = use_packaged_models
44
+
45
+ if not self .use_packaged_models :
46
+ self .base_model_dir = Path .cwd () / "ecoml_models" / self .layer_type
47
+ else :
48
+ self .base_model_dir = pkg_resources .files (ecoml ).joinpath ("ecoml_models" , self .layer_type )
49
+
44
50
self .runtime_model = self .load_model (model_type = "runtime" )
45
-
46
51
47
- def _download_model (self , model_uri : str , dst_path : str ) -> None :
52
+ def _download_model (self , model_uri : str , dst_path : Path ) -> None :
48
53
"""Download model from MLflow registry to local filesystem.
49
54
50
55
Args:
51
56
model_uri: URI pointing to model artifact.
52
- dst_path: Path of the local filesystem destination directory
53
- to which to download the specified artifacts.
57
+ dst_path: Destination directory path.
54
58
"""
55
59
if self .verbose :
56
- logger .info (f"Downloading model to { dst_path } folder " )
60
+ logger .info (f"Downloading model to { dst_path } " )
57
61
dagshub .init (repo_name = self .repo_name , repo_owner = self .repo_owner , mlflow = True )
58
62
mlflow .artifacts .download_artifacts (artifact_uri = model_uri , dst_path = str (dst_path ))
59
63
60
64
def load_model (self , model_type : str ) -> Any :
61
- """Download and load power or runtime model from MLflow Registry.
62
-
63
- Download is skipped if model exists in the local filesystem.
65
+ """Download and load runtime or power model from MLflow Registry.
64
66
65
67
Returns:
66
- Power or runtime model from MLflow Registry .
68
+ Loaded runtime or power model .
67
69
"""
70
+ if self .use_packaged_models :
71
+ model_path = self .base_model_dir / model_type / "model.pkl"
72
+ if not model_path .exists ():
73
+ raise FileNotFoundError (
74
+ f"Packaged model file not found: { model_path } . "
75
+ "Ensure ecoml_models are packaged correctly."
76
+ )
77
+ model_dir = model_path .parent
78
+ if self .verbose :
79
+ logger .info (f"Loading packaged { model_type } model from { model_dir } " )
80
+ return mlflow .pyfunc .load_model (str (model_dir ))
81
+
82
+ # Using DagsHub / MLFlow remote model
68
83
model_name = f"{ self .layer_type } _{ model_type } _model"
69
84
model_uri = f"models:/{ model_name } /{ self .model_version } "
70
85
dst_path = self .base_model_dir / model_type
71
- # TODO: Tighter check to see if current model version is present
72
- # instead of checking only if directory exists
73
86
version_file = dst_path / "version.txt"
74
- need_download = True
75
87
88
+ need_download = True
76
89
if version_file .exists ():
77
90
local_version = version_file .read_text ().strip ()
78
91
if local_version == str (self .model_version ):
@@ -83,23 +96,21 @@ def load_model(self, model_type: str) -> Any:
83
96
version_file .write_text (str (self .model_version ))
84
97
else :
85
98
if self .verbose :
86
- logger .info ("Model already downloaded" )
99
+ logger .info (f" { model_type . capitalize () } model version { self . model_version } already downloaded. " )
87
100
88
101
if self .verbose :
89
- logger .info (f"Loading the { model_type } trained model from { dst_path } folder" )
90
- logger .info (f"Loading { model_type } model from { dst_path } " )
102
+ logger .info (f"Loading { model_type } model from { dst_path } " )
91
103
92
- def get_features (self , layer_info : PytorchLayer ) -> pd .DataFrame :
93
- """Get features for the model to run prediction.
104
+ return mlflow .pyfunc .load_model (str (dst_path ))
94
105
95
- Each layer type creates input features required by
96
- power and runtime models using Pytorch model summary file .
106
+ def get_features ( self , layer_info : PytorchLayer ) -> pd . DataFrame :
107
+ """Get features for prediction based on layer type .
97
108
98
109
Args:
99
- layer_info: Pydantic class containing all layer information .
110
+ layer_info: Layer details .
100
111
101
112
Returns:
102
- Pandas dataframe containing input features.
113
+ DataFrame containing features.
103
114
"""
104
115
if self .layer_type == "convolutional" :
105
116
features = get_convolutional_features (layer_info )
@@ -108,6 +119,4 @@ def get_features(self, layer_info: PytorchLayer) -> pd.DataFrame:
108
119
elif self .layer_type == "dense" :
109
120
features = get_dense_features (layer_info )
110
121
else :
111
- raise ValueError (f"Unsupported layer type: { self .layer_type } " )
112
-
113
- return pd .DataFrame ([features ])
122
+
0 commit comments