Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1522] Deduplicate most of hsml #368

Merged
merged 23 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
312 changes: 299 additions & 13 deletions python/hopsworks_common/core/dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
from __future__ import annotations

import copy
import json
import logging
import math
import os
import shutil
import time
from concurrent.futures import ThreadPoolExecutor, wait
from typing import Literal, Optional, Union

from hopsworks_common import client, usage, util
from hopsworks_common import client, tag, usage, util
from hopsworks_common.client.exceptions import DatasetException, RestAPIError
from hopsworks_common.core import inode
from tqdm.auto import tqdm
Expand All @@ -42,11 +44,25 @@ class DatasetApi:
def __init__(self):
self._log = logging.getLogger(__name__)

DEFAULT_FLOW_CHUNK_SIZE = 1048576
DEFAULT_UPLOAD_FLOW_CHUNK_SIZE = 10 * 1024 * 1024
DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS = 3
DEFAULT_UPLOAD_SIMULTANEOUS_CHUNKS = 3
DEFAULT_UPLOAD_MAX_CHUNK_RETRIES = 1

DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE = 1024 * 1024
FLOW_PERMANENT_ERRORS = [404, 413, 415, 500, 501]

# alias for backwards-compatibility:
DEFAULT_FLOW_CHUNK_SIZE = DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE

@usage.method_logger
def download(self, path: str, local_path: str = None, overwrite: bool = False):
def download(
self,
path: str,
local_path: Optional[str] = None,
overwrite: Optional[bool] = False,
chunk_size: int = DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE,
):
"""Download file from Hopsworks Filesystem to the current working directory.

```python
Expand All @@ -64,6 +80,7 @@ def download(self, path: str, local_path: str = None, overwrite: bool = False):
path: path in Hopsworks filesystem to the file
local_path: path where to download the file in the local filesystem
overwrite: overwrite local file if exists
chunk_size: upload chunk size in bytes. Default 1 MB
# Returns
`str`: Path to downloaded file
# Raises
Expand Down Expand Up @@ -122,9 +139,7 @@ def download(self, path: str, local_path: str = None, overwrite: bool = False):
self._log.exception("Failed to initialize progress bar.")
self._log.info("Starting download")

for chunk in response.iter_content(
chunk_size=self.DEFAULT_FLOW_CHUNK_SIZE
):
for chunk in response.iter_content(chunk_size=chunk_size):
f.write(chunk)

if pbar is not None:
Expand All @@ -143,11 +158,11 @@ def upload(
local_path: str,
upload_path: str,
overwrite: bool = False,
chunk_size=DEFAULT_FLOW_CHUNK_SIZE,
simultaneous_uploads=3,
simultaneous_chunks=3,
max_chunk_retries=1,
chunk_retry_interval=1,
chunk_size: int = DEFAULT_UPLOAD_FLOW_CHUNK_SIZE,
simultaneous_uploads: int = DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS,
simultaneous_chunks: int = DEFAULT_UPLOAD_SIMULTANEOUS_CHUNKS,
max_chunk_retries: int = DEFAULT_UPLOAD_MAX_CHUNK_RETRIES,
chunk_retry_interval: int = 1,
):
"""Upload a file or directory to the Hopsworks filesystem.

