diff --git a/alluxiofs/client/const.py b/alluxiofs/client/const.py index a699944..9de85e0 100644 --- a/alluxiofs/client/const.py +++ b/alluxiofs/client/const.py @@ -23,13 +23,13 @@ ALLUXIO_COMMON_EXTENSION_ENABLE = "alluxio.common.extension.enable" ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE = "alluxio.common.ondemandpool.disable" LIST_URL_FORMAT = "http://{worker_host}:{http_port}/v1/files" -FULL_PAGE_URL_FORMAT = ( - "http://{worker_host}:{http_port}/v1/file/{path_id}/page/{page_index}" -) -PAGE_URL_FORMAT = "http://{worker_host}:{http_port}/v1/file/{path_id}/page/{page_index}?offset={page_offset}&length={page_length}" -WRITE_PAGE_URL_FORMAT = ( - "http://{worker_host}:{http_port}/v1/file/{path_id}/page/{page_index}" +FULL_PAGE_URL_FORMAT = "http://{worker_host}:{http_port}/v1/file/{path_id}/page/{page_index}?ufsFullPath={file_path}" +PAGE_URL_FORMAT = ( + "http://{worker_host}:{http_port}/v1/file/{path_id}" + "/page/{page_index}?offset={page_offset}&length={page_length}&ufsFullPath={file_path}" ) +WRITE_PAGE_URL_FORMAT = "http://{worker_host}:{http_port}/v1/file/{path_id}/page/{page_index}?ufsFullPath={file_path}" +PAGE_PATH_URL_FORMAT = "/v1/file/{path_id}/page/{page_index}" GET_FILE_STATUS_URL_FORMAT = "http://{worker_host}:{http_port}/v1/info" LOAD_URL_FORMAT = "http://{worker_host}:{http_port}/v1/load" # TODO (chunxu): Remove the concrete types of LOAD formats. Keep them for asyncio. diff --git a/alluxiofs/client/core.py b/alluxiofs/client/core.py index 72f7473..3544eee 100644 --- a/alluxiofs/client/core.py +++ b/alluxiofs/client/core.py @@ -443,13 +443,13 @@ def read(self, file_path): if self.data_manager: return b"".join( self._all_page_generator_alluxiocommon( - worker_host, worker_http_port, path_id + worker_host, worker_http_port, path_id, file_path ) ) else: return b"".join( self._all_page_generator( - worker_host, worker_http_port, path_id + worker_host, worker_http_port, path_id, file_path ) ) except Exception as e: @@ -496,12 +496,22 @@ def read_range(self, file_path, offset, length): try: if self.data_manager: return self._range_page_generator_alluxiocommon( - worker_host, worker_http_port, path_id, offset, length + worker_host, + worker_http_port, + path_id, + file_path, + offset, + length, ) else: return b"".join( self._range_page_generator( - worker_host, worker_http_port, path_id, offset, length + worker_host, + worker_http_port, + path_id, + file_path, + offset, + length, ) ) except Exception as e: @@ -533,6 +543,7 @@ def write_page(self, file_path, page_index, page_bytes): worker_host=worker_host, http_port=worker_http_port, path_id=path_id, + file_path=file_path, page_index=page_index, ), headers={"Content-Type": "application/octet-stream"}, @@ -546,7 +557,7 @@ def write_page(self, file_path, page_index, page_bytes): ) def _all_page_generator_alluxiocommon( - self, worker_host, worker_http_port, path_id + self, worker_host, worker_http_port, path_id, file_path ): page_index = 0 fetching_pages_num_each_round = 4 @@ -558,6 +569,7 @@ def _all_page_generator_alluxiocommon( worker_host=worker_host, http_port=worker_http_port, path_id=path_id, + file_path=file_path, page_index=page_index, ) read_urls.append(page_url) @@ -578,12 +590,18 @@ def _all_page_generator_alluxiocommon( f"Error when reading all pages of {path_id}: error {e}" ) from e - def _all_page_generator(self, worker_host, worker_http_port, path_id): + def _all_page_generator( + self, worker_host, worker_http_port, path_id, file_path + ): page_index = 0 while True: try: page_content = self._read_page( - worker_host, worker_http_port, path_id, page_index + worker_host, + worker_http_port, + path_id, + file_path, + page_index, ) except Exception as e: if page_index == 0: @@ -601,7 +619,7 @@ def _all_page_generator(self, worker_host, worker_http_port, path_id): page_index += 1 def _range_page_generator_alluxiocommon( - self, worker_host, worker_http_port, path_id, offset, length + self, worker_host, worker_http_port, path_id, file_path, offset, length ): read_urls = [] start = offset @@ -617,6 +635,7 @@ def _range_page_generator_alluxiocommon( worker_host=worker_host, http_port=worker_http_port, path_id=path_id, + file_path=file_path, page_index=page_index, ) else: @@ -624,6 +643,7 @@ def _range_page_generator_alluxiocommon( worker_host=worker_host, http_port=worker_http_port, path_id=path_id, + file_path=file_path, page_index=page_index, page_offset=inpage_off, page_length=inpage_read_len, @@ -634,7 +654,7 @@ def _range_page_generator_alluxiocommon( return data def _range_page_generator( - self, worker_host, worker_http_port, path_id, offset, length + self, worker_host, worker_http_port, path_id, file_path, offset, length ): start_page_index = offset // self.config.page_size start_page_offset = offset % self.config.page_size @@ -660,6 +680,7 @@ def _range_page_generator( worker_host, worker_http_port, path_id, + file_path, page_index, read_offset, read_length, @@ -778,6 +799,7 @@ def _read_page( worker_host, worker_http_port, path_id, + file_path, page_index, offset=None, length=None, @@ -793,6 +815,7 @@ def _read_page( worker_host=worker_host, http_port=worker_http_port, path_id=path_id, + file_path=file_path, page_index=page_index, ) logger.debug(f"Reading full page request {page_url}") @@ -801,6 +824,7 @@ def _read_page( worker_host=worker_host, http_port=worker_http_port, path_id=path_id, + file_path=file_path, page_index=page_index, page_offset=offset, page_length=length, @@ -1112,7 +1136,7 @@ async def read_range( worker_host = self._get_preferred_worker_host(file_path) path_id = self._get_path_hash(file_path) page_contents = await self._range_page_generator( - worker_host, path_id, offset, length + worker_host, path_id, file_path, offset, length ) return b"".join(await page_contents) @@ -1140,6 +1164,7 @@ async def write_page( worker_host=worker_host, http_port=self.http_port, path_id=path_id, + file_path=file_path, page_index=page_index, ), headers={"Content-Type": "application/octet-stream"}, @@ -1148,7 +1173,12 @@ async def write_page( return 200 <= status < 300 async def _range_page_generator( - self, worker_host: str, path_id: str, offset: float, length: float + self, + worker_host: str, + path_id: str, + file_path: str, + offset: float, + length: float, ): start_page_index = offset // self.page_size start_page_offset = offset % self.page_size @@ -1171,6 +1201,7 @@ async def _range_page_generator( page_content = self._read_page( worker_host, path_id, + file_path, page_index, start_page_offset, read_length, @@ -1178,12 +1209,17 @@ async def _range_page_generator( page_contents.append(page_content) elif page_index == end_page_index: page_content = self._read_page( - worker_host, path_id, page_index, 0, end_page_read_to + worker_host, + path_id, + file_path, + page_index, + 0, + end_page_read_to, ) page_contents.append(page_content) else: page_content = self._read_page( - worker_host, path_id, page_index + worker_host, path_id, file_path, page_index ) page_contents.append(page_content) @@ -1252,6 +1288,7 @@ async def _read_page( self, worker_host, path_id: str, + file_path: str, page_index: int, offset=None, length=None, @@ -1266,6 +1303,7 @@ async def _read_page( worker_host=worker_host, http_port=self.http_port, path_id=path_id, + file_path=file_path, page_index=page_index, ) else: @@ -1273,6 +1311,7 @@ async def _read_page( worker_host=worker_host, http_port=self.http_port, path_id=path_id, + file_path=file_path, page_index=page_index, page_offset=offset, page_length=length, diff --git a/benchmark/bench/AlluxioRESTBench.py b/benchmark/bench/AlluxioRESTBench.py index 90cf3c9..627031e 100644 --- a/benchmark/bench/AlluxioRESTBench.py +++ b/benchmark/bench/AlluxioRESTBench.py @@ -139,6 +139,7 @@ def testGetPage(self): worker_host=self.worker_host, http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE, path_id=self.args.fileid, + file_path=self.args.path, page_index=page_idx, ) ) diff --git a/tests/client/test_fake_server.py b/tests/client/test_fake_server.py index a0cd7be..c8af5e4 100644 --- a/tests/client/test_fake_server.py +++ b/tests/client/test_fake_server.py @@ -5,6 +5,7 @@ from aiohttp.test_utils import TestServer from alluxiofs.client import AlluxioAsyncFileSystem +from alluxiofs.client.const import PAGE_PATH_URL_FORMAT pytestmark = pytest.mark.asyncio @@ -41,12 +42,8 @@ async def startup(app: web.Application): app = web.Application() app.on_startup.append(startup) - app.router.add_get( - "/v1/file/{path_id}/page/{page_index}", get_file_handler - ) - app.router.add_post( - "/v1/file/{path_id}/page/{page_index}", put_file_handler - ) + app.router.add_get(PAGE_PATH_URL_FORMAT, get_file_handler) + app.router.add_post(PAGE_PATH_URL_FORMAT, put_file_handler) server = TestServer(app) event_loop.run_until_complete(server.start_server()) return server