From 1850025a3f76aea65c9e30960a73c386af25343e Mon Sep 17 00:00:00 2001 From: Emma Turetsky Date: Sun, 19 May 2024 08:44:39 -0500 Subject: [PATCH 1/7] Added glob/du/info support -- Moved glob to the top level to deal with potential '//' -- Added a get_dirlist_url function that uses PROPFIND -- Removed unused get_dirlist_host from the parser module --- src/pelicanfs/core.py | 132 ++++++++++++++++++++++++++--- src/pelicanfs/dir_header_parser.py | 19 +---- 2 files changed, 120 insertions(+), 31 deletions(-) diff --git a/src/pelicanfs/core.py b/src/pelicanfs/core.py index b0ff900..9c021db 100644 --- a/src/pelicanfs/core.py +++ b/src/pelicanfs/core.py @@ -15,6 +15,7 @@ """ import cachetools +from fsspec.utils import glob_translate from fsspec.asyn import AsyncFileSystem, sync from .dir_header_parser import parse_metalink import fsspec.implementations.http as fshttp @@ -139,13 +140,12 @@ def __init__ ( self._pipe_file = self.httpFileSystem._pipe_file self._mkdir = self.httpFileSystem._mkdir self._makedirs = self.httpFileSystem._makedirs - - # TODO: These functions are to be implemented. Currently A top level call to glob/du/info will result - # in a failure - self._glob = self.httpFileSystem._glob - self._du = self.httpFileSystem._du - self._info = self.httpFileSystem._info + @classmethod + def _strip_protocol(cls, path): + """For HTTP, we always want to keep the full URL""" + return path + async def _discover_federation_metadata(self, discUrl): """ @@ -255,6 +255,21 @@ async def get_origin_url(self, fileloc: str) -> str: raise NoAvailableSource() return origin + async def get_dirlist_url(self, fileloc: str) -> str: + """ + Returns a dirlist host url for the given namespace locations + """ + url = urllib.parse.urljoin(self.directorUrl, fileloc) + + # Timeout response in seconds - the default response is 5 minutes + timeout = aiohttp.ClientTimeout(total=5) + session = await self.httpFileSystem.set_session() + async with session.request('PROPFIND', url, timeout=timeout) as resp: + dirlist_url = parse_metalink(resp.headers)[0][0][0] + if not dirlist_url: + raise NoAvailableSource() + return dirlist_url + def _get_prefix_info(self, path: str) -> _CacheManager: """ Given a path into the filesystem, return the information inthe @@ -301,11 +316,9 @@ def _dirlist_dec(func): This is for functions which need to list information in the origin directories such as "find", "isdir", "ls" """ async def wrapper(self, *args, **kwargs): - path = args[0] - path = self._check_fspath(path) - # TODO: need to have a separate get_dirlist_url - listUrl = await self.get_origin_url(path) - result = await func(self, listUrl, *args[1:], **kwargs) + path = self._check_fspath(args[0]) + dataUrl = await self.get_dirlist_url(path) + result = await func(self, dataUrl, *args[1:], **kwargs) return result return wrapper @@ -321,11 +334,104 @@ async def _isdir(self, path): async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): return await self.httpFileSystem._find(path, maxdepth, withdirs, **kwargs) + async def _glob(self, path, maxdepth=None, **kwargs): + """ + Find files by glob-matching. + + This implementation is based of the one in HTTPSFileSystem, + except it cleans the path url of double '//' and checks for + the dirlisthost ahead of time + """ + if maxdepth is not None and maxdepth < 1: + raise ValueError("maxdepth must be at least 1") + import re + + dirlist_path = await self.get_dirlist_url(path) + # Need to ensure the path with the any sort of special `glob` characters is added back in + parsed_path = urllib.parse.urlparse(dirlist_path) + updated_path = urllib.parse.urlunparse(parsed_path._replace(path=path)) + + ends_with_slash = updated_path.endswith("/") # _strip_protocol strips trailing slash + path = self._strip_protocol(updated_path) + append_slash_to_dirname = ends_with_slash or path.endswith(("/**", "/*")) + idx_star = path.find("*") if path.find("*") >= 0 else len(path) + idx_brace = path.find("[") if path.find("[") >= 0 else len(path) + + min_idx = min(idx_star, idx_brace) + + detail = kwargs.pop("detail", False) + + if not fshttp.has_magic(path): + if await self._exists(path, **kwargs): + if not detail: + return [path] + else: + return {path: await self._info(path, **kwargs)} + else: + if not detail: + return [] # glob of non-existent returns empty + else: + return {} + elif "/" in path[:min_idx]: + min_idx = path[:min_idx].rindex("/") + root = path[: min_idx + 1] + depth = path[min_idx + 1 :].count("/") + 1 + else: + root = "" + depth = path[min_idx + 1 :].count("/") + 1 + + if "**" in path: + if maxdepth is not None: + idx_double_stars = path.find("**") + depth_double_stars = path[idx_double_stars:].count("/") + 1 + depth = depth - depth_double_stars + maxdepth + else: + depth = None + + allpaths = await self._find( + root, maxdepth=depth, withdirs=True, detail=True, **kwargs + ) + + pattern = glob_translate(path + ("/" if ends_with_slash else "")) + pattern = re.compile(pattern) + + allpaths_cleaned = {} + for p, info in allpaths.items(): + parsed = list(urllib.parse.urlparse(p)) + parsed[2] = re.sub("/{2,}", "/", parsed[2]) + cleaned = urllib.parse.urlunparse(parsed) + allpaths_cleaned[cleaned] = info + + out = { + ( + p.rstrip("/") + if not append_slash_to_dirname + and info["type"] == "directory" + and p.endswith("/") + else p + ): info + for p, info in sorted(allpaths_cleaned.items()) + if pattern.match(p.rstrip("/")) + } + + if detail: + return out + else: + return list(out) + + + @_dirlist_dec + async def _info(self, path, **kwargs): + return await self.httpFileSystem._info(path, **kwargs) + + @_dirlist_dec + async def _du(self, path, total=True, maxdepth=None, **kwargs): + return await self.httpFileSystem._du(path, total, maxdepth, **kwargs) + # Not using a decorator because it requires a yield async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs): path = self._check_fspath(path) - # TODO: need to have a separate get_dirlist_url - listUrl = await self.get_origin_url(path) + listUrl = await self.get_dirlist_url(path) async for _ in self.httpFileSystem._walk(listUrl, maxdepth, on_error, **kwargs): yield _ diff --git a/src/pelicanfs/dir_header_parser.py b/src/pelicanfs/dir_header_parser.py index fe5c0c3..602e3e2 100644 --- a/src/pelicanfs/dir_header_parser.py +++ b/src/pelicanfs/dir_header_parser.py @@ -36,21 +36,4 @@ def parse_metalink(headers: dict[str, str]) -> tuple[list[tuple[str, int]], str] namespace = val break - return linkPrio, namespace - -def get_dirlist_loc(headers={}): - """ - Parse the headers to get the dirlist location - - This will None if there is no dirlist location - """ - if "X-Pelican-Namespace" in headers: - namespace = headers["X-Pelican-Namespace"] - elmts = namespace.split(", ") - for elm in elmts: - left, right = elm.split("=", 1) - if left == "collections-url": - return right - - - return None \ No newline at end of file + return linkPrio, namespace \ No newline at end of file From 4d28b24a61161505f799bbfecd1fb9777fff739b Mon Sep 17 00:00:00 2001 From: Emma Turetsky Date: Sun, 19 May 2024 08:56:49 -0500 Subject: [PATCH 2/7] Add missing newline back in --- src/pelicanfs/dir_header_parser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pelicanfs/dir_header_parser.py b/src/pelicanfs/dir_header_parser.py index 602e3e2..f316153 100644 --- a/src/pelicanfs/dir_header_parser.py +++ b/src/pelicanfs/dir_header_parser.py @@ -36,4 +36,4 @@ def parse_metalink(headers: dict[str, str]) -> tuple[list[tuple[str, int]], str] namespace = val break - return linkPrio, namespace \ No newline at end of file + return linkPrio, namespace From c36d06b2395563a7425b9ac44325c5c2cd0b2961 Mon Sep 17 00:00:00 2001 From: Emma Turetsky Date: Sun, 19 May 2024 09:35:42 -0500 Subject: [PATCH 3/7] Added stripping of osdf/pelican to the strip_protocol code --- src/pelicanfs/core.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/pelicanfs/core.py b/src/pelicanfs/core.py index 9c021db..6b01790 100644 --- a/src/pelicanfs/core.py +++ b/src/pelicanfs/core.py @@ -144,6 +144,10 @@ def __init__ ( @classmethod def _strip_protocol(cls, path): """For HTTP, we always want to keep the full URL""" + if path.startswith("osdf://"): + path = path[7:] + elif path.startswith("pelican://"): + path = path[10:] return path From 46c77af32f57390656567b3c0dde192799897218 Mon Sep 17 00:00:00 2001 From: Emma Turetsky Date: Mon, 20 May 2024 11:36:14 -0500 Subject: [PATCH 4/7] Added a directoryUrl retrieval in get_dirlist_url --- src/pelicanfs/core.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/pelicanfs/core.py b/src/pelicanfs/core.py index 6b01790..43baf38 100644 --- a/src/pelicanfs/core.py +++ b/src/pelicanfs/core.py @@ -263,6 +263,18 @@ async def get_dirlist_url(self, fileloc: str) -> str: """ Returns a dirlist host url for the given namespace locations """ + + if not self.directorUrl: + metadata_json = await self._discover_federation_metadata(self.discoveryUrl) + # Ensure the director url has a '/' at the end + directorUrl = metadata_json.get('director_endpoint') + if not directorUrl: + raise InvalidMetadata() + + if not directorUrl.endswith("/"): + directorUrl = directorUrl + "/" + self.directorUrl = directorUrl + url = urllib.parse.urljoin(self.directorUrl, fileloc) # Timeout response in seconds - the default response is 5 minutes From 4be215daa0562103b0a0679187801dc099a22546 Mon Sep 17 00:00:00 2001 From: Emma Turetsky Date: Tue, 21 May 2024 15:00:51 -0500 Subject: [PATCH 5/7] Removed dirlisthost from returned paths from various calls Also created a work-around for isdir and isfile --- src/pelicanfs/core.py | 86 +++++++++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 23 deletions(-) diff --git a/src/pelicanfs/core.py b/src/pelicanfs/core.py index 43baf38..e8e4e23 100644 --- a/src/pelicanfs/core.py +++ b/src/pelicanfs/core.py @@ -329,7 +329,7 @@ def _dirlist_dec(func): Decorator function which, when given a namespace location, get the url for the dirlist location from the headers and uses that url for the given function. - This is for functions which need to list information in the origin directories such as "find", "isdir", "ls" + This is for functions which need to get information from origin directories "ls", "du", "info", etc. """ async def wrapper(self, *args, **kwargs): path = self._check_fspath(args[0]) @@ -338,17 +338,67 @@ async def wrapper(self, *args, **kwargs): return result return wrapper + + def _remove_dirlist_from_path(self, path): + parsed_url = urllib.parse.urlparse(path) + updated_url = parsed_url._replace(netloc="", scheme="") + return urllib.parse.urlunparse(updated_url) + + def _remove_dirlist_from_paths(self, paths): + if isinstance(paths, list): + return [self._remove_dirlist_from_paths(path) for path in paths] + + if isinstance(paths, dict): + if 'name' in paths: + path = paths['name'] + paths['name'] = self._remove_dirlist_from_path(path) + if 'url' in paths: + url = paths['url'] + paths['url'] = self._remove_dirlist_from_path(url) + return paths + else: + new_dict = {} + for key, item in paths.items(): + new_key = self._remove_dirlist_from_path(key) + new_item = self._remove_dirlist_from_paths(item) + new_dict[new_key] = new_item + return new_dict + + if isinstance(paths, str): + return self._remove_dirlist_from_path(paths) + + return paths + + def _dirlist_dec(func): + """ + Decorator function which, when given a namespace location, get the url for the dirlist location from the headers + and uses that url for the given function. It then normalizes the paths or list of paths returned by the function + + This is for functions which need to retrieve information from origin directories such as "find", "ls", "info", etc. + """ + async def wrapper(self, *args, **kwargs): + path = self._check_fspath(args[0]) + dataUrl = await self.get_dirlist_url(path) + return await func(self, dataUrl, *args[1:], **kwargs) + return wrapper + + @_dirlist_dec async def _ls(self, path, detail=True, **kwargs): - return await self.httpFileSystem._ls(path, detail, **kwargs) + results = await self.httpFileSystem._ls(path, detail, **kwargs) + return self._remove_dirlist_from_paths(results) @_dirlist_dec async def _isdir(self, path): return await self.httpFileSystem._isdir(path) - + @_dirlist_dec async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): - return await self.httpFileSystem._find(path, maxdepth, withdirs, **kwargs) + results = await self.httpFileSystem._find(path, maxdepth, withdirs, **kwargs) + return self._remove_dirlist_from_paths(results) + + async def _isfile(self, path): + return not await self._isdir(path) async def _glob(self, path, maxdepth=None, **kwargs): """ @@ -408,25 +458,18 @@ async def _glob(self, path, maxdepth=None, **kwargs): root, maxdepth=depth, withdirs=True, detail=True, **kwargs ) - pattern = glob_translate(path + ("/" if ends_with_slash else "")) + pattern = glob_translate(self._remove_dirlist_from_path(path) + ("/" if ends_with_slash else "")) pattern = re.compile(pattern) - allpaths_cleaned = {} - for p, info in allpaths.items(): - parsed = list(urllib.parse.urlparse(p)) - parsed[2] = re.sub("/{2,}", "/", parsed[2]) - cleaned = urllib.parse.urlunparse(parsed) - allpaths_cleaned[cleaned] = info - out = { ( - p.rstrip("/") + self._remove_dirlist_from_path(p.rstrip("/")) if not append_slash_to_dirname and info["type"] == "directory" and p.endswith("/") - else p + else self._remove_dirlist_from_path(p) ): info - for p, info in sorted(allpaths_cleaned.items()) + for p, info in sorted(allpaths.items()) if pattern.match(p.rstrip("/")) } @@ -438,7 +481,8 @@ async def _glob(self, path, maxdepth=None, **kwargs): @_dirlist_dec async def _info(self, path, **kwargs): - return await self.httpFileSystem._info(path, **kwargs) + results = await self.httpFileSystem._info(path, **kwargs) + return self._remove_dirlist_from_paths(results) @_dirlist_dec async def _du(self, path, total=True, maxdepth=None, **kwargs): @@ -449,7 +493,7 @@ async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs): path = self._check_fspath(path) listUrl = await self.get_dirlist_url(path) async for _ in self.httpFileSystem._walk(listUrl, maxdepth, on_error, **kwargs): - yield _ + yield self._remove_dirlist_from_path(_) def _io_wrapper(self, func): """ @@ -500,10 +544,10 @@ def _check_fspath(self, path: str) -> str: path = pelican_url.path return path - def open(self, path, **kwargs): + def open(self, path, mode, **kwargs): path = self._check_fspath(path) data_url = sync(self.loop, self.get_origin_cache if self.directReads else self.get_working_cache, path) - fp = self.httpFileSystem.open(data_url, **kwargs) + fp = self.httpFileSystem.open(data_url, mode, **kwargs) fp.read = self._io_wrapper(fp.read) return fp @@ -591,10 +635,6 @@ async def _cat_file(self, path, start=None, end=None, **kwargs): async def _exists(self, path, **kwargs): return await self.httpFileSystem._exists(path, **kwargs) - @_cache_dec - async def _isfile(self, path, **kwargs): - return await self.httpFileSystem._isfile(path, **kwargs) - @_cache_dec async def _get_file(self, rpath, lpath, **kwargs): return await self.httpFileSystem._get_file(rpath, lpath, **kwargs) From 7a3a002195c0a5fa7eb5b1071bcf432c35174108 Mon Sep 17 00:00:00 2001 From: Emma Turetsky Date: Tue, 21 May 2024 17:25:30 -0500 Subject: [PATCH 6/7] Expanded the pytests to test all the major functions --- test/test_director.py | 224 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 224 insertions(+) diff --git a/test/test_director.py b/test/test_director.py index 0e04b5e..42945c1 100644 --- a/test/test_director.py +++ b/test/test_director.py @@ -23,6 +23,21 @@ from pytest_httpserver import HTTPServer +listing_response = ('\n' + '\n\n\n' + '\n\n' + '/foo/bar\n\n\n

