Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
littleEast7 committed Nov 4, 2024
1 parent 9e729a2 commit 39e1113
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 72 deletions.
4 changes: 3 additions & 1 deletion alluxiofs/client/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
MKDIR_URL_FORMAT = "http://{worker_host}:{http_port}/v1/mkdir/{path_id}?ufsFullPath={file_path}"
TOUCH_URL_FORMAT = "http://{worker_host}:{http_port}/v1/touch/{path_id}?ufsFullPath={file_path}"
MV_URL_FORMAT = "http://{worker_host}:{http_port}/v1/mv/{path_id}?srcPath={srcPath}&dstPath={dstPath}"
RM_URL_FORMAT = "http://{worker_host}:{http_port}/v1/rm/{path_id}?ufsFullPath={file_path}"
RM_URL_FORMAT = (
"http://{worker_host}:{http_port}/v1/rm/{path_id}?ufsFullPath={file_path}"
)
CP_URL_FORMAT = "http://{worker_host}:{http_port}/v1/copy/{path_id}?srcPath={srcPath}&dstPath={dstPath}"
TAIL_URL_FORMAT = "http://{worker_host}:{http_port}/v1/tail/{path_id}?ufsFullPath={file_path}"
HEAD_URL_FORMAT = "http://{worker_host}:{http_port}/v1/head/{path_id}?ufsFullPath={file_path}"
Expand Down
100 changes: 54 additions & 46 deletions alluxiofs/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,16 @@
)

from .config import AlluxioClientConfig
from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE, MKDIR_URL_FORMAT, TOUCH_URL_FORMAT, TAIL_URL_FORMAT, \
HEAD_URL_FORMAT, MV_URL_FORMAT, RM_URL_FORMAT, CP_URL_FORMAT
from .const import (
ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE,
MKDIR_URL_FORMAT,
TOUCH_URL_FORMAT,
TAIL_URL_FORMAT,
HEAD_URL_FORMAT,
MV_URL_FORMAT,
RM_URL_FORMAT,
CP_URL_FORMAT,
)
from .const import ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE
from .const import ALLUXIO_COMMON_EXTENSION_ENABLE
from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
Expand Down Expand Up @@ -544,7 +552,11 @@ def write(self, file_path, file_bytes):
)
else:
return self._all_page_generator_write(
worker_host, worker_http_port, path_id, file_path, file_bytes
worker_host,
worker_http_port,
path_id,
file_path,
file_bytes,
)
except Exception as e:
raise Exception(
Expand Down Expand Up @@ -612,9 +624,7 @@ def mkdir(self, file_path):
response.raise_for_status()
return 200 <= response.status_code < 300
except requests.RequestException as e:
raise Exception(
f"Error making a directory of {file_path}: {e}"
)
raise Exception(f"Error making a directory of {file_path}: {e}")

def touch(self, file_path):
"""
Expand All @@ -641,9 +651,7 @@ def touch(self, file_path):
response.raise_for_status()
return 200 <= response.status_code < 300
except requests.RequestException as e:
raise Exception(
f"Error create a file of {file_path}: {e}"
)
raise Exception(f"Error create a file of {file_path}: {e}")

# TODO(littelEast7): complete it

Expand Down Expand Up @@ -675,9 +683,7 @@ def mv(self, path1, path2):
response.raise_for_status()
return 200 <= response.status_code < 300
except requests.RequestException as e:
raise Exception(
f"Error move a file from {path1} to {path2}: {e}"
)
raise Exception(f"Error move a file from {path1} to {path2}: {e}")

def rm(self, path, option):
"""
Expand All @@ -700,15 +706,14 @@ def rm(self, path, option):
worker_host=worker_host,
http_port=worker_http_port,
path_id=path_id,
file_path=path
), params=parameters
file_path=path,
),
params=parameters,
)
response.raise_for_status()
return 200 <= response.status_code < 300
except requests.RequestException as e:
raise Exception(
f"Error remove a file {path}: {e}"
)
raise Exception(f"Error remove a file {path}: {e}")

def cp(self, path1, path2, option):
"""
Expand All @@ -734,14 +739,13 @@ def cp(self, path1, path2, option):
path_id=path_id,
srcPath=path1,
dstPath=path2,
), params=parameters
),
params=parameters,
)
response.raise_for_status()
return 200 <= response.status_code < 300
except requests.RequestException as e:
raise Exception(
f"Error copy a file from {path1} to {path2}: {e}"
)
raise Exception(f"Error copy a file from {path1} to {path2}: {e}")

def tail(self, file_path, numOfBytes=None):
"""
Expand All @@ -764,13 +768,12 @@ def tail(self, file_path, numOfBytes=None):
http_port=worker_http_port,
path_id=path_id,
file_path=file_path,
), params={'numBytes': numOfBytes}
),
params={"numBytes": numOfBytes},
)
return b''.join(response.iter_content())
return b"".join(response.iter_content())
except requests.RequestException as e:
raise Exception(
f"Error show the tail of {file_path}: {e}"
)
raise Exception(f"Error show the tail of {file_path}: {e}")

