Skip to content

Commit

Permalink
Merge pull request #615 from pangeo-forge/revert-594-kerchunk-parquet
Browse files Browse the repository at this point in the history
Revert "Adding parquet reference file storage"
  • Loading branch information
norlandrhagen authored Sep 12, 2023
2 parents 4928835 + 8d4afc4 commit c292777
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 221 deletions.
52 changes: 0 additions & 52 deletions ci/py3.10.yml

This file was deleted.

52 changes: 0 additions & 52 deletions ci/py3.11.yml

This file was deleted.

52 changes: 0 additions & 52 deletions ci/py3.9.yml

This file was deleted.

7 changes: 2 additions & 5 deletions pangeo_forge_recipes/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,13 @@ def exists(self, path: str) -> bool:
"""Check that the file is in the cache."""
return self.fs.exists(self._full_path(path))

def rm(self, path: str, recursive: Optional[bool] = False) -> None:
def rm(self, path: str) -> None:
"""Remove file from the cache."""
self.fs.rm(self._full_path(path), recursive=recursive)
self.fs.rm(self._full_path(path))

def size(self, path: str) -> int:
return self.fs.size(self._full_path(path))

def makedir(self, path: str) -> None:
self.fs.makedir(self._full_path(path))

@contextmanager
def open(self, path: str, **kwargs) -> Iterator[OpenFileType]:
"""Open file with a context manager."""
Expand Down
12 changes: 5 additions & 7 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
@dataclass
class CombineReferences(beam.PTransform):
"""Combines Kerchunk references into a single reference dataset.
:param concat_dims: Dimensions along which to concatenate inputs.
:param identical_dims: Dimensions shared among all inputs.
:mzz_kwargs: Additional kwargs to pass to ``kerchunk.combine.MultiZarrToZarr``.
Expand Down Expand Up @@ -445,28 +446,25 @@ def expand(self, references: beam.PCollection) -> beam.PCollection:
@dataclass
class WriteCombinedReference(beam.PTransform, ZarrWriterMixin):
"""Store a singleton PCollection consisting of a ``kerchunk.combine.MultiZarrToZarr`` object.
:param store_name: Name for the Zarr store. It will be created with
this name under `target_root`.
:param target_root: Root path the Zarr store will be created inside;
`store_name` will be appended to this prefix to create a full path.
:param output_file_name: Name to give the output references file (.json or .parquet suffix.)
:param concat_dims: concat_dims kwarg to pass to write_combined_reference if using
.parquet as a storage format.
:param output_json_fname: Name to give the output references file. Must end in ``.json``.
"""

store_name: str
target_root: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = field(
default_factory=RequiredAtRuntimeDefault
)
output_file_name: str = "reference.json"
concat_dims: List[str] = field(default_factory=list)
output_json_fname: str = "reference.json"

def expand(self, reference: beam.PCollection) -> beam.PCollection:
return reference | beam.Map(
write_combined_reference,
full_target=self.get_full_target(),
concat_dims=self.concat_dims,
output_file_name=self.output_file_name,
output_json_fname=self.output_json_fname,
)


Expand Down
40 changes: 7 additions & 33 deletions pangeo_forge_recipes/writers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import os
from typing import List, Protocol, Tuple, Union
from typing import Protocol, Tuple, Union

import fsspec
import numpy as np
import xarray as xr
import zarr
from fsspec.implementations.reference import LazyReferenceMapper
from kerchunk.combine import MultiZarrToZarr

from .patterns import CombineOp, Index
Expand Down Expand Up @@ -97,47 +95,23 @@ def store_dataset_fragment(
def write_combined_reference(
reference: MultiZarrToZarr,
full_target: FSSpecTarget,
concat_dims: List[str],
output_file_name: str,
refs_per_component: int = 1000,
) -> FSSpecTarget:
output_json_fname: str,
):
"""Write a kerchunk combined references object to file."""

import ujson # type: ignore

file_ext = os.path.splitext(output_file_name)[-1]

outpath = full_target._full_path(output_file_name)
multi_kerchunk = reference.translate()
file_ext = os.path.splitext(output_json_fname)[-1]

if file_ext == ".json":
multi_kerchunk = reference.translate()
outpath = os.path.join(full_target.root_path, output_json_fname)
with full_target.fs.open(outpath, "wb") as f:
f.write(ujson.dumps(multi_kerchunk).encode())

elif file_ext == ".parquet":

# Creates empty parquet store to be written to
if full_target.exists(output_file_name):
full_target.rm(output_file_name, recursive=True)
full_target.makedir(output_file_name)

# kwargs to pass to MultiZarrToZarr
fs = fsspec.filesystem("file")
out = LazyReferenceMapper.create(refs_per_component, outpath, fs)

# Calls MultiZarrToZarr on a MultiZarrToZarr object and adds kwargs to write to parquet.
MultiZarrToZarr(
[reference.translate()], concat_dims=concat_dims, remote_protocol="memory", out=out
).translate()

# call to write reference to empty parquet store
out.flush()

else:
# TODO: implement parquet writer
raise NotImplementedError(f"{file_ext = } not supported.")

return full_target


class ZarrWriterProtocol(Protocol):
"""Protocol for mixin typing, following best practices described in:
Expand Down
24 changes: 4 additions & 20 deletions tests/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import xarray as xr
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline
from fsspec.implementations.reference import ReferenceFileSystem

from pangeo_forge_recipes.patterns import FilePattern, pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
Expand Down Expand Up @@ -82,13 +81,11 @@ def test_xarray_zarr_subpath(
xr.testing.assert_equal(ds.load(), daily_xarray_dataset)


@pytest.mark.parametrize("output_file_name", ["reference.json", "reference.parquet"])
def test_reference_netcdf(
daily_xarray_dataset,
netcdf_local_file_pattern_sequential,
pipeline,
tmp_target_url,
output_file_name,
):
pattern = netcdf_local_file_pattern_sequential
store_name = "daily-xarray-dataset"
Expand All @@ -101,25 +98,12 @@ def test_reference_netcdf(
| WriteCombinedReference(
target_root=tmp_target_url,
store_name=store_name,
concat_dims=["time"],
output_file_name=output_file_name,
)
)

full_path = os.path.join(tmp_target_url, store_name, output_file_name)
file_ext = os.path.splitext(output_file_name)[-1]

if file_ext == ".json":
mapper = fsspec.get_mapper("reference://", fo=full_path)
ds = xr.open_dataset(mapper, engine="zarr", backend_kwargs={"consolidated": False})
xr.testing.assert_equal(ds.load(), daily_xarray_dataset)

elif file_ext == ".parquet":
fs = ReferenceFileSystem(
full_path, remote_protocol="file", target_protocol="file", lazy=True
)
ds = xr.open_dataset(fs.get_mapper(), engine="zarr", backend_kwargs={"consolidated": False})
xr.testing.assert_equal(ds.load(), daily_xarray_dataset)
full_path = os.path.join(tmp_target_url, store_name, "reference.json")
mapper = fsspec.get_mapper("reference://", fo=full_path)
ds = xr.open_dataset(mapper, engine="zarr", backend_kwargs={"consolidated": False})
xr.testing.assert_equal(ds.load(), daily_xarray_dataset)


@pytest.mark.xfail(
Expand Down

0 comments on commit c292777

Please sign in to comment.