From 1b17f0fb740fc21cacffe3e69e4cc0cd0e5c3b73 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Mon, 4 Nov 2024 11:48:45 +0000 Subject: [PATCH] fix(tensorstore): Save path and store size corrections --- src/nwp_consumer/__init__.py | 1 + src/nwp_consumer/{cmd.tmp => cmd}/__init__.py | 0 src/nwp_consumer/{cmd.tmp => cmd}/main.py | 0 .../internal/entities/parameters.py | 2 +- .../internal/entities/tensorstore.py | 24 ++++++++++--------- .../internal/entities/test_tensorstore.py | 3 ++- .../internal/ports/repositories.py | 6 ++--- .../model_repositories/ecmwf_realtime.py | 6 ++++- .../model_repositories/noaa_gfs.py | 24 +++++++++++++++---- .../model_repositories/test_noaa_gfs.py | 2 +- .../internal/services/archiver_service.py | 5 ++-- .../internal/services/consumer_service.py | 10 ++++---- 12 files changed, 55 insertions(+), 28 deletions(-) rename src/nwp_consumer/{cmd.tmp => cmd}/__init__.py (100%) rename src/nwp_consumer/{cmd.tmp => cmd}/main.py (100%) diff --git a/src/nwp_consumer/__init__.py b/src/nwp_consumer/__init__.py index b6f4d45..4aaa887 100644 --- a/src/nwp_consumer/__init__.py +++ b/src/nwp_consumer/__init__.py @@ -149,6 +149,7 @@ "gribapi", "aiobotocore", "s3fs", + "fsspec", "asyncio", "botocore", "cfgrib", diff --git a/src/nwp_consumer/cmd.tmp/__init__.py b/src/nwp_consumer/cmd/__init__.py similarity index 100% rename from src/nwp_consumer/cmd.tmp/__init__.py rename to src/nwp_consumer/cmd/__init__.py diff --git a/src/nwp_consumer/cmd.tmp/main.py b/src/nwp_consumer/cmd/main.py similarity index 100% rename from src/nwp_consumer/cmd.tmp/main.py rename to src/nwp_consumer/cmd/main.py diff --git a/src/nwp_consumer/internal/entities/parameters.py b/src/nwp_consumer/internal/entities/parameters.py index 1735755..c5c0e39 100644 --- a/src/nwp_consumer/internal/entities/parameters.py +++ b/src/nwp_consumer/internal/entities/parameters.py @@ -146,7 +146,7 @@ def metadata(self) -> ParameterData: "incident on the surface expected over the next hour.", units="W/m^2", limits=ParameterLimits(upper=500, lower=0), - alternate_shortnames=["strd", "dlwrf"] + alternate_shortnames=["strd", "dlwrf"], ) case self.RELATIVE_HUMIDITY_SL.name: return ParameterData( diff --git a/src/nwp_consumer/internal/entities/tensorstore.py b/src/nwp_consumer/internal/entities/tensorstore.py index 7b857ee..1451db1 100644 --- a/src/nwp_consumer/internal/entities/tensorstore.py +++ b/src/nwp_consumer/internal/entities/tensorstore.py @@ -70,8 +70,8 @@ class TensorStore: coordinate_map: NWPDimensionCoordinateMap """The coordinates of the store.""" - size_mb: int - """The size of the store in megabytes.""" + size_kb: int + """The size of the store in kilobytes.""" encoding: dict[str, Any] """The encoding passed to Zarr whilst writing.""" @@ -79,7 +79,8 @@ class TensorStore: @classmethod def initialize_empty_store( cls, - name: str, + model: str, + repository: str, coords: NWPDimensionCoordinateMap, overwrite_existing: bool = True, ) -> ResultE["TensorStore"]: @@ -112,7 +113,9 @@ def initialize_empty_store( - As above for the init_time dimension. Args: - name: The name of the tensor. + model: The name of the model providing the tensor data. + This is also used as the name of the tensor. + repository: The name of the repository providing the tensor data. coords: The coordinates of the store. overwrite_existing: Whether to overwrite an existing store. @@ -140,8 +143,8 @@ def initialize_empty_store( store_range = f"{coords.init_time[0]:%Y%m%d%H}-{coords.init_time[-1]:%Y%m%d%H}" store_path = pathlib.Path( - f"{os.getenv('ZARRDIR', f'~/.local/cache/nwp/{name}/data')}/{store_range}.zarr", - ) + os.getenv("ZARRDIR", f"~/.local/cache/nwp/{repository}/{model}/data") + ) / f"{store_range}.zarr" # * Define a set of chunks allowing for intermediate parallel writes # NOTE: This is not the same as the final chunking of the dataset! @@ -175,7 +178,7 @@ def initialize_empty_store( } # Create a DataArray object with the given coordinates and dummy values da: xr.DataArray = xr.DataArray( - name=name, + name=model, data=dummy_values, coords=coords.to_pandas(), attrs=attrs, @@ -240,10 +243,10 @@ def initialize_empty_store( return Success( cls( - name=name, + name=model, path=store_path, coordinate_map=coordinate_map_result.unwrap(), - size_mb=0, + size_kb=0, encoding=encoding, ), ) @@ -291,7 +294,7 @@ def write_to_region( # Calculate the number of bytes written nbytes: int = da.nbytes del da - self.size_mb += nbytes // (1024**2) + self.size_kb += nbytes // 1024 return Success(nbytes) def validate_store(self) -> ResultE[bool]: @@ -367,7 +370,6 @@ def scan_parameter_values(self, p: Parameter) -> ResultE[ParameterScanResult]: ), ) - def postprocess(self, options: PostProcessOptions) -> ResultE[pathlib.Path]: """Post-process the store. diff --git a/src/nwp_consumer/internal/entities/test_tensorstore.py b/src/nwp_consumer/internal/entities/test_tensorstore.py index c7c9d14..84db810 100644 --- a/src/nwp_consumer/internal/entities/test_tensorstore.py +++ b/src/nwp_consumer/internal/entities/test_tensorstore.py @@ -29,7 +29,8 @@ def setUp(self) -> None: ) init_result = TensorStore.initialize_empty_store( - name="test_da", + model="test_da", + repository="dummy_repository", coords=self.test_coords, ) match init_result: diff --git a/src/nwp_consumer/internal/ports/repositories.py b/src/nwp_consumer/internal/ports/repositories.py index aea603a..858d065 100644 --- a/src/nwp_consumer/internal/ports/repositories.py +++ b/src/nwp_consumer/internal/ports/repositories.py @@ -87,9 +87,9 @@ def fetch_init_data(self, it: dt.datetime) \ ``_download_and_convert`` in the example above. This is to allow for parallelization of the download and processing. - .. note:: It is however, worth considering the most efficient way to download and process the data. - The above assumes that the data comes in many files, but there is a possibility of the - case where the source provides one large file with many underlying datasets within. + .. note:: It is however, worth considering the most efficient way to download and process + the data. The above assumes that the data comes in many files, but there is a possibility + of the case where the source provides one large file with many underlying datasets within. In this case, it may be more efficient to download the large file in the `fetch_init_data` method and then process the datasets within via the yielded functions. diff --git a/src/nwp_consumer/internal/repositories/model_repositories/ecmwf_realtime.py b/src/nwp_consumer/internal/repositories/model_repositories/ecmwf_realtime.py index c8c36cd..24f5995 100644 --- a/src/nwp_consumer/internal/repositories/model_repositories/ecmwf_realtime.py +++ b/src/nwp_consumer/internal/repositories/model_repositories/ecmwf_realtime.py @@ -118,7 +118,6 @@ def model() -> entities.ModelMetadata: @override def fetch_init_data(self, it: dt.datetime) \ -> Iterator[Callable[..., ResultE[list[xr.DataArray]]]]: - # List relevant files in the S3 bucket try: urls: list[str] = [ @@ -145,6 +144,10 @@ def fetch_init_data(self, it: dt.datetime) \ "named with the expected pattern, e.g. 'A2S10250000102603001.", )) + log.debug( + f"Found {len(urls)} files for init time '{it.strftime('%Y-%m-%d %H:%M')}' " + f"in bucket path '{self.bucket}/ecmwf'.", + ) for url in urls: yield delayed(self._download_and_convert)(url=url) @@ -167,6 +170,7 @@ def authenticate(cls) -> ResultE["ECMWFRealTimeS3ModelRepository"]: f"Credentials may be wrong or undefined. Encountered error: {e}", )) + log.debug(f"Successfully authenticated with S3 instance '{bucket}'") return Success(cls(bucket=bucket, fs=_fs)) diff --git a/src/nwp_consumer/internal/repositories/model_repositories/noaa_gfs.py b/src/nwp_consumer/internal/repositories/model_repositories/noaa_gfs.py index 08fc70a..5eea48b 100644 --- a/src/nwp_consumer/internal/repositories/model_repositories/noaa_gfs.py +++ b/src/nwp_consumer/internal/repositories/model_repositories/noaa_gfs.py @@ -1,10 +1,15 @@ +"""Repository implementation for NOAA GFS data stored in S3. + +This module contains the implementation of the model repository for the +NOAA GFS data stored in an S3 bucket. +""" + import datetime as dt import logging import os import pathlib import re from collections.abc import Callable, Iterator -from readline import backend from typing import override import cfgrib @@ -190,11 +195,22 @@ def _convert(self, path: pathlib.Path) -> ResultE[list[xr.DataArray]]: path: The path to the local grib file. """ try: - # Squeeze reduces length-1- dimensions to scalar coordinates, - # Thus single-level variables should not have any extra dimensions + # Use some options when opening the datasets: + # * 'squeeze' reduces length-1- dimensions to scalar coordinates, + # thus single-level variables should not have any extra dimensions + # * 'filter_by_keys' reduces the number of variables loaded to only those + # in the expected list dss: list[xr.Dataset] = cfgrib.open_datasets( path.as_posix(), - backend_kwargs={"squeeze": True}, + backend_kwargs={ + "squeeze": True, + "filter_by_keys": { + "shortName": [ + x for v in self.model().expected_coordinates.variable + for x in v.metadata().alternate_shortnames + ], + }, + }, ) except Exception as e: return Failure(ValueError( diff --git a/src/nwp_consumer/internal/repositories/model_repositories/test_noaa_gfs.py b/src/nwp_consumer/internal/repositories/model_repositories/test_noaa_gfs.py index 9994e58..2c60c32 100644 --- a/src/nwp_consumer/internal/repositories/model_repositories/test_noaa_gfs.py +++ b/src/nwp_consumer/internal/repositories/model_repositories/test_noaa_gfs.py @@ -28,7 +28,7 @@ def test__download_and_convert(self) -> None: c: NOAAGFSS3ModelRepository = NOAAGFSS3ModelRepository.authenticate().unwrap() - test_it: dt.datetime = dt.datetime(2024, 10, 25, 0, tzinfo=dt.UTC) + test_it: dt.datetime = dt.datetime(2024, 10, 24, 12, tzinfo=dt.UTC) test_coordinates: entities.NWPDimensionCoordinateMap = dataclasses.replace( c.model().expected_coordinates, init_time=[test_it], diff --git a/src/nwp_consumer/internal/services/archiver_service.py b/src/nwp_consumer/internal/services/archiver_service.py index 160e251..1cbe3d7 100644 --- a/src/nwp_consumer/internal/services/archiver_service.py +++ b/src/nwp_consumer/internal/services/archiver_service.py @@ -45,7 +45,8 @@ def archive(self, year: int, month: int) -> ResultE[pathlib.Path]: # Create a store for the archive init_store_result: ResultE[entities.TensorStore] = \ entities.TensorStore.initialize_empty_store( - name=self.mr.repository().name, + model=self.mr.model().name, + repository=self.mr.repository().name, coords=dataclasses.replace( self.mr.model().expected_coordinates, init_time=init_times, @@ -116,7 +117,7 @@ def archive(self, year: int, month: int) -> ResultE[pathlib.Path]: notify_result = self.nr().notify( message=entities.StoreCreatedNotification( filename=store.path.name, - size_mb=store.size_mb, + size_mb=store.size_kb // 1024, performance=entities.PerformanceMetadata( duration_seconds=monitor.get_runtime(), memory_mb=max(monitor.memory_buffer) / 1e6, diff --git a/src/nwp_consumer/internal/services/consumer_service.py b/src/nwp_consumer/internal/services/consumer_service.py index dd47084..4ac4139 100644 --- a/src/nwp_consumer/internal/services/consumer_service.py +++ b/src/nwp_consumer/internal/services/consumer_service.py @@ -6,7 +6,7 @@ import pathlib from typing import override -from joblib import Parallel +from joblib import Parallel, cpu_count from returns.result import Failure, ResultE, Success from nwp_consumer.internal import entities, ports @@ -50,7 +50,8 @@ def consume(self, it: dt.datetime | None = None) -> ResultE[pathlib.Path]: # Create a store for the init time init_store_result: ResultE[entities.TensorStore] = \ entities.TensorStore.initialize_empty_store( - name=self.mr.model().name, + model=self.mr.model().name, + repository=self.mr.repository().name, coords=dataclasses.replace(self.mr.model().expected_coordinates, init_time=[it]), ) @@ -72,7 +73,8 @@ def consume(self, it: dt.datetime | None = None) -> ResultE[pathlib.Path]: amr = amr_result.unwrap() fetch_result_generator = Parallel( - n_jobs=1, # TODO - fix segfault when using multiple threads + # TODO - fix segfault when using multiple threads + n_jobs=max(cpu_count() - 1, self.mr.repository().max_connections), prefer="threads", return_as="generator_unordered", )(amr.fetch_init_data(it=it)) @@ -118,7 +120,7 @@ def consume(self, it: dt.datetime | None = None) -> ResultE[pathlib.Path]: notify_result = self.nr().notify( message=entities.StoreCreatedNotification( filename=store.path.name, - size_mb=store.size_mb, + size_mb=store.size_kb // 1024, performance=entities.PerformanceMetadata( duration_seconds=monitor.get_runtime(), memory_mb=max(monitor.memory_buffer) / 1e6,