Skip to content

Commit

Permalink
feat: support simplecache url chaining (#42)
Browse files Browse the repository at this point in the history
* test for simplecache

* implementing _get_file

* read in chunks

* update test

* working _get_file

* add additional cache test

* fix problem with chunking

* repeat on FileNotFoundError to fix flaky glob

* use unstrip_protocol to build url

* match parent signature for _rm_file

* increase chunk size to 256kiB
  • Loading branch information
lobis authored Dec 22, 2023
1 parent 851adaa commit f8c57cd
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 4 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
run: python -m pip install .[test]
- name: Test package
run: |
python -m pytest -vv tests --reruns 10 --reruns-delay 30 --only-rerun "(?i)OSError|timeout|expired|connection|socket"
python -m pytest -vv tests --reruns 10 --reruns-delay 30 --only-rerun "(?i)OSError|FileNotFoundError|timeout|expired|connection|socket"
- name: Run fsspec-xrootd tests from uproot latest release
run: |
Expand All @@ -75,7 +75,7 @@ jobs:
python -m pip install ./uproot[test]
# Install xrootd-fsspec again because it may have been overwritten by uproot
python -m pip install .[test]
python -m pytest -vv -k "xrootd" uproot/tests --reruns 10 --reruns-delay 30 --only-rerun "(?i)OSError|timeout|expired|connection|socket"
python -m pytest -vv -k "xrootd" uproot/tests --reruns 10 --reruns-delay 30 --only-rerun "(?i)OSError|FileNotFoundError|timeout|expired|connection|socket"
dist:
name: Distribution build
Expand Down
43 changes: 41 additions & 2 deletions src/fsspec_xrootd/xrootd.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ async def _rmdir(self, path: str) -> None:

rmdir = sync_wrapper(_rmdir)

async def _rm_file(self, path: str) -> None:
async def _rm_file(self, path: str, **kwargs: Any) -> None:
status, n = await _async_wrap(self._myclient.rm, path, self.timeout)
if not status.ok:
raise OSError(f"File not removed properly: {status.message}")
Expand Down Expand Up @@ -391,7 +391,7 @@ async def _cat_file(self, path: str, start: int, end: int, **kwargs: Any) -> Any
try:
status, _n = await _async_wrap(
_myFile.open,
self.protocol + "://" + self.storage_options["hostid"] + "/" + path,
self.unstrip_protocol(path),
OpenFlags.READ,
self.timeout,
)
Expand All @@ -412,6 +412,45 @@ async def _cat_file(self, path: str, start: int, end: int, **kwargs: Any) -> Any
self.timeout,
)

async def _get_file(
self, rpath: str, lpath: str, chunk_size: int = 262_144, **kwargs: Any
) -> None:
# Open the remote file for reading
remote_file = client.File()

try:
status, _n = await _async_wrap(
remote_file.open,
self.unstrip_protocol(rpath),
OpenFlags.READ,
self.timeout,
)
if not status.ok:
raise OSError(f"Remote file failed to open: {status.message}")

with open(lpath, "wb") as local_file:
start: int = 0
while True:
# Read a chunk of content from the remote file
status, chunk = await _async_wrap(
remote_file.read, start, chunk_size, self.timeout
)
start += chunk_size

if not status.ok:
raise OSError(f"Remote file failed to read: {status.message}")

# Break if there is no more content
if not chunk:
break

# Write the chunk to the local file
local_file.write(chunk)

finally:
# Close the remote file
await _async_wrap(remote_file.close, self.timeout)

async def _get_max_chunk_info(self, file: Any) -> tuple[int, int]:
"""Queries the XRootD server for info required for pyxrootd vector_read() function.
Queries for maximum number of chunks and the maximum chunk size allowed by the server.
Expand Down
32 changes: 32 additions & 0 deletions tests/test_basicio.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,3 +412,35 @@ def test_glob_full_names(localserver, clear_server):
for name in full_names:
with fsspec.open(name) as f:
assert f.read() in [bytes(data, "utf-8") for data in [TESTDATA1, TESTDATA2]]


@pytest.mark.parametrize("protocol_prefix", ["", "simplecache::"])
def test_cache(localserver, clear_server, protocol_prefix):
data = TESTDATA1 * int(1e7 / len(TESTDATA1)) # bigger than the chunk size
remoteurl, localpath = localserver
with open(localpath + "/testfile.txt", "w") as fout:
fout.write(data)

with fsspec.open(protocol_prefix + remoteurl + "/testfile.txt", "rb") as f:
contents = f.read()
assert contents == data.encode("utf-8")


def test_cache_directory(localserver, clear_server, tmp_path):
remoteurl, localpath = localserver
with open(localpath + "/testfile.txt", "w") as fout:
fout.write(TESTDATA1)

cache_directory = tmp_path / "cache"
with fsspec.open(
"simplecache::" + remoteurl + "/testfile.txt",
"rb",
simplecache={"cache_storage": str(cache_directory)},
) as f:
contents = f.read()
assert contents == TESTDATA1.encode("utf-8")

assert len(os.listdir(cache_directory)) == 1
with open(cache_directory / os.listdir(cache_directory)[0], "rb") as f:
contents = f.read()
assert contents == TESTDATA1.encode("utf-8")

0 comments on commit f8c57cd

Please sign in to comment.