Skip to content

Commit

Permalink
[HWORKS-829] Register models without compressing files (logicalclocks…
Browse files Browse the repository at this point in the history
…#206)

* [HWORKS-829] Upload uncompressed files, copy_from_local if internal, move if hopsfs-mount

* Set keep_original_files default value to False

* Reuse local/hopsworks engine for copy, move and download ops
  • Loading branch information
javierdlrm authored Nov 6, 2023
1 parent eee4f5b commit 8c99438
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 69 deletions.
4 changes: 4 additions & 0 deletions python/hsml/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class MODEL:
FRAMEWORK_SKLEARN = "SKLEARN"


class MODEL_REGISTRY:
HOPSFS_MOUNT_PREFIX = "/home/yarnapp/hopsfs/"


class MODEL_SERVING:
MODELS_DATASET = "Models"

Expand Down
21 changes: 19 additions & 2 deletions python/hsml/core/native_hdfs_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,22 @@ def chmod(self, hdfs_path, mode):
def mkdir(self, path):
return hdfs.mkdir(path)

def delete(self, path):
hdfs.rm(path, recursive=True)
def rm(self, path, recursive=True):
hdfs.rm(path, recursive=recursive)

def upload(self, local_path: str, remote_path: str):
# copy from local fs to hdfs
hdfs.put(local_path, remote_path)

def download(self, remote_path: str, local_path: str):
# copy from hdfs to local fs
print("Downloading file ...", end=" ")
hdfs.get(remote_path, local_path)

def copy(self, source_path: str, destination_path: str):
# both paths are hdfs paths
hdfs.cp(source_path, destination_path)

def move(self, source_path: str, destination_path: str):
# both paths are hdfs paths
hdfs.rename(source_path, destination_path)
61 changes: 42 additions & 19 deletions python/hsml/engine/hopsworks_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,52 @@
# limitations under the License.
#

import os

from hsml.core import native_hdfs_api
from hsml import constants
from hsml import client


class HopsworksEngine:
def __init__(self):
self._native_hdfs_api = native_hdfs_api.NativeHdfsApi()

def mkdir(self, model_instance):
model_version_dir_hdfs = "/Projects/{}/{}/{}/{}".format(
model_instance.project_name,
constants.MODEL_SERVING.MODELS_DATASET,
model_instance.name,
str(model_instance.version),
)
self._native_hdfs_api.mkdir(model_version_dir_hdfs)
self._native_hdfs_api.chmod(model_version_dir_hdfs, "ug+rwx")

def delete(self, model_instance):
model_version_dir_hdfs = "/Projects/{}/{}/{}/{}".format(
model_instance.project_name,
constants.MODEL_SERVING.MODELS_DATASET,
model_instance.name,
str(model_instance.version),
)
self._native_hdfs_api.delete(model_version_dir_hdfs)
def mkdir(self, remote_path: str):
remote_path = self._prepend_project_path(remote_path)
self._native_hdfs_api.mkdir(remote_path)
self._native_hdfs_api.chmod(remote_path, "ug+rwx")

def delete(self, remote_path: str):
remote_path = self._prepend_project_path(remote_path)
self._native_hdfs_api.rm(remote_path)

def upload(self, local_path: str, remote_path: str):
local_path = self._get_abs_path(local_path)
remote_path = self._prepend_project_path(remote_path)
self._native_hdfs_api.upload(local_path, remote_path)
self._native_hdfs_api.chmod(remote_path, "ug+rwx")

def download(self, remote_path: str, local_path: str):
local_path = self._get_abs_path(local_path)
remote_path = self._prepend_project_path(remote_path)
self._native_hdfs_api.download(remote_path, local_path)

def copy(self, source_path: str, destination_path: str):
# both paths are hdfs paths
source_path = self._prepend_project_path(source_path)
destination_path = self._prepend_project_path(destination_path)
self._native_hdfs_api.copy(source_path, destination_path)

def move(self, source_path: str, destination_path: str):
source_path = self._prepend_project_path(source_path)
destination_path = self._prepend_project_path(destination_path)
self._native_hdfs_api.move(source_path, destination_path)

def _get_abs_path(self, local_path: str):
return local_path if os.path.isabs(local_path) else os.path.abspath(local_path)

def _prepend_project_path(self, remote_path: str):
if not remote_path.startswith("/Projects/"):
_client = client.get_instance()
remote_path = "/Projects/{}/{}".format(_client._project_name, remote_path)
return remote_path
42 changes: 38 additions & 4 deletions python/hsml/engine/local_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,49 @@
# limitations under the License.
#

import os

from hsml.core import dataset_api
from hsml import client


class LocalEngine:
def __init__(self):
self._dataset_api = dataset_api.DatasetApi()

def mkdir(self, model_instance):
self._dataset_api.mkdir(model_instance.version_path)
def mkdir(self, remote_path: str):
remote_path = self._prepend_project_path(remote_path)
self._dataset_api.mkdir(remote_path)

def delete(self, remote_path: str):
remote_path = self._prepend_project_path(remote_path)
self._dataset_api.rm(remote_path)

def upload(self, local_path: str, remote_path: str):
local_path = self._get_abs_path(local_path)
remote_path = self._prepend_project_path(remote_path)
self._dataset_api.upload(local_path, remote_path)

def download(self, remote_path: str, local_path: str):
local_path = self._get_abs_path(local_path)
remote_path = self._prepend_project_path(remote_path)
self._dataset_api.download(remote_path, local_path)

def copy(self, source_path, destination_path):
source_path = self._prepend_project_path(source_path)
destination_path = self._prepend_project_path(destination_path)
self._dataset_api.copy(source_path, destination_path)

def move(self, source_path, destination_path):
source_path = self._prepend_project_path(source_path)
destination_path = self._prepend_project_path(destination_path)
self._dataset_api.move(source_path, destination_path)

def _get_abs_path(self, local_path: str):
return local_path if os.path.isabs(local_path) else os.path.abspath(local_path)

def delete(self, model_instance):
self._dataset_api.rm(model_instance.version_path)
def _prepend_project_path(self, remote_path: str):
if not remote_path.startswith("/Projects/"):
_client = client.get_instance()
remote_path = "/Projects/{}/{}".format(_client._project_name, remote_path)
return remote_path
Loading

0 comments on commit 8c99438

Please sign in to comment.