diff --git a/alluxiofs/client/const.py b/alluxiofs/client/const.py index 79d169a..9de85e0 100644 --- a/alluxiofs/client/const.py +++ b/alluxiofs/client/const.py @@ -29,13 +29,6 @@ "/page/{page_index}?offset={page_offset}&length={page_length}&ufsFullPath={file_path}" ) WRITE_PAGE_URL_FORMAT = "http://{worker_host}:{http_port}/v1/file/{path_id}/page/{page_index}?ufsFullPath={file_path}" -MKDIR_URL_FORMAT = "http://{worker_host}:{http_port}/v1/mkdir/{path_id}?ufsFullPath={file_path}" -TOUCH_URL_FORMAT = "http://{worker_host}:{http_port}/v1/touch/{path_id}?ufsFullPath={file_path}" -MV_URL_FORMAT = "http://{worker_host}:{http_port}/v1/mv/{path_id}?srcPath={srcPath}&dstPath={dstPath}" -RM_URL_FORMAT = "http://{worker_host}:{http_port}/v1/rm/{path_id}?ufsFullPath={file_path}" -CP_URL_FORMAT = "http://{worker_host}:{http_port}/v1/copy/{path_id}?srcPath={srcPath}&dstPath={dstPath}" -TAIL_URL_FORMAT = "http://{worker_host}:{http_port}/v1/tail/{path_id}?ufsFullPath={file_path}" -HEAD_URL_FORMAT = "http://{worker_host}:{http_port}/v1/head/{path_id}?ufsFullPath={file_path}" PAGE_PATH_URL_FORMAT = "/v1/file/{path_id}/page/{page_index}" GET_FILE_STATUS_URL_FORMAT = "http://{worker_host}:{http_port}/v1/info" LOAD_URL_FORMAT = "http://{worker_host}:{http_port}/v1/load" diff --git a/alluxiofs/client/core.py b/alluxiofs/client/core.py index a3b7bac..3544eee 100644 --- a/alluxiofs/client/core.py +++ b/alluxiofs/client/core.py @@ -33,8 +33,7 @@ ) from .config import AlluxioClientConfig -from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE, MKDIR_URL_FORMAT, TOUCH_URL_FORMAT, TAIL_URL_FORMAT, \ - HEAD_URL_FORMAT, MV_URL_FORMAT, RM_URL_FORMAT, CP_URL_FORMAT +from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE from .const import ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE from .const import ALLUXIO_COMMON_EXTENSION_ENABLE from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE @@ -521,39 +520,6 @@ def read_range(self, file_path, offset, length): f"worker_host:{worker_host}, worker_http_port:{worker_http_port}" ) from e - - def write(self, file_path, file_bytes): - """ - Write a byte[] content to the file. - - Args: - file_path (str): The full ufs file path to read data from - file_bytes (str): The full ufs file content - - Returns: - True if the write was successful, False otherwise. - """ - self._validate_path(file_path) - worker_host, worker_http_port = self._get_preferred_worker_address( - file_path - ) - path_id = self._get_path_hash(file_path) - try: - if self.data_manager: - return b"".join( - self._all_page_generator_alluxiocommon( - worker_host, worker_http_port, path_id, file_path - ) - ) - else: - return self._all_page_generator_write( - worker_host, worker_http_port, path_id, file_path, file_bytes - ) - except Exception as e: - raise Exception( - f"Error when reading file {file_path}: error {e}" - ) from e - def write_page(self, file_path, page_index, page_bytes): """ Writes a page. @@ -590,234 +556,6 @@ def write_page(self, file_path, page_index, page_bytes): f"Error writing to file {file_path} at page {page_index}: {e}" ) - def mkdir(self, file_path): - """ - make a directory which path is 'file_path'. - - Args: - file_path: The path of the directory to make. - - Returns: - True if the mkdir was successful, False otherwise. - """ - self._validate_path(file_path) - worker_host, worker_http_port = self._get_preferred_worker_address( - file_path - ) - path_id = self._get_path_hash(file_path) - try: - response = requests.post( - MKDIR_URL_FORMAT.format( - worker_host=worker_host, - http_port=worker_http_port, - path_id=path_id, - file_path=file_path, - ) - ) - response.raise_for_status() - return 200 <= response.status_code < 300 - except requests.RequestException as e: - raise Exception( - f"Error making a directory of {file_path}: {e}" - ) - - def touch(self, file_path): - """ - create a file which path is 'file_path'. - - Args: - file_path: The path of the file to touch. - - Returns: - True if the touch was successful, False otherwise. - """ - self._validate_path(file_path) - worker_host, worker_http_port = self._get_preferred_worker_address( - file_path - ) - path_id = self._get_path_hash(file_path) - try: - response = requests.post( - TOUCH_URL_FORMAT.format( - worker_host=worker_host, - http_port=worker_http_port, - path_id=path_id, - file_path=file_path, - ) - ) - response.raise_for_status() - return 200 <= response.status_code < 300 - except requests.RequestException as e: - raise Exception( - f"Error create a file of {file_path}: {e}" - ) - - # TODO(littelEast7): complete it - def mv(self, path1, path2): - """ - mv a file from path1 to path2. - - Args: - path1: The path of the file original. - path2: The path of the file destination. - - Returns: - True if the mv was successful, False otherwise. - """ - self._validate_path(path1) - self._validate_path(path2) - worker_host, worker_http_port = self._get_preferred_worker_address( - path1 - ) - path_id = self._get_path_hash(path1) - try: - response = requests.post( - MV_URL_FORMAT.format( - worker_host=worker_host, - http_port=worker_http_port, - path_id=path_id, - srcPath=path1, - dstPath=path2, - ) - ) - response.raise_for_status() - return 200 <= response.status_code < 300 - except requests.RequestException as e: - raise Exception( - f"Error move a file from {path1} to {path2}: {e}" - ) - - def rm(self, path, option): - """ - remove a file which path is 'path'. - - Args: - path: The path of the file. - option: The option to remove. - - Returns: - True if the rm was successful, False otherwise. - """ - self._validate_path(path) - worker_host, worker_http_port = self._get_preferred_worker_address( - path - ) - path_id = self._get_path_hash(path) - parameters = option.__dict__ - try: - response = requests.post( - RM_URL_FORMAT.format( - worker_host=worker_host, - http_port=worker_http_port, - path_id=path_id, - file_path=path - ), params=parameters - ) - response.raise_for_status() - return 200 <= response.status_code < 300 - except requests.RequestException as e: - raise Exception( - f"Error remove a file {path}: {e}" - ) - - def cp(self, path1, path2, option): - """ - copy a file which path is 'path1' to 'path2'. - - Args: - path1: The path of the file original. - path2: The path of the file destination. - option: The option to remove. - - Returns: - True if the cp was successful, False otherwise. - """ - self._validate_path(path1) - worker_host, worker_http_port = self._get_preferred_worker_address( - path1 - ) - path_id = self._get_path_hash(path1) - parameters = option.__dict__ - try: - response = requests.post( - CP_URL_FORMAT.format( - worker_host=worker_host, - http_port=worker_http_port, - path_id=path_id, - srcPath=path1, - dstPath=path2, - ), params=parameters - ) - response.raise_for_status() - return 200 <= response.status_code < 300 - except requests.RequestException as e: - raise Exception( - f"Error copy a file from {path1} to {path2}: {e}" - ) - - def tail(self, file_path, numOfBytes=None): - """ - show the tail a file which path is 'file_path'. - - Args: - path1: The ufs path of the file. - path2: The length of the file to show (like 1kb). - - Returns: - The content of tail of the file. - """ - self._validate_path(file_path) - worker_host, worker_http_port = self._get_preferred_worker_address( - file_path - ) - path_id = self._get_path_hash(file_path) - try: - response = requests.get( - TAIL_URL_FORMAT.format( - worker_host=worker_host, - http_port=worker_http_port, - path_id=path_id, - file_path=file_path, - ), params={'numBytes': numOfBytes} - ) - return b''.join(response.iter_content()) - except requests.RequestException as e: - raise Exception( - f"Error show the tail of {file_path}: {e}" - ) - - def head(self, file_path, numOfBytes=None): - """ - show the head a file which path is 'file_path'. - - Args: - path1: The ufs path of the file. - path2: The length of the file to show (like 1kb). - - Returns: - The content of head of the file. - """ - self._validate_path(file_path) - worker_host, worker_http_port = self._get_preferred_worker_address( - file_path - ) - path_id = self._get_path_hash(file_path) - try: - response = requests.get( - HEAD_URL_FORMAT.format( - worker_host=worker_host, - http_port=worker_http_port, - path_id=path_id, - file_path=file_path, - ), params={'numBytes': numOfBytes} - ) - return b''.join(response.iter_content()) - except requests.RequestException as e: - raise Exception( - f"Error show the head of {file_path}: {e}" - ) - - def _all_page_generator_alluxiocommon( self, worker_host, worker_http_port, path_id, file_path ): @@ -880,66 +618,40 @@ def _all_page_generator( break page_index += 1 - def _all_page_generator_write(self, worker_host, worker_http_port, path_id, file_path, file_bytes): - page_index = 0 - page_size = self.config.page_size - offset = 0 - try: - while True: - end = min(offset + page_size, len(file_bytes)) - page_bytes = file_bytes[offset:end] - self._write_page(worker_host, - worker_http_port, - path_id, - file_path, - page_index, - page_bytes) - page_index += 1 - offset += page_size - if (end >= len(file_bytes)): - break - return True - except Exception as e: - # data_manager won't throw exception if there are any first few content retrieved - # hence we always propagte exception from data_manager upwards - raise Exception( - f"Error when writing all pages of {path_id}: error {e}" - ) from e - def _range_page_generator_alluxiocommon( - self, worker_host, worker_http_port, path_id, file_path, offset, length - ): - read_urls = [] - start = offset - while start < offset + length: - page_index = start // self.config.page_size - inpage_off = start % self.config.page_size - inpage_read_len = min( - self.config.page_size - inpage_off, offset + length - start + self, worker_host, worker_http_port, path_id, file_path, offset, length + ): + read_urls = [] + start = offset + while start < offset + length: + page_index = start // self.config.page_size + inpage_off = start % self.config.page_size + inpage_read_len = min( + self.config.page_size - inpage_off, offset + length - start + ) + page_url = None + if inpage_off == 0 and inpage_read_len == self.config.page_size: + page_url = FULL_PAGE_URL_FORMAT.format( + worker_host=worker_host, + http_port=worker_http_port, + path_id=path_id, + file_path=file_path, + page_index=page_index, ) - page_url = None - if inpage_off == 0 and inpage_read_len == self.config.page_size: - page_url = FULL_PAGE_URL_FORMAT.format( - worker_host=worker_host, - http_port=worker_http_port, - path_id=path_id, - file_path=file_path, - page_index=page_index, - ) - else: - page_url = PAGE_URL_FORMAT.format( - worker_host=worker_host, - http_port=worker_http_port, - path_id=path_id, - file_path=file_path, - page_index=page_index, - page_offset=inpage_off, - page_length=inpage_read_len, - ) - read_urls.append(page_url) - start += inpage_read_len - data = self.data_manager.make_multi_http_req(read_urls) - return data + else: + page_url = PAGE_URL_FORMAT.format( + worker_host=worker_host, + http_port=worker_http_port, + path_id=path_id, + file_path=file_path, + page_index=page_index, + page_offset=inpage_off, + page_length=inpage_read_len, + ) + read_urls.append(page_url) + start += inpage_read_len + data = self.data_manager.make_multi_http_req(read_urls) + return data def _range_page_generator( self, worker_host, worker_http_port, path_id, file_path, offset, length @@ -1127,43 +839,6 @@ def _read_page( f"Error when requesting file {path_id} page {page_index} from {worker_host}: error {e}" ) from e - def _write_page(self, - worker_host, - worker_http_port, - path_id, - file_path, - page_index, - page_bytes): - """ - Writes a page. - - Args: - file_path: The path of the file where data is to be written. - page_index: The page index in the file to write the data. - page_bytes: The byte data to write to the specified page, MUST BE FULL PAGE. - - Returns: - True if the write was successful, False otherwise. - """ - try: - response = requests.post( - WRITE_PAGE_URL_FORMAT.format( - worker_host=worker_host, - http_port=worker_http_port, - path_id=path_id, - file_path=file_path, - page_index=page_index, - ), - headers={"Content-Type": "application/octet-stream"}, - data=page_bytes, - ) - response.raise_for_status() - return 200 <= response.status_code < 300 - except requests.RequestException as e: - raise Exception( - f"Error writing to file {file_path} at page {page_index}: {e}" - ) - def _get_path_hash(self, uri): hash_functions = [ hashlib.sha256, diff --git a/alluxiofs/core.py b/alluxiofs/core.py index 0014939..7fb9550 100644 --- a/alluxiofs/core.py +++ b/alluxiofs/core.py @@ -7,7 +7,6 @@ # # See the NOTICE file distributed with this work for information regarding copyright ownership. import inspect -import io import logging import time from functools import wraps @@ -23,38 +22,6 @@ logger = logging.getLogger(__name__) -class RMOption: - def __init__(self, *args): - # delete files and subdirectories recursively - self.recursive = args[0] - # copy files in subdirectories recursively - self.recursiveAlias = args[1] - # remove data and metadata from Alluxio space only - self.removeAlluxioOnly = args[2] - # remove mount points in the directory - self.deleteMountPoint = args[3] - # Marks a directory to either trigger a metadata sync or skip the metadata sync on next access. - self.syncParentNextTime = args[4] - # remove directories without checking UFS contents are in sync - self.removeUncheckedOption = args[5] - - -class CPOption: - def __init__(self, *args): - # delete files and subdirectories recursively - self.recursive = args[0] - # copy files in subdirectories recursively - self.recursiveAlias = args[1] - # forces to overwrite the destination file if it exists - self.forced = args[2] - # Number of threads used to copy files in parallel, default value is CPU cores * 2 - self.thread = args[3] - # Read buffer size in bytes, default is 8MB when copying from local, and 64MB when copying to local - self.bufferSize = args[4] - # Preserve file permission attributes when copying files. All ownership, permissions and ACLs will be preserved - self.preserve = args[5] - - class AlluxioErrorMetrics: def __init__(self): self.error_counts = {} @@ -78,12 +45,12 @@ class AlluxioFileSystem(AbstractFileSystem): protocol_prefix = f"{protocol}://" def __init__( - self, - preload_path=None, - target_protocol=None, - target_options=None, - fs=None, - **kwargs, + self, + preload_path=None, + target_protocol=None, + target_options=None, + fs=None, + **kwargs, ): """ Initializes an Alluxio filesystem on top of underlying filesystem @@ -164,7 +131,7 @@ def _strip_individual_path(p): raise TypeError( f"Filesystem instance(fs) or target_protocol should be provided to use {self.protocol_prefix} schema" ) - return p[len(self.protocol_prefix):] + return p[len(self.protocol_prefix) :] return p if isinstance(path, str): @@ -191,10 +158,7 @@ def _strip_protocol(path): def unstrip_protocol(self, path): if self.fs: # avoid adding Alluxiofs protocol to the full ufs url - if path.startswith('/'): - return self.fs.unstrip_protocol(path[1:]) - else: - return self.fs.unstrip_protocol(path) + return self.fs.unstrip_protocol(path) return path def get_error_metrics(self): @@ -316,13 +280,13 @@ def isdir(self, path, **kwargs): @fallback_handler def _open( - self, - path, - mode="rb", - block_size=None, - autocommit=True, - cache_options=None, - **kwargs, + self, + path, + mode="rb", + block_size=None, + autocommit=True, + cache_options=None, + **kwargs, ): path = self.unstrip_protocol(path) return AlluxioFile( @@ -336,7 +300,7 @@ def _open( ) @fallback_handler - def cat_file(self, path, start=0, end=None, **kwargs): + def cat_file(self, path, start=None, end=None, **kwargs): if end is None: length = -1 else: @@ -350,32 +314,15 @@ def ukey(self, path, *args, **kwargs): @fallback_handler def mkdir(self, path, *args, **kwargs): - path = self.unstrip_protocol(path) - return self.alluxio.mkdir(path) + raise NotImplementedError @fallback_handler def makedirs(self, path, *args, **kwargs): raise NotImplementedError @fallback_handler - def rm(self, path, - recursive=False, - recursiveAlias=False, - removeAlluxioOnly=False, - deleteMountPoint=False, - syncParentNextTime=False, - removeUncheckedOptionChar=False - ): - path = self.unstrip_protocol(path) - option = RMOption( - recursive, - recursiveAlias, - removeAlluxioOnly, - deleteMountPoint, - syncParentNextTime, - removeUncheckedOptionChar - ) - return self.alluxio.rm(path, option) + def rm(self, path, *args, **kwargs): + raise NotImplementedError @fallback_handler def rmdir(self, path, *args, **kwargs): @@ -395,8 +342,7 @@ def rm_file(self, path, *args, **kwargs): @fallback_handler def touch(self, path, *args, **kwargs): - path = self.unstrip_protocol(path) - return self.alluxio.touch(path) + raise NotImplementedError @fallback_handler def created(self, path, *args, **kwargs): @@ -408,15 +354,11 @@ def modified(self, path, *args, **kwargs): @fallback_handler def head(self, path, *args, **kwargs): - path = self.unstrip_protocol(path) - numOfBytes = args[0] - return self.alluxio.head(path, numOfBytes) + raise NotImplementedError @fallback_handler def tail(self, path, *args, **kwargs): - path = self.unstrip_protocol(path) - numOfBytes = args[0] - return self.alluxio.tail(path, numOfBytes) + raise NotImplementedError @fallback_handler def expand_path(self, path, *args, **kwargs): @@ -429,30 +371,11 @@ def expand_path(self, path, *args, **kwargs): @fallback_handler def mv(self, path1, path2, *args, **kwargs): - path1 = self.unstrip_protocol(path1) - path2 = self.unstrip_protocol(path2) - return self.alluxio.mv(path1, path2) + raise NotImplementedError @fallback_handler - def copy(self, path1, - path2, - recursive=False, - recursiveAlias=False, - force=False, - thread=None, - bufferSize=None, - preserve=None): - path1 = self.unstrip_protocol(path1) - path2 = self.unstrip_protocol(path2) - option = CPOption( - recursive, - recursiveAlias, - force, - thread, - bufferSize, - preserve - ) - return self.alluxio.cp(path1, path2, option) + def copy(self, path1, path2, *args, **kwargs): + raise NotImplementedError @fallback_handler def cp_file(self, path1, path2, *args, **kwargs): @@ -474,10 +397,6 @@ def put_file(self, lpath, rpath, *args, **kwargs): def put(self, lpath, rpath, *args, **kwargs): raise NotImplementedError - def write_bytes(self, path, value, **kwargs): - path = self.unstrip_protocol(path) - return self.alluxio.write(path, value) - @fallback_handler def upload(self, lpath, rpath, *args, **kwargs): raise NotImplementedError @@ -503,10 +422,10 @@ def read_block(self, *args, **kwargs): class AlluxioFile(AbstractBufferedFile): def __init__(self, fs, path, mode="rb", **kwargs): - # if mode != "rb": - # raise ValueError( - # 'Remote Alluxio files can only be opened in "rb" mode' - # ) + if mode != "rb": + raise ValueError( + 'Remote Alluxio files can only be opened in "rb" mode' + ) super().__init__(fs, path, mode, **kwargs) def _fetch_range(self, start, end): @@ -514,37 +433,7 @@ def _fetch_range(self, start, end): return self.fs.cat_file(path=self.path, start=start, end=end) def _upload_chunk(self, final=False): - data = self.buffer.getvalue() - if not data: - return False - if self.fs.write_bytes(path=self.path, value=data): - return True - return False + pass def _initiate_upload(self): pass - - def flush(self, force=False): - if self.closed: - raise ValueError("Flush on closed file") - if force and self.forced: - raise ValueError("Force flush cannot be called more than once") - if force: - self.forced = True - - if self.mode not in {"wb", "ab"}: - # no-op to flush on read-mode - return - - if self.offset is None: - # Initialize a multipart upload - self.offset = 0 - try: - self._initiate_upload() - except: - self.closed = True - raise - - if self._upload_chunk(final=force) is not False: - self.offset += self.buffer.seek(0, 2) - self.buffer = io.BytesIO() diff --git a/setup.py b/setup.py index 9abf5c7..fb4af2d 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name="alluxiofs", - version="1.0.4", + version="1.0.3", description="Alluxio Fsspec provides Alluxio filesystem spec implementation.", long_description=open("README.md").read(), long_description_content_type="text/markdown",