diff --git a/python/hopsworks_common/constants.py b/python/hopsworks_common/constants.py index 7b9b92936..b98ed8497 100644 --- a/python/hopsworks_common/constants.py +++ b/python/hopsworks_common/constants.py @@ -163,12 +163,12 @@ class MODEL: class MODEL_REGISTRY: HOPSFS_MOUNT_PREFIX = "/hopsfs/" - ARTIFACTS_DIR_NAME = "Artifacts" MODEL_FILES_DIR_NAME = "Files" class MODEL_SERVING: MODELS_DATASET = "Models" + ARTIFACTS_DIR_NAME = "Artifacts" class ARTIFACT_VERSION: diff --git a/python/hsml/deployment.py b/python/hsml/deployment.py index 8891b149f..f2e88223b 100644 --- a/python/hsml/deployment.py +++ b/python/hsml/deployment.py @@ -219,10 +219,10 @@ def get_model(self): ) @usage.method_logger - def download_artifact(self): - """Download the model artifact served by the deployment""" + def download_artifact_files(self): + """Download the artifact files served by the deployment""" - return self._serving_engine.download_artifact(self) + return self._serving_engine.download_artifact_files(self) def get_logs(self, component="predictor", tail=10): """Prints the deployment logs of the predictor or transformer. @@ -373,9 +373,15 @@ def artifact_version(self): def artifact_version(self, artifact_version: Union[int, str]): self._predictor.artifact_version = artifact_version + @property + def artifact_files_path(self): + """Path of the artifact files deployed by the predictor.""" + return self._predictor.artifact_files_path + @property def artifact_path(self): """Path of the model artifact deployed by the predictor.""" + # TODO: deprecated return self._predictor.artifact_path @property diff --git a/python/hsml/engine/model_engine.py b/python/hsml/engine/model_engine.py index 3e01305e3..61db10521 100644 --- a/python/hsml/engine/model_engine.py +++ b/python/hsml/engine/model_engine.py @@ -156,7 +156,9 @@ def _download_model_from_hopsfs_recursive( if path_attr.get("dir", False): # otherwise, make a recursive call for the folder - if basename == constants.MODEL_REGISTRY.ARTIFACTS_DIR_NAME: + if ( + basename == constants.MODEL_SERVING.ARTIFACTS_DIR_NAME + ): # TODO: Not needed anymore continue # skip Artifacts subfolder local_folder_path = os.path.join(to_local_path, basename) os.mkdir(local_folder_path) @@ -447,7 +449,7 @@ def update_download_progress(n_dirs, n_files, done=False): ) try: - from_hdfs_model_path = model_instance.files_path + from_hdfs_model_path = model_instance.model_files_path if from_hdfs_model_path.startswith("hdfs:/"): projects_index = from_hdfs_model_path.find("/Projects", 0) from_hdfs_model_path = from_hdfs_model_path[projects_index:] diff --git a/python/hsml/engine/serving_engine.py b/python/hsml/engine/serving_engine.py index 260a1f668..d8e7d674b 100644 --- a/python/hsml/engine/serving_engine.py +++ b/python/hsml/engine/serving_engine.py @@ -15,11 +15,12 @@ # import os +import tempfile import time import uuid from typing import Dict, List, Union -from hsml import util, constants +from hsml import constants from hsml.client.exceptions import ModelServingException, RestAPIError from hsml.client.istio.utils.infer_type import InferInput from hsml.constants import ( @@ -31,6 +32,7 @@ INFERENCE_ENDPOINTS as IE, ) from hsml.core import dataset_api, serving_api +from hsml.engine import local_engine from tqdm.auto import tqdm @@ -51,6 +53,8 @@ def __init__(self): self._serving_api = serving_api.ServingApi() self._dataset_api = dataset_api.DatasetApi() + self._engine = local_engine.LocalEngine() + def _poll_deployment_status( self, deployment_instance, status: str, await_status: int, update_progress=None ): @@ -304,7 +308,64 @@ def _get_stopped_instances(self, available_instances, requested_instances): num_instances = requested_instances - available_instances return num_instances if num_instances >= 0 else 0 - def download_artifact(self, deployment_instance): + def _download_files_from_hopsfs_recursive( + self, + from_hdfs_path: str, + to_local_path: str, + update_download_progress, + n_dirs, + n_files, + ): + """Download model files from a model path in hdfs, recursively""" + + for entry in self._dataset_api.list(from_hdfs_path, sort_by="NAME:desc")[ + "items" + ]: + path_attr = entry["attributes"] + path = path_attr["path"] + basename = os.path.basename(path) + + if path_attr.get("dir", False): + # otherwise, make a recursive call for the folder + if ( + basename == constants.MODEL_SERVING.ARTIFACTS_DIR_NAME + ): # TODO: Not needed anymore + continue # skip Artifacts subfolder + local_folder_path = os.path.join(to_local_path, basename) + os.mkdir(local_folder_path) + n_dirs, n_files = self._download_files_from_hopsfs_recursive( + from_hdfs_path=path, + to_local_path=local_folder_path, + update_download_progress=update_download_progress, + n_dirs=n_dirs, + n_files=n_files, + ) + n_dirs += 1 + update_download_progress(n_dirs=n_dirs, n_files=n_files) + else: + # if it's a file, download it + local_file_path = os.path.join(to_local_path, basename) + self._engine.download(path, local_file_path) + n_files += 1 + update_download_progress(n_dirs=n_dirs, n_files=n_files) + + return n_dirs, n_files + + def _download_files_from_hopsfs( + self, from_hdfs_path: str, to_local_path: str, update_download_progress + ): + """Download files from a model path in hdfs.""" + + n_dirs, n_files = self._download_files_from_hopsfs_recursive( + from_hdfs_path=from_hdfs_path, + to_local_path=to_local_path, + update_download_progress=update_download_progress, + n_dirs=0, + n_files=0, + ) + update_download_progress(n_dirs=n_dirs, n_files=n_files, done=True) + + def download_artifact_files(self, deployment_instance): if deployment_instance.id is None: raise ModelServingException( "Deployment is not created yet. To create the deployment use `.save()`" @@ -316,30 +377,38 @@ def download_artifact(self, deployment_instance): Download the model files by using `model.download()`" ) - from_artifact_zip_path = deployment_instance.artifact_path - to_artifacts_path = os.path.join( - os.getcwd(), + artifact_files_path = os.path.join( + tempfile.gettempdir(), str(uuid.uuid4()), deployment_instance.model_name, str(deployment_instance.model_version), - constants.MODEL_REGISTRY.ARTIFACTS_DIR_NAME, - ) - to_artifact_version_path = ( - to_artifacts_path + "/" + str(deployment_instance.artifact_version) + constants.MODEL_SERVING.ARTIFACTS_DIR_NAME, + str(deployment_instance.artifact_version), ) - to_artifact_zip_path = to_artifact_version_path + ".zip" + os.makedirs(artifact_files_path) - os.makedirs(to_artifacts_path) + def update_download_progress(n_dirs, n_files, done=False): + print( + "Downloading artifact files (%s dirs, %s files)... %s" + % (n_dirs, n_files, "DONE" if done else ""), + end="\r", + ) try: - self._dataset_api.download(from_artifact_zip_path, to_artifact_zip_path) - util.decompress(to_artifact_zip_path, extract_dir=to_artifacts_path) - os.remove(to_artifact_zip_path) - finally: - if os.path.exists(to_artifact_zip_path): - os.remove(to_artifact_zip_path) - - return to_artifact_version_path + from_hdfs_path = deployment_instance.artifact_files_path + if from_hdfs_path.startswith("hdfs:/"): + projects_index = from_hdfs_path.find("/Projects", 0) + from_hdfs_path = from_hdfs_path[projects_index:] + + self._download_files_from_hopsfs( + from_hdfs_path=from_hdfs_path, + to_local_path=artifact_files_path, + update_download_progress=update_download_progress, + ) + except BaseException as be: + raise be + + return artifact_files_path def create(self, deployment_instance): try: diff --git a/python/hsml/model.py b/python/hsml/model.py index dc92df99d..50bbd9128 100644 --- a/python/hsml/model.py +++ b/python/hsml/model.py @@ -22,7 +22,7 @@ import humps from hopsworks_common import usage -from hsml import client, util, constants +from hsml import client, constants, util from hsml.constants import ARTIFACT_VERSION from hsml.constants import INFERENCE_ENDPOINTS as IE from hsml.core import explicit_provenance @@ -544,11 +544,14 @@ def model_path(self): def version_path(self): """path of the model including version folder. Resolves to /Projects/{project_name}/Models/{name}/{version}""" return "{}/{}".format(self.model_path, str(self.version)) - + @property - def files_path(self): + def model_files_path(self): """path of the model files including version and files folder. Resolves to /Projects/{project_name}/Models/{name}/{version}/Files""" - return "{}/{}/{}".format(self.model_path, str(self.version), constants.MODEL_REGISTRY.MODEL_FILES_DIR_NAME) + return "{}/{}".format( + self.version_path, + constants.MODEL_REGISTRY.MODEL_FILES_DIR_NAME, + ) @property def shared_registry_project_name(self): diff --git a/python/hsml/predictor.py b/python/hsml/predictor.py index f1d458a3f..236b7cb20 100644 --- a/python/hsml/predictor.py +++ b/python/hsml/predictor.py @@ -17,7 +17,7 @@ from typing import Optional, Union import humps -from hsml import client, deployment, util +from hsml import client, constants, deployment, util from hsml.constants import ( ARTIFACT_VERSION, INFERENCE_ENDPOINTS, @@ -395,9 +395,19 @@ def artifact_version(self): def artifact_version(self, artifact_version: Union[int, str]): self._artifact_version = artifact_version + @property + def artifact_files_path(self): + return "{}/{}/{}/{}".format( + self._model_path, + str(self._model_version), + constants.MODEL_SERVING.ARTIFACTS_DIR_NAME, + str(self._artifact_version), + ) + @property def artifact_path(self): """Path of the model artifact deployed by the predictor. Resolves to /Projects/{project_name}/Models/{name}/{version}/Artifacts/{artifact_version}/{name}_{version}_{artifact_version}.zip""" + # TODO: Deprecated artifact_name = "{}_{}_{}.zip".format( self._model_name, str(self._model_version), str(self._artifact_version) )