diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 90d8bb9d..7f3acec2 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -23,4 +23,5 @@ jobs: - name: Test with pytest shell: bash -l {0} run: | + export ZARR_V3_EXPERIMENTAL_API=1 pytest -v --cov diff --git a/kerchunk/fits.py b/kerchunk/fits.py index 18729a9b..042a7849 100644 --- a/kerchunk/fits.py +++ b/kerchunk/fits.py @@ -3,12 +3,10 @@ import numcodecs import numcodecs.abc import numpy as np -import zarr from fsspec.implementations.reference import LazyReferenceMapper - -from kerchunk.utils import class_factory +from kerchunk.utils import class_factory, _zarr_open from kerchunk.codecs import AsciiTableCodec, VarArrCodec try: @@ -40,6 +38,7 @@ def process_file( inline_threshold=100, primary_attr_to_group=False, out=None, + zarr_version=None, ): """ Create JSON references for a single FITS file as a zarr group @@ -62,7 +61,9 @@ def process_file( This allows you to supply an fsspec.implementations.reference.LazyReferenceMapper to write out parquet as the references get filled, or some other dictionary-like class to customise how references get stored - + zarr_version: int + The desired zarr spec version to target (currently 2 or 3). The default + of None will use the default zarr version. Returns ------- @@ -72,7 +73,7 @@ def process_file( storage_options = storage_options or {} out = out or {} - g = zarr.open(out) + g = _zarr_open(out, zarr_version=zarr_version) with fsspec.open(url, mode="rb", **storage_options) as f: infile = fits.open(f, do_not_scale_image_data=True) diff --git a/kerchunk/grib2.py b/kerchunk/grib2.py index e3a927b3..6025a710 100644 --- a/kerchunk/grib2.py +++ b/kerchunk/grib2.py @@ -20,11 +20,10 @@ ) import fsspec -import zarr import xarray import numpy as np -from kerchunk.utils import class_factory, _encode_for_JSON +from kerchunk.utils import class_factory, _encode_for_JSON, _zarr_init_group_and_store from kerchunk.codecs import GRIBCodec from kerchunk.combine import MultiZarrToZarr, drop @@ -113,6 +112,7 @@ def scan_grib( inline_threshold=100, skip=0, filter={}, + zarr_version=None, ): """ Generate references for a GRIB2 file @@ -134,6 +134,9 @@ def scan_grib( the exact value or is in the given set, are processed. E.g., the cf-style filter ``{'typeOfLevel': 'heightAboveGround', 'level': 2}`` only keeps messages where heightAboveGround==2. + zarr_version: int + The desired zarr spec version to target (currently 2 or 3). The default + of None will use the default zarr version. Returns ------- @@ -192,7 +195,7 @@ def scan_grib( if good is False: continue - z = zarr.open_group(store) + z, store = _zarr_init_group_and_store(store, zarr_version=zarr_version) global_attrs = { f"GRIB_{k}": m[k] for k in cfgrib.dataset.GLOBAL_ATTRIBUTES_KEYS @@ -399,7 +402,7 @@ def grib_tree( # TODO allow passing a LazyReferenceMapper as output? zarr_store = {} - zroot = zarr.open_group(store=zarr_store) + zroot, zarr_store = _zarr_init_group_and_store(zarr_store, overwrite=False) aggregations: Dict[str, List] = defaultdict(list) aggregation_dims: Dict[str, Set] = defaultdict(set) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 549923d4..05831864 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -10,7 +10,7 @@ import numcodecs from .codecs import FillStringsCodec -from .utils import _encode_for_JSON +from .utils import _encode_for_JSON, encode_fill_value, _zarr_init_group_and_store try: import h5py @@ -21,12 +21,6 @@ "for more details." ) -try: - from zarr.meta import encode_fill_value -except ModuleNotFoundError: - # https://github.com/zarr-developers/zarr-python/issues/2021 - from zarr.v2.meta import encode_fill_value - lggr = logging.getLogger("h5-to-zarr") _HIDDEN_ATTRS = { # from h5netcdf.attrs "REFERENCE_LIST", @@ -71,10 +65,10 @@ class SingleHdf5ToZarr: encode: save the ID-to-value mapping in a codec, to produce the real values at read time; requires this library to be available. Can be efficient storage where there are few unique values. - out: dict-like or None + out: dict-like, StoreLike, or None This allows you to supply an fsspec.implementations.reference.LazyReferenceMapper - to write out parquet as the references get filled, or some other dictionary-like class - to customise how references get stored + or a ZarrV3 StoreLike to write out parquet as the references get filled, + or some other dictionary-like class to customise how references get stored """ def __init__( @@ -87,6 +81,7 @@ def __init__( error="warn", vlen_encode="embed", out=None, + zarr_version=None, ): # Open HDF5 file in read mode... @@ -111,9 +106,9 @@ def __init__( if vlen_encode not in ["embed", "null", "leave", "encode"]: raise NotImplementedError self.vlen = vlen_encode - self.store = out or {} - self._zroot = zarr.group(store=self.store, overwrite=True) - + self._zroot, self.store = _zarr_init_group_and_store( + out or {}, zarr_version=zarr_version or 2 + ) self._uri = url self.error = error lggr.debug(f"HDF5 file URI: {self._uri}") diff --git a/kerchunk/netCDF3.py b/kerchunk/netCDF3.py index d43b6b97..a0b1f262 100644 --- a/kerchunk/netCDF3.py +++ b/kerchunk/netCDF3.py @@ -5,7 +5,11 @@ from fsspec.implementations.reference import LazyReferenceMapper import fsspec -from kerchunk.utils import _encode_for_JSON, inline_array +from kerchunk.utils import ( + _encode_for_JSON, + inline_array, + _zarr_open, +) try: from scipy.io._netcdf import ZERO, NC_VARIABLE, netcdf_file, netcdf_variable @@ -31,6 +35,7 @@ def __init__( inline_threshold=100, max_chunk_size=0, out=None, + zarr_version=None, **kwargs, ): """ @@ -52,6 +57,9 @@ def __init__( This allows you to supply an fsspec.implementations.reference.LazyReferenceMapper to write out parquet as the references get filled, or some other dictionary-like class to customise how references get stored + zarr_version: int + The desired zarr spec version to target (currently 2 or 3). The default + of None will use the default zarr version. args, kwargs: passed to scipy superclass ``scipy.io.netcdf.netcdf_file`` """ assert kwargs.pop("mmap", False) is False @@ -63,6 +71,7 @@ def __init__( self.chunks = {} self.threshold = inline_threshold self.max_chunk_size = max_chunk_size + self.zarr_version = zarr_version self.out = out or {} self.storage_options = storage_options self.fp = fsspec.open(filename, **(storage_options or {})).open() @@ -164,10 +173,9 @@ def translate(self): Parameters ---------- """ - import zarr out = self.out - z = zarr.open(out, mode="w") + zroot = _zarr_open(out, mode="w") for dim, var in self.variables.items(): if dim in self.chunks: shape = self.chunks[dim][-1] @@ -191,18 +199,25 @@ def translate(self): fill = float(fill) if fill is not None and var.data.dtype.kind == "i": fill = int(fill) - arr = z.create_dataset( + arr = zroot.create_dataset( name=dim, shape=shape, dtype=var.data.dtype, fill_value=fill, chunks=shape, compression=None, + overwrite=True, ) - part = ".".join(["0"] * len(shape)) or "0" - k = f"{dim}/{part}" - out[k] = [ - self.filename, + + if self.zarr_version == 3: + part = "/".join(["0"] * len(shape)) or "0" + key = f"data/root/{dim}/c{part}" + else: + part = ".".join(["0"] * len(shape)) or "0" + + key = f"{dim}/{part}" + + self.out[key] = [self.filename] + [ int(self.chunks[dim][0]), int(self.chunks[dim][1]), ] @@ -245,13 +260,14 @@ def translate(self): fill = float(fill) if fill is not None and base.kind == "i": fill = int(fill) - arr = z.create_dataset( + arr = zroot.create_dataset( name=name, shape=shape, dtype=base, fill_value=fill, chunks=(1,) + dtype.shape, compression=None, + overwrite=True, ) arr.attrs.update( { @@ -266,18 +282,33 @@ def translate(self): arr.attrs["_ARRAY_DIMENSIONS"] = list(var.dimensions) - suffix = ( - ("." + ".".join("0" for _ in dtype.shape)) if dtype.shape else "" - ) + if self.zarr_version == 3: + suffix = ( + ("/" + "/".join("0" for _ in dtype.shape)) + if dtype.shape + else "" + ) + else: + suffix = ( + ("." + ".".join("0" for _ in dtype.shape)) + if dtype.shape + else "" + ) + for i in range(outer_shape): - out[f"{name}/{i}{suffix}"] = [ + if self.zarr_version == 3: + key = f"data/root/{name}/c{i}{suffix}" + else: + key = f"{name}/{i}{suffix}" + + self.out[key] = [ self.filename, int(offset + i * dt.itemsize), int(dtype.itemsize), ] offset += dtype.itemsize - z.attrs.update( + zroot.attrs.update( { k: v.decode() if isinstance(v, bytes) else str(v) for k, v in self._attributes.items() diff --git a/kerchunk/tests/test_fits.py b/kerchunk/tests/test_fits.py index 14ea6fc0..d06e68d4 100644 --- a/kerchunk/tests/test_fits.py +++ b/kerchunk/tests/test_fits.py @@ -13,12 +13,13 @@ var = os.path.join(testdir, "variable_length_table.fits") -def test_ascii_table(): +@pytest.mark.parametrize("zarr_version", [2]) +def test_ascii_table(zarr_version): # this one directly hits a remote server - should cache? url = "https://fits.gsfc.nasa.gov/samples/WFPC2u5780205r_c0fx.fits" - out = kerchunk.fits.process_file(url, extension=1) + out = kerchunk.fits.process_file(url, extension=1, zarr_version=zarr_version) m = fsspec.get_mapper("reference://", fo=out, remote_protocol="https") - g = zarr.open(m) + g = zarr.open(m, zarr_version=zarr_version) arr = g["u5780205r_cvt.c0h.tab"][:] with fsspec.open( "https://fits.gsfc.nasa.gov/samples/WFPC2u5780205r_c0fx.fits" @@ -28,10 +29,11 @@ def test_ascii_table(): assert list(hdu.data.astype(arr.dtype) == arr) == [True, True, True, True] -def test_binary_table(): - out = kerchunk.fits.process_file(btable, extension=1) +@pytest.mark.parametrize("zarr_version", [2, 3]) +def test_binary_table(zarr_version): + out = kerchunk.fits.process_file(btable, extension=1, zarr_version=zarr_version) m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) + z = zarr.open(m, zarr_version=zarr_version) arr = z["1"] with open(btable, "rb") as f: hdul = fits.open(f) @@ -45,10 +47,11 @@ def test_binary_table(): ).all() # string come out as bytes -def test_cube(): - out = kerchunk.fits.process_file(range_im) +@pytest.mark.parametrize("zarr_version", [2]) +def test_cube(zarr_version): + out = kerchunk.fits.process_file(range_im, zarr_version=zarr_version) m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) + z = zarr.open(m, zarr_version=zarr_version) arr = z["PRIMARY"] with open(range_im, "rb") as f: hdul = fits.open(f) @@ -56,12 +59,13 @@ def test_cube(): assert (arr[:] == expected).all() -def test_with_class(): +@pytest.mark.parametrize("zarr_version", [2]) +def test_with_class(zarr_version): ftz = kerchunk.fits.FitsToZarr(range_im) out = ftz.translate() assert "fits" in repr(ftz) m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) + z = zarr.open(m, zarr_version=zarr_version) arr = z["PRIMARY"] with open(range_im, "rb") as f: hdul = fits.open(f) @@ -69,14 +73,15 @@ def test_with_class(): assert (arr[:] == expected).all() -def test_var(): +@pytest.mark.parametrize("zarr_version", [2]) +def test_var(zarr_version): data = fits.open(var)[1].data expected = [_.tolist() for _ in data["var"]] ftz = kerchunk.fits.FitsToZarr(var) out = ftz.translate() m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) + z = zarr.open(m, zarr_version=zarr_version) arr = z["1"] vars = [_.tolist() for _ in arr["var"]] diff --git a/kerchunk/tests/test_grib.py b/kerchunk/tests/test_grib.py index f24f2974..20ee2e89 100644 --- a/kerchunk/tests/test_grib.py +++ b/kerchunk/tests/test_grib.py @@ -21,14 +21,19 @@ here = os.path.dirname(__file__) -def test_one(): +@pytest.mark.parametrize("zarr_version", [2]) +def test_one(zarr_version): # from https://dd.weather.gc.ca/model_gem_regional/10km/grib2/00/000 fn = os.path.join(here, "CMC_reg_DEPR_ISBL_10_ps10km_2022072000_P000.grib2") - out = scan_grib(fn) + out = scan_grib(fn, zarr_version=zarr_version) ds = xr.open_dataset( "reference://", engine="zarr", - backend_kwargs={"consolidated": False, "storage_options": {"fo": out[0]}}, + backend_kwargs={ + "consolidated": False, + "zarr_version": zarr_version, + "storage_options": {"fo": out[0]}, + }, ) assert ds.attrs["GRIB_centre"] == "cwao" diff --git a/kerchunk/tests/test_hdf.py b/kerchunk/tests/test_hdf.py index a20be2ad..a92dbbc0 100644 --- a/kerchunk/tests/test_hdf.py +++ b/kerchunk/tests/test_hdf.py @@ -14,18 +14,29 @@ here = osp.dirname(__file__) -def test_single(): +@pytest.mark.parametrize("zarr_version", [2, 3]) +def test_single(zarr_version): """Test creating references for a single HDF file""" url = "s3://noaa-nwm-retro-v2.0-pds/full_physics/2017/201704010000.CHRTOUT_DOMAIN1.comp" so = dict(anon=True, default_fill_cache=False, default_cache_type="none") with fsspec.open(url, **so) as f: - h5chunks = SingleHdf5ToZarr(f, url, storage_options=so) + h5chunks = SingleHdf5ToZarr( + f, url, storage_options=so, zarr_version=zarr_version + ) test_dict = h5chunks.translate() m = fsspec.get_mapper( "reference://", fo=test_dict, remote_protocol="s3", remote_options=so ) - ds = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False)) + if zarr_version == 2: + assert ".zgroup" in test_dict["refs"] + elif zarr_version == 3: + assert "zarr.json" in test_dict["refs"] + assert "meta/root.group.json" in test_dict["refs"] + + backend_kwargs = {"zarr_version": zarr_version, "consolidated": False} + # TODO: drop consolidated kwarg for v3 stores + ds = xr.open_dataset(m, engine="zarr", backend_kwargs=backend_kwargs) with fsspec.open(url, **so) as f: expected = xr.open_dataset(f, engine="h5netcdf") diff --git a/kerchunk/tests/test_netcdf.py b/kerchunk/tests/test_netcdf.py index 43b6021b..652746d4 100644 --- a/kerchunk/tests/test_netcdf.py +++ b/kerchunk/tests/test_netcdf.py @@ -24,15 +24,17 @@ ) -def test_one(m): +@pytest.mark.parametrize("zarr_version", [2]) +def test_one(m, zarr_version): m.pipe("data.nc3", bdata) - h = netCDF3.netcdf_recording_file("memory://data.nc3") + h = netCDF3.netcdf_recording_file("memory://data.nc3", zarr_version=zarr_version) out = h.translate() ds = xr.open_dataset( "reference://", engine="zarr", backend_kwargs={ "consolidated": False, + "zarr_version": zarr_version, "storage_options": {"fo": out, "remote_protocol": "memory"}, }, ) @@ -76,16 +78,18 @@ def unlimited_dataset(tmpdir): return fn -def test_unlimited(unlimited_dataset): +@pytest.mark.parametrize("zarr_version", [2]) +def test_unlimited(unlimited_dataset, zarr_version): fn = unlimited_dataset expected = xr.open_dataset(fn, engine="scipy") - h = netCDF3.NetCDF3ToZarr(fn) + h = netCDF3.NetCDF3ToZarr(fn, zarr_version=zarr_version) out = h.translate() ds = xr.open_dataset( "reference://", engine="zarr", backend_kwargs={ "consolidated": False, + "zarr_version": zarr_version, "storage_options": {"fo": out}, }, ) diff --git a/kerchunk/tests/test_utils.py b/kerchunk/tests/test_utils.py index a1bb094d..a5791168 100644 --- a/kerchunk/tests/test_utils.py +++ b/kerchunk/tests/test_utils.py @@ -2,6 +2,7 @@ import fsspec import json + import kerchunk.utils import kerchunk.zarr import numpy as np @@ -170,3 +171,21 @@ def test_deflate_zip_archive(m): fs = fsspec.filesystem("reference", fo=refs2) assert dec.decode(fs.cat("b")) == data + + +@pytest.mark.parametrize("zarr_version", [2, 3]) +def test_zarr_open(zarr_version): + zarr_store = kerchunk.utils._zarr_open({}, zarr_version=zarr_version, mode="w") + assert isinstance(zarr_store, zarr.hierarchy.Group) + zarr_store = kerchunk.utils._zarr_open({}, zarr_version=zarr_version, mode="a") + assert isinstance(zarr_store, zarr.hierarchy.Group) + + +@pytest.mark.parametrize("zarr_version", [2, 3]) +def test_init_group_and_store(zarr_version): + store = {} + zroot, store_returned = kerchunk.utils._zarr_init_group_and_store( + store, zarr_version=zarr_version + ) + assert isinstance(zroot, zarr.hierarchy.Group) + assert store_returned is store diff --git a/kerchunk/tiff.py b/kerchunk/tiff.py index 7c6eccdb..2ecbeb95 100644 --- a/kerchunk/tiff.py +++ b/kerchunk/tiff.py @@ -14,7 +14,9 @@ import kerchunk.utils -def tiff_to_zarr(urlpath, remote_options=None, target=None, target_options=None): +def tiff_to_zarr( + urlpath, remote_options=None, target=None, target_options=None, zarr_version=None +): """Wraps TIFFFile's fsspec writer to extract metadata as attributes Parameters @@ -27,12 +29,16 @@ def tiff_to_zarr(urlpath, remote_options=None, target=None, target_options=None) Write JSON to this location. If not given, no file is output target_options: dict pass these to fsspec when opening target + zarr_version: int Returns ------- references dict """ + if zarr_version not in [2, None]: + raise ValueError("zarr_version not implemented for tiff_to_zarr") + with fsspec.open(urlpath, **(remote_options or {})) as of: url, name = urlpath.rsplit("/", 1) diff --git a/kerchunk/utils.py b/kerchunk/utils.py index 838c3cb1..b9ebe883 100644 --- a/kerchunk/utils.py +++ b/kerchunk/utils.py @@ -3,11 +3,22 @@ import itertools import warnings +import fsspec.asyn import ujson import fsspec import zarr +try: + from zarr.store import StorePath, MemoryStore + from zarr.v2.hierarchy import group + import zarr.array + from zarr.buffer import default_buffer_prototype + + _ZARR_VERSION = 3 +except ModuleNotFoundError: + _ZARR_VERSION = 2 + def class_factory(func): """Experimental uniform API across function-based file scanners""" @@ -53,6 +64,20 @@ def consolidate(refs): return {"version": 1, "refs": out} +def encode_fill_value(v, dtype, object_codec=None): + if _ZARR_VERSION == 3: + # Precarious use of this function + # https://github.com/zarr-developers/zarr-python/issues/2021 + # https://github.com/zarr-developers/VirtualiZarr/pull/182#discussion_r1673096418 + from zarr.v2.meta import Metadata2 + + return Metadata2.encode_fill_value(v, dtype, object_codec) + else: + from zarr.meta import encode_fill_value as _encode_fill_value + + return _encode_fill_value(v, dtype, object_codec) + + def rename_target(refs, renames): """Utility to change URLs in a reference set in a predictable way @@ -116,9 +141,37 @@ def rename_target_files( ujson.dump(new, f) -def _encode_for_JSON(store): +def _zarr_init_group_and_store(store=None, zarr_version=None, overwrite=True): + """ """ + zarr_version = _default_zarr_version(zarr_version) + if _ZARR_VERSION == 3 and zarr_version == 2: + return group(store, overwrite=True), store + elif _ZARR_VERSION == 3 and zarr_version == 3: + store = store or StorePath(MemoryStore(mode="w")) + return zarr.group(store, overwrite=True), store + else: + return zarr.group(store, overwrite=overwrite, zarr_version=zarr_version), store + + +def _zarr_open(store, zarr_version=None, mode=None): + zarr_version = _default_zarr_version(zarr_version) + if _ZARR_VERSION == 3: + store = store or StorePath(MemoryStore(mode=mode or "w")) + return zarr.open(store, zarr_format=zarr_version) + else: + return zarr.open(store, zarr_version=zarr_version, mode=mode or "a") + + +def _encode_for_JSON(store, zarr_version=None): """Make store JSON encodable""" - for k, v in store.copy().items(): + zarr_version = _default_zarr_version(zarr_version) + if _ZARR_VERSION == 2 or zarr_version == 2: + store = store.copy() + else: + store = fsspec.asyn.sync( + fsspec.asyn.get_loop(), _store_to_dict_with_copy(store.store) + ) + for k, v in store.items(): if isinstance(v, list): continue else: @@ -134,6 +187,18 @@ def _encode_for_JSON(store): return store +async def _store_to_dict_with_copy(store): + """Only works for ZarrV3 stores.""" + result = {} + async for k in store.list(): + result[k] = await store.get(k, default_buffer_prototype) + return result + + +def _default_zarr_version(zarr_version=None): + return zarr_version or 2 + + def do_inline(store, threshold, remote_options=None, remote_protocol=None): """Replace short chunks with the value of that chunk and inline metadata