Skip to content

Commit

Permalink
File ops feature (#7)
Browse files Browse the repository at this point in the history
* file system functions added

* Passes old tests

* mkdir fixed

mkdir and mkdirs fixed
update mode removed
passes tests

* stuff

* passes broken server test

* info() and exists()

* working dir cache

* timeouts added

* invalidate cache

* exceptions changed

* cache issues fixed

* walk, find, glob, du tests added

* touch() works

* Most changes made

* lint

* all changes implemented

* lint
  • Loading branch information
ScottDemarest authored Jul 22, 2022
1 parent 3bb2bae commit c966495
Show file tree
Hide file tree
Showing 2 changed files with 380 additions and 47 deletions.
212 changes: 180 additions & 32 deletions src/fsspec_xrootd/xrootd.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,56 @@
from __future__ import annotations

import io
import os.path
import warnings
from enum import IntEnum
from typing import Any

from fsspec.dircache import DirCache # type: ignore[import]
from fsspec.spec import AbstractBufferedFile, AbstractFileSystem # type: ignore[import]
from XRootD import client # type: ignore[import]
from XRootD.client.flags import ( # type: ignore[import]
DirListFlags,
MkDirFlags,
OpenFlags,
StatInfoFlags,
)


class ErrorCodes(IntEnum):
INVALID_PATH = 400


class XRootDFileSystem(AbstractFileSystem): # type: ignore[misc]

protocol = "root"
root_marker = "/"
default_timeout = 60

def __init__(self, *args: list[Any], **storage_options: str) -> None:
def __init__(self, *args: list[Any], **storage_options: Any) -> None:
self.timeout = storage_options.get("timeout", XRootDFileSystem.default_timeout)
self._path = storage_options["path"]
self._myclient = client.FileSystem(
storage_options["protocol"] + "://" + storage_options["hostid"]
)
self.storage_options = storage_options
status, _n = self._myclient.ping(15)
if not status.ok:
raise OSError(f"Could not connect to server {storage_options['hostid']}")
self._intrans = False
self.dircache = DirCache(
use_listings_cache=True,
listings_expiry_time=storage_options.get("listings_expiry_time", 0),
)
self.storage_options = storage_options

def invalidate_cache(self, path: str | None = None) -> None:
if path is None:
self.dircache.clear()
else:
try:
del self.dircache[path]
except KeyError:
pass

@staticmethod
def _get_kwargs_from_urls(u: str) -> dict[Any, Any]:
Expand All @@ -43,39 +69,157 @@ def _get_kwargs_from_urls(u: str) -> dict[Any, Any]:
}

@classmethod
def _strip_protocol(cls, path: str) -> Any:
url = client.URL(path)
def _strip_protocol(cls, path: str | list[str]) -> Any:
if type(path) == str:
return client.URL(path).path
elif type(path) == list:
return [client.URL(item).path for item in path]
else:
raise ValueError("Strip protocol not given string or list")

return url.path
def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None:
if create_parents:
status, n = self._myclient.mkdir(
path, MkDirFlags.MAKEPATH, timeout=self.timeout
)
else:
status, n = self._myclient.mkdir(path, timeout=self.timeout)
if not status.ok:
raise OSError(f"Directory not made properly: {status.message}")

def ls(self, path: str, detail: bool = True, **kwargs: Any) -> list[Any]:
def makedirs(self, path: str, exist_ok: bool = False) -> None:
if not exist_ok:
if self.exists(path):
raise OSError(
"Location already exists and exist_ok arg was set to false"
)
status, n = self._myclient.mkdir(
path, MkDirFlags.MAKEPATH, timeout=self.timeout
)
if not status.ok and not (status.code == ErrorCodes.INVALID_PATH and exist_ok):
raise OSError(f"Directory not made properly: {status.message}")

stats, deets = self._myclient.dirlist(path, DirListFlags.STAT)
def rmdir(self, path: str) -> None:
status, n = self._myclient.rmdir(path, timeout=self.timeout)
if not status.ok:
raise OSError(f"Directory not removed properly: {status.message}")

listing = []
def _rm(self, path: str) -> None:
status, n = self._myclient.rm(path, timeout=self.timeout)
if not status.ok:
raise OSError(f"File not removed properly: {status.message}")

if detail:
for item in deets:
t = ""
if item.statinfo.flags and StatInfoFlags.IS_DIR:
t = "directory"
elif item.statinfo.flags and StatInfoFlags.OTHER:
t = "other"
else:
t = "file"
def touch(self, path: str, truncate: bool = False, **kwargs: Any) -> None:
if truncate or not self.exists(path):
status, _ = self._myclient.truncate(path, 0, timeout=self.timeout)
if not status.ok:
raise OSError(f"File not touched properly: {status.message}")
else:
status, _ = self._myclient.truncate(
path, self.info(path).get("size"), timeout=self.timeout
)
if not status.ok:
raise OSError(f"File not touched properly: {status.message}")