def head(self, file_path, numOfBytes=None):
"""
Expand All @@ -793,13 +796,12 @@ def head(self, file_path, numOfBytes=None):
http_port=worker_http_port,
path_id=path_id,
file_path=file_path,
), params={'numBytes': numOfBytes}
),
params={"numBytes": numOfBytes},
)
return b''.join(response.iter_content())
return b"".join(response.iter_content())
except requests.RequestException as e:
raise Exception(
f"Error show the head of {file_path}: {e}"
)
raise Exception(f"Error show the head of {file_path}: {e}")

def _all_page_generator_alluxiocommon(
self, worker_host, worker_http_port, path_id, file_path
Expand Down Expand Up @@ -863,23 +865,27 @@ def _all_page_generator(
break
page_index += 1

def _all_page_generator_write(self, worker_host, worker_http_port, path_id, file_path, file_bytes):
def _all_page_generator_write(
self, worker_host, worker_http_port, path_id, file_path, file_bytes
):
page_index = 0
page_size = self.config.page_size
offset = 0
try:
while True:
end = min(offset + page_size, len(file_bytes))
page_bytes = file_bytes[offset:end]
self._write_page(worker_host,
worker_http_port,
path_id,
file_path,
page_index,
page_bytes)
self._write_page(
worker_host,
worker_http_port,
path_id,
file_path,
page_index,
page_bytes,
)
page_index += 1
offset += page_size
if (end >= len(file_bytes)):
if end >= len(file_bytes):
break
return True
except Exception as e:
Expand Down Expand Up @@ -1110,13 +1116,15 @@ def _read_page(
f"Error when requesting file {path_id} page {page_index} from {worker_host}: error {e}"
) from e

def _write_page(self,
worker_host,
worker_http_port,
path_id,
file_path,
page_index,
page_bytes):
def _write_page(
self,
worker_host,
worker_http_port,
path_id,
file_path,
page_index,
page_bytes,
):
"""
Writes a page.
Args:
Expand Down
51 changes: 26 additions & 25 deletions alluxiofs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def __init__(self, *args):
self.preserve = args[5]



class AlluxioErrorMetrics:
def __init__(self):
self.error_counts = {}
Expand Down Expand Up @@ -197,7 +196,9 @@ def unstrip_protocol(self, path):
# return self.fs.unstrip_protocol(path[1:])
# else:
path = self.fs.unstrip_protocol(path)
if not (path.startswith("file") or path.startswith("alluxiofs")) and path.contain("///"):
if not (
path.startswith("file") or path.startswith("alluxiofs")
) and path.contain("///"):
path.replace("///", "//", 1)
return path

Expand Down Expand Up @@ -362,22 +363,24 @@ def makedirs(self, path, *args, **kwargs):
raise NotImplementedError

@fallback_handler
def rm(self, path,
recursive=False,
recursiveAlias=False,
removeAlluxioOnly=False,
deleteMountPoint=False,
syncParentNextTime=False,
removeUncheckedOptionChar=False
):
def rm(
self,
path,
recursive=False,
recursiveAlias=False,
removeAlluxioOnly=False,
deleteMountPoint=False,
syncParentNextTime=False,
removeUncheckedOptionChar=False,
):
path = self.unstrip_protocol(path)
option = RMOption(
recursive,
recursiveAlias,
removeAlluxioOnly,
deleteMountPoint,
syncParentNextTime,
removeUncheckedOptionChar
removeUncheckedOptionChar,
)
return self.alluxio.rm(path, option)

Expand Down Expand Up @@ -438,23 +441,21 @@ def mv(self, path1, path2, *args, **kwargs):
return self.alluxio.mv(path1, path2)

@fallback_handler
def copy(self, path1,
path2,
recursive=False,
recursiveAlias=False,
force=False,
thread=None,
bufferSize=None,
preserve=None):
def copy(
self,
path1,
path2,
recursive=False,
recursiveAlias=False,
force=False,
thread=None,
bufferSize=None,
preserve=None,
):
path1 = self.unstrip_protocol(path1)
path2 = self.unstrip_protocol(path2)
option = CPOption(
recursive,
recursiveAlias,
force,
thread,
bufferSize,
preserve
recursive, recursiveAlias, force, thread, bufferSize, preserve
)
return self.alluxio.cp(path1, path2, option)

Expand Down

0 comments on commit 39e1113

Please sign in to comment.