Skip to content

Commit

Permalink
Fix the bug after introducing ufs full path as page id (#53)
Browse files Browse the repository at this point in the history
* fix the bug after introducing ufs full path as page id.

* try to fix test_fake_server.py

* add the code in #51

* fix again.

* fix checkstyle

* fix checkstyle again. it's so tricky.

* Revert "add the code in #51"

This reverts commit 689cdcf.
  • Loading branch information
YichuanSun authored Jul 31, 2024
1 parent ce10281 commit 7f9641f
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 25 deletions.
12 changes: 6 additions & 6 deletions alluxiofs/client/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
ALLUXIO_COMMON_EXTENSION_ENABLE = "alluxio.common.extension.enable"
ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE = "alluxio.common.ondemandpool.disable"
LIST_URL_FORMAT = "http://{worker_host}:{http_port}/v1/files"
FULL_PAGE_URL_FORMAT = (
"http://{worker_host}:{http_port}/v1/file/{path_id}/page/{page_index}"
)
PAGE_URL_FORMAT = "http://{worker_host}:{http_port}/v1/file/{path_id}/page/{page_index}?offset={page_offset}&length={page_length}"
WRITE_PAGE_URL_FORMAT = (
"http://{worker_host}:{http_port}/v1/file/{path_id}/page/{page_index}"
FULL_PAGE_URL_FORMAT = "http://{worker_host}:{http_port}/v1/file/{path_id}/page/{page_index}?ufsFullPath={file_path}"
PAGE_URL_FORMAT = (
"http://{worker_host}:{http_port}/v1/file/{path_id}"
"/page/{page_index}?offset={page_offset}&length={page_length}&ufsFullPath={file_path}"
)
WRITE_PAGE_URL_FORMAT = "http://{worker_host}:{http_port}/v1/file/{path_id}/page/{page_index}?ufsFullPath={file_path}"
PAGE_PATH_URL_FORMAT = "/v1/file/{path_id}/page/{page_index}"
GET_FILE_STATUS_URL_FORMAT = "http://{worker_host}:{http_port}/v1/info"
LOAD_URL_FORMAT = "http://{worker_host}:{http_port}/v1/load"
# TODO (chunxu): Remove the concrete types of LOAD formats. Keep them for asyncio.
Expand Down
65 changes: 52 additions & 13 deletions alluxiofs/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,13 @@ def read(self, file_path):
if self.data_manager:
return b"".join(
self._all_page_generator_alluxiocommon(
worker_host, worker_http_port, path_id
worker_host, worker_http_port, path_id, file_path
)
)
else:
return b"".join(
self._all_page_generator(
worker_host, worker_http_port, path_id
worker_host, worker_http_port, path_id, file_path
)
)
except Exception as e:
Expand Down Expand Up @@ -496,12 +496,22 @@ def read_range(self, file_path, offset, length):
try:
if self.data_manager:
return self._range_page_generator_alluxiocommon(
worker_host, worker_http_port, path_id, offset, length
worker_host,
worker_http_port,
path_id,
file_path,
offset,
length,
)
else:
return b"".join(
self._range_page_generator(
worker_host, worker_http_port, path_id, offset, length
worker_host,
worker_http_port,
path_id,
file_path,
offset,
length,
)
)
except Exception as e:
Expand Down Expand Up @@ -533,6 +543,7 @@ def write_page(self, file_path, page_index, page_bytes):
worker_host=worker_host,
http_port=worker_http_port,
path_id=path_id,
file_path=file_path,
page_index=page_index,
),
headers={"Content-Type": "application/octet-stream"},
Expand All @@ -546,7 +557,7 @@ def write_page(self, file_path, page_index, page_bytes):
)

def _all_page_generator_alluxiocommon(
self, worker_host, worker_http_port, path_id
self, worker_host, worker_http_port, path_id, file_path
):
page_index = 0
fetching_pages_num_each_round = 4
Expand All @@ -558,6 +569,7 @@ def _all_page_generator_alluxiocommon(
worker_host=worker_host,
http_port=worker_http_port,
path_id=path_id,
file_path=file_path,
page_index=page_index,
)
read_urls.append(page_url)
Expand All @@ -578,12 +590,18 @@ def _all_page_generator_alluxiocommon(
f"Error when reading all pages of {path_id}: error {e}"
) from e

def _all_page_generator(self, worker_host, worker_http_port, path_id):
def _all_page_generator(
self, worker_host, worker_http_port, path_id, file_path
):
page_index = 0
while True:
try:
page_content = self._read_page(
worker_host, worker_http_port, path_id, page_index
worker_host,
worker_http_port,
path_id,
file_path,
page_index,
)
except Exception as e:
if page_index == 0:
Expand All @@ -601,7 +619,7 @@ def _all_page_generator(self, worker_host, worker_http_port, path_id):
page_index += 1

