Skip to content

Commit

Permalink
fix(tensorstore): Save path and store size corrections
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Nov 4, 2024
1 parent 0f6c84b commit 1b17f0f
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 28 deletions.
1 change: 1 addition & 0 deletions src/nwp_consumer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
"gribapi",
"aiobotocore",
"s3fs",
"fsspec",
"asyncio",
"botocore",
"cfgrib",
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion src/nwp_consumer/internal/entities/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def metadata(self) -> ParameterData:
"incident on the surface expected over the next hour.",
units="W/m^2",
limits=ParameterLimits(upper=500, lower=0),
alternate_shortnames=["strd", "dlwrf"]
alternate_shortnames=["strd", "dlwrf"],
)
case self.RELATIVE_HUMIDITY_SL.name:
return ParameterData(
Expand Down
24 changes: 13 additions & 11 deletions src/nwp_consumer/internal/entities/tensorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,17 @@ class TensorStore:
coordinate_map: NWPDimensionCoordinateMap
"""The coordinates of the store."""

size_mb: int
"""The size of the store in megabytes."""
size_kb: int
"""The size of the store in kilobytes."""

encoding: dict[str, Any]
"""The encoding passed to Zarr whilst writing."""

@classmethod
def initialize_empty_store(
cls,
name: str,
model: str,
repository: str,
coords: NWPDimensionCoordinateMap,
overwrite_existing: bool = True,
) -> ResultE["TensorStore"]:
Expand Down Expand Up @@ -112,7 +113,9 @@ def initialize_empty_store(
- As above for the init_time dimension.
Args:
name: The name of the tensor.
model: The name of the model providing the tensor data.
This is also used as the name of the tensor.
repository: The name of the repository providing the tensor data.
coords: The coordinates of the store.
overwrite_existing: Whether to overwrite an existing store.
Expand Down Expand Up @@ -140,8 +143,8 @@ def initialize_empty_store(
store_range = f"{coords.init_time[0]:%Y%m%d%H}-{coords.init_time[-1]:%Y%m%d%H}"

store_path = pathlib.Path(
f"{os.getenv('ZARRDIR', f'~/.local/cache/nwp/{name}/data')}/{store_range}.zarr",
)
os.getenv("ZARRDIR", f"~/.local/cache/nwp/{repository}/{model}/data")

Check failure on line 146 in src/nwp_consumer/internal/entities/tensorstore.py

View workflow job for this annotation

GitHub Actions / lint-typecheck

Ruff (COM812)

src/nwp_consumer/internal/entities/tensorstore.py:146:82: COM812 Trailing comma missing
) / f"{store_range}.zarr"

# * Define a set of chunks allowing for intermediate parallel writes
# NOTE: This is not the same as the final chunking of the dataset!
Expand Down Expand Up @@ -175,7 +178,7 @@ def initialize_empty_store(
}
# Create a DataArray object with the given coordinates and dummy values
da: xr.DataArray = xr.DataArray(
name=name,
name=model,
data=dummy_values,
coords=coords.to_pandas(),
attrs=attrs,
Expand Down Expand Up @@ -240,10 +243,10 @@ def initialize_empty_store(

return Success(
cls(
name=name,
name=model,
path=store_path,
coordinate_map=coordinate_map_result.unwrap(),
size_mb=0,
size_kb=0,
encoding=encoding,
),
)
Expand Down Expand Up @@ -291,7 +294,7 @@ def write_to_region(
# Calculate the number of bytes written
nbytes: int = da.nbytes
del da
self.size_mb += nbytes // (1024**2)
self.size_kb += nbytes // 1024
return Success(nbytes)

def validate_store(self) -> ResultE[bool]:
Expand Down Expand Up @@ -367,7 +370,6 @@ def scan_parameter_values(self, p: Parameter) -> ResultE[ParameterScanResult]:
),
)


def postprocess(self, options: PostProcessOptions) -> ResultE[pathlib.Path]:
"""Post-process the store.
Expand Down
3 changes: 2 additions & 1 deletion src/nwp_consumer/internal/entities/test_tensorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def setUp(self) -> None:
)

init_result = TensorStore.initialize_empty_store(
name="test_da",
model="test_da",
repository="dummy_repository",
coords=self.test_coords,
)
match init_result:
Expand Down
6 changes: 3 additions & 3 deletions src/nwp_consumer/internal/ports/repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ def fetch_init_data(self, it: dt.datetime) \
``_download_and_convert`` in the example above.
This is to allow for parallelization of the download and processing.
.. note:: It is however, worth considering the most efficient way to download and process the data.
The above assumes that the data comes in many files, but there is a possibility of the
case where the source provides one large file with many underlying datasets within.
.. note:: It is however, worth considering the most efficient way to download and process
the data. The above assumes that the data comes in many files, but there is a possibility
of the case where the source provides one large file with many underlying datasets within.
In this case, it may be more efficient to download the large file in the
`fetch_init_data` method and then process the datasets within via the yielded functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ def model() -> entities.ModelMetadata:
@override
def fetch_init_data(self, it: dt.datetime) \
-> Iterator[Callable[..., ResultE[list[xr.DataArray]]]]:

# List relevant files in the S3 bucket
try:
urls: list[str] = [
Expand All @@ -145,6 +144,10 @@ def fetch_init_data(self, it: dt.datetime) \
"named with the expected pattern, e.g. 'A2S10250000102603001.",
))

log.debug(
f"Found {len(urls)} files for init time '{it.strftime('%Y-%m-%d %H:%M')}' "
f"in bucket path '{self.bucket}/ecmwf'.",
)
for url in urls:
yield delayed(self._download_and_convert)(url=url)

Expand All @@ -167,6 +170,7 @@ def authenticate(cls) -> ResultE["ECMWFRealTimeS3ModelRepository"]:
f"Credentials may be wrong or undefined. Encountered error: {e}",
))

log.debug(f"Successfully authenticated with S3 instance '{bucket}'")
return Success(cls(bucket=bucket, fs=_fs))


Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
"""Repository implementation for NOAA GFS data stored in S3.
This module contains the implementation of the model repository for the
NOAA GFS data stored in an S3 bucket.
"""

import datetime as dt
import logging
import os
import pathlib
import re
from collections.abc import Callable, Iterator
from readline import backend
from typing import override

import cfgrib
Expand Down Expand Up @@ -190,11 +195,22 @@ def _convert(self, path: pathlib.Path) -> ResultE[list[xr.DataArray]]:
path: The path to the local grib file.
"""
try:
# Squeeze reduces length-1- dimensions to scalar coordinates,
# Thus single-level variables should not have any extra dimensions
# Use some options when opening the datasets:
# * 'squeeze' reduces length-1- dimensions to scalar coordinates,
# thus single-level variables should not have any extra dimensions
# * 'filter_by_keys' reduces the number of variables loaded to only those
# in the expected list
dss: list[xr.Dataset] = cfgrib.open_datasets(
path.as_posix(),
backend_kwargs={"squeeze": True},
backend_kwargs={
"squeeze": True,
"filter_by_keys": {
"shortName": [
x for v in self.model().expected_coordinates.variable
for x in v.metadata().alternate_shortnames
],
},
},
)
except Exception as e:
return Failure(ValueError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test__download_and_convert(self) -> None:

c: NOAAGFSS3ModelRepository = NOAAGFSS3ModelRepository.authenticate().unwrap()

test_it: dt.datetime = dt.datetime(2024, 10, 25, 0, tzinfo=dt.UTC)
test_it: dt.datetime = dt.datetime(2024, 10, 24, 12, tzinfo=dt.UTC)
test_coordinates: entities.NWPDimensionCoordinateMap = dataclasses.replace(
c.model().expected_coordinates,
init_time=[test_it],
Expand Down
5 changes: 3 additions & 2 deletions src/nwp_consumer/internal/services/archiver_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def archive(self, year: int, month: int) -> ResultE[pathlib.Path]:
# Create a store for the archive
init_store_result: ResultE[entities.TensorStore] = \
entities.TensorStore.initialize_empty_store(
name=self.mr.repository().name,
model=self.mr.model().name,
repository=self.mr.repository().name,
coords=dataclasses.replace(
self.mr.model().expected_coordinates,
init_time=init_times,
Expand Down Expand Up @@ -116,7 +117,7 @@ def archive(self, year: int, month: int) -> ResultE[pathlib.Path]:
notify_result = self.nr().notify(
message=entities.StoreCreatedNotification(
filename=store.path.name,
size_mb=store.size_mb,
size_mb=store.size_kb // 1024,
performance=entities.PerformanceMetadata(
duration_seconds=monitor.get_runtime(),
memory_mb=max(monitor.memory_buffer) / 1e6,
Expand Down
10 changes: 6 additions & 4 deletions src/nwp_consumer/internal/services/consumer_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pathlib
from typing import override

from joblib import Parallel
from joblib import Parallel, cpu_count
from returns.result import Failure, ResultE, Success

from nwp_consumer.internal import entities, ports
Expand Down Expand Up @@ -50,7 +50,8 @@ def consume(self, it: dt.datetime | None = None) -> ResultE[pathlib.Path]:
# Create a store for the init time
init_store_result: ResultE[entities.TensorStore] = \
entities.TensorStore.initialize_empty_store(
name=self.mr.model().name,
model=self.mr.model().name,
repository=self.mr.repository().name,
coords=dataclasses.replace(self.mr.model().expected_coordinates, init_time=[it]),
)

Expand All @@ -72,7 +73,8 @@ def consume(self, it: dt.datetime | None = None) -> ResultE[pathlib.Path]:
amr = amr_result.unwrap()

fetch_result_generator = Parallel(
n_jobs=1, # TODO - fix segfault when using multiple threads
# TODO - fix segfault when using multiple threads
n_jobs=max(cpu_count() - 1, self.mr.repository().max_connections),
prefer="threads",
return_as="generator_unordered",
)(amr.fetch_init_data(it=it))
Expand Down Expand Up @@ -118,7 +120,7 @@ def consume(self, it: dt.datetime | None = None) -> ResultE[pathlib.Path]:
notify_result = self.nr().notify(
message=entities.StoreCreatedNotification(
filename=store.path.name,
size_mb=store.size_mb,
size_mb=store.size_kb // 1024,
performance=entities.PerformanceMetadata(
duration_seconds=monitor.get_runtime(),
memory_mb=max(monitor.memory_buffer) / 1e6,
Expand Down

0 comments on commit 1b17f0f

Please sign in to comment.