Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PROPFIND and glob fix #45

Merged
merged 7 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 186 additions & 24 deletions src/pelicanfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""

import cachetools
from fsspec.utils import glob_translate
from fsspec.asyn import AsyncFileSystem, sync
from .dir_header_parser import parse_metalink
import fsspec.implementations.http as fshttp
Expand Down Expand Up @@ -139,13 +140,16 @@ def __init__ (
self._pipe_file = self.httpFileSystem._pipe_file
self._mkdir = self.httpFileSystem._mkdir
self._makedirs = self.httpFileSystem._makedirs


# TODO: These functions are to be implemented. Currently A top level call to glob/du/info will result
# in a failure
self._glob = self.httpFileSystem._glob
self._du = self.httpFileSystem._du
self._info = self.httpFileSystem._info
@classmethod
turetske marked this conversation as resolved.
Show resolved Hide resolved
def _strip_protocol(cls, path):
"""For HTTP, we always want to keep the full URL"""
if path.startswith("osdf://"):
path = path[7:]
elif path.startswith("pelican://"):
path = path[10:]
return path


async def _discover_federation_metadata(self, discUrl):
"""
Expand Down Expand Up @@ -255,6 +259,33 @@ async def get_origin_url(self, fileloc: str) -> str:
raise NoAvailableSource()
return origin

async def get_dirlist_url(self, fileloc: str) -> str:
"""
Returns a dirlist host url for the given namespace locations
"""

if not self.directorUrl:
metadata_json = await self._discover_federation_metadata(self.discoveryUrl)
# Ensure the director url has a '/' at the end
directorUrl = metadata_json.get('director_endpoint')
if not directorUrl:
raise InvalidMetadata()

if not directorUrl.endswith("/"):
directorUrl = directorUrl + "/"
self.directorUrl = directorUrl

url = urllib.parse.urljoin(self.directorUrl, fileloc)

# Timeout response in seconds - the default response is 5 minutes
timeout = aiohttp.ClientTimeout(total=5)
session = await self.httpFileSystem.set_session()
async with session.request('PROPFIND', url, timeout=timeout) as resp:
dirlist_url = parse_metalink(resp.headers)[0][0][0]
turetske marked this conversation as resolved.
Show resolved Hide resolved
if not dirlist_url:
raise NoAvailableSource()
return dirlist_url

def _get_prefix_info(self, path: str) -> _CacheManager:
"""
Given a path into the filesystem, return the information inthe
Expand Down Expand Up @@ -298,36 +329,171 @@ def _dirlist_dec(func):
Decorator function which, when given a namespace location, get the url for the dirlist location from the headers
and uses that url for the given function.

This is for functions which need to list information in the origin directories such as "find", "isdir", "ls"
This is for functions which need to get information from origin directories "ls", "du", "info", etc.
"""
async def wrapper(self, *args, **kwargs):
path = args[0]
path = self._check_fspath(path)
# TODO: need to have a separate get_dirlist_url
listUrl = await self.get_origin_url(path)
result = await func(self, listUrl, *args[1:], **kwargs)
path = self._check_fspath(args[0])
dataUrl = await self.get_dirlist_url(path)
result = await func(self, dataUrl, *args[1:], **kwargs)
return result
return wrapper


def _remove_dirlist_from_path(self, path):
turetske marked this conversation as resolved.
Show resolved Hide resolved
parsed_url = urllib.parse.urlparse(path)
updated_url = parsed_url._replace(netloc="", scheme="")
return urllib.parse.urlunparse(updated_url)

def _remove_dirlist_from_paths(self, paths):
if isinstance(paths, list):
return [self._remove_dirlist_from_paths(path) for path in paths]

if isinstance(paths, dict):
turetske marked this conversation as resolved.
Show resolved Hide resolved
if 'name' in paths:
path = paths['name']
paths['name'] = self._remove_dirlist_from_path(path)
if 'url' in paths:
url = paths['url']
paths['url'] = self._remove_dirlist_from_path(url)
return paths
else:
new_dict = {}
for key, item in paths.items():
new_key = self._remove_dirlist_from_path(key)
new_item = self._remove_dirlist_from_paths(item)
new_dict[new_key] = new_item
return new_dict

if isinstance(paths, str):
return self._remove_dirlist_from_path(paths)

return paths

def _dirlist_dec(func):
"""
Decorator function which, when given a namespace location, get the url for the dirlist location from the headers
and uses that url for the given function. It then normalizes the paths or list of paths returned by the function

This is for functions which need to retrieve information from origin directories such as "find", "ls", "info", etc.
"""
async def wrapper(self, *args, **kwargs):
path = self._check_fspath(args[0])
dataUrl = await self.get_dirlist_url(path)
return await func(self, dataUrl, *args[1:], **kwargs)
return wrapper


@_dirlist_dec
async def _ls(self, path, detail=True, **kwargs):
return await self.httpFileSystem._ls(path, detail, **kwargs)
results = await self.httpFileSystem._ls(path, detail, **kwargs)
return self._remove_dirlist_from_paths(results)

@_dirlist_dec
async def _isdir(self, path):
return await self.httpFileSystem._isdir(path)

@_dirlist_dec
async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
return await self.httpFileSystem._find(path, maxdepth, withdirs, **kwargs)
results = await self.httpFileSystem._find(path, maxdepth, withdirs, **kwargs)
return self._remove_dirlist_from_paths(results)

async def _isfile(self, path):
return not await self._isdir(path)

async def _glob(self, path, maxdepth=None, **kwargs):
"""
Find files by glob-matching.

This implementation is based of the one in HTTPSFileSystem,
except it cleans the path url of double '//' and checks for
the dirlisthost ahead of time
"""
if maxdepth is not None and maxdepth < 1:
raise ValueError("maxdepth must be at least 1")
import re

dirlist_path = await self.get_dirlist_url(path)
# Need to ensure the path with the any sort of special `glob` characters is added back in
parsed_path = urllib.parse.urlparse(dirlist_path)
updated_path = urllib.parse.urlunparse(parsed_path._replace(path=path))

ends_with_slash = updated_path.endswith("/") # _strip_protocol strips trailing slash
path = self._strip_protocol(updated_path)
append_slash_to_dirname = ends_with_slash or path.endswith(("/**", "/*"))
idx_star = path.find("*") if path.find("*") >= 0 else len(path)
idx_brace = path.find("[") if path.find("[") >= 0 else len(path)

min_idx = min(idx_star, idx_brace)

detail = kwargs.pop("detail", False)

if not fshttp.has_magic(path):
if await self._exists(path, **kwargs):
if not detail:
return [path]
else:
return {path: await self._info(path, **kwargs)}
else:
if not detail:
return [] # glob of non-existent returns empty
else:
return {}
elif "/" in path[:min_idx]:
min_idx = path[:min_idx].rindex("/")
root = path[: min_idx + 1]
depth = path[min_idx + 1 :].count("/") + 1
else:
root = ""
depth = path[min_idx + 1 :].count("/") + 1

if "**" in path:
if maxdepth is not None:
idx_double_stars = path.find("**")
depth_double_stars = path[idx_double_stars:].count("/") + 1
depth = depth - depth_double_stars + maxdepth
else:
depth = None

allpaths = await self._find(
root, maxdepth=depth, withdirs=True, detail=True, **kwargs
)

pattern = glob_translate(self._remove_dirlist_from_path(path) + ("/" if ends_with_slash else ""))
pattern = re.compile(pattern)

out = {
(
self._remove_dirlist_from_path(p.rstrip("/"))
if not append_slash_to_dirname
and info["type"] == "directory"
and p.endswith("/")
else self._remove_dirlist_from_path(p)
): info
for p, info in sorted(allpaths.items())
if pattern.match(p.rstrip("/"))
}

if detail:
return out
else:
return list(out)


@_dirlist_dec
async def _info(self, path, **kwargs):
results = await self.httpFileSystem._info(path, **kwargs)
return self._remove_dirlist_from_paths(results)

@_dirlist_dec
async def _du(self, path, total=True, maxdepth=None, **kwargs):
return await self.httpFileSystem._du(path, total, maxdepth, **kwargs)

# Not using a decorator because it requires a yield
async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs):
path = self._check_fspath(path)
# TODO: need to have a separate get_dirlist_url
listUrl = await self.get_origin_url(path)
listUrl = await self.get_dirlist_url(path)
async for _ in self.httpFileSystem._walk(listUrl, maxdepth, on_error, **kwargs):
yield _
yield self._remove_dirlist_from_path(_)

def _io_wrapper(self, func):
"""
Expand Down Expand Up @@ -378,10 +544,10 @@ def _check_fspath(self, path: str) -> str:
path = pelican_url.path
return path

def open(self, path, **kwargs):
def open(self, path, mode, **kwargs):
path = self._check_fspath(path)
data_url = sync(self.loop, self.get_origin_cache if self.directReads else self.get_working_cache, path)
fp = self.httpFileSystem.open(data_url, **kwargs)
fp = self.httpFileSystem.open(data_url, mode, **kwargs)
fp.read = self._io_wrapper(fp.read)
return fp

Expand Down Expand Up @@ -469,10 +635,6 @@ async def _cat_file(self, path, start=None, end=None, **kwargs):
async def _exists(self, path, **kwargs):
return await self.httpFileSystem._exists(path, **kwargs)

@_cache_dec
async def _isfile(self, path, **kwargs):
return await self.httpFileSystem._isfile(path, **kwargs)

@_cache_dec
async def _get_file(self, rpath, lpath, **kwargs):
return await self.httpFileSystem._get_file(rpath, lpath, **kwargs)
Expand Down
17 changes: 0 additions & 17 deletions src/pelicanfs/dir_header_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,3 @@ def parse_metalink(headers: dict[str, str]) -> tuple[list[tuple[str, int]], str]
break

return linkPrio, namespace

def get_dirlist_loc(headers={}):
"""
Parse the headers to get the dirlist location

This will None if there is no dirlist location
"""
if "X-Pelican-Namespace" in headers:
namespace = headers["X-Pelican-Namespace"]
elmts = namespace.split(", ")
for elm in elmts:
left, right = elm.split("=", 1)
if left == "collections-url":
return right


return None
Loading