def _range_page_generator_alluxiocommon(
self, worker_host, worker_http_port, path_id, offset, length
self, worker_host, worker_http_port, path_id, file_path, offset, length
):
read_urls = []
start = offset
Expand All @@ -617,13 +635,15 @@ def _range_page_generator_alluxiocommon(
worker_host=worker_host,
http_port=worker_http_port,
path_id=path_id,
file_path=file_path,
page_index=page_index,
)
else:
page_url = PAGE_URL_FORMAT.format(
worker_host=worker_host,
http_port=worker_http_port,
path_id=path_id,
file_path=file_path,
page_index=page_index,
page_offset=inpage_off,
page_length=inpage_read_len,
Expand All @@ -634,7 +654,7 @@ def _range_page_generator_alluxiocommon(
return data

def _range_page_generator(
self, worker_host, worker_http_port, path_id, offset, length
self, worker_host, worker_http_port, path_id, file_path, offset, length
):
start_page_index = offset // self.config.page_size
start_page_offset = offset % self.config.page_size
Expand All @@ -660,6 +680,7 @@ def _range_page_generator(
worker_host,
worker_http_port,
path_id,
file_path,
page_index,
read_offset,
read_length,
Expand Down Expand Up @@ -778,6 +799,7 @@ def _read_page(
worker_host,
worker_http_port,
path_id,
file_path,
page_index,
offset=None,
length=None,
Expand All @@ -793,6 +815,7 @@ def _read_page(
worker_host=worker_host,
http_port=worker_http_port,
path_id=path_id,
file_path=file_path,
page_index=page_index,
)
logger.debug(f"Reading full page request {page_url}")
Expand All @@ -801,6 +824,7 @@ def _read_page(
worker_host=worker_host,
http_port=worker_http_port,
path_id=path_id,
file_path=file_path,
page_index=page_index,
page_offset=offset,
page_length=length,
Expand Down Expand Up @@ -1112,7 +1136,7 @@ async def read_range(
worker_host = self._get_preferred_worker_host(file_path)
path_id = self._get_path_hash(file_path)
page_contents = await self._range_page_generator(
worker_host, path_id, offset, length
worker_host, path_id, file_path, offset, length
)
return b"".join(await page_contents)

Expand Down Expand Up @@ -1140,6 +1164,7 @@ async def write_page(
worker_host=worker_host,
http_port=self.http_port,
path_id=path_id,
file_path=file_path,
page_index=page_index,
),
headers={"Content-Type": "application/octet-stream"},
Expand All @@ -1148,7 +1173,12 @@ async def write_page(
return 200 <= status < 300

async def _range_page_generator(
self, worker_host: str, path_id: str, offset: float, length: float
self,
worker_host: str,
path_id: str,
file_path: str,
offset: float,
length: float,
):
start_page_index = offset // self.page_size
start_page_offset = offset % self.page_size
Expand All @@ -1171,19 +1201,25 @@ async def _range_page_generator(
page_content = self._read_page(
worker_host,
path_id,
file_path,
page_index,
start_page_offset,
read_length,
)
page_contents.append(page_content)
elif page_index == end_page_index:
page_content = self._read_page(
worker_host, path_id, page_index, 0, end_page_read_to
worker_host,
path_id,
file_path,
page_index,
0,
end_page_read_to,
)
page_contents.append(page_content)
else:
page_content = self._read_page(
worker_host, path_id, page_index
worker_host, path_id, file_path, page_index
)
page_contents.append(page_content)

Expand Down Expand Up @@ -1252,6 +1288,7 @@ async def _read_page(
self,
worker_host,
path_id: str,
file_path: str,
page_index: int,
offset=None,
length=None,
Expand All @@ -1266,13 +1303,15 @@ async def _read_page(
worker_host=worker_host,
http_port=self.http_port,
path_id=path_id,
file_path=file_path,
page_index=page_index,
)
else:
page_url = PAGE_URL_FORMAT.format(
worker_host=worker_host,
http_port=self.http_port,
path_id=path_id,
file_path=file_path,
page_index=page_index,
page_offset=offset,
page_length=length,
Expand Down
1 change: 1 addition & 0 deletions benchmark/bench/AlluxioRESTBench.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def testGetPage(self):
worker_host=self.worker_host,
http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE,
path_id=self.args.fileid,
file_path=self.args.path,
page_index=page_idx,
)
)
Expand Down
9 changes: 3 additions & 6 deletions tests/client/test_fake_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from aiohttp.test_utils import TestServer

from alluxiofs.client import AlluxioAsyncFileSystem
from alluxiofs.client.const import PAGE_PATH_URL_FORMAT

pytestmark = pytest.mark.asyncio

Expand Down Expand Up @@ -41,12 +42,8 @@ async def startup(app: web.Application):

app = web.Application()
app.on_startup.append(startup)
app.router.add_get(
"/v1/file/{path_id}/page/{page_index}", get_file_handler
)
app.router.add_post(
"/v1/file/{path_id}/page/{page_index}", put_file_handler
)
app.router.add_get(PAGE_PATH_URL_FORMAT, get_file_handler)
app.router.add_post(PAGE_PATH_URL_FORMAT, put_file_handler)
server = TestServer(app)
event_loop.run_until_complete(server.start_server())
return server
Expand Down

0 comments on commit 7f9641f

Please sign in to comment.