diff --git a/src/nwp_consumer/cmd/main.py b/src/nwp_consumer/cmd/main.py index 12f1b4ea..d33967a4 100644 --- a/src/nwp_consumer/cmd/main.py +++ b/src/nwp_consumer/cmd/main.py @@ -155,7 +155,7 @@ def main(): log.error("nwp-consumer error", error=str(e)) raise e finally: - _ = [p.unlink() for p in TMP_DIR.glob("nwpc_*")] + _ = [p.unlink(missing_ok=True) for p in TMP_DIR.glob("nwpc_*")] elapsedTime = dt.datetime.now() - programStartTime log.info( "nwp-consumer finished", diff --git a/src/nwp_consumer/internal/inputs/ceda/client.py b/src/nwp_consumer/internal/inputs/ceda/client.py index b6e612d3..5552d1e3 100644 --- a/src/nwp_consumer/internal/inputs/ceda/client.py +++ b/src/nwp_consumer/internal/inputs/ceda/client.py @@ -69,14 +69,22 @@ def downloadToTemp(self, *, fi: internal.FileInfoModel) \ -> tuple[internal.FileInfoModel, pathlib.Path]: anonUrl: str = f"{self.dataUrl}/{fi.initTime():%Y/%m/%d}/{fi.fname()}" - log.debug(f"requesting download of file", filename=fi.fname(), path=anonUrl) + log.debug( + event=f"requesting download of file", + filename=fi.fname(), + path=anonUrl + ) url: str = f'{self.__ftpBase}/{anonUrl}' try: response = urllib.request.urlopen(url=url) except Exception as e: - raise ConnectionError( - f"error calling url {url} for {fi.fname()}: {e}" - ) from e + log.warn( + event="error calling url for file", + url=url, + filename=fi.fname(), + error=e + ) + return fi, pathlib.Path() # Stream the filedata into a temporary file tfp: pathlib.Path = internal.TMP_DIR / str(TypeID(prefix='nwpc')) @@ -84,12 +92,13 @@ def downloadToTemp(self, *, fi: internal.FileInfoModel) \ for chunk in iter(lambda: response.read(16 * 1024), b''): f.write(chunk) - log.debug(f"fetched all data from file", filename=fi.fname(), path=anonUrl, tempfile=tfp.as_posix()) - - # Check the file is not empty - if tfp.stat().st_size == 0: - # File is empty. Fail hard - raise ValueError(f"downloaded file {fi.fname()} is empty") + log.debug( + event=f"fetched all data from file", + filename=fi.fname(), + url=anonUrl, + filepath=tfp.as_posix(), + nbytes=tfp.stat().st_size + ) return fi, tfp @@ -112,17 +121,23 @@ def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoM return [] if not response.ok: # Something else has gone wrong. Fail hard - raise ConnectionError( - f"error calling url {response.url}: {response.status_code}" - ) from None + log.warn( + event="error response from filelist endpoint", + url=response.url, + response=response.json() + ) + return [] # Map the response to a CEDAResponse object to ensure it looks as expected try: responseObj: CEDAResponse = CEDAResponse.Schema().load(response.json()) except Exception as e: - raise TypeError( - f"error marshalling json to CedaResponse object: {e}, response: {response.json()}" - ) from e + log.warn( + event="response from ceda does not match expected schema", + error=e, + response=response.json() + ) + return [] # Filter the files for the desired init time wantedFiles: list[CEDAFileInfo] = [ @@ -145,7 +160,12 @@ def mapTemp(self, *, p: pathlib.Path) -> xr.Dataset: backend_kwargs={"indexpath": ""} ) except Exception as e: - raise Exception(f"error loading wholesale file as dataset: {e}") from e + log.warn( + event="error converting raw file to dataset", + filepath=p.as_posix(), + error=e + ) + return xr.Dataset() for i, ds in enumerate(datasets): # Rename the parameters to the OCF names @@ -190,9 +210,17 @@ def mapTemp(self, *, p: pathlib.Path) -> xr.Dataset: ) # Add in x and y coordinates - wholesaleDataset = _reshapeTo2DGrid( - ds=wholesaleDataset - ) + try: + wholesaleDataset = _reshapeTo2DGrid( + ds=wholesaleDataset + ) + except Exception as e: + log.warn( + event="error reshaping to 2D grid", + filepath=p.as_posix(), + error=e + ) + return xr.Dataset() # Create a chunked Dask Dataset from the input multi-variate Dataset. # * Converts the input multivariate DataSet (with different DataArrays for diff --git a/src/nwp_consumer/internal/inputs/metoffice/client.py b/src/nwp_consumer/internal/inputs/metoffice/client.py index c4fddbb4..76bdb141 100644 --- a/src/nwp_consumer/internal/inputs/metoffice/client.py +++ b/src/nwp_consumer/internal/inputs/metoffice/client.py @@ -59,10 +59,12 @@ def __init__(self, *, orderID: str, clientID: str, clientSecret: str): "X-IBM-Client-Secret": clientSecret, } - def downloadToTemp(self, *, fi: MetOfficeFileInfo) \ - -> tuple[internal.FileInfoModel, pathlib.Path]: + def downloadToTemp(self, *, fi: MetOfficeFileInfo) -> tuple[internal.FileInfoModel, pathlib.Path]: - log.debug(f"requesting download of file", filename=fi.fname()) + log.debug( + event=f"requesting download of file", + filename=fi.fname() + ) url: str = f"{self.baseurl}/{fi.fname()}/data" try: opener = urllib.request.build_opener() @@ -72,11 +74,20 @@ def downloadToTemp(self, *, fi: MetOfficeFileInfo) \ urllib.request.install_opener(opener) response = urllib.request.urlopen(url=url) if not response.status == 200: - raise ConnectionError( - f"error response code {response.status} for url {url}: {response.read()}" + log.warn( + event="error response received for download file request", + response=response.json(), + url=url ) + return fi, pathlib.Path() except Exception as e: - raise ConnectionError(f"error calling url {url} for {fi.fname()}: {e}") from e + log.warn( + event="error calling url for file", + url=url, + filename=fi.fname(), + error=e + ) + return fi, pathlib.Path() # Stream the filedata into a temporary file tfp: pathlib.Path = internal.TMP_DIR / str(TypeID(prefix='nwpc')) @@ -84,12 +95,13 @@ def downloadToTemp(self, *, fi: MetOfficeFileInfo) \ for chunk in iter(lambda: response.read(16 * 1024), b''): f.write(chunk) - log.debug(f"fetched all data from file", filename=fi.fname(), path=url, tempfile=tfp.as_posix()) - - # Check the file is not empty - if tfp.stat().st_size == 0: - # File is empty. Fail hard - raise ValueError(f"downloaded file {fi.fname()} is empty") + log.debug( + event="fetched all data from file", + filename=fi.fname(), + url=url, + filepath=tfp.as_posix(), + nbytes=tfp.stat().st_size + ) return fi, tfp @@ -107,18 +119,23 @@ def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoM params=self.querystring ) if not response.ok: - raise AssertionError( - f"response did not return with an ok status: {response.content}" - ) from None + log.warn( + event="error response from filelist endpoint", + url=response.url, + response=response.json() + ) + return [] # Map the response to a MetOfficeResponse object try: responseObj: MetOfficeResponse = MetOfficeResponse.Schema().load(response.json()) except Exception as e: - raise TypeError( - f"error marshalling json to MetOfficeResponse object: {e}, " - f"response: {response.json()}" + log.warn( + event="response from metoffice does not match expected schema", + error=e, + response=response.json() ) + return [] # Filter the file infos for the desired init time wantedFileInfos: list[MetOfficeFileInfo] = [ @@ -139,7 +156,12 @@ def mapTemp(self, *, p: pathlib.Path) -> xr.Dataset: backend_kwargs={'read_keys': ['name', 'parameterNumber'], 'indexpath': ''} ) except Exception as e: - raise ValueError(f"Failed to load GRIB file as a cube: {e}") from e + log.warn( + event="error loading raw file as dataset", + error=e, + filepath=p.as_posix() + ) + return xr.Dataset() # Make the DataArray OCF-compliant # 1. Rename the parameter to the OCF short name @@ -161,8 +183,12 @@ def mapTemp(self, *, p: pathlib.Path) -> xr.Dataset: parameterDataset = parameterDataset.rename({ x: PARAMETER_RENAME_MAP[x]}) case _, _: - log.warn(f"encountered unknown parameter {currentName}; ignoring file", - parameterName=currentName, parameterNumber=parameterNumber) + log.warn( + event=f"encountered unknown parameter; ignoring file", + unknownparamname=currentName, + unknownparamnumber=parameterNumber, + filepath=p.as_posix() + ) return xr.Dataset() # 2. Remove unneeded variables diff --git a/src/nwp_consumer/internal/outputs/localfs/client.py b/src/nwp_consumer/internal/outputs/localfs/client.py index 8aaa4048..0b48b954 100644 --- a/src/nwp_consumer/internal/outputs/localfs/client.py +++ b/src/nwp_consumer/internal/outputs/localfs/client.py @@ -2,6 +2,7 @@ import os import pathlib import shutil +import time import structlog from typeid import TypeID @@ -56,9 +57,12 @@ def copyITFolderToTemp(self, *, prefix: pathlib.Path, it: dt.datetime) \ log.debug(f"copying init time folder to temp", initTime=it, path=initTimeDirPath.as_posix()) if not initTimeDirPath.exists(): - raise FileNotFoundError( - f"folder does not exist for init time {it} at {initTimeDirPath.as_posix()}" + log.warn( + event="folder does not exist for init time", + inittime=f"{it:%Y%/m/%d %H:%M}", + directorypath=initTimeDirPath.as_posix() ) + return it, [] paths: list[pathlib.Path] = list(initTimeDirPath.iterdir()) @@ -67,14 +71,20 @@ def copyITFolderToTemp(self, *, prefix: pathlib.Path, it: dt.datetime) \ # Read all files into temporary files tempPaths: list[pathlib.Path] = [] for path in paths: - with path.open("rb") as infile: - tfp: pathlib.Path = internal.TMP_DIR / str(TypeID(prefix='nwpc')) - with tfp.open("wb") as tmpfile: - for chunk in iter(lambda: infile.read(16 * 1024), b""): - tmpfile.write(chunk) - if tfp.stat().st_size == 0: - raise ValueError(f"downloaded file {path} is empty") - tempPaths.append(tfp) + if path.exists() is False or path.stat().st_size == 0: + log.warn( + event="temp file is empty", + filepath=path.as_posix() + ) + continue + tfp: pathlib.Path = internal.TMP_DIR / str(TypeID(prefix='nwpc')) + shutil.copy2(src=path, dst=tfp) + tempPaths.append(tfp) + + log.debug( + event="copied it folder to temporary files", + nbytes=[p.stat().st_size for p in tempPaths] + ) return it, tempPaths diff --git a/src/nwp_consumer/internal/outputs/s3/client.py b/src/nwp_consumer/internal/outputs/s3/client.py index 8c83a058..f95110bf 100644 --- a/src/nwp_consumer/internal/outputs/s3/client.py +++ b/src/nwp_consumer/internal/outputs/s3/client.py @@ -73,25 +73,31 @@ def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]: ) return sortedInitTimes - def copyITFolderToTemp(self, *, prefix: pathlib.Path, it: dt.datetime) \ - -> tuple[dt.datetime, list[pathlib.Path]]: - initTimeDirPath = self.__bucket / prefix \ - / it.strftime(internal.IT_FOLDER_FMTSTR) - paths = [pathlib.Path(p) - for p in self.__fs.ls(initTimeDirPath.as_posix())] + def copyITFolderToTemp(self, *, prefix: pathlib.Path, it: dt.datetime) -> tuple[dt.datetime, list[pathlib.Path]]: + initTimeDirPath = self.__bucket / prefix / it.strftime(internal.IT_FOLDER_FMTSTR) + paths = [pathlib.Path(p) for p in self.__fs.ls(initTimeDirPath.as_posix())] # Read all files into temporary files tempPaths: list[pathlib.Path] = [] for path in paths: + if path.exists() is False or path.stat().st_size == 0: + log.warn( + event="temporary file is empty", + filepath=path.as_posix() + ) + continue with self.__fs.open(path=path.as_posix(), mode="rb") as infile: tfp: pathlib.Path = internal.TMP_DIR / str(TypeID(prefix='nwpc')) with tfp.open("wb") as tmpfile: for chunk in iter(lambda: infile.read(16 * 1024), b""): tmpfile.write(chunk) - if tfp.stat().st_size == 0: - raise ValueError(f"downloaded file {path} is empty") tempPaths.append(tfp) + log.debug( + event="copied it folder to temporary files", + nbytes=[p.stat().st_size for p in tempPaths] + ) + return it, tempPaths def delete(self, *, p: pathlib.Path) -> None: diff --git a/src/nwp_consumer/internal/service/service.py b/src/nwp_consumer/internal/service/service.py index d345d4da..19886c06 100644 --- a/src/nwp_consumer/internal/service/service.py +++ b/src/nwp_consumer/internal/service/service.py @@ -84,6 +84,8 @@ def DownloadRawDataset(self, *, start: dt.date, end: dt.date) -> int: # * This deletes the temporary files for future in concurrent.futures.as_completed(futures): fileInfo, tempFile = future.result() + if tempFile == pathlib.Path(): + continue nbytes += self.storer.store( src=tempFile, dst=self.rawdir / fileInfo.initTime().strftime(internal.IT_FOLDER_FMTSTR) / (fileInfo.fname() + ".grib") @@ -134,18 +136,32 @@ def ConvertRawDatasetToZarr(self, *, start: dt.date, end: dt.date) -> int: for future in concurrent.futures.as_completed(futures): initTime, tempPaths = future.result() + if not tempPaths: + log.warn( + event=f"no files for initTime", + initTime=initTime.strftime("%Y/%m/%d %H:%M") + ) + continue + log.debug( - f"creating zarr for initTime", + event=f"creating zarr for initTime", initTime=initTime.strftime("%Y/%m/%d %H:%M") ) # Create a pipeline to convert the raw files and merge them as a dataset bag: dask.bag.Bag = dask.bag.from_sequence(tempPaths) - dataset = bag.map(lambda tfp: self.fetcher.mapTemp(p=tfp)) \ - .fold(lambda ds1, ds2: xr.merge([ds1, ds2], combine_attrs="drop_conflicts")) \ + dataset = bag.map(func=lambda tfp: self.fetcher.mapTemp(p=tfp)) \ + .fold(binop=lambda ds1, ds2: xr.merge([ds1, ds2], combine_attrs="drop_conflicts")) \ .compute() # Carry out a basic data quality check + if dataset == xr.Dataset(): + log.warn( + event=f"Dataset for initTime is empty", + initTime=initTime.strftime("%Y/%m/%d %H:%M") + ) + continue + for var in dataset.coords['variable'].values: if True in dataset.sel(variable=var).isnull(): log.warn(