Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Truncate file through EOS using XRootD File interface #69

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 30 additions & 14 deletions src/fsspec_xrootd/xrootd.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

Check warning on line 1 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing module docstring

import asyncio
import io
Expand All @@ -15,18 +15,19 @@
from fsspec.asyn import AsyncFileSystem, _run_coros_in_chunks, sync, sync_wrapper
from fsspec.exceptions import FSTimeoutError
from fsspec.spec import AbstractBufferedFile
from XRootD import client

Check failure on line 18 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD'
from XRootD.client import File

Check failure on line 19 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD.client'
from XRootD.client.flags import (

Check failure on line 20 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD.client.flags'
DirListFlags,
MkDirFlags,
OpenFlags,
QueryCode,
StatInfoFlags,
)
from XRootD.client.responses import HostList, XRootDStatus

Check failure on line 27 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD.client.responses'


class ErrorCodes(IntEnum):

Check warning on line 30 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing class docstring
INVALID_PATH = 400


Expand All @@ -51,13 +52,13 @@
asyncio.get_running_loop().create_future()
)

def callback(status: XRootDStatus, content: T, servers: HostList) -> None:

Check warning on line 55 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unused argument 'servers'
if future.cancelled():
return
loop = future.get_loop()
try:
loop.call_soon_threadsafe(future.set_result, (status, content))
except Exception as exc:

Check warning on line 61 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Catching too general exception Exception
loop.call_soon_threadsafe(future.set_exception, exc)

async def wrapped(*args: Any, **kwargs: Any) -> tuple[XRootDStatus, T]:
Expand Down Expand Up @@ -142,7 +143,7 @@
handle: client.File


class ReadonlyFileHandleCache:

Check warning on line 146 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing class docstring
def __init__(self, loop: Any, max_items: int | None, ttl: int):
self.loop = loop
self._max_items = max_items
Expand Down Expand Up @@ -170,7 +171,7 @@
item.handle.close(callback=lambda *args: None)
cache.clear()

def close_all(self) -> None:

Check warning on line 174 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing function or method docstring
self._close_all(self.loop, self._cache)

async def _close(self, url: str, timeout: int) -> None:
Expand All @@ -183,7 +184,7 @@
close = sync_wrapper(_close)

async def _start_pruner(self) -> None:
self._prune_task = asyncio.create_task(self._pruner())

Check warning on line 187 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Attribute '_prune_task' defined outside __init__

async def _pruner(self) -> None:
while True:
Expand Down Expand Up @@ -220,7 +221,7 @@
return handle


class XRootDFileSystem(AsyncFileSystem): # type: ignore[misc]

Check warning on line 224 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing class docstring

Check warning on line 224 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Method '_cp_file' is abstract in class 'AsyncFileSystem' but is not overridden in child class 'XRootDFileSystem'

Check warning on line 224 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Method '_pipe_file' is abstract in class 'AsyncFileSystem' but is not overridden in child class 'XRootDFileSystem'
protocol = "root"
root_marker = "/"
default_timeout = 60
Expand Down Expand Up @@ -372,21 +373,36 @@
raise OSError(f"File not removed properly: {status.message}")

async def _touch(self, path: str, truncate: bool = False, **kwargs: Any) -> None:
if truncate or not await self._exists(path):
status, _ = await _async_wrap(self._myclient.truncate)(
path, size=0, timeout=self.timeout
)
# necessary to write in EOS, see https://github.com/xrootd/xrootd/issues/2304
if "eos" in self.hostid:
eos_full_path = f"{self.protocol}://{self.hostid}/{path}"
f = File()
status, _ = f.open(eos_full_path, OpenFlags.NEW, timeout=self.timeout)
if not status.ok:
raise OSError(f"File not touched properly: {status.message}")
raise OSError(f"Impossible to create file in EOS: {status.message}")
if truncate or not await self._exists(path):
status, _ = await _async_wrap(f.truncate)(size=0, timeout=self.timeout)
else:
len = await self._info(path)
status, _ = await _async_wrap(f.truncate)(
size=len.get("size"),
timeout=self.timeout,
)
f.close()
else:
len = await self._info(path)
status, _ = await _async_wrap(self._myclient.truncate)(
path,
size=len.get("size"),
timeout=self.timeout,
)
if not status.ok:
raise OSError(f"File not touched properly: {status.message}")
if truncate or not await self._exists(path):
status, _ = await _async_wrap(self._myclient.truncate)(
path, size=0, timeout=self.timeout
)
else:
len = await self._info(path)
status, _ = await _async_wrap(self._myclient.truncate)(
path,
size=len.get("size"),
timeout=self.timeout,
)
if not status.ok:
raise OSError(f"File not touched properly: {status.message}")

touch = sync_wrapper(_touch)

Expand Down Expand Up @@ -834,7 +850,7 @@

self.kwargs = kwargs

if mode not in {"ab", "rb", "wb"}:
if mode not in {"ab", "rb", "wb", "a+b", "r+b", "w+b"}:
raise NotImplementedError("File mode not supported")
if mode == "rb":
if size is not None:
Expand Down
Loading