Skip to content

Commit

Permalink
Fixed integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Jul 20, 2023
1 parent c5ab8fb commit 25a91d4
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 63 deletions.
2 changes: 1 addition & 1 deletion src/nwp_consumer/cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def main():
log.error("nwp-consumer error", error=str(e))
raise e
finally:
_ = [p.unlink() for p in TMP_DIR.glob("nwpc_*")]
_ = [p.unlink(missing_ok=True) for p in TMP_DIR.glob("nwpc_*")]
elapsedTime = dt.datetime.now() - programStartTime
log.info(
"nwp-consumer finished",
Expand Down
68 changes: 48 additions & 20 deletions src/nwp_consumer/internal/inputs/ceda/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,36 @@ def downloadToTemp(self, *, fi: internal.FileInfoModel) \
-> tuple[internal.FileInfoModel, pathlib.Path]:

anonUrl: str = f"{self.dataUrl}/{fi.initTime():%Y/%m/%d}/{fi.fname()}"
log.debug(f"requesting download of file", filename=fi.fname(), path=anonUrl)
log.debug(
event=f"requesting download of file",
filename=fi.fname(),
path=anonUrl
)
url: str = f'{self.__ftpBase}/{anonUrl}'
try:
response = urllib.request.urlopen(url=url)
except Exception as e:
raise ConnectionError(
f"error calling url {url} for {fi.fname()}: {e}"
) from e
log.warn(
event="error calling url for file",
url=url,
filename=fi.fname(),
error=e
)
return fi, pathlib.Path()

# Stream the filedata into a temporary file
tfp: pathlib.Path = internal.TMP_DIR / str(TypeID(prefix='nwpc'))
with tfp.open("wb") as f:
for chunk in iter(lambda: response.read(16 * 1024), b''):
f.write(chunk)

log.debug(f"fetched all data from file", filename=fi.fname(), path=anonUrl, tempfile=tfp.as_posix())

# Check the file is not empty
if tfp.stat().st_size == 0:
# File is empty. Fail hard
raise ValueError(f"downloaded file {fi.fname()} is empty")
log.debug(
event=f"fetched all data from file",
filename=fi.fname(),
url=anonUrl,
filepath=tfp.as_posix(),
nbytes=tfp.stat().st_size
)

return fi, tfp

Expand All @@ -112,17 +121,23 @@ def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoM
return []
if not response.ok:
# Something else has gone wrong. Fail hard
raise ConnectionError(
f"error calling url {response.url}: {response.status_code}"
) from None
log.warn(
event="error response from filelist endpoint",
url=response.url,
response=response.json()
)
return []

# Map the response to a CEDAResponse object to ensure it looks as expected
try:
responseObj: CEDAResponse = CEDAResponse.Schema().load(response.json())
except Exception as e:
raise TypeError(
f"error marshalling json to CedaResponse object: {e}, response: {response.json()}"
) from e
log.warn(
event="response from ceda does not match expected schema",
error=e,
response=response.json()
)
return []

# Filter the files for the desired init time
wantedFiles: list[CEDAFileInfo] = [
Expand All @@ -145,7 +160,12 @@ def mapTemp(self, *, p: pathlib.Path) -> xr.Dataset:
backend_kwargs={"indexpath": ""}
)
except Exception as e:
raise Exception(f"error loading wholesale file as dataset: {e}") from e
log.warn(
event="error converting raw file to dataset",
filepath=p.as_posix(),
error=e
)
return xr.Dataset()

for i, ds in enumerate(datasets):
# Rename the parameters to the OCF names
Expand Down Expand Up @@ -190,9 +210,17 @@ def mapTemp(self, *, p: pathlib.Path) -> xr.Dataset:
)

# Add in x and y coordinates
wholesaleDataset = _reshapeTo2DGrid(
ds=wholesaleDataset
)
try:
wholesaleDataset = _reshapeTo2DGrid(
ds=wholesaleDataset
)
except Exception as e:
log.warn(
event="error reshaping to 2D grid",
filepath=p.as_posix(),
error=e
)
return xr.Dataset()

# Create a chunked Dask Dataset from the input multi-variate Dataset.
# * Converts the input multivariate DataSet (with different DataArrays for
Expand Down
68 changes: 47 additions & 21 deletions src/nwp_consumer/internal/inputs/metoffice/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ def __init__(self, *, orderID: str, clientID: str, clientSecret: str):
"X-IBM-Client-Secret": clientSecret,
}

def downloadToTemp(self, *, fi: MetOfficeFileInfo) \
-> tuple[internal.FileInfoModel, pathlib.Path]:
def downloadToTemp(self, *, fi: MetOfficeFileInfo) -> tuple[internal.FileInfoModel, pathlib.Path]:

