Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FSTORE-1522] Deduplicate most of hsml (logicalclocks#368)
Browse files Browse the repository at this point in the history
* Add missing init in hopsworks_common.engine

* Add missing FeatureStoreEncoder to hsfs.util for backwards compatibility

* Deduplicate util

* Deduplicate kafka_topic

* Make direct imports instead of through aliases

* Deduplicate tag

* Deduplicate dataset_api

* Fix tag import

* Fix KafkaTopic

* Fix mocking of get_obj_from_json

* Fix mocking

* Fix mocking of hsml.util

* Fix mocking and imports of util

* Fix KafkaTopic.from_reposnse_json

* Fix pandas typechecking in util

* Fix mocking in test_predictor

* Fix mocking in test_resources

* Fix mocking in test_transformer
aversey authored and davitbzh committed Dec 5, 2024
1 parent 824329a commit 656aa01
Showing 35 changed files with 1,038 additions and 1,286 deletions.
312 changes: 299 additions & 13 deletions python/hopsworks_common/core/dataset_api.py
Original file line number Diff line number Diff line change
@@ -17,14 +17,16 @@
from __future__ import annotations

import copy
import json
import logging
import math
import os
import shutil
import time
from concurrent.futures import ThreadPoolExecutor, wait
from typing import Literal, Optional, Union

from hopsworks_common import client, usage, util
from hopsworks_common import client, tag, usage, util
from hopsworks_common.client.exceptions import DatasetException, RestAPIError
from hopsworks_common.core import inode
from tqdm.auto import tqdm
@@ -42,11 +44,25 @@ class DatasetApi:
def __init__(self):
self._log = logging.getLogger(__name__)

DEFAULT_FLOW_CHUNK_SIZE = 1048576
DEFAULT_UPLOAD_FLOW_CHUNK_SIZE = 10 * 1024 * 1024
DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS = 3
DEFAULT_UPLOAD_SIMULTANEOUS_CHUNKS = 3
DEFAULT_UPLOAD_MAX_CHUNK_RETRIES = 1

DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE = 1024 * 1024
FLOW_PERMANENT_ERRORS = [404, 413, 415, 500, 501]

# alias for backwards-compatibility:
DEFAULT_FLOW_CHUNK_SIZE = DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE

@usage.method_logger
def download(self, path: str, local_path: str = None, overwrite: bool = False):
def download(
self,
path: str,
local_path: Optional[str] = None,
overwrite: Optional[bool] = False,
chunk_size: int = DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE,
):
"""Download file from Hopsworks Filesystem to the current working directory.
```python
@@ -64,6 +80,7 @@ def download(self, path: str, local_path: str = None, overwrite: bool = False):
path: path in Hopsworks filesystem to the file
local_path: path where to download the file in the local filesystem
overwrite: overwrite local file if exists
chunk_size: upload chunk size in bytes. Default 1 MB
# Returns
`str`: Path to downloaded file
# Raises
@@ -122,9 +139,7 @@ def download(self, path: str, local_path: str = None, overwrite: bool = False):
self._log.exception("Failed to initialize progress bar.")
self._log.info("Starting download")

for chunk in response.iter_content(
chunk_size=self.DEFAULT_FLOW_CHUNK_SIZE
):
for chunk in response.iter_content(chunk_size=chunk_size):
f.write(chunk)

if pbar is not None:
@@ -143,11 +158,11 @@ def upload(
local_path: str,
upload_path: str,
overwrite: bool = False,
chunk_size=DEFAULT_FLOW_CHUNK_SIZE,
simultaneous_uploads=3,
simultaneous_chunks=3,
max_chunk_retries=1,
chunk_retry_interval=1,
chunk_size: int = DEFAULT_UPLOAD_FLOW_CHUNK_SIZE,
simultaneous_uploads: int = DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS,
simultaneous_chunks: int = DEFAULT_UPLOAD_SIMULTANEOUS_CHUNKS,
max_chunk_retries: int = DEFAULT_UPLOAD_MAX_CHUNK_RETRIES,
chunk_retry_interval: int = 1,
):
"""Upload a file or directory to the Hopsworks filesystem.
@@ -170,7 +185,7 @@ def upload(
local_path: local path to file or directory to upload, can be relative or absolute
upload_path: path to directory where to upload the file in Hopsworks Filesystem
overwrite: overwrite file or directory if exists
chunk_size: upload chunk size in bytes. Default 1048576 bytes
chunk_size: upload chunk size in bytes. Default 10 MB
simultaneous_chunks: number of simultaneous chunks to upload for each file upload. Default 3
simultaneous_uploads: number of simultaneous files to be uploaded for directories. Default 3
max_chunk_retries: maximum retry for a chunk. Default is 1
@@ -399,6 +414,16 @@ def _get(self, path: str):
headers = {"content-type": "application/json"}
return _client._send_request("GET", path_params, headers=headers)

def get(self, path: str):
"""Get dataset.
:param path: path to check
:type path: str
:return: dataset metadata
:rtype: dict
"""
return self._get(path)

def exists(self, path: str):
"""Check if a file exists in the Hopsworks Filesystem.
@@ -415,6 +440,16 @@ def exists(self, path: str):
except RestAPIError:
return False

def path_exists(self, remote_path: str):
"""Check if a path exists in datasets.
:param remote_path: path to check
:type remote_path: str
:return: boolean whether path exists
:rtype: bool
"""
return self.exists(remote_path)

@usage.method_logger
def remove(self, path: str):
"""Remove a path in the Hopsworks Filesystem.
@@ -426,7 +461,17 @@ def remove(self, path: str):
"""
_client = client.get_instance()
path_params = ["project", _client._project_id, "dataset", path]
_client._send_request("DELETE", path_params)
return _client._send_request("DELETE", path_params)

def rm(self, remote_path: str):
"""Remove a path in the Hopsworks Filesystem.
# Arguments
remote_path: path to remove
# Raises
`RestAPIError`: If unable to remove the path
"""
return self.remove(remote_path)

@usage.method_logger
def mkdir(self, path: str):
@@ -601,6 +646,27 @@ def list_files(self, path, offset, limit):

return inode_lst["count"], inode.Inode.from_response_json(inode_lst)

@usage.method_logger
def list(self, remote_path, sort_by=None, limit=1000):
"""List all files in a directory in datasets.
:param remote_path: path to list
:type remote_path: str
:param sort_by: sort string
:type sort_by: str
:param limit: max number of returned files
:type limit: int
"""
# this method is probably to be merged with list_files
# they seem to handle paths differently and return different results, which prevents the merge at the moment (2024-09-03), due to the requirement of backwards-compatibility
_client = client.get_instance()
path_params = ["project", _client._project_id, "dataset", remote_path]
query_params = {"action": "listing", "sort_by": sort_by, "limit": limit}
headers = {"content-type": "application/json"}
return _client._send_request(
"GET", path_params, headers=headers, query_params=query_params
)

@usage.method_logger
def read_content(self, path: str, dataset_type: str = "DATASET"):
_client = client.get_instance()
@@ -619,3 +685,223 @@ def read_content(self, path: str, dataset_type: str = "DATASET"):
}

return _client._send_request("GET", path_params, query_params, stream=True)

def chmod(self, remote_path, permissions):
"""Chmod operation on file or directory in datasets.
:param remote_path: path to chmod
:type remote_path: str
:param permissions: permissions string, for example u+x
:type permissions: str
"""
_client = client.get_instance()
path_params = ["project", _client._project_id, "dataset", remote_path]
headers = {"content-type": "application/json"}
query_params = {"action": "PERMISSION", "permissions": permissions}
return _client._send_request(
"PUT", path_params, headers=headers, query_params=query_params
)

# region Archiving

def _archive(
self,
remote_path: str,
destination_path: Optional[str] = None,
block: bool = False,
timeout: Optional[int] = 120,
action: Union[Literal["unzip"], Literal["zip"]] = "unzip",
):
"""Internal (de)compression logic.
# Arguments
remote_path: path to file or directory to unzip.
destination_path: path to upload the zip, defaults to None; is used only if action is zip.
block: if the operation should be blocking until complete, defaults to False.
timeout: timeout in seconds for the blocking, defaults to 120; if None, the blocking is unbounded.
action: zip or unzip, defaults to unzip.
# Returns
`bool`: whether the operation completed in the specified timeout; if non-blocking, always returns True.
"""

_client = client.get_instance()
path_params = ["project", _client._project_id, "dataset", remote_path]

query_params = {"action": action}

if destination_path is not None:
query_params["destination_path"] = destination_path
query_params["destination_type"] = "DATASET"

headers = {"content-type": "application/json"}

_client._send_request(
"POST", path_params, headers=headers, query_params=query_params
)

if not block:
# the call is successful at this point if we don't want to block
return True

# Wait for zip file to appear. When it does, check that parent dir zipState is not set to CHOWNING
count = 0
while timeout is None:
if action == "zip":
zip_path = remote_path + ".zip"
# Get the status of the zipped file
if destination_path is None:
zip_exists = self.path_exists(zip_path)
else:
zip_exists = self.path_exists(
destination_path + "/" + os.path.split(zip_path)[1]
)
# Get the zipState of the directory being zipped
dir_status = self.get(remote_path)
zip_state = dir_status["zipState"] if "zipState" in dir_status else None
if zip_exists and zip_state == "NONE":
return True
elif action == "unzip":
# Get the status of the unzipped dir
unzipped_dir_exists = self.path_exists(
remote_path[: remote_path.index(".")]
)
# Get the zipState of the zip being extracted
dir_status = self.get(remote_path)
zip_state = dir_status["zipState"] if "zipState" in dir_status else None
if unzipped_dir_exists and zip_state == "NONE":
return True
time.sleep(1)
count += 1
if count >= timeout:
self._log.info(
f"Timeout of {timeout} seconds exceeded while {action} {remote_path}."
)
return False

def unzip(
self, remote_path: str, block: bool = False, timeout: Optional[int] = 120
):
"""Unzip an archive in the dataset.
# Arguments
remote_path: path to file or directory to unzip.
block: if the operation should be blocking until complete, defaults to False.
timeout: timeout in seconds for the blocking, defaults to 120; if None, the blocking is unbounded.
# Returns
`bool`: whether the operation completed in the specified timeout; if non-blocking, always returns True.
"""
return self._archive(remote_path, block=block, timeout=timeout, action="unzip")

def zip(
self,
remote_path: str,
destination_path: Optional[str] = None,
block: bool = False,
timeout: Optional[int] = 120,
):
"""Zip a file or directory in the dataset.
# Arguments
remote_path: path to file or directory to unzip.
destination_path: path to upload the zip, defaults to None.
block: if the operation should be blocking until complete, defaults to False.
timeout: timeout in seconds for the blocking, defaults to 120; if None, the blocking is unbounded.
# Returns
`bool`: whether the operation completed in the specified timeout; if non-blocking, always returns True.
"""
return self._archive(
remote_path,
destination_path=destination_path,
block=block,
timeout=timeout,
action="zip",
)

# region Dataset Tags

def add(self, path, name, value):
"""Attach a name/value tag to a model.
A tag consists of a name/value pair. Tag names are unique identifiers.
The value of a tag can be any valid json - primitives, arrays or json objects.
:param path: path to add the tag
:type path: str
:param name: name of the tag to be added
:type name: str
:param value: value of the tag to be added
:type value: str
"""
_client = client.get_instance()
path_params = [
"project",
_client._project_id,
"dataset",
"tags",
"schema",
name,
path,
]
headers = {"content-type": "application/json"}
json_value = json.dumps(value)
_client._send_request("PUT", path_params, headers=headers, data=json_value)

def delete(self, path, name):
"""Delete a tag.
Tag names are unique identifiers.
:param path: path to delete the tags
:type path: str
:param name: name of the tag to be removed
:type name: str
"""
_client = client.get_instance()
path_params = [
"project",
_client._project_id,
"dataset",
"tags",
"schema",
name,
path,
]
_client._send_request("DELETE", path_params)

def get_tags(self, path, name: str = None):
"""Get the tags.
Gets all tags if no tag name is specified.
:param path: path to get the tags
:type path: str
:param name: tag name
:type name: str
:return: dict of tag name/values
:rtype: dict
"""
_client = client.get_instance()
path_params = [
"project",
_client._project_id,
"dataset",
"tags",
]

if name is not None:
path_params.append("schema")
path_params.append(name)
else:
path_params.append("all")

path_params.append(path)

return {
tag._name: json.loads(tag._value)
for tag in tag.Tag.from_response_json(
_client._send_request("GET", path_params)
)
}
Loading

0 comments on commit 656aa01

Please sign in to comment.