Skip to content

Commit

Permalink
Added logging to src/pelicanfs/core.py
Browse files Browse the repository at this point in the history
  • Loading branch information
turetske committed Sep 23, 2024
1 parent b4ac8ef commit 349bf7a
Showing 1 changed file with 26 additions and 3 deletions.
29 changes: 26 additions & 3 deletions src/pelicanfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ async def _discover_federation_metadata(self, discUrl):
Returns the json response from a GET call to the metadata discovery url of the federation
"""
# Parse the url for federation discovery
logger.debug("Running federation discovery")
discoveryUrl = urllib.parse.urlparse(discUrl)
discoveryUrl = discoveryUrl._replace(scheme="https", path="/.well-known/pelican-configuration")
session = await self.httpFileSystem.set_session()
Expand All @@ -206,10 +207,12 @@ async def get_director_headers(self, fileloc, origin=False) -> dict[str, str]:
"""
Returns the header response from a GET call to the director
"""
logger.debug("Getting the director headers")
if fileloc[0] == "/":
fileloc = fileloc[1:]

if not self.directorUrl:
logger.debug("Director URL not set, geting from disocyr url")
metadata_json = await self._discover_federation_metadata(self.discoveryUrl)
# Ensure the director url has a '/' at the end
directorUrl = metadata_json.get('director_endpoint')
Expand All @@ -220,6 +223,7 @@ async def get_director_headers(self, fileloc, origin=False) -> dict[str, str]:
directorUrl = directorUrl + "/"
self.directorUrl = directorUrl

logger.debug(f"Getting headers from director: {self.directorUrl}")
if origin:
url = urllib.parse.urljoin(self.directorUrl, "/api/v1.0/director/origin/") + fileloc
else:
Expand All @@ -232,15 +236,18 @@ async def get_working_cache(self, fileloc: str) -> str:
"""
Returns the highest priority cache for the namespace that appears to be working
"""
logger.debug(f"Get a working cache for {fileloc}")
fparsed = urllib.parse.urlparse(fileloc)
# Removing the query if need be
cacheUrl = self._match_namespace(fparsed.path)
if cacheUrl:
logger.debug(f"Found previously working cache: {cacheUrl}")
return cacheUrl

# Calculate the list of applicable caches; this takes into account the
# preferredCaches for the filesystem. If '+' is a preferred cache, we
# add all the director-provided caches to the list (doing a round of de-dup)
logger.debug("Getting a cache from the director")
cache_list = []
if self.preferredCaches:
cache_list = [urllib.parse.urlparse(urllib.parse.urljoin(cache, fileloc))._replace(query=fparsed.query).geturl() if cache != "+" else "+" for cache in self.preferredCaches]
Expand All @@ -267,12 +274,16 @@ async def get_working_cache(self, fileloc: str) -> str:
updatedUrl = cache_list[0]
# Timeout response in seconds - the default response is 5 minutes
timeout = aiohttp.ClientTimeout(total=5)
logger.debug("Setting cache session")
session = await self.httpFileSystem.set_session()
if self.token:
logger.debug("Adding Authorization to session header")
session.headers["Authorization"] = self.token
try:
logger.debug(f"Testing cacheurl {updatedUrl}")
async with session.head(updatedUrl, timeout=timeout) as resp:
if resp.status >= 200 and resp.status < 400:
logger.debug("Cache found")
break
except (aiohttp.client_exceptions.ClientConnectorError, FileNotFoundError, asyncio.TimeoutError, asyncio.exceptions.TimeoutError):
pass
Expand Down Expand Up @@ -301,7 +312,7 @@ async def get_dirlist_url(self, fileloc: str) -> str:
"""
Returns a dirlist host url for the given namespace locations
"""

logger.debug(f"Finding the dirlist url for {fileloc}")
if not self.directorUrl:
metadata_json = await self._discover_federation_metadata(self.discoveryUrl)
# Ensure the director url has a '/' at the end
Expand All @@ -328,7 +339,7 @@ async def get_dirlist_url(self, fileloc: str) -> str:

def _get_prefix_info(self, path: str) -> _CacheManager:
"""
Given a path into the filesystem, return the information inthe
Given a path into the filesystem, return the information in the
namespace cache (if any)
"""
namespace_info = None
Expand All @@ -342,10 +353,12 @@ def _get_prefix_info(self, path: str) -> _CacheManager:
return namespace_info

def _match_namespace(self, fileloc: str):
logger.debug(f"Matching namespace for {fileloc}")
namespace_info = self._get_prefix_info(fileloc)
if not namespace_info:
return


logger.debug(f"Match found")
return namespace_info.get_url(fileloc)

def _bad_cache(self, url: str):
Expand All @@ -354,6 +367,7 @@ def _bad_cache(self, url: str):
the corresponding cache as a "bad cache" in the namespace
cache.
"""
logger.debug(f"Bad cache found: {url}")
cache_url = urllib.parse.urlparse(url)
path = cache_url.path
cache_url = cache_url._replace(query="", path="", fragment="")
Expand All @@ -374,6 +388,7 @@ def _dirlist_dec(func):
async def wrapper(self, *args, **kwargs):
path = self._check_fspath(args[0])
dataUrl = await self.get_dirlist_url(path)
logger.debug(f"Running {func} with url: {dataUrl}")
return await func(self, dataUrl, *args[1:], **kwargs)
return wrapper

