Skip to content

Commit

Permalink
Merge pull request #391 from martindurant/append
Browse files Browse the repository at this point in the history
Direct Parquet in netCDF3 and HDF
  • Loading branch information
martindurant authored Nov 9, 2023
2 parents fa74e51 + e356937 commit 3c4e9fc
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 63 deletions.
12 changes: 11 additions & 1 deletion kerchunk/fits.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import numpy as np
import zarr

from fsspec.implementations.reference import LazyReferenceMapper


from kerchunk.utils import class_factory
from kerchunk.codecs import AsciiTableCodec, VarArrCodec
Expand Down Expand Up @@ -37,6 +39,7 @@ def process_file(
extension=None,
inline_threshold=100,
primary_attr_to_group=False,
out=None,
):
"""
Create JSON references for a single FITS file as a zarr group
Expand All @@ -55,6 +58,11 @@ def process_file(
primary_attr_to_group: bool
Whether the output top-level group contains the attributes of the primary extension
(which often contains no data, just a general description)
out: dict-like or None
This allows you to supply an fsspec.implementations.reference.LazyReferenceMapper
to write out parquet as the references get filled, or some other dictionary-like class
to customise how references get stored
Returns
-------
Expand All @@ -63,7 +71,7 @@ def process_file(
from astropy.io import fits

storage_options = storage_options or {}
out = {}
out = out or {}
g = zarr.open(out)

with fsspec.open(url, mode="rb", **storage_options) as f:
Expand Down Expand Up @@ -180,6 +188,8 @@ def process_file(
if k != "COMMENT"
}
)
if isinstance(out, LazyReferenceMapper):
out.flush()
return out


Expand Down
51 changes: 25 additions & 26 deletions kerchunk/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Union, BinaryIO

import fsspec.core
from fsspec.implementations.reference import LazyReferenceMapper
import numpy as np
import zarr
from zarr.meta import encode_fill_value
Expand Down Expand Up @@ -64,6 +65,10 @@ class SingleHdf5ToZarr:
encode: save the ID-to-value mapping in a codec, to produce the real values at read
time; requires this library to be available. Can be efficient storage where there
are few unique values.
out: dict-like or None
This allows you to supply an fsspec.implementations.reference.LazyReferenceMapper
to write out parquet as the references get filled, or some other dictionary-like class
to customise how references get stored
"""

def __init__(
Expand All @@ -75,6 +80,7 @@ def __init__(
storage_options=None,
error="warn",
vlen_encode="embed",
out=None,
):

# Open HDF5 file in read mode...
Expand All @@ -92,7 +98,7 @@ def __init__(
self.vlen = vlen_encode
self._h5f = h5py.File(self.input_file, mode="r")

self.store = {}
self.store = out or {}
self._zroot = zarr.group(store=self.store, overwrite=True)

self._uri = url
Expand All @@ -115,10 +121,11 @@ def translate(self):
lggr.debug("Translation begins")
self._transfer_attrs(self._h5f, self._zroot)
self._h5f.visititems(self._translator)
if self.inline > 0:
self._do_inline(self.inline)
if self.spec < 1:
return self.store
elif isinstance(self.store, LazyReferenceMapper):
self.store.flush()
return self.store
else:
store = _encode_for_JSON(self.store)
return {"version": 1, "refs": store}
Expand All @@ -127,24 +134,6 @@ def _unref(self, ref):
name = h5py.h5r.get_name(ref, self._h5f.id)
return self._h5f[name]

def _do_inline(self, threshold):
"""Replace short chunks with the value of that chunk
The chunk may need encoding with base64 if not ascii, so actual
length may be larger than threshold.
"""
# TODO: use version in utils
for k, v in self.store.copy().items():
if isinstance(v, list) and v[2] < threshold:
self.input_file.seek(v[1])
data = self.input_file.read(v[2])
try:
# easiest way to test if data is ascii
data.decode("ascii")
except UnicodeDecodeError:
data = b"base64:" + base64.b64encode(data)
self.store[k] = data

def _transfer_attrs(
self,
h5obj: Union[h5py.Dataset, h5py.Group],
Expand Down Expand Up @@ -461,11 +450,21 @@ def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]):
if h5obj.fletcher32:
logging.info("Discarding fletcher32 checksum")
v["size"] -= 4
self.store[za._chunk_key(k)] = [
self._uri,
v["offset"],
v["size"],
]
if self.inline and isinstance(v, list) and v[2] < self.inline:
self.input_file.seek(v["offset"])
data = self.input_file.read(v["size"])
try:
# easiest way to test if data is ascii
data.decode("ascii")
except UnicodeDecodeError:
data = b"base64:" + base64.b64encode(data)
self.store[k] = data
else:
self.store[za._chunk_key(k)] = [
self._uri,
v["offset"],
v["size"],
]

elif isinstance(h5obj, h5py.Group):
lggr.debug(f"HDF5 group: {h5obj.name}")
Expand Down
76 changes: 46 additions & 30 deletions kerchunk/netCDF3.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import base64
from functools import reduce
from operator import mul

import numpy as np
from .utils import do_inline, _encode_for_JSON
from fsspec.implementations.reference import LazyReferenceMapper
import fsspec

from kerchunk.utils import _encode_for_JSON

try:
from scipy.io._netcdf import ZERO, NC_VARIABLE, netcdf_file, netcdf_variable
Expand All @@ -12,8 +16,6 @@
"`pip/conda install scipy`. See https://scipy.org/install/ for more details."
)

