Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic chunking interface for StoreToZarr #595

Merged
merged 28 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
14e4fa2
add dynamic_chunking_fn field to storetozarr
cisaacstern Aug 31, 2023
1eda370
test dynamic chunking kws
cisaacstern Aug 31, 2023
1765f67
test dynamic chunking with target chunks raises
cisaacstern Aug 31, 2023
1f16d83
Merge remote-tracking branch 'origin/main' into dynamic-chunks-interface
cisaacstern Sep 1, 2023
74b8ebf
xr.Datasets are more familiar than XarraySchemas, so let users genera…
cisaacstern Sep 1, 2023
952ea57
Merge branch 'main' into dynamic-chunks-interface
cisaacstern Sep 19, 2023
147cd20
fix transforms test for attrs
cisaacstern Sep 19, 2023
9e7dd67
Added integration test for dynamic chunking plugin
jbusecke Nov 15, 2023
9770cf5
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 15, 2023
f7aee4e
Expanded docstring
jbusecke Nov 15, 2023
4918ffd
Merge branch 'dynamic-chunks-interface' of https://github.com/pangeo-…
jbusecke Nov 15, 2023
81350ac
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 15, 2023
6d3e965
Test patching runner to not depend on recipes
jbusecke Nov 15, 2023
6403002
Merge branch 'dynamic-chunks-interface' of https://github.com/pangeo-…
jbusecke Nov 15, 2023
0c0819e
Merge remote-tracking branch 'upstream/main' into dynamic-chunks-inte…
jbusecke Nov 15, 2023
ad2e237
Did we need that patch at all?
jbusecke Nov 15, 2023
7983643
reverse install order
jbusecke Nov 15, 2023
c7c5206
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 15, 2023
d177dc4
Add chunking test
jbusecke Nov 15, 2023
cbbfe40
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 15, 2023
dcb1aa7
reduce chunking
jbusecke Nov 15, 2023
9e15a25
Merge branch 'dynamic-chunks-interface' of https://github.com/pangeo-…
jbusecke Nov 15, 2023
7dc9be7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 15, 2023
ff5fa4c
Added checks to non-chunked gpcp recipe and reduced dynamic chunks to 2
jbusecke Nov 15, 2023
caef6c0
Merge branch 'dynamic-chunks-interface' of https://github.com/pangeo-…
jbusecke Nov 15, 2023
5ee3f8a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 15, 2023
f19acfb
Add release notes
jbusecke Nov 15, 2023
2afd727
Merge branch 'dynamic-chunks-interface' of https://github.com/pangeo-…
jbusecke Nov 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/test-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ jobs:
- name: 🌈 Install pangeo-forge-recipes & pangeo-forge-runner
shell: bash -l {0}
run: |
python -m pip install -e ".[test,minio]"
python -m pip install ${{ matrix.runner-version }}
python -m pip install -e ".[test,minio]"

# order reversed to fix https://github.com/pangeo-forge/pangeo-forge-recipes/pull/595#issuecomment-1811630921
# this should however be fixed in the runner itself
- name: 🏄‍♂️ Run Tests
shell: bash -l {0}
run: |
Expand Down
52 changes: 52 additions & 0 deletions examples/feedstock/gpcp_from_gcs_dynamic_chunks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Dict

import apache_beam as beam
import pandas as pd
import xarray as xr
import zarr

from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr

dates = [
d.to_pydatetime().strftime("%Y%m%d")
for d in pd.date_range("1996-10-01", "1999-02-01", freq="D")
]


def make_url(time):
url_base = "https://storage.googleapis.com/pforge-test-data"
return f"{url_base}/gpcp/v01r03_daily_d{time}.nc"


concat_dim = ConcatDim("time", dates, nitems_per_file=1)
pattern = FilePattern(make_url, concat_dim)


def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore:
# This fails integration test if not imported here
# TODO: see if --setup-file option for runner fixes this
import xarray as xr

