From 8c99438fe4fbe08433d38f18b133dff9b9e440e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20de=20la=20R=C3=BAa=20Mart=C3=ADnez?= Date: Mon, 6 Nov 2023 23:52:46 +0100 Subject: [PATCH] [HWORKS-829] Register models without compressing files (#206) * [HWORKS-829] Upload uncompressed files, copy_from_local if internal, move if hopsfs-mount * Set keep_original_files default value to False * Reuse local/hopsworks engine for copy, move and download ops --- python/hsml/constants.py | 4 + python/hsml/core/native_hdfs_api.py | 21 +++- python/hsml/engine/hopsworks_engine.py | 61 +++++++---- python/hsml/engine/local_engine.py | 42 ++++++- python/hsml/engine/model_engine.py | 146 ++++++++++++++++++------- python/hsml/model.py | 8 +- 6 files changed, 213 insertions(+), 69 deletions(-) diff --git a/python/hsml/constants.py b/python/hsml/constants.py index d6f5204c9..682c7884b 100644 --- a/python/hsml/constants.py +++ b/python/hsml/constants.py @@ -25,6 +25,10 @@ class MODEL: FRAMEWORK_SKLEARN = "SKLEARN" +class MODEL_REGISTRY: + HOPSFS_MOUNT_PREFIX = "/home/yarnapp/hopsfs/" + + class MODEL_SERVING: MODELS_DATASET = "Models" diff --git a/python/hsml/core/native_hdfs_api.py b/python/hsml/core/native_hdfs_api.py index 90bdaa2b4..1fc5c90ef 100644 --- a/python/hsml/core/native_hdfs_api.py +++ b/python/hsml/core/native_hdfs_api.py @@ -39,5 +39,22 @@ def chmod(self, hdfs_path, mode): def mkdir(self, path): return hdfs.mkdir(path) - def delete(self, path): - hdfs.rm(path, recursive=True) + def rm(self, path, recursive=True): + hdfs.rm(path, recursive=recursive) + + def upload(self, local_path: str, remote_path: str): + # copy from local fs to hdfs + hdfs.put(local_path, remote_path) + + def download(self, remote_path: str, local_path: str): + # copy from hdfs to local fs + print("Downloading file ...", end=" ") + hdfs.get(remote_path, local_path) + + def copy(self, source_path: str, destination_path: str): + # both paths are hdfs paths + hdfs.cp(source_path, destination_path) + + def move(self, source_path: str, destination_path: str): + # both paths are hdfs paths + hdfs.rename(source_path, destination_path) diff --git a/python/hsml/engine/hopsworks_engine.py b/python/hsml/engine/hopsworks_engine.py index 919bd34aa..d65f3bdcd 100644 --- a/python/hsml/engine/hopsworks_engine.py +++ b/python/hsml/engine/hopsworks_engine.py @@ -14,29 +14,52 @@ # limitations under the License. # +import os + from hsml.core import native_hdfs_api -from hsml import constants +from hsml import client class HopsworksEngine: def __init__(self): self._native_hdfs_api = native_hdfs_api.NativeHdfsApi() - def mkdir(self, model_instance): - model_version_dir_hdfs = "/Projects/{}/{}/{}/{}".format( - model_instance.project_name, - constants.MODEL_SERVING.MODELS_DATASET, - model_instance.name, - str(model_instance.version), - ) - self._native_hdfs_api.mkdir(model_version_dir_hdfs) - self._native_hdfs_api.chmod(model_version_dir_hdfs, "ug+rwx") - - def delete(self, model_instance): - model_version_dir_hdfs = "/Projects/{}/{}/{}/{}".format( - model_instance.project_name, - constants.MODEL_SERVING.MODELS_DATASET, - model_instance.name, - str(model_instance.version), - ) - self._native_hdfs_api.delete(model_version_dir_hdfs) + def mkdir(self, remote_path: str): + remote_path = self._prepend_project_path(remote_path) + self._native_hdfs_api.mkdir(remote_path) + self._native_hdfs_api.chmod(remote_path, "ug+rwx") + + def delete(self, remote_path: str): + remote_path = self._prepend_project_path(remote_path) + self._native_hdfs_api.rm(remote_path) + + def upload(self, local_path: str, remote_path: str): + local_path = self._get_abs_path(local_path) + remote_path = self._prepend_project_path(remote_path) + self._native_hdfs_api.upload(local_path, remote_path) + self._native_hdfs_api.chmod(remote_path, "ug+rwx") + + def download(self, remote_path: str, local_path: str): + local_path = self._get_abs_path(local_path) + remote_path = self._prepend_project_path(remote_path) + self._native_hdfs_api.download(remote_path, local_path) + + def copy(self, source_path: str, destination_path: str): + # both paths are hdfs paths + source_path = self._prepend_project_path(source_path) + destination_path = self._prepend_project_path(destination_path) + self._native_hdfs_api.copy(source_path, destination_path) + + def move(self, source_path: str, destination_path: str): + source_path = self._prepend_project_path(source_path) + destination_path = self._prepend_project_path(destination_path) + self._native_hdfs_api.move(source_path, destination_path) + + def _get_abs_path(self, local_path: str): + return local_path if os.path.isabs(local_path) else os.path.abspath(local_path) + + def _prepend_project_path(self, remote_path: str): + if not remote_path.startswith("/Projects/"): + _client = client.get_instance() + remote_path = "/Projects/{}/{}".format(_client._project_name, remote_path) + return remote_path diff --git a/python/hsml/engine/local_engine.py b/python/hsml/engine/local_engine.py index 1e7614cef..36ccffb84 100644 --- a/python/hsml/engine/local_engine.py +++ b/python/hsml/engine/local_engine.py @@ -14,15 +14,49 @@ # limitations under the License. # +import os + from hsml.core import dataset_api +from hsml import client class LocalEngine: def __init__(self): self._dataset_api = dataset_api.DatasetApi() - def mkdir(self, model_instance): - self._dataset_api.mkdir(model_instance.version_path) + def mkdir(self, remote_path: str): + remote_path = self._prepend_project_path(remote_path) + self._dataset_api.mkdir(remote_path) + + def delete(self, remote_path: str): + remote_path = self._prepend_project_path(remote_path) + self._dataset_api.rm(remote_path) + + def upload(self, local_path: str, remote_path: str): + local_path = self._get_abs_path(local_path) + remote_path = self._prepend_project_path(remote_path) + self._dataset_api.upload(local_path, remote_path) + + def download(self, remote_path: str, local_path: str): + local_path = self._get_abs_path(local_path) + remote_path = self._prepend_project_path(remote_path) + self._dataset_api.download(remote_path, local_path) + + def copy(self, source_path, destination_path): + source_path = self._prepend_project_path(source_path) + destination_path = self._prepend_project_path(destination_path) + self._dataset_api.copy(source_path, destination_path) + + def move(self, source_path, destination_path): + source_path = self._prepend_project_path(source_path) + destination_path = self._prepend_project_path(destination_path) + self._dataset_api.move(source_path, destination_path) + + def _get_abs_path(self, local_path: str): + return local_path if os.path.isabs(local_path) else os.path.abspath(local_path) - def delete(self, model_instance): - self._dataset_api.rm(model_instance.version_path) + def _prepend_project_path(self, remote_path: str): + if not remote_path.startswith("/Projects/"): + _client = client.get_instance() + remote_path = "/Projects/{}/{}".format(_client._project_name, remote_path) + return remote_path diff --git a/python/hsml/engine/model_engine.py b/python/hsml/engine/model_engine.py index dc2863a48..e4701217e 100644 --- a/python/hsml/engine/model_engine.py +++ b/python/hsml/engine/model_engine.py @@ -73,7 +73,7 @@ def _upload_additional_resources(self, model_instance): with open(input_example_path, "w+") as out: json.dump(input_example, out, cls=util.NumpyEncoder) - self._dataset_api.upload(input_example_path, model_instance.version_path) + self._engine.upload(input_example_path, model_instance.version_path) os.remove(input_example_path) model_instance.input_example = None if model_instance._model_schema is not None: @@ -83,45 +83,86 @@ def _upload_additional_resources(self, model_instance): with open(model_schema_path, "w+") as out: out.write(model_schema.json()) - self._dataset_api.upload(model_schema_path, model_instance.version_path) + self._engine.upload(model_schema_path, model_instance.version_path) os.remove(model_schema_path) model_instance.model_schema = None return model_instance - def _copy_hopsfs_model(self, existing_model_path, model_version_path): + def _copy_or_move_hopsfs_model( + self, + from_hdfs_model_path, + to_model_version_path, + keep_original_files, + update_upload_progress, + ): + """Copy or move model files from a hdfs path to the model version folder in the Models dataset.""" # Strip hdfs prefix - if existing_model_path.startswith("hdfs:/"): - projects_index = existing_model_path.find("/Projects", 0) - existing_model_path = existing_model_path[projects_index:] + if from_hdfs_model_path.startswith("hdfs:/"): + projects_index = from_hdfs_model_path.find("/Projects", 0) + from_hdfs_model_path = from_hdfs_model_path[projects_index:] - for entry in self._dataset_api.list(existing_model_path, sort_by="NAME:desc")[ + n_dirs, n_files = 0, 0 + for entry in self._dataset_api.list(from_hdfs_model_path, sort_by="NAME:desc")[ "items" ]: path = entry["attributes"]["path"] _, file_name = os.path.split(path) - self._dataset_api.copy(path, model_version_path + "/" + file_name) + if keep_original_files: + self._engine.copy(path, to_model_version_path + "/" + file_name) + else: + self._engine.move(path, to_model_version_path + "/" + file_name) + if "." in path: + n_files += 1 + else: + n_dirs += 1 + update_upload_progress(n_dirs=n_dirs, n_files=n_files) def _upload_local_model( - self, local_model_path, model_version, dataset_model_name_path + self, + from_local_model_path, + to_model_version_path, + update_upload_progress, ): - archive_out_dir = None - uploaded_archive_path = None - try: - archive_out_dir = tempfile.TemporaryDirectory(dir=os.getcwd()) - archive_path = util.compress( - archive_out_dir.name, str(model_version), local_model_path + """Copy or upload model files from a local path to the model version folder in the Models dataset.""" + n_dirs, n_files = 0, 0 + for root, dirs, files in os.walk(from_local_model_path): + # os.walk(local_model_path), where local_model_path is expected to be an absolute path + # - root is the absolute path of the directory being walked + # - dirs is the list of directory names present in the root dir + # - files is the list of file names present in the root dir + # we need to replace the local path prefix with the hdfs path prefix (i.e., /srv/hops/....../root with /Projects/.../) + remote_base_path = root.replace( + from_local_model_path, to_model_version_path + ) + for d_name in dirs: + self._engine.mkdir(remote_base_path + "/" + d_name) + n_dirs += 1 + update_upload_progress(n_dirs, n_files) + for f_name in files: + self._engine.upload(root + "/" + f_name, remote_base_path) + n_files += 1 + update_upload_progress(n_dirs, n_files) + + def _save_model_from_local_or_hopsfs_mount( + self, model_instance, model_path, keep_original_files, update_upload_progress + ): + """Save model files from a local path. The local path can be on hopsfs mount""" + # check hopsfs mount + if model_path.startswith(constants.MODEL_REGISTRY.HOPSFS_MOUNT_PREFIX): + self._copy_or_move_hopsfs_model( + from_hdfs_model_path=model_path.replace( + constants.MODEL_REGISTRY.HOPSFS_MOUNT_PREFIX, "" + ), + to_model_version_path=model_instance.version_path, + keep_original_files=keep_original_files, + update_upload_progress=update_upload_progress, ) - uploaded_archive_path = ( - dataset_model_name_path + "/" + os.path.basename(archive_path) + else: + self._upload_local_model( + from_local_model_path=model_path, + to_model_version_path=model_instance.version_path, + update_upload_progress=update_upload_progress, ) - self._dataset_api.upload(archive_path, dataset_model_name_path) - self._dataset_api.unzip(uploaded_archive_path, block=True, timeout=600) - except RestAPIError: - raise - finally: - if archive_out_dir is not None: - archive_out_dir.cleanup() - self._dataset_api.rm(uploaded_archive_path) def _set_model_version( self, model_instance, dataset_models_root_path, dataset_model_path @@ -162,7 +203,13 @@ def _build_resource_path(self, model_instance, artifact): artifact_path = "{}/{}".format(model_instance.version_path, artifact) return artifact_path - def save(self, model_instance, model_path, await_registration=480): + def save( + self, + model_instance, + model_path, + await_registration=480, + keep_original_files=False, + ): _client = client.get_instance() is_shared_registry = model_instance.shared_registry_project_name is not None @@ -189,7 +236,7 @@ def save(self, model_instance, model_path, await_registration=480): # Create /Models/{model_instance._name} folder dataset_model_name_path = dataset_models_root_path + "/" + model_instance._name if not self._dataset_api.path_exists(dataset_model_name_path): - self._dataset_api.mkdir(dataset_model_name_path) + self._engine.mkdir(dataset_model_name_path) model_instance = self._set_model_version( model_instance, dataset_models_root_path, dataset_model_name_path @@ -224,30 +271,45 @@ def save(self, model_instance, model_path, await_registration=480): pbar.set_description("%s" % step["desc"]) if step["id"] == 0: # Create folders - self._engine.mkdir(model_instance) + self._engine.mkdir(model_instance.version_path) if step["id"] == 1: + + def update_upload_progress(n_dirs=0, n_files=0): + pbar.set_description( + "%s (%s dirs, %s files)" % (step["desc"], n_dirs, n_files) + ) + + update_upload_progress(n_dirs=0, n_files=0) + # Upload Model files from local path to /Models/{model_instance._name}/{model_instance._version} # check local absolute if os.path.isabs(model_path) and os.path.exists(model_path): - self._upload_local_model( - model_path, - model_instance.version, - dataset_model_name_path, + self._save_model_from_local_or_hopsfs_mount( + model_instance=model_instance, + model_path=model_path, + keep_original_files=keep_original_files, + update_upload_progress=update_upload_progress, ) # check local relative elif os.path.exists( os.path.join(os.getcwd(), model_path) ): # check local relative - self._upload_local_model( - os.path.join(os.getcwd(), model_path), - model_instance.version, - dataset_model_name_path, + self._save_model_from_local_or_hopsfs_mount( + model_instance=model_instance, + model_path=os.path.join(os.getcwd(), model_path), + keep_original_files=keep_original_files, + update_upload_progress=update_upload_progress, ) # check project relative elif self._dataset_api.path_exists( model_path ): # check hdfs relative and absolute - self._copy_hopsfs_model(model_path, model_instance.version_path) + self._copy_or_move_hopsfs_model( + from_hdfs_model_path=model_path, + to_model_version_path=model_instance.version_path, + keep_original_files=keep_original_files, + update_upload_progress=update_upload_progress, + ) else: raise IOError( "Could not find path {} in the local filesystem or in Hopsworks File System".format( @@ -284,14 +346,14 @@ def download(self, model_instance): temp_download_dir = "/Resources" + "/" + str(uuid.uuid4()) try: - self._dataset_api.mkdir(temp_download_dir) + self._engine.mkdir(temp_download_dir) self._dataset_api.zip( model_instance.version_path, destination_path=temp_download_dir, block=True, timeout=600, ) - self._dataset_api.download( + self._engine.download( temp_download_dir + "/" + str(model_instance._version) + ".zip", zip_path, ) @@ -316,7 +378,7 @@ def read_file(self, model_instance, resource): resource = os.path.basename(resource) tmp_dir = tempfile.TemporaryDirectory(dir=os.getcwd()) local_resource_path = os.path.join(tmp_dir.name, resource) - self._dataset_api.download( + self._engine.download( hdfs_resource_path, local_resource_path, ) @@ -332,7 +394,7 @@ def read_json(self, model_instance, resource): try: tmp_dir = tempfile.TemporaryDirectory(dir=os.getcwd()) local_resource_path = os.path.join(tmp_dir.name, resource) - self._dataset_api.download( + self._engine.download( hdfs_resource_path, local_resource_path, ) @@ -343,7 +405,7 @@ def read_json(self, model_instance, resource): tmp_dir.cleanup() def delete(self, model_instance): - self._engine.delete(model_instance) + self._engine.delete(model_instance.version_path) def set_tag(self, model_instance, name, value): """Attach a name/value tag to a model.""" diff --git a/python/hsml/model.py b/python/hsml/model.py index 2f9c4e476..11faafec3 100644 --- a/python/hsml/model.py +++ b/python/hsml/model.py @@ -83,18 +83,22 @@ def __init__( self._model_engine = model_engine.ModelEngine() - def save(self, model_path, await_registration=480): + def save(self, model_path, await_registration=480, keep_original_files=False): """Persist this model including model files and metadata to the model registry. # Arguments model_path: Local or remote (Hopsworks file system) path to the folder where the model files are located, or path to a specific model file. await_registration: Awaiting time for the model to be registered in Hopsworks. + keep_original_files: If the model files are located in hopsfs, whether to move or copy those files into the Models dataset. Default is False (i.e., model files will be moved) # Returns `Model`: The model metadata object. """ return self._model_engine.save( - self, model_path, await_registration=await_registration + self, + model_path, + await_registration=await_registration, + keep_original_files=keep_original_files, ) def download(self):