log.debug(f"requesting download of file", filename=fi.fname())
log.debug(
event=f"requesting download of file",
filename=fi.fname()
)
url: str = f"{self.baseurl}/{fi.fname()}/data"
try:
opener = urllib.request.build_opener()
Expand All @@ -72,24 +74,34 @@ def downloadToTemp(self, *, fi: MetOfficeFileInfo) \
urllib.request.install_opener(opener)
response = urllib.request.urlopen(url=url)
if not response.status == 200:
raise ConnectionError(
f"error response code {response.status} for url {url}: {response.read()}"
log.warn(
event="error response received for download file request",
response=response.json(),
url=url
)
return fi, pathlib.Path()
except Exception as e:
raise ConnectionError(f"error calling url {url} for {fi.fname()}: {e}") from e
log.warn(
event="error calling url for file",
url=url,
filename=fi.fname(),
error=e
)
return fi, pathlib.Path()

# Stream the filedata into a temporary file
tfp: pathlib.Path = internal.TMP_DIR / str(TypeID(prefix='nwpc'))
with tfp.open("wb") as f:
for chunk in iter(lambda: response.read(16 * 1024), b''):
f.write(chunk)

log.debug(f"fetched all data from file", filename=fi.fname(), path=url, tempfile=tfp.as_posix())

# Check the file is not empty
if tfp.stat().st_size == 0:
# File is empty. Fail hard
raise ValueError(f"downloaded file {fi.fname()} is empty")
log.debug(
event="fetched all data from file",
filename=fi.fname(),
url=url,
filepath=tfp.as_posix(),
nbytes=tfp.stat().st_size
)

return fi, tfp

Expand All @@ -107,18 +119,23 @@ def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoM
params=self.querystring
)
if not response.ok:
raise AssertionError(
f"response did not return with an ok status: {response.content}"
) from None
log.warn(
event="error response from filelist endpoint",
url=response.url,
response=response.json()
)
return []

# Map the response to a MetOfficeResponse object
try:
responseObj: MetOfficeResponse = MetOfficeResponse.Schema().load(response.json())
except Exception as e:
raise TypeError(
f"error marshalling json to MetOfficeResponse object: {e}, "
f"response: {response.json()}"
log.warn(
event="response from metoffice does not match expected schema",
error=e,
response=response.json()
)
return []

# Filter the file infos for the desired init time
wantedFileInfos: list[MetOfficeFileInfo] = [
Expand All @@ -139,7 +156,12 @@ def mapTemp(self, *, p: pathlib.Path) -> xr.Dataset:
backend_kwargs={'read_keys': ['name', 'parameterNumber'], 'indexpath': ''}
)
except Exception as e:
raise ValueError(f"Failed to load GRIB file as a cube: {e}") from e
log.warn(
event="error loading raw file as dataset",
error=e,
filepath=p.as_posix()
)
return xr.Dataset()

# Make the DataArray OCF-compliant
# 1. Rename the parameter to the OCF short name
Expand All @@ -161,8 +183,12 @@ def mapTemp(self, *, p: pathlib.Path) -> xr.Dataset:
parameterDataset = parameterDataset.rename({
x: PARAMETER_RENAME_MAP[x]})
case _, _:
log.warn(f"encountered unknown parameter {currentName}; ignoring file",
parameterName=currentName, parameterNumber=parameterNumber)
log.warn(
event=f"encountered unknown parameter; ignoring file",
unknownparamname=currentName,
unknownparamnumber=parameterNumber,
filepath=p.as_posix()
)
return xr.Dataset()

# 2. Remove unneeded variables
Expand Down
30 changes: 20 additions & 10 deletions src/nwp_consumer/internal/outputs/localfs/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import pathlib
import shutil
import time

import structlog
from typeid import TypeID
Expand Down Expand Up @@ -56,9 +57,12 @@ def copyITFolderToTemp(self, *, prefix: pathlib.Path, it: dt.datetime) \
log.debug(f"copying init time folder to temp", initTime=it, path=initTimeDirPath.as_posix())

if not initTimeDirPath.exists():
raise FileNotFoundError(
f"folder does not exist for init time {it} at {initTimeDirPath.as_posix()}"
log.warn(
event="folder does not exist for init time",
inittime=f"{it:%Y%/m/%d %H:%M}",
directorypath=initTimeDirPath.as_posix()
)
return it, []

paths: list[pathlib.Path] = list(initTimeDirPath.iterdir())

Expand All @@ -67,14 +71,20 @@ def copyITFolderToTemp(self, *, prefix: pathlib.Path, it: dt.datetime) \
# Read all files into temporary files
tempPaths: list[pathlib.Path] = []
for path in paths:
with path.open("rb") as infile:
tfp: pathlib.Path = internal.TMP_DIR / str(TypeID(prefix='nwpc'))
with tfp.open("wb") as tmpfile:
for chunk in iter(lambda: infile.read(16 * 1024), b""):
tmpfile.write(chunk)
if tfp.stat().st_size == 0:
raise ValueError(f"downloaded file {path} is empty")
tempPaths.append(tfp)
if path.exists() is False or path.stat().st_size == 0:
log.warn(
event="temp file is empty",
filepath=path.as_posix()
)
continue
tfp: pathlib.Path = internal.TMP_DIR / str(TypeID(prefix='nwpc'))
shutil.copy2(src=path, dst=tfp)
tempPaths.append(tfp)

log.debug(
event="copied it folder to temporary files",
nbytes=[p.stat().st_size for p in tempPaths]
)

return it, tempPaths

Expand Down
22 changes: 14 additions & 8 deletions src/nwp_consumer/internal/outputs/s3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,31 @@ def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]:
)
return sortedInitTimes