import fsspec


class NetCDF3ToZarr(netcdf_file):
"""Generate references for a netCDF3 file
Expand All @@ -26,10 +28,10 @@ class NetCDF3ToZarr(netcdf_file):
def __init__(
self,
filename,
*args,
storage_options=None,
inline_threshold=100,
max_chunk_size=0,
out=None,
**kwargs,
):
"""
Expand All @@ -47,6 +49,10 @@ def __init__(
subchunking, and there is never subchunking for coordinate/dimension arrays.
E.g., if an array contains 10,000bytes, and this value is 6000, there will
be two output chunks, split on the biggest available dimension. [TBC]
out: dict-like or None
This allows you to supply an fsspec.implementations.reference.LazyReferenceMapper
to write out parquet as the references get filled, or some other dictionary-like class
to customise how references get stored
args, kwargs: passed to scipy superclass ``scipy.io.netcdf.netcdf_file``
"""
assert kwargs.pop("mmap", False) is False
Expand All @@ -58,22 +64,20 @@ def __init__(
self.chunks = {}
self.threshold = inline_threshold
self.max_chunk_size = max_chunk_size
self.out = {}
self.out = out or {}
self.storage_options = storage_options
with fsspec.open(filename, **(storage_options or {})) as fp:
magic = fp.read(4)
assert magic[:3] == b"CDF"
version = magic[3]
fp.seek(0)
super().__init__(
fp,
*args,
mmap=False,
mode="r",
maskandscale=False,
version=version,
**kwargs,
)
self.fp = fsspec.open(filename, **(storage_options or {})).open()
magic = self.fp.read(4)
assert magic[:3] == b"CDF"
version = kwargs.pop("version", None) or magic[3]
self.fp.seek(0)
super().__init__(
self.fp,
mmap=False,
mode="r",
maskandscale=False,
version=version,
)
self.filename = filename # this becomes an attribute, so must ignore on write

def _read_var_array(self):
Expand Down Expand Up @@ -197,11 +201,22 @@ def translate(self):
compression=None,
)
part = ".".join(["0"] * len(shape)) or "0"
out[f"{dim}/{part}"] = [
self.filename,
int(self.chunks[dim][0]),
int(self.chunks[dim][1]),
]
k = f"{dim}/{part}"
if self.threshold and int(self.chunks[dim][1]) < self.threshold:
self.fp.seek(int(self.chunks[dim][0]))
data = self.fp.read(int(self.chunks[dim][1]))
try:
# easiest way to test if data is ascii
data.decode("ascii")
except UnicodeDecodeError:
data = b"base64:" + base64.b64encode(data)
out[k] = data
else:
out[k] = [
self.filename,
int(self.chunks[dim][0]),
int(self.chunks[dim][1]),
]
arr.attrs.update(
{
k: v.decode() if isinstance(v, bytes) else str(v)
Expand Down Expand Up @@ -280,11 +295,12 @@ def translate(self):
}
)

if self.threshold > 0:
out = do_inline(out, self.threshold, remote_options=self.storage_options)
out = _encode_for_JSON(out)

return {"version": 1, "refs": out}
if isinstance(out, LazyReferenceMapper):
out.flush()
return out
else:
out = _encode_for_JSON(out)
return {"version": 1, "refs": out}


netcdf_recording_file = NetCDF3ToZarr
netcdf_recording_file = NetCDF3ToZarr # old name
33 changes: 27 additions & 6 deletions kerchunk/zarr.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,52 @@
import fsspec
from fsspec.implementations.reference import LazyReferenceMapper

from kerchunk.utils import do_inline, class_factory
from kerchunk.utils import class_factory


def single_zarr(uri_or_store, storage_options=None, inline_threshold=100, inline=None):
def single_zarr(
uri_or_store, storage_options=None, inline_threshold=100, inline=None, out=None
):
"""kerchunk-style view on zarr mapper
This is a similar process to zarr's consolidate_metadata, but does not
need to be held in the original file tree. You do not need zarr itself
to do this.
This is useful for testing, so that we can pass hand-made zarrs to combine.
Parameters
----------
uri_or_store: str or dict-like
storage_options: dict or None
given to fsspec
out: dict-like or None
This allows you to supply an fsspec.implementations.reference.LazyReferenceMapper
to write out parquet as the references get filled, or some other dictionary-like class
to customise how references get stored
Returns
-------
reference dict like
"""
inline_threshold = inline or inline_threshold
if isinstance(uri_or_store, str):
mapper = fsspec.get_mapper(uri_or_store, **(storage_options or {}))
else:
mapper = uri_or_store

refs = {}
refs = out or {}
for k in mapper:
if k.startswith("."):
refs[k] = mapper[k]
else:
refs[k] = [fsspec.utils._unstrip_protocol(mapper._key_to_str(k), mapper.fs)]
if inline_threshold:
refs = do_inline(refs, inline_threshold, remote_options=storage_options)
# from kerchunk.utils import do_inline
# inline_threshold = inline or inline_threshold
# if inline_threshold:
# # this never does anything since we don't have the chunk sizes
# refs = do_inline(refs, inline_threshold, remote_options=storage_options)
if isinstance(refs, LazyReferenceMapper):
refs.flush()
return refs


Expand Down

0 comments on commit 3c4e9fc

Please sign in to comment.