diff --git a/python/hsml/core/hdfs_api.py b/python/hsml/core/hdfs_api.py new file mode 100644 index 000000000..d786bce37 --- /dev/null +++ b/python/hsml/core/hdfs_api.py @@ -0,0 +1,93 @@ +# +# Copyright 2024 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import annotations + +import os + + +class HdfsApi: + def __init__(self): + + import fsspec.implementations.arrow as pfs + + host, port = os.environ["LIBHDFS_DEFAULT_FS"].split(":") + + self._hopsfs = pfs.HadoopFileSystem( + host=host, + port=int(port), + user=os.environ["LIBHDFS_DEFAULT_USER"], + ) + + DEFAULT_BUFFER_SIZE = 0 + + def upload( + self, + local_path: str, + upload_path: str, + overwrite: bool = False, + buffer_size: int = DEFAULT_BUFFER_SIZE, + ): + """Upload file/directory to the Hopsworks filesystem. + :param local_path: local path to file to upload + :type local_path: str + :param upload_path: path to directory where to upload the file in Hopsworks filesystem + :type upload_path: str + :param overwrite: overwrite file if exists + :type overwrite: bool + :param buffer_size: size of the temporary read and write buffer. Defaults to 0. + :type buffer_size: int + """ + # local path could be absolute or relative, + if not os.path.isabs(local_path) and os.path.exists( + os.path.join(os.getcwd(), local_path) + ): + local_path = os.path.join(os.getcwd(), local_path) + + _, file_name = os.path.split(local_path) + + destination_path = upload_path + "/" + file_name + + if self._hopsfs.exists(destination_path): + if overwrite: + self._hopsfs.rm(destination_path, recursive=True) + else: + raise Exception( + "{} already exists, set overwrite=True to overwrite it".format( + local_path + ) + ) + + self._hopsfs.upload( + lpath=local_path, + rpath=destination_path, + recursive=True, + buffer_size=buffer_size, + ) + + return upload_path + "/" + os.path.basename(local_path) + + def download(self, path, local_path, buffer_size=DEFAULT_BUFFER_SIZE): + """Download file/directory on a path in datasets. + :param path: path to download + :type path: str + :param local_path: path to download in datasets + :type local_path: str + :param buffer_size: size of the temporary read and write buffer. Defaults to 0. + :type buffer_size: int + """ + + self._hopsfs.download(path, local_path, recursive=True, buffer_size=buffer_size) diff --git a/python/hsml/engine/local_engine.py b/python/hsml/engine/local_engine.py index 7b669a249..d64d3379e 100644 --- a/python/hsml/engine/local_engine.py +++ b/python/hsml/engine/local_engine.py @@ -17,7 +17,7 @@ import os from hsml import client -from hsml.core import dataset_api, model_api +from hsml.core import dataset_api, model_api, hdfs_api class LocalEngine: @@ -25,6 +25,11 @@ def __init__(self): self._dataset_api = dataset_api.DatasetApi() self._model_api = model_api.ModelApi() + try: + self._hdfs_api = hdfs_api.HdfsApi() + except Exception as e: + self._hdfs_api = None + def mkdir(self, remote_path: str): remote_path = self._prepend_project_path(remote_path) self._dataset_api.mkdir(remote_path) @@ -38,26 +43,55 @@ def upload(self, local_path: str, remote_path: str, upload_configuration=None): # Initialize the upload configuration to empty dictionary if is None upload_configuration = upload_configuration if upload_configuration else {} - self._dataset_api.upload( - local_path, - remote_path, - chunk_size=upload_configuration.get( - "chunk_size", self._dataset_api.DEFAULT_UPLOAD_FLOW_CHUNK_SIZE - ), - simultaneous_uploads=upload_configuration.get( - "simultaneous_uploads", - self._dataset_api.DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS, - ), - max_chunk_retries=upload_configuration.get( - "max_chunk_retries", - self._dataset_api.DEFAULT_UPLOAD_MAX_CHUNK_RETRIES, - ), - ) - def download(self, remote_path: str, local_path: str): + if self._hdfs_api is not None: + # use the hdfs client if available + self._hdfs_api.upload( + local_path=local_path, + upload_path=remote_path, + buffer_size=upload_configuration.get( + "buffer_size", self._hdfs_api.DEFAULT_BUFFER_SIZE + ), + ) + else: + # otherwise, use the REST API + self._dataset_api.upload( + local_path, + remote_path, + chunk_size=upload_configuration.get( + "chunk_size", self._dataset_api.DEFAULT_UPLOAD_FLOW_CHUNK_SIZE + ), + simultaneous_uploads=upload_configuration.get( + "simultaneous_uploads", + self._dataset_api.DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS, + ), + max_chunk_retries=upload_configuration.get( + "max_chunk_retries", + self._dataset_api.DEFAULT_UPLOAD_MAX_CHUNK_RETRIES, + ), + ) + + def download(self, remote_path: str, local_path: str, download_configuration=None): local_path = self._get_abs_path(local_path) remote_path = self._prepend_project_path(remote_path) - self._dataset_api.download(remote_path, local_path) + + # Initialize the download configuration to empty dictionary if is None + download_configuration = ( + download_configuration if download_configuration else {} + ) + + if self._hdfs_api is not None: + # use the hdfs client if available + self._hdfs_api.download( + path=remote_path, + local_path=local_path, + buffer_size=download_configuration.get( + "buffer_size", self._hdfs_api.DEFAULT_BUFFER_SIZE + ), + ) + else: + # otherwise, use the REST API + self._dataset_api.download(remote_path, local_path) def copy(self, source_path, destination_path): source_path = self._prepend_project_path(source_path)