ds = xr.open_dataset(store, engine="zarr", chunks={})
assert ds.title == (
"Global Precipitation Climatatology Project (GPCP) " "Climate Data Record (CDR), Daily V1.3"
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jbusecke let's assert that the chunking of the opened dataset is in fact what we set dynamically in with the dynamic chunking function :)

return store


def chunk_func(ds: xr.Dataset) -> Dict[str, int]:
return {"time": 3}


recipe = (
beam.Create(pattern.items())
| OpenURLWithFSSpec()
| OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={"decode_coords": "all"})
| StoreToZarr(
dynamic_chunking_fn=chunk_func,
store_name="gpcp.zarr",
combine_dims=pattern.combine_dim_keys,
)
| "Test dataset" >> beam.Map(test_ds)
)
2 changes: 2 additions & 0 deletions examples/feedstock/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
recipes:
- id: "gpcp-from-gcs"
object: "gpcp_from_gcs:recipe"
- id: "gpcp-from-gcs-dynamic-chunks"
object: "gpcp_from_gcs_dynamic_chunks:recipe"
- id: "noaa-oisst"
object: "noaa_oisst:recipe"
- id: "terraclimate"
Expand Down
32 changes: 27 additions & 5 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import xarray as xr
import zarr

from .aggregation import XarraySchema, dataset_to_schema, schema_to_zarr
from .aggregation import XarraySchema, dataset_to_schema, schema_to_template_ds, schema_to_zarr
from .combiners import CombineMultiZarrToZarr, CombineXarraySchemas
from .openers import open_url, open_with_kerchunk, open_with_xarray
from .patterns import CombineOp, Dimension, FileType, Index, augment_index_with_start_stop
Expand Down Expand Up @@ -505,7 +505,14 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
:param target_root: Root path the Zarr store will be created inside;
`store_name` will be appended to this prefix to create a full path.
:param target_chunks: Dictionary mapping dimension names to chunks sizes.
If a dimension is a not named, the chunks will be inferred from the data.
If a dimension is a not named, the chunks will be inferred from the data.
:param dynamic_chunking_fn: Optionally provide a function that takes an ``xarray.Dataset``
template dataset as its first argument and returns a dynamically generated chunking dict.
If provided, ``target_chunks`` cannot also be passed. You can use this to determine chunking
based on the full dataset (e.g. divide along a certain dimension based on a desired chunk
size in memory). For more advanced chunking strategies, check
out https://github.com/jbusecke/dynamic_chunks
:param dynamic_chunking_fn_kwargs: Optional keyword arguments for ``dynamic_chunking_fn``.
:param attrs: Extra group-level attributes to inject into the dataset.
"""

Expand All @@ -517,19 +524,34 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
default_factory=RequiredAtRuntimeDefault
)
target_chunks: Dict[str, int] = field(default_factory=dict)
dynamic_chunking_fn: Optional[Callable[[xr.Dataset], dict]] = None
dynamic_chunking_fn_kwargs: Optional[dict] = field(default_factory=dict)
attrs: Dict[str, str] = field(default_factory=dict)

def __post_init__(self):
if self.target_chunks and self.dynamic_chunking_fn:
raise ValueError("Passing both `target_chunks` and `dynamic_chunking_fn` not allowed.")

def expand(
self,
datasets: beam.PCollection[Tuple[Index, xr.Dataset]],
) -> beam.PCollection[zarr.storage.FSStore]:
schema = datasets | DetermineSchema(combine_dims=self.combine_dims)
indexed_datasets = datasets | IndexItems(schema=schema)
rechunked_datasets = indexed_datasets | Rechunk(
target_chunks=self.target_chunks, schema=schema
target_chunks = (
self.target_chunks
if not self.dynamic_chunking_fn
else beam.pvalue.AsSingleton(
schema
| beam.Map(schema_to_template_ds)
| beam.Map(self.dynamic_chunking_fn, **self.dynamic_chunking_fn_kwargs)
)
)
rechunked_datasets = indexed_datasets | Rechunk(target_chunks=target_chunks, schema=schema)
target_store = schema | PrepareZarrTarget(
target=self.get_full_target(), target_chunks=self.target_chunks, attrs=self.attrs
target=self.get_full_target(),
target_chunks=target_chunks,
attrs=self.attrs,
)
n_target_stores = rechunked_datasets | StoreDatasetFragments(target_store=target_store)
singleton_target_store = (
Expand Down
85 changes: 78 additions & 7 deletions tests/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,18 +234,20 @@ def _check_chunks(actual):
assert_that(rechunked, correct_chunks())


class OpenZarrStore(beam.PTransform):
@staticmethod
def _open_zarr(store):
return xr.open_dataset(store, engine="zarr", chunks={})

def expand(self, pcoll):
return pcoll | beam.Map(self._open_zarr)


def test_StoreToZarr_emits_openable_fsstore(
pipeline,
netcdf_local_file_pattern_sequential,
tmp_target_url,
):
def _open_zarr(store):
return xr.open_dataset(store, engine="zarr")

class OpenZarrStore(beam.PTransform):
def expand(self, pcoll):
return pcoll | beam.Map(_open_zarr)

def is_xrdataset():
def _is_xr_dataset(actual):
assert len(actual) == 1
Expand All @@ -266,6 +268,75 @@ def _is_xr_dataset(actual):
assert_that(open_store, is_xrdataset())


@pytest.mark.parametrize("with_kws", [True, False])
def test_StoreToZarr_dynamic_chunking_interface(
pipeline: beam.Pipeline,
netcdf_local_file_pattern_sequential: FilePattern,
tmp_target_url: str,
daily_xarray_dataset: xr.Dataset,
with_kws: bool,
):
def has_dynamically_set_chunks():
def _has_dynamically_set_chunks(actual):
assert len(actual) == 1
item = actual[0]
assert isinstance(item, xr.Dataset)
if not with_kws:
# we've dynamically set the number of timesteps per chunk to be equal to
# the length of the full time dimension of the aggregate dataset, therefore
# if this worked, there should only be one chunk
assert len(item.chunks["time"]) == 1
else:
# in this case, we've passed the kws {"divisor": 2}, so we expect two time chunks
assert len(item.chunks["time"]) == 2

return _has_dynamically_set_chunks

pattern: FilePattern = netcdf_local_file_pattern_sequential

time_len = len(daily_xarray_dataset.time)

def dynamic_chunking_fn(template_ds: xr.Dataset, divisor: int = 1):
assert isinstance(template_ds, xr.Dataset)
return {"time": int(time_len / divisor)}

kws = {} if not with_kws else {"dynamic_chunking_fn_kwargs": {"divisor": 2}}

with pipeline as p:
datasets = p | beam.Create(pattern.items()) | OpenWithXarray()
target_store = datasets | StoreToZarr(
target_root=tmp_target_url,
store_name="test.zarr",
combine_dims=pattern.combine_dim_keys,
attrs={},
dynamic_chunking_fn=dynamic_chunking_fn,
**kws,
)
open_store = target_store | OpenZarrStore()
assert_that(open_store, has_dynamically_set_chunks())


def test_StoreToZarr_dynamic_chunking_with_target_chunks_raises(
netcdf_local_file_pattern_sequential: FilePattern,
):
def fn(template_ds):
pass

pattern: FilePattern = netcdf_local_file_pattern_sequential

with pytest.raises(
ValueError,
match="Passing both `target_chunks` and `dynamic_chunking_fn` not allowed",
):
_ = StoreToZarr(
target_root="target_root",
store_name="test.zarr",
combine_dims=pattern.combine_dim_keys,
target_chunks={"time": 1},
dynamic_chunking_fn=fn,
)


def test_StoreToZarr_target_root_default_unrunnable(
pipeline,
netcdf_local_file_pattern_sequential,
Expand Down