From ddd6973a161689c97c41c5ec6daba28e0a5bd35d Mon Sep 17 00:00:00 2001 From: Emma Turetsky Date: Thu, 11 Apr 2024 12:44:59 -0500 Subject: [PATCH] Initial implementation of the pelican fsspec code --- src/__init__.py | 0 src/pelicanfs/__init__.py | 0 src/pelicanfs/core.py | 258 +++++++++++++++++++++++++++++ src/pelicanfs/dir_header_parser.py | 43 +++++ 4 files changed, 301 insertions(+) create mode 100644 src/__init__.py create mode 100644 src/pelicanfs/__init__.py create mode 100644 src/pelicanfs/core.py create mode 100644 src/pelicanfs/dir_header_parser.py diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pelicanfs/__init__.py b/src/pelicanfs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pelicanfs/core.py b/src/pelicanfs/core.py new file mode 100644 index 0000000..527ba08 --- /dev/null +++ b/src/pelicanfs/core.py @@ -0,0 +1,258 @@ +""" +Copyright (C) 2024, Pelican Project, Morgridge Institute for Research + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You may +obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import fsspec +import fsspec.registry +from fsspec.asyn import AsyncFileSystem +import pelicanfs.dir_header_parser as dir_header_parser +import fsspec.implementations.http as fshttp +import aiohttp +import urllib.parse +import asyncio + +class PelicanFileSystem(AsyncFileSystem): + """ + Access a pelican namespace as if it were a file system. + + This exposes a filesystem-like API (ls, cp, open, etc.) on top of pelican + + It works by composing with an http fsspec. Whenever a function call + is made to the PelicanFileSystem, it will call out to the director to get + an appropriate url for the given call. This url is then passed on to the + http fsspec which will handle the actual logic of the function. + + NOTE: Once a url is passed onto the http fsspec, that url will be the one + used for all sub calls within the http fsspec. + """ + + protocol = "pelican" + + + def __init__ ( + self, + directorUrl, + asynchronous = False, + loop = None + ): + + # The internal filesystem + self.httpFileSystem = fshttp.HTTPFileSystem(asynchronous=asynchronous, loop=loop) + + # Ensure the director url ends with a "/" + if directorUrl[-1] != "/": + directorUrl = directorUrl + "/" + self.directorUrl = directorUrl + + + super().__init__(self, asynchronous=asynchronous, loop=loop) + + # These are all not implemented in the http fsspec and as such are not implemented in the pelican fsspec + # They will raise NotImplementedErrors when called + self._rm_file = self.httpFileSystem._rm_file + self._cp_file = self.httpFileSystem._cp_file + self._pipe_file = self.httpFileSystem._pipe_file + self._mkdir = self.httpFileSystem._mkdir + self._makedirs = self.httpFileSystem._makedirs + + + # TODO: These functions are to be implemented. Currently A top level call to glob/du/info will result + # in a failure + self._glob = self.httpFileSystem._glob + self._du = self.httpFileSystem._du + self._info = self.httpFileSystem._info + + + async def get_director_headers(self, fileloc): + """ + Returns the header response from a GET call to the director + """ + if fileloc[0] == "/": + fileloc = fileloc[1:] + url = self.directorUrl + fileloc + async with aiohttp.ClientSession() as session: + async with session.get(url, allow_redirects=False) as resp: + return resp.headers + + async def get_working_cache(self, fileloc): + """ + Returns the highest priority cache for the namespace that appears to be owrking + """ + headers = await self.get_director_headers(fileloc) + metalist = dir_header_parser.parse_metalink(headers)[1:] + while len(metalist) > 0: + updatedUrl = metalist[0][0] + metalist = metalist[1:] + # Timeout response in seconds - the default response is 5 minutes + timeout = aiohttp.ClientTimeout(total=2) + async with aiohttp.ClientSession() as session: + async with session.get(updatedUrl, timeout=timeout) as resp: + try: + resp.status + except (aiohttp.client_exceptions.ClientConnectorError, FileNotFoundError, asyncio.TimeoutError): + continue + break + if len(metalist) == 0: + # No working cache was found + raise RuntimeError + + return updatedUrl + + + def _dirlist_dec(func): + """ + Decorator function which, when given a namespace location, get the url for the dirlist location from the headers + and uses that url for the given function. + + This is for functions which need to list information in the origin directories such as "find", "isdir", "ls" + """ + async def wrapper(self, *args, **kwargs): + path = args[0] + parsedUrl = urllib.parse.urlparse(path) + headers = await self.get_director_headers(parsedUrl.path) + dirlistloc = dir_header_parser.get_dirlist_loc(headers) + if dirlistloc == None: + raise RuntimeError + listUrl = dirlistloc + "/" + parsedUrl.path + result = await func(self, listUrl, *args[1:], **kwargs) + return result + return wrapper + + @_dirlist_dec + async def _ls(self, path, detail=True, **kwargs): + return await self.httpFileSystem._ls(path, detail, **kwargs) + + @_dirlist_dec + async def _isdir(self, path): + return await self.httpFileSystem._isdir(path) + + @_dirlist_dec + async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): + return await self.httpFileSystem._find(path, maxdepth, withdirs, **kwargs) + + # Not using a decorator because it requires a yield + async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs): + parsedUrl = urllib.parse.urlparse(path) + headers = await self.get_director_headers(parsedUrl.path) + dirlistloc = dir_header_parser.get_dirlist_loc(headers) + if dirlistloc == "": + raise RuntimeError + listUrl = dirlistloc + "/" + path + async for _ in self.httpFileSystem._walk(listUrl, maxdepth, on_error, **kwargs): + yield _ + + + def _open( + self, + path, + mode="rb", + block_size=None, + autocommit=None, # XXX: This differs from the base class. + cache_type=None, + cache_options=None, + size=None, + **kwargs, + ): + loop = asyncio.get_event_loop() + cache_url = loop.run_until_complete(self.get_working_cache(path)) + + return self.httpFileSystem._open(cache_url, mode, block_size, autocommit, cache_type, cache_options, size, **kwargs) + + + + def _cache_dec(func): + """ + Decorator function which, when given a namespace location, finds the best working cache that serves the namespace, + then calls the sub function with that namespace + + + Note: This will find the nearest cache even if provided with a valid url. The reason being that if that url was found + via an "ls" call, then that url points to an origin, not the cache. So it cannot be assumed that a valid url points to + a cache + """ + async def wrapper(self, *args, **kwargs): + path = args[0] + parsedUrl = urllib.parse.urlparse(path) + if parsedUrl.scheme == "http": + cacheUrl = path + else: + cacheUrl = await self.get_working_cache(parsedUrl.path) + result = await func(self, cacheUrl, *args[1:], **kwargs) + return result + return wrapper + + def _cache_multi_dec(func): + """ + Decorator function which, when given a list of namespace location, finds the best working cache that serves the namespace, + then calls the sub function with that namespace + + + Note: If a valid url is provided, it will not call the director to get a cache. This does mean that if a url was created/retrieved via + ls and then used for another function, the url will be an origin url and not a cache url. This should be fixed in the future. + """ + async def wrapper(self, *args, **kwargs): + path = args[0] + if isinstance(path, str): + parsedUrl = urllib.parse.urlparse(path) + if parsedUrl.scheme == "http": + cacheUrl = path + else: + cacheUrl = await self.get_working_cache(parsedUrl.path) + else: + cacheUrl = [] + for p in path: + parsedUrl = urllib.parse.urlparse(p) + if parsedUrl.scheme == "http": + cUrl = p + else: + cUrl = cacheUrl = await self.get_working_cache(parsedUrl.path) + cacheUrl.append(cUrl) + result = await func(self, cacheUrl, *args[1:], **kwargs) + return result + return wrapper + + @_cache_dec + async def _cat_file(self, path, start=None, end=None, **kwargs): + return await self.httpFileSystem._cat_file(path, start, end, **kwargs) + + @_cache_dec + async def _exists(self, path, **kwargs): + return await self.httpFileSystem._exists(path, **kwargs) + + @_cache_dec + async def _isfile(self, path, **kwargs): + return await self.httpFileSystem._isfile(path, **kwargs) + + @_cache_dec + async def _get_file(self, rpath, lpath, **kwargs): + return await self.httpFileSystem._get_file(rpath, lpath, **kwargs) + + + @_cache_multi_dec + async def _cat(self, path, recursive=False, on_error="raise", batch_size=None, **kwargs): + return await self.httpFileSystem._cat(path, recursive, on_error, batch_size, **kwargs) + + @_cache_multi_dec + async def _expand_path(self, path, recursive=False, maxdepth=None): + return await self.httpFileSystem._expand_path(path, recursive, maxdepth) + + +fsspec.register_implementation(PelicanFileSystem.protocol, PelicanFileSystem) + +def PelicanMap(root, pelfs, check=False, create=False): + loop = asyncio.get_event_loop() + cache_url = loop.run_until_complete(pelfs.get_working_cache(root)) + + return pelfs.get_mapper(cache_url, check=check, create=create) \ No newline at end of file diff --git a/src/pelicanfs/dir_header_parser.py b/src/pelicanfs/dir_header_parser.py new file mode 100644 index 0000000..0fbfa77 --- /dev/null +++ b/src/pelicanfs/dir_header_parser.py @@ -0,0 +1,43 @@ + +def parse_metalink(headers={}): + """ + Parse the metalink headers to get a list of caches to attempt to try in priority orider + """ + linkPrio = [] + + if "Link" in headers: + links = headers["Link"].split(",") + for mlink in links: + elmts = mlink.split(";") + mdict = {} + for elm in elmts[1:]: + left, right = elm.split("=", 1) + mdict[left.strip()] = right.strip() + + priority = len(headers) + if mdict["pri"]: + priority = int(mdict["pri"]) + + link = elmts[0].strip(" <>") + + linkPrio.append([link, priority]) + + linkPrio.sort(key=lambda x: x[1]) + return linkPrio + +def get_dirlist_loc(headers={}): + """ + Parse the headers to get the dirlist location + + This will None if there is no dirlist location + """ + if "X-Pelican-Namespace" in headers: + namespace = headers["X-Pelican-Namespace"] + elmts = namespace.split(", ") + for elm in elmts: + left, right = elm.split("=", 1) + if left == "collections-url": + return right + + + return None \ No newline at end of file