Expand Down Expand Up @@ -402,6 +417,7 @@ async def _glob(self, path, maxdepth=None, **kwargs):
except it cleans the path url of double '//' and checks for
the dirlisthost ahead of time
"""
logger.debug("Starting glob...")
if maxdepth is not None and maxdepth < 1:
raise ValueError("maxdepth must be at least 1")
import re
Expand Down Expand Up @@ -443,6 +459,7 @@ async def _glob(self, path, maxdepth=None, **kwargs):
else:
depth = None

logger.debug(f"Running find within glob with root={root}")
allpaths = await self._find(
root, maxdepth=depth, withdirs=True, detail=True, **kwargs
)
Expand Down Expand Up @@ -511,6 +528,7 @@ def _check_fspath(self, path: str) -> str:
check that the pelican://-style URL is compatible with the current
filesystem object and return the path.
"""
logger.debug(f"Checking path: {path}")
if not path.startswith("/"):
pelican_url = urllib.parse.urlparse("pelican://" + path)
discovery_url = pelican_url._replace(path="/", fragment="", query="", params="")
Expand All @@ -520,11 +538,13 @@ def _check_fspath(self, path: str) -> str:
elif self.discoveryUrl != discovery_str:
raise InvalidMetadata()
path = pelican_url.path
logger.debug(f"Compatible path: {path}")
return path

def open(self, path, mode, **kwargs):
path = self._check_fspath(path)
data_url = sync(self.loop, self.get_origin_url if self.directReads else self.get_working_cache, path)
logger.debug(f"Running open on {data_url}")
fp = self.httpFileSystem.open(data_url, mode, **kwargs)
fp.read = self._io_wrapper(fp.read)
return fp
Expand All @@ -535,6 +555,7 @@ async def open_async(self, path, **kwargs):
data_url = await self.get_origin_url(path)
else:
data_url = self.get_working_cache(path)
logger.debug(f"Running open_aync on {data_url}")
fp = await self.httpFileSystem.open_async(data_url, **kwargs)
fp.read = self._async_io_wrapper(fp.read)
return fp
Expand All @@ -556,6 +577,7 @@ async def wrapper(self, *args, **kwargs):
else:
dataUrl = await self.get_working_cache(path)
try:
logger.debug(f"Calling {func} using the following url: {dataUrl}")
result = await func(self, dataUrl, *args[1:], **kwargs)
except:
self._bad_cache(dataUrl)
Expand Down Expand Up @@ -590,6 +612,7 @@ async def wrapper(self, *args, **kwargs):
dUrl = await self.get_working_cache(p)
dataUrl.append(dUrl)
try:
logger.debug(f"Calling {func} using the following urls: {dataUrl}")
result = await func(self, dataUrl, *args[1:], **kwargs)
except:
if isinstance(dataUrl, list):
Expand Down

0 comments on commit 349bf7a

Please sign in to comment.