Skip to content

Commit

Permalink
Fix fsspec.open for pelican:// URLs
Browse files Browse the repository at this point in the history
When `fsspec.open` is used, the path provided to the filesystem calls
is the full `pelican://` URL _minus_ `pelican://`.  This ensures the
path is properly parsed.
  • Loading branch information
bbockelm committed May 11, 2024
1 parent 9996cc0 commit 2dce3e4
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 27 deletions.
65 changes: 38 additions & 27 deletions src/pelicanfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class PelicanFileSystem(AsyncFileSystem):

def __init__ (
self,
federationDiscoveryUrl,
federationDiscoveryUrl="",
direct_reads = False,
preferred_caches = [],
asynchronous = False,
Expand Down Expand Up @@ -299,8 +299,9 @@ def _dirlist_dec(func):
"""
async def wrapper(self, *args, **kwargs):
path = args[0]
parsedUrl = urllib.parse.urlparse(path)
listUrl = await self.get_origin_url(parsedUrl.path)
path = self._check_fspath(path)
# TODO: need to have a separate get_dirlist_url
listUrl = await self.get_origin_url(path)
result = await func(self, listUrl, *args[1:], **kwargs)
return result
return wrapper
Expand All @@ -319,8 +320,9 @@ async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):

# Not using a decorator because it requires a yield
async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs):
parsedUrl = urllib.parse.urlparse(path)
listUrl = await self.get_origin_url(parsedUrl.path)
path = self._check_fspath(path)
# TODO: need to have a separate get_dirlist_url
listUrl = await self.get_origin_url(path)
async for _ in self.httpFileSystem._walk(listUrl, maxdepth, on_error, **kwargs):
yield _

Expand Down Expand Up @@ -351,13 +353,32 @@ async def io_wrapper(*args, **kwargs):

return io_wrapper

def _check_fspath(self, path: str) -> str:
"""
Given a path (either absolute or a pelican://-style URL),
check that the pelican://-style URL is compatible with the current
filesystem object and return the path.
"""
if not path.startswith("/"):
pelican_url = urllib.parse.urlparse("pelican://" + path)
discovery_url = pelican_url._replace(path="/", fragment="", query="", params="")
discovery_str = discovery_url.geturl()
if not self.discoveryUrl:
self.discoveryUrl = discovery_str
elif self.discoveryUrl != discovery_str:
raise InvalidMetadata()
path = pelican_url.path
return path

def open(self, path, **kwargs):
path = self._check_fspath(path)
data_url = sync(self.loop, self.get_origin_cache if self.directReads else self.get_working_cache, path)
fp = self.httpFileSystem.open(data_url, **kwargs)
fp.read = self._io_wrapper(fp.read)
return fp

async def open_async(self, path, **kwargs):
path = self._check_fspath(path)
if self.directReads:
data_url = await self.get_origin_cache(path)
else:
Expand All @@ -377,15 +398,11 @@ def _cache_dec(func):
a cache
"""
async def wrapper(self, *args, **kwargs):
path = args[0]
parsedUrl = urllib.parse.urlparse(path)
if parsedUrl.scheme == "http" or parsedUrl.scheme == "https":
dataUrl = path
path = self._check_fspath(args[0])
if self.directReads:
dataUrl = await self.get_origin_url(path)
else:
if self.directReads:
dataUrl = await self.get_origin_url(parsedUrl.path)
else:
dataUrl = await self.get_working_cache(parsedUrl.path)
dataUrl = await self.get_working_cache(path)
try:
result = await func(self, dataUrl, *args[1:], **kwargs)
except:
Expand All @@ -406,25 +423,19 @@ def _cache_multi_dec(func):
async def wrapper(self, *args, **kwargs):
path = args[0]
if isinstance(path, str):
parsedUrl = urllib.parse.urlparse(path)
if parsedUrl.scheme == "http" or parsedUrl.scheme == "https":
dataUrl = path
path = self._check_fspath(args[0])
if self.directReads:
dataUrl = await self.get_origin_url(path)
else:
if self.directReads:
dataUrl = await self.get_origin_url(parsedUrl.path)
else:
dataUrl = await self.get_working_cache(parsedUrl.path)
dataUrl = await self.get_working_cache(path)
else:
dataUrl = []
for p in path:
parsedUrl = urllib.parse.urlparse(p)
if parsedUrl.scheme == "http" or parsedUrl.scheme == "https":
dUrl = p
p = self._check_fspath(p)
if self.directReads:
dUrl = await self.get_origin_url(p)
else:
if self.directReads:
dUrl = await self.get_origin_url(parsedUrl.path)
else:
dUrl = await self.get_working_cache(parsedUrl.path)
dUrl = await self.get_working_cache(p)
dataUrl.append(dUrl)
try:
result = await func(self, dataUrl, *args[1:], **kwargs)
Expand Down
5 changes: 5 additions & 0 deletions test/test_osdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ def test_osdf():
data = of.read()
assert data == b'Hello, World!\n'

def test_osdf_pelicanurl():
with fsspec.open("pelican://osg-htc.org/ospool/uc-shared/public/OSG-Staff/validation/test.txt") as of:
data = of.read()
assert data == b'Hello, World!\n'

def test_osdf_direct():
pelfs = pelicanfs.core.PelicanFileSystem("pelican://osg-htc.org", direct_reads=True)
data = pelfs.cat("/ospool/uc-shared/public/OSG-Staff/validation/test.txt")
Expand Down

0 comments on commit 2dce3e4

Please sign in to comment.