listing.append(
{
"name": path + "/" + item.name,
"size": item.statinfo.size,
"type": t,
def modified(self, path: str) -> Any:
status, statInfo = self._myclient.stat(path, timeout=self.timeout)
return statInfo.modtime

def exists(self, path: str, **kwargs: Any) -> bool:
if path in self.dircache:
return True
else:
status, _ = self._myclient.stat(path, timeout=self.timeout)
if status.code == ErrorCodes.INVALID_PATH:
return False
elif not status.ok:
raise OSError(f"status check failed with message: {status.message}")
return True

def info(self, path: str, **kwargs: Any) -> dict[str, Any]:
spath = os.path.split(path)
deet = self._ls_from_cache(spath[0])
if deet is not None:
for item in deet:
if item["name"] == path:
return {
"name": path,
"size": item["size"],
"type": item["type"],
}
)
raise OSError("_ls_from_cache() failed to function")
else:
for item in deets:
listing.append(item.name)
status, deet = self._myclient.stat(path, timeout=self.timeout)
if not status.ok:
raise OSError(f"File stat request failed: {status.message}")
if deet.flags & StatInfoFlags.IS_DIR:
ret = {
"name": path,
"size": deet.size,
"type": "directory",
}
elif deet.flags & StatInfoFlags.OTHER:
ret = {
"name": path,
"size": deet.size,
"type": "other",
}
else:
ret = {
"name": path,
"size": deet.size,
"type": "file",
}
return ret

return listing
def ls(self, path: str, detail: bool = True, **kwargs: Any) -> list[Any]:
listing = []
if path in self.dircache and not kwargs.get("force_update", False):
if detail:
listing = self._ls_from_cache(path)
return listing
else:
return [
os.path.basename(item["name"]) for item in self._ls_from_cache(path)
]
else:
status, deets = self._myclient.dirlist(
path, DirListFlags.STAT, timeout=self.timeout
)
if not status.ok:
raise OSError(
f"Server failed to provide directory info: {status.message}"
)
for item in deets:
if item.statinfo.flags & StatInfoFlags.IS_DIR:
listing.append(
{
"name": path + "/" + item.name,
"size": item.statinfo.size,
"type": "directory",
}
)
elif item.statinfo.flags & StatInfoFlags.OTHER:
listing.append(
{
"name": path + "/" + item.name,
"size": item.statinfo.size,
"type": "other",
}
)
else:
listing.append(
{
"name": path + "/" + item.name,
"size": item.statinfo.size,
"type": "file",
}
)
self.dircache[path] = listing
if detail:
return listing
else:
return [os.path.basename(item["name"].rstrip("/")) for item in listing]

def _open(
self,
Expand Down Expand Up @@ -166,13 +310,13 @@ def __init__(
) -> None:
from fsspec.core import caches

self.timeout = fs.timeout
# by this point, mode will have a "b" in it
# update "+" mode removed for now since seek() is read only
if "x" in mode:
self.mode = OpenFlags.NEW
elif "a" in mode:
self.mode = OpenFlags.UPDATE
elif "+" in mode:
self.mode = OpenFlags.UPDATE
elif "w" in mode:
self.mode = OpenFlags.DELETE
elif "r" in mode:
Expand All @@ -187,14 +331,15 @@ def __init__(
+ "/"
+ path,
self.mode,
timeout=self.timeout,
)

if not status.ok:
raise OSError(f"File did not open properly: {status.message}")

self.metaOffset = 0
if "a" in mode:
_stats, _deets = self._myFile.stat()
_stats, _deets = self._myFile.stat(timeout=self.timeout)
self.metaOffset = _deets.size

self.path = path
Expand Down Expand Up @@ -241,7 +386,7 @@ def __init__(

def _fetch_range(self, start: int, end: int) -> Any:
status, data = self._myFile.read(
self.metaOffset + start, self.metaOffset + end - start
self.metaOffset + start, self.metaOffset + end - start, timeout=self.timeout
)
if not status.ok:
raise OSError(f"File did not read properly: {status.message}")
Expand Down Expand Up @@ -269,7 +414,10 @@ def flush(self, force: bool = False) -> None:

def _upload_chunk(self, final: bool = False) -> Any:
status, _n = self._myFile.write(
self.buffer.getvalue(), self.offset + self.metaOffset, self.buffer.tell()
self.buffer.getvalue(),
self.offset + self.metaOffset,
self.buffer.tell(),
timeout=self.timeout,
)
if final:
self.closed
Expand All @@ -292,7 +440,7 @@ def close(self) -> None:
if self.fs is not None:
self.fs.invalidate_cache(self.path)
self.fs.invalidate_cache(self.fs._parent(self.path))
status, _n = self._myFile.close()
status, _n = self._myFile.close(timeout=self.timeout)
if not status.ok:
raise OSError(f"File did not close properly: {status.message}")
self.closed = True
Loading

0 comments on commit c966495

Please sign in to comment.