diff --git a/alluxiofs/client/const.py b/alluxiofs/client/const.py index 79d169a..146b950 100644 --- a/alluxiofs/client/const.py +++ b/alluxiofs/client/const.py @@ -32,7 +32,9 @@ MKDIR_URL_FORMAT = "http://{worker_host}:{http_port}/v1/mkdir/{path_id}?ufsFullPath={file_path}" TOUCH_URL_FORMAT = "http://{worker_host}:{http_port}/v1/touch/{path_id}?ufsFullPath={file_path}" MV_URL_FORMAT = "http://{worker_host}:{http_port}/v1/mv/{path_id}?srcPath={srcPath}&dstPath={dstPath}" -RM_URL_FORMAT = "http://{worker_host}:{http_port}/v1/rm/{path_id}?ufsFullPath={file_path}" +RM_URL_FORMAT = ( + "http://{worker_host}:{http_port}/v1/rm/{path_id}?ufsFullPath={file_path}" +) CP_URL_FORMAT = "http://{worker_host}:{http_port}/v1/copy/{path_id}?srcPath={srcPath}&dstPath={dstPath}" TAIL_URL_FORMAT = "http://{worker_host}:{http_port}/v1/tail/{path_id}?ufsFullPath={file_path}" HEAD_URL_FORMAT = "http://{worker_host}:{http_port}/v1/head/{path_id}?ufsFullPath={file_path}" diff --git a/alluxiofs/client/core.py b/alluxiofs/client/core.py index 55c67b9..713af8e 100644 --- a/alluxiofs/client/core.py +++ b/alluxiofs/client/core.py @@ -33,8 +33,16 @@ ) from .config import AlluxioClientConfig -from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE, MKDIR_URL_FORMAT, TOUCH_URL_FORMAT, TAIL_URL_FORMAT, \ - HEAD_URL_FORMAT, MV_URL_FORMAT, RM_URL_FORMAT, CP_URL_FORMAT +from .const import ( + ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE, + MKDIR_URL_FORMAT, + TOUCH_URL_FORMAT, + TAIL_URL_FORMAT, + HEAD_URL_FORMAT, + MV_URL_FORMAT, + RM_URL_FORMAT, + CP_URL_FORMAT, +) from .const import ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE from .const import ALLUXIO_COMMON_EXTENSION_ENABLE from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE @@ -544,7 +552,11 @@ def write(self, file_path, file_bytes): ) else: return self._all_page_generator_write( - worker_host, worker_http_port, path_id, file_path, file_bytes + worker_host, + worker_http_port, + path_id, + file_path, + file_bytes, ) except Exception as e: raise Exception( @@ -612,9 +624,7 @@ def mkdir(self, file_path): response.raise_for_status() return 200 <= response.status_code < 300 except requests.RequestException as e: - raise Exception( - f"Error making a directory of {file_path}: {e}" - ) + raise Exception(f"Error making a directory of {file_path}: {e}") def touch(self, file_path): """ @@ -641,9 +651,7 @@ def touch(self, file_path): response.raise_for_status() return 200 <= response.status_code < 300 except requests.RequestException as e: - raise Exception( - f"Error create a file of {file_path}: {e}" - ) + raise Exception(f"Error create a file of {file_path}: {e}") # TODO(littelEast7): complete it @@ -675,9 +683,7 @@ def mv(self, path1, path2): response.raise_for_status() return 200 <= response.status_code < 300 except requests.RequestException as e: - raise Exception( - f"Error move a file from {path1} to {path2}: {e}" - ) + raise Exception(f"Error move a file from {path1} to {path2}: {e}") def rm(self, path, option): """ @@ -700,15 +706,14 @@ def rm(self, path, option): worker_host=worker_host, http_port=worker_http_port, path_id=path_id, - file_path=path - ), params=parameters + file_path=path, + ), + params=parameters, ) response.raise_for_status() return 200 <= response.status_code < 300 except requests.RequestException as e: - raise Exception( - f"Error remove a file {path}: {e}" - ) + raise Exception(f"Error remove a file {path}: {e}") def cp(self, path1, path2, option): """ @@ -734,14 +739,13 @@ def cp(self, path1, path2, option): path_id=path_id, srcPath=path1, dstPath=path2, - ), params=parameters + ), + params=parameters, ) response.raise_for_status() return 200 <= response.status_code < 300 except requests.RequestException as e: - raise Exception( - f"Error copy a file from {path1} to {path2}: {e}" - ) + raise Exception(f"Error copy a file from {path1} to {path2}: {e}") def tail(self, file_path, numOfBytes=None): """ @@ -764,13 +768,12 @@ def tail(self, file_path, numOfBytes=None): http_port=worker_http_port, path_id=path_id, file_path=file_path, - ), params={'numBytes': numOfBytes} + ), + params={"numBytes": numOfBytes}, ) - return b''.join(response.iter_content()) + return b"".join(response.iter_content()) except requests.RequestException as e: - raise Exception( - f"Error show the tail of {file_path}: {e}" - ) + raise Exception(f"Error show the tail of {file_path}: {e}") def head(self, file_path, numOfBytes=None): """ @@ -793,13 +796,12 @@ def head(self, file_path, numOfBytes=None): http_port=worker_http_port, path_id=path_id, file_path=file_path, - ), params={'numBytes': numOfBytes} + ), + params={"numBytes": numOfBytes}, ) - return b''.join(response.iter_content()) + return b"".join(response.iter_content()) except requests.RequestException as e: - raise Exception( - f"Error show the head of {file_path}: {e}" - ) + raise Exception(f"Error show the head of {file_path}: {e}") def _all_page_generator_alluxiocommon( self, worker_host, worker_http_port, path_id, file_path @@ -863,7 +865,9 @@ def _all_page_generator( break page_index += 1 - def _all_page_generator_write(self, worker_host, worker_http_port, path_id, file_path, file_bytes): + def _all_page_generator_write( + self, worker_host, worker_http_port, path_id, file_path, file_bytes + ): page_index = 0 page_size = self.config.page_size offset = 0 @@ -871,15 +875,17 @@ def _all_page_generator_write(self, worker_host, worker_http_port, path_id, file while True: end = min(offset + page_size, len(file_bytes)) page_bytes = file_bytes[offset:end] - self._write_page(worker_host, - worker_http_port, - path_id, - file_path, - page_index, - page_bytes) + self._write_page( + worker_host, + worker_http_port, + path_id, + file_path, + page_index, + page_bytes, + ) page_index += 1 offset += page_size - if (end >= len(file_bytes)): + if end >= len(file_bytes): break return True except Exception as e: @@ -1110,13 +1116,15 @@ def _read_page( f"Error when requesting file {path_id} page {page_index} from {worker_host}: error {e}" ) from e - def _write_page(self, - worker_host, - worker_http_port, - path_id, - file_path, - page_index, - page_bytes): + def _write_page( + self, + worker_host, + worker_http_port, + path_id, + file_path, + page_index, + page_bytes, + ): """ Writes a page. Args: diff --git a/alluxiofs/core.py b/alluxiofs/core.py index 284de5e..31e234f 100644 --- a/alluxiofs/core.py +++ b/alluxiofs/core.py @@ -55,7 +55,6 @@ def __init__(self, *args): self.preserve = args[5] - class AlluxioErrorMetrics: def __init__(self): self.error_counts = {} @@ -197,7 +196,9 @@ def unstrip_protocol(self, path): # return self.fs.unstrip_protocol(path[1:]) # else: path = self.fs.unstrip_protocol(path) - if not (path.startswith("file") or path.startswith("alluxiofs")) and path.contain("///"): + if not ( + path.startswith("file") or path.startswith("alluxiofs") + ) and path.contain("///"): path.replace("///", "//", 1) return path @@ -362,14 +363,16 @@ def makedirs(self, path, *args, **kwargs): raise NotImplementedError @fallback_handler - def rm(self, path, - recursive=False, - recursiveAlias=False, - removeAlluxioOnly=False, - deleteMountPoint=False, - syncParentNextTime=False, - removeUncheckedOptionChar=False - ): + def rm( + self, + path, + recursive=False, + recursiveAlias=False, + removeAlluxioOnly=False, + deleteMountPoint=False, + syncParentNextTime=False, + removeUncheckedOptionChar=False, + ): path = self.unstrip_protocol(path) option = RMOption( recursive, @@ -377,7 +380,7 @@ def rm(self, path, removeAlluxioOnly, deleteMountPoint, syncParentNextTime, - removeUncheckedOptionChar + removeUncheckedOptionChar, ) return self.alluxio.rm(path, option) @@ -438,23 +441,21 @@ def mv(self, path1, path2, *args, **kwargs): return self.alluxio.mv(path1, path2) @fallback_handler - def copy(self, path1, - path2, - recursive=False, - recursiveAlias=False, - force=False, - thread=None, - bufferSize=None, - preserve=None): + def copy( + self, + path1, + path2, + recursive=False, + recursiveAlias=False, + force=False, + thread=None, + bufferSize=None, + preserve=None, + ): path1 = self.unstrip_protocol(path1) path2 = self.unstrip_protocol(path2) option = CPOption( - recursive, - recursiveAlias, - force, - thread, - bufferSize, - preserve + recursive, recursiveAlias, force, thread, bufferSize, preserve ) return self.alluxio.cp(path1, path2, option)