Expand All @@ -170,7 +185,7 @@ def upload(
local_path: local path to file or directory to upload, can be relative or absolute
upload_path: path to directory where to upload the file in Hopsworks Filesystem
overwrite: overwrite file or directory if exists
chunk_size: upload chunk size in bytes. Default 1048576 bytes
chunk_size: upload chunk size in bytes. Default 10 MB
simultaneous_chunks: number of simultaneous chunks to upload for each file upload. Default 3
simultaneous_uploads: number of simultaneous files to be uploaded for directories. Default 3
max_chunk_retries: maximum retry for a chunk. Default is 1
Expand Down Expand Up @@ -375,6 +390,16 @@ def _get(self, path: str):
headers = {"content-type": "application/json"}
return _client._send_request("GET", path_params, headers=headers)

def get(self, path: str):
"""Get dataset.

:param path: path to check
:type path: str
:return: dataset metadata
:rtype: dict
"""
return self._get(path)

def exists(self, path: str):
"""Check if a file exists in the Hopsworks Filesystem.

Expand All @@ -391,6 +416,16 @@ def exists(self, path: str):
except RestAPIError:
return False

def path_exists(self, remote_path: str):
"""Check if a path exists in datasets.

:param remote_path: path to check
:type remote_path: str
:return: boolean whether path exists
:rtype: bool
"""
return self.exists(remote_path)

@usage.method_logger
def remove(self, path: str):
"""Remove a path in the Hopsworks Filesystem.
Expand All @@ -402,7 +437,17 @@ def remove(self, path: str):
"""
_client = client.get_instance()
path_params = ["project", _client._project_id, "dataset", path]
_client._send_request("DELETE", path_params)
return _client._send_request("DELETE", path_params)

def rm(self, remote_path: str):
"""Remove a path in the Hopsworks Filesystem.

# Arguments
remote_path: path to remove
# Raises
`RestAPIError`: If unable to remove the path
"""
return self.remove(remote_path)

@usage.method_logger
def mkdir(self, path: str):
Expand Down Expand Up @@ -573,6 +618,27 @@ def list_files(self, path, offset, limit):

return inode_lst["count"], inode.Inode.from_response_json(inode_lst)

@usage.method_logger
def list(self, remote_path, sort_by=None, limit=1000):
"""List all files in a directory in datasets.

:param remote_path: path to list
:type remote_path: str
:param sort_by: sort string
:type sort_by: str
:param limit: max number of returned files
:type limit: int
"""
# this method is probably to be merged with list_files
# they seem to handle paths differently and return different results, which prevents the merge at the moment (2024-09-03), due to the requirement of backwards-compatibility
_client = client.get_instance()
path_params = ["project", _client._project_id, "dataset", remote_path]
query_params = {"action": "listing", "sort_by": sort_by, "limit": limit}
headers = {"content-type": "application/json"}
return _client._send_request(
"GET", path_params, headers=headers, query_params=query_params
)

@usage.method_logger
def read_content(self, path: str, dataset_type: str = "DATASET"):
_client = client.get_instance()
Expand All @@ -591,3 +657,223 @@ def read_content(self, path: str, dataset_type: str = "DATASET"):
}

return _client._send_request("GET", path_params, query_params, stream=True)

def chmod(self, remote_path, permissions):
"""Chmod operation on file or directory in datasets.

:param remote_path: path to chmod
:type remote_path: str
:param permissions: permissions string, for example u+x
:type permissions: str
"""
_client = client.get_instance()
path_params = ["project", _client._project_id, "dataset", remote_path]
headers = {"content-type": "application/json"}
query_params = {"action": "PERMISSION", "permissions": permissions}
return _client._send_request(
"PUT", path_params, headers=headers, query_params=query_params
)

# region Archiving

def _archive(
self,
remote_path: str,
destination_path: Optional[str] = None,
block: bool = False,
timeout: Optional[int] = 120,
action: Union[Literal["unzip"], Literal["zip"]] = "unzip",
):
"""Internal (de)compression logic.

# Arguments
remote_path: path to file or directory to unzip.
destination_path: path to upload the zip, defaults to None; is used only if action is zip.
block: if the operation should be blocking until complete, defaults to False.
timeout: timeout in seconds for the blocking, defaults to 120; if None, the blocking is unbounded.
action: zip or unzip, defaults to unzip.

# Returns
`bool`: whether the operation completed in the specified timeout; if non-blocking, always returns True.
"""

_client = client.get_instance()
path_params = ["project", _client._project_id, "dataset", remote_path]

query_params = {"action": action}

if destination_path is not None:
query_params["destination_path"] = destination_path
query_params["destination_type"] = "DATASET"

headers = {"content-type": "application/json"}

_client._send_request(
"POST", path_params, headers=headers, query_params=query_params
)

if not block:
# the call is successful at this point if we don't want to block
return True

# Wait for zip file to appear. When it does, check that parent dir zipState is not set to CHOWNING
count = 0
while timeout is None:
if action == "zip":
zip_path = remote_path + ".zip"
# Get the status of the zipped file
if destination_path is None:
zip_exists = self.path_exists(zip_path)
else:
zip_exists = self.path_exists(
destination_path + "/" + os.path.split(zip_path)[1]
)
# Get the zipState of the directory being zipped
dir_status = self.get(remote_path)
zip_state = dir_status["zipState"] if "zipState" in dir_status else None
if zip_exists and zip_state == "NONE":
return True
elif action == "unzip":
# Get the status of the unzipped dir
unzipped_dir_exists = self.path_exists(
remote_path[: remote_path.index(".")]
)
# Get the zipState of the zip being extracted
dir_status = self.get(remote_path)
zip_state = dir_status["zipState"] if "zipState" in dir_status else None
if unzipped_dir_exists and zip_state == "NONE":
return True
time.sleep(1)
count += 1
if count >= timeout:
self._log.info(
f"Timeout of {timeout} seconds exceeded while {action} {remote_path}."
)
return False

def unzip(
self, remote_path: str, block: bool = False, timeout: Optional[int] = 120
):
"""Unzip an archive in the dataset.

# Arguments
remote_path: path to file or directory to unzip.
block: if the operation should be blocking until complete, defaults to False.
timeout: timeout in seconds for the blocking, defaults to 120; if None, the blocking is unbounded.

# Returns
`bool`: whether the operation completed in the specified timeout; if non-blocking, always returns True.
"""
return self._archive(remote_path, block=block, timeout=timeout, action="unzip")

def zip(
self,
remote_path: str,
destination_path: Optional[str] = None,
block: bool = False,
timeout: Optional[int] = 120,
):
"""Zip a file or directory in the dataset.

# Arguments
remote_path: path to file or directory to unzip.
destination_path: path to upload the zip, defaults to None.
block: if the operation should be blocking until complete, defaults to False.
timeout: timeout in seconds for the blocking, defaults to 120; if None, the blocking is unbounded.

# Returns
`bool`: whether the operation completed in the specified timeout; if non-blocking, always returns True.
"""
return self._archive(
remote_path,
destination_path=destination_path,
block=block,
timeout=timeout,
action="zip",
)

# region Dataset Tags

def add(self, path, name, value):
"""Attach a name/value tag to a model.

A tag consists of a name/value pair. Tag names are unique identifiers.
The value of a tag can be any valid json - primitives, arrays or json objects.

:param path: path to add the tag
:type path: str
:param name: name of the tag to be added
:type name: str
:param value: value of the tag to be added
:type value: str
"""
_client = client.get_instance()
path_params = [
"project",
_client._project_id,
"dataset",
"tags",
"schema",
name,
path,
]
headers = {"content-type": "application/json"}
json_value = json.dumps(value)
_client._send_request("PUT", path_params, headers=headers, data=json_value)

def delete(self, path, name):
"""Delete a tag.

Tag names are unique identifiers.

:param path: path to delete the tags
:type path: str
:param name: name of the tag to be removed
:type name: str
"""
_client = client.get_instance()
path_params = [
"project",
_client._project_id,
"dataset",
"tags",
"schema",
name,
path,
]
_client._send_request("DELETE", path_params)

def get_tags(self, path, name: str = None):
"""Get the tags.

Gets all tags if no tag name is specified.

:param path: path to get the tags
:type path: str
:param name: tag name
:type name: str
:return: dict of tag name/values
:rtype: dict
"""
_client = client.get_instance()
path_params = [
"project",
_client._project_id,
"dataset",
"tags",
]

if name is not None:
path_params.append("schema")
path_params.append(name)
else:
path_params.append("all")

path_params.append(path)

return {
tag._name: json.loads(tag._value)
for tag in tag.Tag.from_response_json(
_client._send_request("GET", path_params)
)
}
Loading
Loading