Skip to content

Commit

Permalink
Merge branch 'main' into pre-commit-ci-update-config
Browse files Browse the repository at this point in the history
  • Loading branch information
nsmith- authored Mar 5, 2024
2 parents 744c2e6 + b12503e commit ccc4e71
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 5 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pip install fsspec-xrootd
## Purpose

To allow fsspec to use XRootD accessible storage systems. Install fsspec-xrootd
alongside fsspec and have easy access to files stored on XRootD serevrs. Once
alongside fsspec and have easy access to files stored on XRootD servers. Once
installed, fsspec will be able to work with urls with the 'root' protocol. Only
tested with Linux at this time.

Expand Down
13 changes: 10 additions & 3 deletions src/fsspec_xrootd/xrootd.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ async def _ls(self, path: str, detail: bool = True, **kwargs: Any) -> list[Any]:
else:
return [os.path.basename(item["name"].rstrip("/")) for item in listing]

async def _cat_file(self, path: str, start: int, end: int, **kwargs: Any) -> Any:
async def _cat_file(
self, path: str, start: int | None, end: int | None, **kwargs: Any
) -> Any:
_myFile = client.File()
try:
status, _n = await _async_wrap(
Expand All @@ -397,10 +399,15 @@ async def _cat_file(self, path: str, start: int, end: int, **kwargs: Any) -> Any
)
if not status.ok:
raise OSError(f"File failed to read: {status.message}")

n_bytes = end
if start is not None and end is not None:
n_bytes = end - start

status, data = await _async_wrap(
_myFile.read,
start,
end - start,
start or 0,
n_bytes or 0,
self.timeout,
)
if not status.ok:
Expand Down
24 changes: 23 additions & 1 deletion tests/test_basicio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import os
import shutil
import socket
import subprocess
import time

Expand All @@ -16,19 +17,29 @@
_vectors_to_chunks,
)

XROOTD_PORT = 1094
TESTDATA1 = "apple\nbanana\norange\ngrape"
TESTDATA2 = "red\ngreen\nyellow\nblue"
sleep_time = 0.2
expiry_time = 0.1


def require_port_availability(port: int) -> bool:
"""Raise an exception if the given port is already in use."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
if s.connect_ex(("localhost", port)) == 0:
raise RuntimeError(f"This test requires port {port} to be available")


@pytest.fixture(scope="module")
def localserver(tmpdir_factory):
require_port_availability(XROOTD_PORT)

srvdir = tmpdir_factory.mktemp("srv")
tempPath = os.path.join(srvdir, "Folder")
os.mkdir(tempPath)
xrdexe = shutil.which("xrootd")
proc = subprocess.Popen([xrdexe, srvdir])
proc = subprocess.Popen([xrdexe, "-p", str(XROOTD_PORT), srvdir])
time.sleep(2) # give it some startup
yield "root://localhost/" + str(tempPath), tempPath
proc.terminate()
Expand Down Expand Up @@ -158,6 +169,17 @@ def test_write_fsspec(localserver, clear_server):
assert f.read() == TESTDATA1


@pytest.mark.parametrize("start, end", [(None, None), (None, 10), (1, None), (1, 10)])
def test_read_bytes_fsspec(localserver, clear_server, start, end):
remoteurl, localpath = localserver
with open(localpath + "/testfile.txt", "w") as fout:
fout.write(TESTDATA1)

fs, _ = fsspec.core.url_to_fs(remoteurl)
data = fs.read_bytes(localpath + "/testfile.txt", start=start, end=end)
assert data == TESTDATA1.encode("utf-8")[start:end]


def test_append_fsspec(localserver, clear_server):
remoteurl, localpath = localserver
with open(localpath + "/testfile.txt", "w") as fout:
Expand Down

0 comments on commit ccc4e71

Please sign in to comment.