From 8d4afc4c42d30c3e407c07c83389f94738371a69 Mon Sep 17 00:00:00 2001 From: Raphael Hagen Date: Tue, 12 Sep 2023 10:29:24 -0600 Subject: [PATCH] Revert "Adding parquet reference file storage" --- ci/py3.10.yml | 52 ------------------------------ ci/py3.11.yml | 52 ------------------------------ ci/py3.9.yml | 52 ------------------------------ pangeo_forge_recipes/storage.py | 7 ++-- pangeo_forge_recipes/transforms.py | 12 +++---- pangeo_forge_recipes/writers.py | 40 ++++------------------- tests/test_end_to_end.py | 24 +++----------- 7 files changed, 18 insertions(+), 221 deletions(-) delete mode 100644 ci/py3.10.yml delete mode 100644 ci/py3.11.yml delete mode 100644 ci/py3.9.yml diff --git a/ci/py3.10.yml b/ci/py3.10.yml deleted file mode 100644 index b0a37e0c..00000000 --- a/ci/py3.10.yml +++ /dev/null @@ -1,52 +0,0 @@ -name: pangeo-forge-recipes -channels: - - conda-forge -dependencies: - - python=3.10 - - aiohttp - - apache-beam - - black - - boto3 - - cfgrib - - cftime - - codecov - - dask - - distributed - - fastparquet - - fsspec>=2022.1.0 - - gcsfs>=2022.1.0 - - graphviz # needed for building tutorial notebooks - - h5netcdf - - h5py>=3.3.0 - - hdf5 - - intake - - intake-xarray - - kerchunk>=0.1.1 - - lxml # Optional dep of pydap - - matplotlib # needed for building tutorial notebooks - - netcdf4 - - numcodecs - - numpy - - pandas - - pip - - pydap - # bring back eventually once pynio conda-forge package supports py3.9 and does not - # conflict with ujson, which is a depencency of kerchunk's conda-forge feedstock. - # See: https://github.com/conda-forge/pynio-feedstock/issues/114 - # - pynio - - pytest - - pytest-cov - - pytest-lazy-fixture - - python-graphviz # needed for building tutorial notebooks - - rasterio - - requests - - rechunker>=0.4.2 - - scipy - - s3fs>=2022.1.0 - - setuptools - - toolz - - xarray>=0.18.0 - - zarr>=2.6.0 - - pip: - - nbmake>=1.3.0 # used in tutorial nb worklow - - pytest-timeout diff --git a/ci/py3.11.yml b/ci/py3.11.yml deleted file mode 100644 index c3170617..00000000 --- a/ci/py3.11.yml +++ /dev/null @@ -1,52 +0,0 @@ -name: pangeo-forge-recipes -channels: - - conda-forge -dependencies: - - python=3.11 - - aiohttp - - apache-beam - - black - - boto3 - - cfgrib - - cftime - - codecov - - dask - - distributed - - fastparquet - - fsspec>=2022.1.0 - - gcsfs>=2022.1.0 - - graphviz # needed for building tutorial notebooks - - h5netcdf - - h5py>=3.3.0 - - hdf5 - - intake - - intake-xarray - - kerchunk>=0.1.1 - - lxml # Optional dep of pydap - - matplotlib # needed for building tutorial notebooks - - netcdf4 - - numcodecs - - numpy - - pandas - - pip - - pydap - # bring back eventually once pynio conda-forge package supports py3.9 and does not - # conflict with ujson, which is a depencency of kerchunk's conda-forge feedstock. - # See: https://github.com/conda-forge/pynio-feedstock/issues/114 - # - pynio - - pytest - - pytest-cov - - pytest-lazy-fixture - - python-graphviz # needed for building tutorial notebooks - - rasterio - - requests - - rechunker>=0.4.2 - - scipy - - s3fs>=2022.1.0 - - setuptools - - toolz - - xarray>=0.18.0 - - zarr>=2.6.0 - - pip: - - nbmake>=1.3.0 # used in tutorial nb worklow - - pytest-timeout diff --git a/ci/py3.9.yml b/ci/py3.9.yml deleted file mode 100644 index e354350d..00000000 --- a/ci/py3.9.yml +++ /dev/null @@ -1,52 +0,0 @@ -name: pangeo-forge-recipes -channels: - - conda-forge -dependencies: - - python=3.9 - - aiohttp - - apache-beam - - black - - boto3 - - cfgrib - - cftime - - codecov - - dask - - distributed - - fastparquet - - fsspec>=2022.1.0 - - gcsfs>=2022.1.0 - - graphviz # needed for building tutorial notebooks - - h5netcdf - - h5py>=3.3.0 - - hdf5 - - intake - - intake-xarray - - kerchunk>=0.1.1 - - lxml # Optional dep of pydap - - matplotlib # needed for building tutorial notebooks - - netcdf4 - - numcodecs - - numpy - - pandas - - pip - - pydap - # bring back eventually once pynio conda-forge package supports py3.9 and does not - # conflict with ujson, which is a depencency of kerchunk's conda-forge feedstock. - # See: https://github.com/conda-forge/pynio-feedstock/issues/114 - # - pynio - - pytest - - pytest-cov - - pytest-lazy-fixture - - python-graphviz # needed for building tutorial notebooks - - rasterio - - requests - - rechunker>=0.4.2 - - scipy - - s3fs>=2022.1.0 - - setuptools - - toolz - - xarray>=0.18.0 - - zarr>=2.6.0 - - pip: - - nbmake>=1.3.0 # used in tutorial nb worklow - - pytest-timeout diff --git a/pangeo_forge_recipes/storage.py b/pangeo_forge_recipes/storage.py index 42e888f6..94cb8f5b 100644 --- a/pangeo_forge_recipes/storage.py +++ b/pangeo_forge_recipes/storage.py @@ -117,16 +117,13 @@ def exists(self, path: str) -> bool: """Check that the file is in the cache.""" return self.fs.exists(self._full_path(path)) - def rm(self, path: str, recursive: Optional[bool] = False) -> None: + def rm(self, path: str) -> None: """Remove file from the cache.""" - self.fs.rm(self._full_path(path), recursive=recursive) + self.fs.rm(self._full_path(path)) def size(self, path: str) -> int: return self.fs.size(self._full_path(path)) - def makedir(self, path: str) -> None: - self.fs.makedir(self._full_path(path)) - @contextmanager def open(self, path: str, **kwargs) -> Iterator[OpenFileType]: """Open file with a context manager.""" diff --git a/pangeo_forge_recipes/transforms.py b/pangeo_forge_recipes/transforms.py index 23584edf..50b9d409 100644 --- a/pangeo_forge_recipes/transforms.py +++ b/pangeo_forge_recipes/transforms.py @@ -412,6 +412,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: @dataclass class CombineReferences(beam.PTransform): """Combines Kerchunk references into a single reference dataset. + :param concat_dims: Dimensions along which to concatenate inputs. :param identical_dims: Dimensions shared among all inputs. :mzz_kwargs: Additional kwargs to pass to ``kerchunk.combine.MultiZarrToZarr``. @@ -445,28 +446,25 @@ def expand(self, references: beam.PCollection) -> beam.PCollection: @dataclass class WriteCombinedReference(beam.PTransform, ZarrWriterMixin): """Store a singleton PCollection consisting of a ``kerchunk.combine.MultiZarrToZarr`` object. + :param store_name: Name for the Zarr store. It will be created with this name under `target_root`. :param target_root: Root path the Zarr store will be created inside; `store_name` will be appended to this prefix to create a full path. - :param output_file_name: Name to give the output references file (.json or .parquet suffix.) - :param concat_dims: concat_dims kwarg to pass to write_combined_reference if using - .parquet as a storage format. + :param output_json_fname: Name to give the output references file. Must end in ``.json``. """ store_name: str target_root: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = field( default_factory=RequiredAtRuntimeDefault ) - output_file_name: str = "reference.json" - concat_dims: List[str] = field(default_factory=list) + output_json_fname: str = "reference.json" def expand(self, reference: beam.PCollection) -> beam.PCollection: return reference | beam.Map( write_combined_reference, full_target=self.get_full_target(), - concat_dims=self.concat_dims, - output_file_name=self.output_file_name, + output_json_fname=self.output_json_fname, ) diff --git a/pangeo_forge_recipes/writers.py b/pangeo_forge_recipes/writers.py index c2b896c2..4ce5b01e 100644 --- a/pangeo_forge_recipes/writers.py +++ b/pangeo_forge_recipes/writers.py @@ -1,11 +1,9 @@ import os -from typing import List, Protocol, Tuple, Union +from typing import Protocol, Tuple, Union -import fsspec import numpy as np import xarray as xr import zarr -from fsspec.implementations.reference import LazyReferenceMapper from kerchunk.combine import MultiZarrToZarr from .patterns import CombineOp, Index @@ -97,47 +95,23 @@ def store_dataset_fragment( def write_combined_reference( reference: MultiZarrToZarr, full_target: FSSpecTarget, - concat_dims: List[str], - output_file_name: str, - refs_per_component: int = 1000, -) -> FSSpecTarget: + output_json_fname: str, +): """Write a kerchunk combined references object to file.""" import ujson # type: ignore - file_ext = os.path.splitext(output_file_name)[-1] - - outpath = full_target._full_path(output_file_name) + multi_kerchunk = reference.translate() + file_ext = os.path.splitext(output_json_fname)[-1] if file_ext == ".json": - multi_kerchunk = reference.translate() + outpath = os.path.join(full_target.root_path, output_json_fname) with full_target.fs.open(outpath, "wb") as f: f.write(ujson.dumps(multi_kerchunk).encode()) - - elif file_ext == ".parquet": - - # Creates empty parquet store to be written to - if full_target.exists(output_file_name): - full_target.rm(output_file_name, recursive=True) - full_target.makedir(output_file_name) - - # kwargs to pass to MultiZarrToZarr - fs = fsspec.filesystem("file") - out = LazyReferenceMapper.create(refs_per_component, outpath, fs) - - # Calls MultiZarrToZarr on a MultiZarrToZarr object and adds kwargs to write to parquet. - MultiZarrToZarr( - [reference.translate()], concat_dims=concat_dims, remote_protocol="memory", out=out - ).translate() - - # call to write reference to empty parquet store - out.flush() - else: + # TODO: implement parquet writer raise NotImplementedError(f"{file_ext = } not supported.") - return full_target - class ZarrWriterProtocol(Protocol): """Protocol for mixin typing, following best practices described in: diff --git a/tests/test_end_to_end.py b/tests/test_end_to_end.py index d6f78222..e28c902f 100644 --- a/tests/test_end_to_end.py +++ b/tests/test_end_to_end.py @@ -10,7 +10,6 @@ import xarray as xr from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.testing.test_pipeline import TestPipeline -from fsspec.implementations.reference import ReferenceFileSystem from pangeo_forge_recipes.patterns import FilePattern, pattern_from_file_sequence from pangeo_forge_recipes.transforms import ( @@ -82,13 +81,11 @@ def test_xarray_zarr_subpath( xr.testing.assert_equal(ds.load(), daily_xarray_dataset) -@pytest.mark.parametrize("output_file_name", ["reference.json", "reference.parquet"]) def test_reference_netcdf( daily_xarray_dataset, netcdf_local_file_pattern_sequential, pipeline, tmp_target_url, - output_file_name, ): pattern = netcdf_local_file_pattern_sequential store_name = "daily-xarray-dataset" @@ -101,25 +98,12 @@ def test_reference_netcdf( | WriteCombinedReference( target_root=tmp_target_url, store_name=store_name, - concat_dims=["time"], - output_file_name=output_file_name, ) ) - - full_path = os.path.join(tmp_target_url, store_name, output_file_name) - file_ext = os.path.splitext(output_file_name)[-1] - - if file_ext == ".json": - mapper = fsspec.get_mapper("reference://", fo=full_path) - ds = xr.open_dataset(mapper, engine="zarr", backend_kwargs={"consolidated": False}) - xr.testing.assert_equal(ds.load(), daily_xarray_dataset) - - elif file_ext == ".parquet": - fs = ReferenceFileSystem( - full_path, remote_protocol="file", target_protocol="file", lazy=True - ) - ds = xr.open_dataset(fs.get_mapper(), engine="zarr", backend_kwargs={"consolidated": False}) - xr.testing.assert_equal(ds.load(), daily_xarray_dataset) + full_path = os.path.join(tmp_target_url, store_name, "reference.json") + mapper = fsspec.get_mapper("reference://", fo=full_path) + ds = xr.open_dataset(mapper, engine="zarr", backend_kwargs={"consolidated": False}) + xr.testing.assert_equal(ds.load(), daily_xarray_dataset) @pytest.mark.xfail(