def copyITFolderToTemp(self, *, prefix: pathlib.Path, it: dt.datetime) \
-> tuple[dt.datetime, list[pathlib.Path]]:
initTimeDirPath = self.__bucket / prefix \
/ it.strftime(internal.IT_FOLDER_FMTSTR)
paths = [pathlib.Path(p)
for p in self.__fs.ls(initTimeDirPath.as_posix())]
def copyITFolderToTemp(self, *, prefix: pathlib.Path, it: dt.datetime) -> tuple[dt.datetime, list[pathlib.Path]]:
initTimeDirPath = self.__bucket / prefix / it.strftime(internal.IT_FOLDER_FMTSTR)
paths = [pathlib.Path(p) for p in self.__fs.ls(initTimeDirPath.as_posix())]

# Read all files into temporary files
tempPaths: list[pathlib.Path] = []
for path in paths:
if path.exists() is False or path.stat().st_size == 0:
log.warn(
event="temporary file is empty",
filepath=path.as_posix()
)
continue
with self.__fs.open(path=path.as_posix(), mode="rb") as infile:
tfp: pathlib.Path = internal.TMP_DIR / str(TypeID(prefix='nwpc'))
with tfp.open("wb") as tmpfile:
for chunk in iter(lambda: infile.read(16 * 1024), b""):
tmpfile.write(chunk)
if tfp.stat().st_size == 0:
raise ValueError(f"downloaded file {path} is empty")
tempPaths.append(tfp)

log.debug(
event="copied it folder to temporary files",
nbytes=[p.stat().st_size for p in tempPaths]
)

return it, tempPaths

def delete(self, *, p: pathlib.Path) -> None:
Expand Down
22 changes: 19 additions & 3 deletions src/nwp_consumer/internal/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def DownloadRawDataset(self, *, start: dt.date, end: dt.date) -> int:
# * This deletes the temporary files
for future in concurrent.futures.as_completed(futures):
fileInfo, tempFile = future.result()
if tempFile == pathlib.Path():
continue
nbytes += self.storer.store(
src=tempFile,
dst=self.rawdir / fileInfo.initTime().strftime(internal.IT_FOLDER_FMTSTR) / (fileInfo.fname() + ".grib")
Expand Down Expand Up @@ -134,18 +136,32 @@ def ConvertRawDatasetToZarr(self, *, start: dt.date, end: dt.date) -> int:
for future in concurrent.futures.as_completed(futures):
initTime, tempPaths = future.result()

if not tempPaths:
log.warn(
event=f"no files for initTime",
initTime=initTime.strftime("%Y/%m/%d %H:%M")
)
continue

log.debug(
f"creating zarr for initTime",
event=f"creating zarr for initTime",
initTime=initTime.strftime("%Y/%m/%d %H:%M")
)

# Create a pipeline to convert the raw files and merge them as a dataset
bag: dask.bag.Bag = dask.bag.from_sequence(tempPaths)
dataset = bag.map(lambda tfp: self.fetcher.mapTemp(p=tfp)) \
.fold(lambda ds1, ds2: xr.merge([ds1, ds2], combine_attrs="drop_conflicts")) \
dataset = bag.map(func=lambda tfp: self.fetcher.mapTemp(p=tfp)) \
.fold(binop=lambda ds1, ds2: xr.merge([ds1, ds2], combine_attrs="drop_conflicts")) \
.compute()

# Carry out a basic data quality check
if dataset == xr.Dataset():
log.warn(
event=f"Dataset for initTime is empty",
initTime=initTime.strftime("%Y/%m/%d %H:%M")
)
continue

for var in dataset.coords['variable'].values:
if True in dataset.sel(variable=var).isnull():
log.warn(
Expand Down

0 comments on commit 25a91d4

Please sign in to comment.