Skip to content

Commit

Permalink
Add first pass at grib zarr 3 codec
Browse files Browse the repository at this point in the history
  • Loading branch information
mpiannucci committed Oct 26, 2024
1 parent 5019b15 commit d96cf46
Showing 1 changed file with 84 additions and 3 deletions.
87 changes: 84 additions & 3 deletions kerchunk/codecs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
import ast
from dataclasses import dataclass
import io
from typing import TYPE_CHECKING

import numcodecs
from numcodecs.abc import Codec
import numpy as np
import threading
import zlib
from zarr.abc.codec import ArrayBytesCodec
from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer
from zarr.core.common import JSON, parse_enum, parse_named_configuration
from zarr.registry import register_codec

if TYPE_CHECKING:
from typing import Self

from zarr.core.array_spec import ArraySpec


class FillStringsCodec(Codec):
Expand Down Expand Up @@ -115,6 +126,78 @@ def decode(self, buf, out=None):
numcodecs.register_codec(GRIBCodec, "grib")


@dataclass(frozen=True)
class GRIBZarrCodec(ArrayBytesCodec):
eclock = threading.RLock()

var: str
dtype: np.dtype

def __init__(self, *, var: str, dtype: np.dtype) -> None:
object.__setattr__(self, "var", var)
object.__setattr__(self, "dtype", dtype)

@classmethod
def from_dict(cls, data: dict[str, JSON]) -> Self:
_, configuration_parsed = parse_named_configuration(
data, "bytes", require_configuration=True
)
configuration_parsed = configuration_parsed or {}
return cls(**configuration_parsed) # type: ignore[arg-type]

def to_dict(self) -> dict[str, JSON]:
if self.endian is None:
return {"name": "grib"}
else:
return {
"name": "grib",
"configuration": {"var": self.var, "dtype": self.dtype},
}

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
assert isinstance(chunk_bytes, Buffer)
import eccodes

if self.var in ["latitude", "longitude"]:
var = self.var + "s"
dt = self.dtype or "float64"
else:
var = "values"
dt = self.dtype or "float32"

with self.eclock:
mid = eccodes.codes_new_from_message(chunk_bytes.to_bytes())
try:
data = eccodes.codes_get_array(mid, var)
missingValue = eccodes.codes_get_string(mid, "missingValue")
if var == "values" and missingValue:
data[data == float(missingValue)] = np.nan
return data.astype(dt, copy=False)

finally:
eccodes.codes_release(mid)

async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
# This is a one way codec
raise NotImplementedError

def compute_encoded_size(
self, input_byte_length: int, _chunk_spec: ArraySpec
) -> int:
raise NotImplementedError


register_codec("grib", GRIBZarrCodec)


class AsciiTableCodec(numcodecs.abc.Codec):
"""Decodes ASCII-TABLE extensions in FITS files"""

Expand Down Expand Up @@ -166,7 +249,6 @@ def decode(self, buf, out=None):
arr2 = np.empty((self.nrow,), dtype=dt_out)
heap = buf[arr.nbytes :]
for name in dt_out.names:

if dt_out[name] == "O":
dt = np.dtype(self.ftypes[self.types[name]])
counts = arr[name][:, 0]
Expand Down Expand Up @@ -244,8 +326,7 @@ def encode(self, buf):
class ZlibCodec(Codec):
codec_id = "zlib"

def __init__(self):
...
def __init__(self): ...

def decode(self, data, out=None):
if out:
Expand Down

0 comments on commit d96cf46

Please sign in to comment.