' + 'Listing of: /foo/bar

\n' + '


Request by unknown.189071:38@[::ffff:128.104.153.58] ( [::ffff:128.104.153.58] )

\n

Powered by XrdHTTP v5.6.8 (CERN IT-SDC)

\n') + @pytest.fixture(scope="session") def ca(): return trustme.CA() @@ -66,6 +81,215 @@ async def clientFactory(**kwargs): return clientFactory +def test_ls(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + assert pelfs.ls("/foo/bar", detail=False) == ['/foo/bar/file1', '/foo/bar/file2', '/foo/bar/file3'] + +def test_glob(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar/*").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_oneshot_request("/foo/bar/").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + assert pelfs.glob("/foo/bar/*") == ['/foo/bar/file1', '/foo/bar/file2', '/foo/bar/file3'] + +def test_find(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + assert pelfs.find("/foo/bar") == ['/foo/bar/file1', '/foo/bar/file2', '/foo/bar/file3'] + +def test_info(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + assert pelfs.info("/foo/bar") == {'name': '/foo/bar', 'size': 1425, 'mimetype': 'text/plain', 'url': '/foo/bar', 'type': 'file'} + +def test_du(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + httpserver.expect_request("/foo/bar/file1", method="HEAD").respond_with_data( + "file1", + status=307, + ) + httpserver.expect_request("/foo/bar/file2", method="HEAD").respond_with_data( + "file2!!!!", + status=307, + ) + httpserver.expect_request("/foo/bar/file3", method="HEAD").respond_with_data( + "file3-with-extra-characters-for-more-content", + status=307, + ) + + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + assert pelfs.du("/foo/bar") == 58 + + +def test_isdir(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + foo_bar_file_url = httpserver.url_for("foo/bar/file1") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + httpserver.expect_oneshot_request("/foo/bar/file1").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_file_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar/file1", method="GET").respond_with_data( + "file1", + status=307, + ) + + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + assert pelfs.isdir("/foo/bar") == True + assert pelfs.isdir("/foo/bar/file1") == False + +def test_isfile(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + foo_bar_file_url = httpserver.url_for("foo/bar/file1") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + httpserver.expect_oneshot_request("/foo/bar/file1").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_file_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar/file1", method="GET").respond_with_data( + "file1", + status=307, + ) + + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + assert pelfs.isfile("/foo/bar") == False + assert pelfs.isfile("/foo/bar/file1") == True + + +def test_walk(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + for root, dirnames, filenames in pelfs.walk("/foo/bar"): + assert root == "/foo/bar" + assert dirnames == [] + assert 'file1' in filenames + assert 'file2' in filenames + assert 'file3' in filenames + assert len(filenames) == 3 + def test_open(httpserver: HTTPServer, get_client): foo_bar_url = httpserver.url_for("/foo/bar") httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) From 962bcbc96122e757a0190b765a9cc2fad9a1793c Mon Sep 17 00:00:00 2001 From: Emma Turetsky Date: Thu, 23 May 2024 21:27:47 -0500 Subject: [PATCH 7/7] Addressed PR comments and removed hostname from 'cat' results and 'get_mapper' root -- Renamed and made the _remove_hostname_from_path/s methods static methods -- Added tests for _remove_hostname_from_paths -- get_mapper now works with an absolute path rather than a url to a cache Also added two example notebooks to show how things work --- examples/intake/intake-example.ipynb | 3240 +++++++++++++++++ examples/pytorch/pytorch_with_pelicanfs.ipynb | 100 + src/pelicanfs/core.py | 128 +- test/test_director.py | 16 +- test/test_utils.py | 41 + 5 files changed, 3459 insertions(+), 66 deletions(-) create mode 100644 examples/intake/intake-example.ipynb create mode 100644 examples/pytorch/pytorch_with_pelicanfs.ipynb create mode 100644 test/test_utils.py diff --git a/examples/intake/intake-example.ipynb b/examples/intake/intake-example.ipynb new file mode 100644 index 0000000..02d660a --- /dev/null +++ b/examples/intake/intake-example.ipynb @@ -0,0 +1,3240 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Intake and Pelican FS Example\n", + "\n", + "This is an example notebook that shows how to use `Intake-ESM` and `PelicanFS` together to access data that exists on a Pelican origin. It will not actually analyze the data significantly as the exercise is to load in the data. Once that's done, everything else should follow naturally.\n", + "\n", + "This is based off of the NCAR notebook [here](https://github.com/NCAR/cesm2-le-aws/blob/main/notebooks/kay_et_al_lens2.ipynb)\n", + "\n", + "## Installation\n", + "\n", + "Ensure that `PelicanFS` and `FSSpec` are both installed in the environment.\n", + "\n", + "## Imports" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "%matplotlib inline\n", + "import warnings\n", + "\n", + "warnings.filterwarnings(\"ignore\")\n", + "\n", + "import intake\n", + "import xarray as xr\n", + "from distributed import LocalCluster, Client\n", + "from ncar_jobqueue import NCARCluster" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Spin up a Cluster\n", + "\n", + "This is just leftover from the original example, not a requirement for working with PelicanFS" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "
\n", + "
\n", + "

Client

\n", + "

Client-760b3d22-196a-11ef-a4e6-06124698fe53

\n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", + " Dashboard: http://127.0.0.1:8787/status\n", + "
\n", + "\n", + " \n", + "\n", + " \n", + "
\n", + "

Cluster Info

\n", + "
\n", + "
\n", + "
\n", + "
\n", + "

LocalCluster

\n", + "

23614e3d

\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + "\n", + " \n", + "
\n", + " Dashboard: http://127.0.0.1:8787/status\n", + " \n", + " Workers: 40\n", + "
\n", + " Total threads: 120\n", + " \n", + " Total memory: 160.00 GiB\n", + "
Status: runningUsing processes: True
\n", + "\n", + "
\n", + " \n", + "

Scheduler Info

\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + "

Scheduler

\n", + "

Scheduler-c568eeea-60d1-4e70-aeaa-1a649921e1f0

\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " Comm: tcp://127.0.0.1:53605\n", + " \n", + " Workers: 40\n", + "
\n", + " Dashboard: http://127.0.0.1:8787/status\n", + " \n", + " Total threads: 120\n", + "
\n", + " Started: Just now\n", + " \n", + " Total memory: 160.00 GiB\n", + "
\n", + "
\n", + "
\n", + "\n", + "
\n", + " \n", + "

Workers

\n", + "
\n", + "\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 0

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53616\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53618/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53608\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-2lc83b7o\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 1

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53617\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53620/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53609\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-58520ndj\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 2

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53622\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53623/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53610\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-v1ibhjbe\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 3

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53625\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53626/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53611\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-0jxbnabm\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 4

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53713\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53716/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53628\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-nnlr_in6\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 5

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53704\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53705/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53629\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-uz94oith\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 6

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53707\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53708/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53630\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-m5yjqj9v\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 7

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53727\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53728/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53631\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-ue47mk3o\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 8

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53710\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53711/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53632\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-ripg6rlu\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 9

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53732\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53733/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53633\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-z5ndtdux\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 10

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53714\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53723/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53634\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-1jullde4\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 11

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53718\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53721/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53635\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-_z8hyc6g\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 12

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53726\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53729/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53636\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-c416jprq\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 13

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53735\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53736/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53637\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-4md88kji\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 14

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53742\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53748/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53638\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-ozwa00n4\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 15

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53715\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53719/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53639\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-zb49bo_l\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 16

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53743\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53747/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53640\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-mngb94qx\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 17

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53738\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53739/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53641\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-672ueb8x\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 18

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53741\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53745/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53642\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-_xwvcnoo\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 19

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53765\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53771/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53643\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-9ajtk7qx\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 20

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53744\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53751/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53644\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-ggageg52\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 21

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53762\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53768/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53645\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-wrquhsy9\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 22

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53753\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53755/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53646\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-v4cvm82p\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 23

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53761\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53766/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53647\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-lle2_dts\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 24

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53770\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53774/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53648\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-kl_tmoic\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 25

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53757\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53758/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53649\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-zummzgyw\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 26

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53760\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53763/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53650\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-81xg1cov\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 27

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53784\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53790/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53651\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-7l_uhvnk\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 28

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53754\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53779/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53652\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-r1e8qb7v\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 29

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53776\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53794/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53653\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-wzcykm2g\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 30

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53773\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53777/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53654\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-c7t1lr_9\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 31

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53785\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53798/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53655\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-a6swejdw\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 32

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53796\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53803/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53656\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-1xemqc5h\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 33

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53781\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53782/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53657\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-35ta3reb\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 34

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53802\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53805/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53658\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-dcyhnfq5\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 35

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53789\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53792/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53659\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-kr_cvms1\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 36

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53797\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53800/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53660\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-pipt848r\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 37

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53807\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53808/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53661\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-l_ydsj3v\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 38

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53786\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53787/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53662\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-g5xm4u5t\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 39

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:53810\n", + " \n", + " Total threads: 3\n", + "
\n", + " Dashboard: http://127.0.0.1:53811/status\n", + " \n", + " Memory: 4.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:53663\n", + "
\n", + " Local directory: /var/folders/1k/r6ckbg812_qb_sqkkcvw0qsw0000gq/T/dask-scratch-space/worker-0_y0mwep\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "\n", + "
\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "\n", + "
\n", + "
" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# If not using NCAR HPC, use the LocalCluster\n", + "#cluster = LocalCluster()\n", + "cluster = NCARCluster()\n", + "cluster.scale(40)\n", + "\n", + "client = Client(cluster)\n", + "client" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Use the `Intake-ESM` Catalog to Access the Data\n", + "\n", + "This catalog can exist in an origin, but since it's assumed objects in origins are immutable and it make require some testing to ensure the catalog is correct, it's recommended that you keep the catalog and csv files somewhere else until you are sure of their correctness. The catalog this example is using exists in the `resources` folder.\n", + "\n", + "\n", + "### Intake Format for PelicanFS\n", + "\n", + "To have the catalog works with `PelicanFS`, give the namespace path and the appropriate protocol prefix. This is what a path the `.csv` file the `Intake` catalog uses looks like:\n", + "\n", + "```\n", + "osdf:///chtc/PUBLIC/eturetsky/ncar-subset/ncar/monthly/cesm2LE-historical-cmip6-FLNS.zarr\n", + "```\n", + "\n", + "Where `osdf://` is the protocol that indicates we are using the `osg` federation within. And `chtc/PUBLIC/eturetsky/ncar-subset/ncar/monthly/cesm2LE-historical-cmip6-FLNS.zarr` is the file's location within the `chtc/PUBLIC` namespace." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "

pelican-test-intake catalog with 1 dataset(s) from 4 asset(s):

\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
unique
variable4
long_name4
component1
experiment1
forcing_variant1
frequency1
vertical_levels1
spatial_domain1
units1
start_time1
end_time1
path4
derived_variable0
\n", + "
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "catalog = intake.open_esm_datastore(\n", + " 'file://resources/pelican-test-intake.json'\n", + ")\n", + "catalog" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "catalog_subset = catalog.search(variable='FLUT', frequency='monthly')\n", + "catalog_subset\n", + "\n", + "catalog_subset2 = catalog.search(variable='FLNS', frequency='monthly')" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
variablelong_namecomponentexperimentforcing_variantfrequencyvertical_levelsspatial_domainunitsstart_timeend_timepath
0FLUTupwelling longwave flux at top of modelatmhistoricalcmip6monthly1.0globalW/m21850-01-16 12:00:002014-12-16 12:00:00osdf:///chtc/PUBLIC/eturetsky/ncar-subset/ncar...
\n", + "
" + ], + "text/plain": [ + " variable long_name component experiment \\\n", + "0 FLUT upwelling longwave flux at top of model atm historical \n", + "\n", + " forcing_variant frequency vertical_levels spatial_domain units \\\n", + "0 cmip6 monthly 1.0 global W/m2 \n", + "\n", + " start_time end_time \\\n", + "0 1850-01-16 12:00:00 2014-12-16 12:00:00 \n", + "\n", + " path \n", + "0 osdf:///chtc/PUBLIC/eturetsky/ncar-subset/ncar... " + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "catalog_subset.df" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "--> The keys in the returned dictionary of datasets are constructed as follows:\n", + "\t'component.experiment.frequency.forcing_variant'\n" + ] + }, + { + "data": { + "text/html": [ + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "\n", + "
\n", + " \n", + " 0.00% [0/1 00:00<?]\n", + "
\n", + " " + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "--> The keys in the returned dictionary of datasets are constructed as follows:\n", + "\t'component.experiment.frequency.forcing_variant'\n" + ] + }, + { + "data": { + "text/html": [ + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "\n", + "
\n", + " \n", + " 100.00% [1/1 00:02<00:00]\n", + "
\n", + " " + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "dsets = catalog_subset.to_dataset_dict()\n", + "dsets2 = catalog_subset2.to_dataset_dict()" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "dict_keys(['atm.historical.monthly.cmip6'])" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "dsets.keys()" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "dict_keys(['atm.historical.monthly.cmip6'])" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "dsets2.keys()" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "historical_cmip6 = dsets['atm.historical.monthly.cmip6']\n", + "other_cmip6 = dsets2['atm.historical.monthly.cmip6']" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
<xarray.Dataset> Size: 88GB\n",
+       "Dimensions:    (member_id: 50, time: 3960, lat: 192, lon: 288, nbnd: 2)\n",
+       "Coordinates:\n",
+       "  * lat        (lat) float64 2kB -90.0 -89.06 -88.12 -87.17 ... 88.12 89.06 90.0\n",
+       "  * lon        (lon) float64 2kB 0.0 1.25 2.5 3.75 ... 355.0 356.2 357.5 358.8\n",
+       "  * member_id  (member_id) <U12 2kB 'r10i1181p1f1' ... 'r9i1301p1f1'\n",
+       "  * time       (time) object 32kB 1850-01-16 12:00:00 ... 2014-12-16 12:00:00\n",
+       "    time_bnds  (time, nbnd) object 63kB dask.array<chunksize=(1980, 2), meta=np.ndarray>\n",
+       "Dimensions without coordinates: nbnd\n",
+       "Data variables:\n",
+       "    FLUT       (member_id, time, lat, lon) float32 44GB dask.array<chunksize=(1, 600, 192, 288), meta=np.ndarray>\n",
+       "    FLNS       (member_id, time, lat, lon) float32 44GB dask.array<chunksize=(1, 2580, 192, 288), meta=np.ndarray>\n",
+       "Attributes: (12/22)\n",
+       "    Conventions:                       CF-1.0\n",
+       "    NCO:                               netCDF Operators version 4.9.4 (Homepa...\n",
+       "    logname:                           sunseon\n",
+       "    model_doi_url:                     https://doi.org/10.5065/D67H1H0V\n",
+       "    source:                            CAM\n",
+       "    time_period_freq:                  month_1\n",
+       "    ...                                ...\n",
+       "    intake_esm_attrs:units:            W/m2\n",
+       "    intake_esm_attrs:start_time:       1850-01-16 12:00:00\n",
+       "    intake_esm_attrs:end_time:         2014-12-16 12:00:00\n",
+       "    intake_esm_attrs:path:             osdf:///chtc/PUBLIC/eturetsky/ncar-sub...\n",
+       "    intake_esm_attrs:_data_format_:    zarr\n",
+       "    intake_esm_dataset_key:            atm.historical.monthly.cmip6
" + ], + "text/plain": [ + " Size: 88GB\n", + "Dimensions: (member_id: 50, time: 3960, lat: 192, lon: 288, nbnd: 2)\n", + "Coordinates:\n", + " * lat (lat) float64 2kB -90.0 -89.06 -88.12 -87.17 ... 88.12 89.06 90.0\n", + " * lon (lon) float64 2kB 0.0 1.25 2.5 3.75 ... 355.0 356.2 357.5 358.8\n", + " * member_id (member_id) \n", + "Dimensions without coordinates: nbnd\n", + "Data variables:\n", + " FLUT (member_id, time, lat, lon) float32 44GB dask.array\n", + " FLNS (member_id, time, lat, lon) float32 44GB dask.array\n", + "Attributes: (12/22)\n", + " Conventions: CF-1.0\n", + " NCO: netCDF Operators version 4.9.4 (Homepa...\n", + " logname: sunseon\n", + " model_doi_url: https://doi.org/10.5065/D67H1H0V\n", + " source: CAM\n", + " time_period_freq: month_1\n", + " ... ...\n", + " intake_esm_attrs:units: W/m2\n", + " intake_esm_attrs:start_time: 1850-01-16 12:00:00\n", + " intake_esm_attrs:end_time: 2014-12-16 12:00:00\n", + " intake_esm_attrs:path: osdf:///chtc/PUBLIC/eturetsky/ncar-sub...\n", + " intake_esm_attrs:_data_format_: zarr\n", + " intake_esm_dataset_key: atm.historical.monthly.cmip6" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "merge_ds = xr.concat([historical_cmip6, other_cmip6], dim = 'time')\n", + "merge_ds" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.6" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/examples/pytorch/pytorch_with_pelicanfs.ipynb b/examples/pytorch/pytorch_with_pelicanfs.ipynb new file mode 100644 index 0000000..0bede40 --- /dev/null +++ b/examples/pytorch/pytorch_with_pelicanfs.ipynb @@ -0,0 +1,100 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Pytorch And PelicanFS\n", + "\n", + "The following is an example of how to load in data from a `Pelican` origin for `PyTorch`\n", + "\n", + "## Installation\n", + "\n", + "Ensure both `FSSpec` and `PelicanFS` are installed in the environment. Under the hood, the IterableWrapper is using an FSSpec backend to load in data and the installation of PelicanFS registers the `osdf` protocol with FSSpec." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import torch\n", + "torch.utils.data.datapipes.utils.common.DILL_AVAILABLE = torch.utils._import_utils.dill_available()\n", + "from torchdata.datapipes.iter import IterableWrapper" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Example of list_files_by_fsspec()" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "['osdf:///chtc/PUBLIC/eturetsky/data/faces/0805personali01.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/1084239450_e76e00b7e7.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/10comm-decarlo.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/110276240_bec305da91.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/1198_0_861.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/137341995_e7c48e9a75.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/1383023626_8a49e4879a.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/144044282_87cf3ff76e.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/152601997_ec6429a43c.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/1549040388_b99e9fa295.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/1878519279_f905d4f34e.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2046713398_91aaa6fe1c.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2173711035_dbd53b4f9f.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2210514040_6b03ff2629.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2322901504_08122b01ba.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2327253037_66a61ea6fe.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2328398005_d328a70b4c.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2370961440_6bc8ce346c.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2382SJ8.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/252418361_440b75751b.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/262007783_943bbcf613.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2633371780_45b740b670.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2647088981_60e9fe40cd.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2711409561_a0786a3d3d.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2722779845_7fcb64a096.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2795838930_0cc5aa5f41.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2902323565_100017b63c.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2902760364_89c50bde40.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/2956581526_cd803f2daa.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/297448785_b2dda4b2c0.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/299733036_fff5ea6f8e.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/303808204_1f744bc407.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3074791551_baee7fa0c1.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3152653555_68322314f3.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3264867945_fe18d442c1.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3273658251_b95f65c244.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3298715079_5af7c78fcb.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3325611505_ddc7beffa1.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3362762930_24f76cb89c.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/343583208_e986824d77.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3461016494_56cce9c984.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/348272697_832ce65324.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3534188114_2108895291.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3534189272_8ef88ba368.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3555944509_7b477069c6.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3574737496_6ee8207045.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/362167809_d5a5dcbfdb.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/363149951_8be04dc6c0.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3638950581_3387685d3a.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3646828311_bfeb429ef7.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3689162471_5f9ffb5aa0.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3718903026_c1bf5dfcf8.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3790616528_297c0ac935.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3855944735_e252959937.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3856149136_d4595ffdd4.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/3872768751_e60d7fdbd5.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/529447797_0f9d2fb756.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/57635685_d41c98f8ca.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/809285949_6889026b53.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/92053278_be61a225d2.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/96063776_bdb3617b64.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/97308305_4b737d0873.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/britney-bald.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/create_landmark_dataset.py', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/deeny.peggy.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/face_landmarks.csv', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/matt-mathes.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/person-7.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/person.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/person_TjahjonoDGondhowiardjo.jpg', 'osdf:///chtc/PUBLIC/eturetsky/data/faces/personalpic.jpg']\n" + ] + } + ], + "source": [ + "dp = IterableWrapper([\"osdf:///chtc/PUBLIC/eturetsky/data/faces/\"]).list_files_by_fsspec()\n", + "print(list(dp))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Example of open_files_by_fsspec()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "osdf:///chtc/PUBLIC/eturetsky/data/faces/0805personali01.jpg StreamWrapper<<_io.TextIOWrapper encoding='UTF-8'>>\n" + ] + } + ], + "source": [ + "dp = IterableWrapper([\"osdf:///chtc/PUBLIC/eturetsky/data/faces/0805personali01.jpg\"]).open_files_by_fsspec()\n", + "for path, filestream in dp:\n", + " print(path, filestream)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.6" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/src/pelicanfs/core.py b/src/pelicanfs/core.py index e8e4e23..ffe6135 100644 --- a/src/pelicanfs/core.py +++ b/src/pelicanfs/core.py @@ -32,6 +32,11 @@ class PelicanException(RuntimeError): Base class for all Pelican-related failures """ +class BadDirectorResponse(PelicanException): + """ + The director response did not include Link Headers + """ + class NoAvailableSource(PelicanException): """ No source endpoint is currently available for the requested object @@ -141,6 +146,7 @@ def __init__ ( self._mkdir = self.httpFileSystem._mkdir self._makedirs = self.httpFileSystem._makedirs + # Note this is a class method because it's overwriting a class method for the AbstractFileSystem @classmethod def _strip_protocol(cls, path): """For HTTP, we always want to keep the full URL""" @@ -150,6 +156,38 @@ def _strip_protocol(cls, path): path = path[10:] return path + @staticmethod + def _remove_host_from_path(path): + parsed_url = urllib.parse.urlparse(path) + updated_url = parsed_url._replace(netloc="", scheme="") + return urllib.parse.urlunparse(updated_url) + + @staticmethod + def _remove_host_from_paths(paths): + if isinstance(paths, list): + return [PelicanFileSystem._remove_host_from_paths(path) for path in paths] + + + if isinstance(paths, dict): + if 'name' in paths: + path = paths['name'] + paths['name'] = PelicanFileSystem._remove_host_from_path(path) + if 'url' in paths: + url = paths['url'] + paths['url'] = PelicanFileSystem._remove_host_from_path(url) + return paths + else: + new_dict = {} + for key, item in paths.items(): + new_key = PelicanFileSystem._remove_host_from_path(key) + new_item = PelicanFileSystem._remove_host_from_paths(item) + new_dict[new_key] = new_item + return new_dict + + if isinstance(paths, str): + return PelicanFileSystem._remove_host_from_path(paths) + + return paths async def _discover_federation_metadata(self, discUrl): """ @@ -281,6 +319,8 @@ async def get_dirlist_url(self, fileloc: str) -> str: timeout = aiohttp.ClientTimeout(total=5) session = await self.httpFileSystem.set_session() async with session.request('PROPFIND', url, timeout=timeout) as resp: + if 'Link' not in resp.headers: + raise BadDirectorResponse() dirlist_url = parse_metalink(resp.headers)[0][0][0] if not dirlist_url: raise NoAvailableSource() @@ -338,37 +378,6 @@ async def wrapper(self, *args, **kwargs): return result return wrapper - - def _remove_dirlist_from_path(self, path): - parsed_url = urllib.parse.urlparse(path) - updated_url = parsed_url._replace(netloc="", scheme="") - return urllib.parse.urlunparse(updated_url) - - def _remove_dirlist_from_paths(self, paths): - if isinstance(paths, list): - return [self._remove_dirlist_from_paths(path) for path in paths] - - if isinstance(paths, dict): - if 'name' in paths: - path = paths['name'] - paths['name'] = self._remove_dirlist_from_path(path) - if 'url' in paths: - url = paths['url'] - paths['url'] = self._remove_dirlist_from_path(url) - return paths - else: - new_dict = {} - for key, item in paths.items(): - new_key = self._remove_dirlist_from_path(key) - new_item = self._remove_dirlist_from_paths(item) - new_dict[new_key] = new_item - return new_dict - - if isinstance(paths, str): - return self._remove_dirlist_from_path(paths) - - return paths - def _dirlist_dec(func): """ Decorator function which, when given a namespace location, get the url for the dirlist location from the headers @@ -382,11 +391,10 @@ async def wrapper(self, *args, **kwargs): return await func(self, dataUrl, *args[1:], **kwargs) return wrapper - @_dirlist_dec async def _ls(self, path, detail=True, **kwargs): results = await self.httpFileSystem._ls(path, detail, **kwargs) - return self._remove_dirlist_from_paths(results) + return self._remove_host_from_paths(results) @_dirlist_dec async def _isdir(self, path): @@ -395,7 +403,7 @@ async def _isdir(self, path): @_dirlist_dec async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): results = await self.httpFileSystem._find(path, maxdepth, withdirs, **kwargs) - return self._remove_dirlist_from_paths(results) + return self._remove_host_from_paths(results) async def _isfile(self, path): return not await self._isdir(path) @@ -412,13 +420,13 @@ async def _glob(self, path, maxdepth=None, **kwargs): raise ValueError("maxdepth must be at least 1") import re - dirlist_path = await self.get_dirlist_url(path) + #dirlist_path = await self.get_dirlist_url(path) # Need to ensure the path with the any sort of special `glob` characters is added back in - parsed_path = urllib.parse.urlparse(dirlist_path) - updated_path = urllib.parse.urlunparse(parsed_path._replace(path=path)) + #parsed_path = urllib.parse.urlparse(dirlist_path) + #updated_path = urllib.parse.urlunparse(parsed_path._replace(path=path)) - ends_with_slash = updated_path.endswith("/") # _strip_protocol strips trailing slash - path = self._strip_protocol(updated_path) + ends_with_slash = path.endswith("/") # _strip_protocol strips trailing slash + path = self._strip_protocol(path) append_slash_to_dirname = ends_with_slash or path.endswith(("/**", "/*")) idx_star = path.find("*") if path.find("*") >= 0 else len(path) idx_brace = path.find("[") if path.find("[") >= 0 else len(path) @@ -458,16 +466,16 @@ async def _glob(self, path, maxdepth=None, **kwargs): root, maxdepth=depth, withdirs=True, detail=True, **kwargs ) - pattern = glob_translate(self._remove_dirlist_from_path(path) + ("/" if ends_with_slash else "")) + pattern = glob_translate(path + ("/" if ends_with_slash else "")) pattern = re.compile(pattern) out = { ( - self._remove_dirlist_from_path(p.rstrip("/")) + p.rstrip("/") if not append_slash_to_dirname and info["type"] == "directory" and p.endswith("/") - else self._remove_dirlist_from_path(p) + else p ): info for p, info in sorted(allpaths.items()) if pattern.match(p.rstrip("/")) @@ -482,7 +490,7 @@ async def _glob(self, path, maxdepth=None, **kwargs): @_dirlist_dec async def _info(self, path, **kwargs): results = await self.httpFileSystem._info(path, **kwargs) - return self._remove_dirlist_from_paths(results) + return self._remove_host_from_paths(results) @_dirlist_dec async def _du(self, path, total=True, maxdepth=None, **kwargs): @@ -493,7 +501,7 @@ async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs): path = self._check_fspath(path) listUrl = await self.get_dirlist_url(path) async for _ in self.httpFileSystem._walk(listUrl, maxdepth, on_error, **kwargs): - yield self._remove_dirlist_from_path(_) + yield self._remove_host_from_path(_) def _io_wrapper(self, func): """ @@ -524,24 +532,19 @@ async def io_wrapper(*args, **kwargs): def _check_fspath(self, path: str) -> str: """ - Given a path (either absolute, a pelican://-style URL, or an https://-style), + Given a path (either absolute or a pelican://-style URL), check that the pelican://-style URL is compatible with the current filesystem object and return the path. """ if not path.startswith("/"): - # This can potentially be an https:// or http:// path if it comes from a get_mapper call - if path.startswith("https://") or path.startswith("http://"): - http_url = urllib.parse.urlparse(path) - path = http_url.path - else: - pelican_url = urllib.parse.urlparse("pelican://" + path) - discovery_url = pelican_url._replace(path="/", fragment="", query="", params="") - discovery_str = discovery_url.geturl() - if not self.discoveryUrl: - self.discoveryUrl = discovery_str - elif self.discoveryUrl != discovery_str: - raise InvalidMetadata() - path = pelican_url.path + pelican_url = urllib.parse.urlparse("pelican://" + path) + discovery_url = pelican_url._replace(path="/", fragment="", query="", params="") + discovery_str = discovery_url.geturl() + if not self.discoveryUrl: + self.discoveryUrl = discovery_str + elif self.discoveryUrl != discovery_str: + raise InvalidMetadata() + path = pelican_url.path return path def open(self, path, mode, **kwargs): @@ -642,7 +645,8 @@ async def _get_file(self, rpath, lpath, **kwargs): @_cache_multi_dec async def _cat(self, path, recursive=False, on_error="raise", batch_size=None, **kwargs): - return await self.httpFileSystem._cat(path, recursive, on_error, batch_size, **kwargs) + results = await self.httpFileSystem._cat(path, recursive, on_error, batch_size, **kwargs) + return self._remove_host_from_paths(results) @_cache_multi_dec async def _expand_path(self, path, recursive=False, maxdepth=None): @@ -661,9 +665,5 @@ def __init__(self, **kwargs): def PelicanMap(root, pelfs: PelicanFileSystem, check=False, create=False): """ Returns and FSMap object assigning creating a mutable mapper at the root location - - TODO: This currently assigns a cache or origin to the mapper at creation. If that cache fails, - this doesn't change to a new cache (the mapping can become complex). This should be fixed in the future """ - dataUrl = sync(pelfs.loop, pelfs.get_origin_url if pelfs.directReads else pelfs.get_working_cache, root) - return pelfs.get_mapper(dataUrl, check=check, create=create) + return pelfs.get_mapper(root, check=check, create=create) diff --git a/test/test_director.py b/test/test_director.py index 42945c1..f0beaf4 100644 --- a/test/test_director.py +++ b/test/test_director.py @@ -412,6 +412,7 @@ def test_open_preferred_plus(httpserver: HTTPServer, httpserver2: HTTPServer, ge def test_open_mapper(httpserver: HTTPServer, get_client): foo_url = httpserver.url_for("/foo") + foo_bar_url = httpserver.url_for("/foo/bar") httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) httpserver.expect_oneshot_request("/foo", method="GET").respond_with_data( "", @@ -421,8 +422,19 @@ def test_open_mapper(httpserver: HTTPServer, get_client): "X-Pelican-Namespace": "namespace=/foo" }, ) - httpserver.expect_oneshot_request("/foo", method="HEAD").respond_with_data("hello, world!") - httpserver.expect_oneshot_request("/foo/bar", method="GET").respond_with_data("hello, world!") + httpserver.expect_request("/foo", method="HEAD").respond_with_data("hello, world!") + + httpserver.expect_oneshot_request("/foo/bar", method="GET").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "Location": foo_bar_url, + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + + httpserver.expect_request("/foo/bar", method="HEAD").respond_with_data("hello, world!") + httpserver.expect_request("/foo/bar", method="GET").respond_with_data("hello, world!") pelfs = pelicanfs.core.PelicanFileSystem( httpserver.url_for("/"), diff --git a/test/test_utils.py b/test/test_utils.py new file mode 100644 index 0000000..3e5052c --- /dev/null +++ b/test/test_utils.py @@ -0,0 +1,41 @@ +""" +Copyright (C) 2024, Pelican Project, Morgridge Institute for Research + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You may +obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import pytest +import pelicanfs.core +from pelicanfs.core import PelicanFileSystem + +def test_remove_hostname(): + # Test a single string + paths = "https://test-url.org/namespace/path" + assert PelicanFileSystem._remove_host_from_paths(paths) == "/namespace/path" + + # Test a list + paths = ["https://test-url.org/namespace/path", "osdf://test-url.org/namespace/path2"] + PelicanFileSystem._remove_host_from_paths(paths) == ["/namespace/path", "namespace/pathe2"] + + # Test an info-return + paths = [{"name": "https://test-url.org/namespace/path", "other": "https://body-remains.test"}, {"name": "pelican://test-url.org/namespace/path2", "size": "42"}] + expected_result = [{"name": "/namespace/path", "other": "https://body-remains.test"},{"name": "/namespace/path2", "size": "42"}] + assert PelicanFileSystem._remove_host_from_paths(paths) == expected_result + + # Test a find-return + paths = {"https://test-url.org/namespace/path": "https://test-url2.org/namespace/path", "https://test-url.org/namespace/path2": "/namespace/path3"} + expected_result = {"/namespace/path": "/namespace/path", "/namespace/path2": "/namespace/path3"} + assert PelicanFileSystem._remove_host_from_paths(paths) == expected_result + + # Test a a non-list | string | dict + assert PelicanFileSystem._remove_host_from_paths(22) == 22 +