Skip to content

Commit

Permalink
[HWORKS-1037][Append] Add HdfsApi with pyarrow-hdfs client
Browse files Browse the repository at this point in the history
  • Loading branch information
javierdlrm committed Oct 22, 2024
1 parent 3ef94f8 commit a999fac
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 18 deletions.
93 changes: 93 additions & 0 deletions python/hsml/core/hdfs_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#
# Copyright 2024 Hopsworks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import annotations

import os


class HdfsApi:
def __init__(self):

import fsspec.implementations.arrow as pfs

host, port = os.environ["LIBHDFS_DEFAULT_FS"].split(":")

self._hopsfs = pfs.HadoopFileSystem(
host=host,
port=int(port),
user=os.environ["LIBHDFS_DEFAULT_USER"],
)

DEFAULT_BUFFER_SIZE = 0

def upload(
self,
local_path: str,
upload_path: str,
overwrite: bool = False,
buffer_size: int = DEFAULT_BUFFER_SIZE,
):
"""Upload file/directory to the Hopsworks filesystem.
:param local_path: local path to file to upload
:type local_path: str
:param upload_path: path to directory where to upload the file in Hopsworks filesystem
:type upload_path: str
:param overwrite: overwrite file if exists
:type overwrite: bool
:param buffer_size: size of the temporary read and write buffer. Defaults to 0.
:type buffer_size: int
"""
# local path could be absolute or relative,
if not os.path.isabs(local_path) and os.path.exists(
os.path.join(os.getcwd(), local_path)
):
local_path = os.path.join(os.getcwd(), local_path)

_, file_name = os.path.split(local_path)

destination_path = upload_path + "/" + file_name

if self._hopsfs.exists(destination_path):
if overwrite:
self._hopsfs.rm(destination_path, recursive=True)
else:
raise Exception(
"{} already exists, set overwrite=True to overwrite it".format(
local_path
)
)

self._hopsfs.upload(
lpath=local_path,
rpath=destination_path,
recursive=True,
buffer_size=buffer_size,
)

return upload_path + "/" + os.path.basename(local_path)

def download(self, path, local_path, buffer_size=DEFAULT_BUFFER_SIZE):
"""Download file/directory on a path in datasets.
:param path: path to download
:type path: str
:param local_path: path to download in datasets
:type local_path: str
:param buffer_size: size of the temporary read and write buffer. Defaults to 0.
:type buffer_size: int
"""

self._hopsfs.download(path, local_path, recursive=True, buffer_size=buffer_size)
70 changes: 52 additions & 18 deletions python/hsml/engine/local_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@
import os

from hsml import client
from hsml.core import dataset_api, model_api
from hsml.core import dataset_api, model_api, hdfs_api


class LocalEngine:
def __init__(self):
self._dataset_api = dataset_api.DatasetApi()
self._model_api = model_api.ModelApi()

try:
self._hdfs_api = hdfs_api.HdfsApi()
except Exception as e:
self._hdfs_api = None

def mkdir(self, remote_path: str):
remote_path = self._prepend_project_path(remote_path)
self._dataset_api.mkdir(remote_path)
Expand All @@ -38,26 +43,55 @@ def upload(self, local_path: str, remote_path: str, upload_configuration=None):

# Initialize the upload configuration to empty dictionary if is None
upload_configuration = upload_configuration if upload_configuration else {}
self._dataset_api.upload(
local_path,
remote_path,
chunk_size=upload_configuration.get(
"chunk_size", self._dataset_api.DEFAULT_UPLOAD_FLOW_CHUNK_SIZE
),
simultaneous_uploads=upload_configuration.get(
"simultaneous_uploads",
self._dataset_api.DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS,
),
max_chunk_retries=upload_configuration.get(
"max_chunk_retries",
self._dataset_api.DEFAULT_UPLOAD_MAX_CHUNK_RETRIES,
),
)

def download(self, remote_path: str, local_path: str):
if self._hdfs_api is not None:
# use the hdfs client if available
self._hdfs_api.upload(
local_path=local_path,
upload_path=remote_path,
buffer_size=upload_configuration.get(
"buffer_size", self._hdfs_api.DEFAULT_BUFFER_SIZE
),
)
else:
# otherwise, use the REST API
self._dataset_api.upload(
local_path,
remote_path,
chunk_size=upload_configuration.get(
"chunk_size", self._dataset_api.DEFAULT_UPLOAD_FLOW_CHUNK_SIZE
),
simultaneous_uploads=upload_configuration.get(
"simultaneous_uploads",
self._dataset_api.DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS,
),
max_chunk_retries=upload_configuration.get(
"max_chunk_retries",
self._dataset_api.DEFAULT_UPLOAD_MAX_CHUNK_RETRIES,
),
)

def download(self, remote_path: str, local_path: str, download_configuration=None):
local_path = self._get_abs_path(local_path)
remote_path = self._prepend_project_path(remote_path)
self._dataset_api.download(remote_path, local_path)

# Initialize the download configuration to empty dictionary if is None
download_configuration = (
download_configuration if download_configuration else {}
)

if self._hdfs_api is not None:
# use the hdfs client if available
self._hdfs_api.download(
path=remote_path,
local_path=local_path,
buffer_size=download_configuration.get(
"buffer_size", self._hdfs_api.DEFAULT_BUFFER_SIZE
),
)
else:
# otherwise, use the REST API
self._dataset_api.download(remote_path, local_path)

def copy(self, source_path, destination_path):
source_path = self._prepend_project_path(source_path)
Expand Down

0 comments on commit a999fac

Please sign in to comment.