Skip to content

Commit

Permalink
[HWORKS-1037] Add support for downloading artifact files from deploym…
Browse files Browse the repository at this point in the history
…ent object
  • Loading branch information
javierdlrm committed Oct 17, 2024
1 parent ae14cb3 commit 4f8604e
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 30 deletions.
2 changes: 1 addition & 1 deletion python/hopsworks_common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,12 @@ class MODEL:

class MODEL_REGISTRY:
HOPSFS_MOUNT_PREFIX = "/hopsfs/"
ARTIFACTS_DIR_NAME = "Artifacts"
MODEL_FILES_DIR_NAME = "Files"


class MODEL_SERVING:
MODELS_DATASET = "Models"
ARTIFACTS_DIR_NAME = "Artifacts"


class ARTIFACT_VERSION:
Expand Down
12 changes: 9 additions & 3 deletions python/hsml/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,10 @@ def get_model(self):
)

@usage.method_logger
def download_artifact(self):
"""Download the model artifact served by the deployment"""
def download_artifact_files(self):
"""Download the artifact files served by the deployment"""

return self._serving_engine.download_artifact(self)
return self._serving_engine.download_artifact_files(self)

def get_logs(self, component="predictor", tail=10):
"""Prints the deployment logs of the predictor or transformer.
Expand Down Expand Up @@ -373,9 +373,15 @@ def artifact_version(self):
def artifact_version(self, artifact_version: Union[int, str]):
self._predictor.artifact_version = artifact_version

@property
def artifact_files_path(self):
"""Path of the artifact files deployed by the predictor."""
return self._predictor.artifact_files_path

@property
def artifact_path(self):
"""Path of the model artifact deployed by the predictor."""
# TODO: deprecated
return self._predictor.artifact_path

@property
Expand Down
6 changes: 4 additions & 2 deletions python/hsml/engine/model_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ def _download_model_from_hopsfs_recursive(

if path_attr.get("dir", False):
# otherwise, make a recursive call for the folder
if basename == constants.MODEL_REGISTRY.ARTIFACTS_DIR_NAME:
if (
basename == constants.MODEL_SERVING.ARTIFACTS_DIR_NAME
): # TODO: Not needed anymore
continue # skip Artifacts subfolder
local_folder_path = os.path.join(to_local_path, basename)
os.mkdir(local_folder_path)
Expand Down Expand Up @@ -447,7 +449,7 @@ def update_download_progress(n_dirs, n_files, done=False):
)

try:
from_hdfs_model_path = model_instance.files_path
from_hdfs_model_path = model_instance.model_files_path
if from_hdfs_model_path.startswith("hdfs:/"):
projects_index = from_hdfs_model_path.find("/Projects", 0)
from_hdfs_model_path = from_hdfs_model_path[projects_index:]
Expand Down
107 changes: 88 additions & 19 deletions python/hsml/engine/serving_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
#

import os
import tempfile
import time
import uuid
from typing import Dict, List, Union

from hsml import util, constants
from hsml import constants
from hsml.client.exceptions import ModelServingException, RestAPIError
from hsml.client.istio.utils.infer_type import InferInput
from hsml.constants import (
Expand All @@ -31,6 +32,7 @@
INFERENCE_ENDPOINTS as IE,
)
from hsml.core import dataset_api, serving_api
from hsml.engine import local_engine
from tqdm.auto import tqdm


Expand All @@ -51,6 +53,8 @@ def __init__(self):
self._serving_api = serving_api.ServingApi()
self._dataset_api = dataset_api.DatasetApi()

self._engine = local_engine.LocalEngine()

def _poll_deployment_status(
self, deployment_instance, status: str, await_status: int, update_progress=None
):
Expand Down Expand Up @@ -304,7 +308,64 @@ def _get_stopped_instances(self, available_instances, requested_instances):
num_instances = requested_instances - available_instances
return num_instances if num_instances >= 0 else 0

def download_artifact(self, deployment_instance):
def _download_files_from_hopsfs_recursive(
self,
from_hdfs_path: str,
to_local_path: str,
update_download_progress,
n_dirs,
n_files,
):
"""Download model files from a model path in hdfs, recursively"""

for entry in self._dataset_api.list(from_hdfs_path, sort_by="NAME:desc")[
"items"
]:
path_attr = entry["attributes"]
path = path_attr["path"]
basename = os.path.basename(path)

if path_attr.get("dir", False):
# otherwise, make a recursive call for the folder
if (
basename == constants.MODEL_SERVING.ARTIFACTS_DIR_NAME
): # TODO: Not needed anymore
continue # skip Artifacts subfolder
local_folder_path = os.path.join(to_local_path, basename)
os.mkdir(local_folder_path)
n_dirs, n_files = self._download_files_from_hopsfs_recursive(
from_hdfs_path=path,
to_local_path=local_folder_path,
update_download_progress=update_download_progress,
n_dirs=n_dirs,
n_files=n_files,
)
n_dirs += 1
update_download_progress(n_dirs=n_dirs, n_files=n_files)
else:
# if it's a file, download it
local_file_path = os.path.join(to_local_path, basename)
self._engine.download(path, local_file_path)
n_files += 1
update_download_progress(n_dirs=n_dirs, n_files=n_files)

return n_dirs, n_files

def _download_files_from_hopsfs(
self, from_hdfs_path: str, to_local_path: str, update_download_progress
):
"""Download files from a model path in hdfs."""

n_dirs, n_files = self._download_files_from_hopsfs_recursive(
from_hdfs_path=from_hdfs_path,
to_local_path=to_local_path,
update_download_progress=update_download_progress,
n_dirs=0,
n_files=0,
)
update_download_progress(n_dirs=n_dirs, n_files=n_files, done=True)

def download_artifact_files(self, deployment_instance):
if deployment_instance.id is None:
raise ModelServingException(
"Deployment is not created yet. To create the deployment use `.save()`"
Expand All @@ -316,30 +377,38 @@ def download_artifact(self, deployment_instance):
Download the model files by using `model.download()`"
)

from_artifact_zip_path = deployment_instance.artifact_path
to_artifacts_path = os.path.join(
os.getcwd(),
artifact_files_path = os.path.join(
tempfile.gettempdir(),
str(uuid.uuid4()),
deployment_instance.model_name,
str(deployment_instance.model_version),
constants.MODEL_REGISTRY.ARTIFACTS_DIR_NAME,
)
to_artifact_version_path = (
to_artifacts_path + "/" + str(deployment_instance.artifact_version)
constants.MODEL_SERVING.ARTIFACTS_DIR_NAME,
str(deployment_instance.artifact_version),
)
to_artifact_zip_path = to_artifact_version_path + ".zip"
os.makedirs(artifact_files_path)

os.makedirs(to_artifacts_path)
def update_download_progress(n_dirs, n_files, done=False):
print(
"Downloading artifact files (%s dirs, %s files)... %s"
% (n_dirs, n_files, "DONE" if done else ""),
end="\r",
)

try:
self._dataset_api.download(from_artifact_zip_path, to_artifact_zip_path)
util.decompress(to_artifact_zip_path, extract_dir=to_artifacts_path)
os.remove(to_artifact_zip_path)
finally:
if os.path.exists(to_artifact_zip_path):
os.remove(to_artifact_zip_path)

return to_artifact_version_path
from_hdfs_path = deployment_instance.artifact_files_path
if from_hdfs_path.startswith("hdfs:/"):
projects_index = from_hdfs_path.find("/Projects", 0)
from_hdfs_path = from_hdfs_path[projects_index:]

self._download_files_from_hopsfs(
from_hdfs_path=from_hdfs_path,
to_local_path=artifact_files_path,
update_download_progress=update_download_progress,
)
except BaseException as be:
raise be

return artifact_files_path

def create(self, deployment_instance):
try:
Expand Down
11 changes: 7 additions & 4 deletions python/hsml/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import humps
from hopsworks_common import usage
from hsml import client, util, constants
from hsml import client, constants, util
from hsml.constants import ARTIFACT_VERSION
from hsml.constants import INFERENCE_ENDPOINTS as IE
from hsml.core import explicit_provenance
Expand Down Expand Up @@ -544,11 +544,14 @@ def model_path(self):
def version_path(self):
"""path of the model including version folder. Resolves to /Projects/{project_name}/Models/{name}/{version}"""
return "{}/{}".format(self.model_path, str(self.version))

@property
def files_path(self):
def model_files_path(self):
"""path of the model files including version and files folder. Resolves to /Projects/{project_name}/Models/{name}/{version}/Files"""
return "{}/{}/{}".format(self.model_path, str(self.version), constants.MODEL_REGISTRY.MODEL_FILES_DIR_NAME)
return "{}/{}".format(
self.version_path,
constants.MODEL_REGISTRY.MODEL_FILES_DIR_NAME,
)

@property
def shared_registry_project_name(self):
Expand Down
12 changes: 11 additions & 1 deletion python/hsml/predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from typing import Optional, Union

import humps
from hsml import client, deployment, util
from hsml import client, constants, deployment, util
from hsml.constants import (
ARTIFACT_VERSION,
INFERENCE_ENDPOINTS,
Expand Down Expand Up @@ -395,9 +395,19 @@ def artifact_version(self):
def artifact_version(self, artifact_version: Union[int, str]):
self._artifact_version = artifact_version

@property
def artifact_files_path(self):
return "{}/{}/{}/{}".format(
self._model_path,
str(self._model_version),
constants.MODEL_SERVING.ARTIFACTS_DIR_NAME,
str(self._artifact_version),
)

@property
def artifact_path(self):
"""Path of the model artifact deployed by the predictor. Resolves to /Projects/{project_name}/Models/{name}/{version}/Artifacts/{artifact_version}/{name}_{version}_{artifact_version}.zip"""
# TODO: Deprecated
artifact_name = "{}_{}_{}.zip".format(
self._model_name, str(self._model_version), str(self._artifact_version)
)
Expand Down

0 comments on commit 4f8604e

Please sign in to comment.