Skip to content

Commit

Permalink
allow anndata tables streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
lorenzocerrone committed Nov 14, 2024
1 parent ebb8c1b commit 8a809ea
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 4 deletions.
14 changes: 11 additions & 3 deletions docs/notebooks/basic_usage.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@
"outputs": [],
"source": [
"from ngio.core import NgffImage\n",
"ngff_image = NgffImage(\"../../data/20200812-CardiomyocyteDifferentiation14-Cycle1_mip.zarr/B/03/0\")"
"from ngio.core.utils import get_fsspec_http_store\n",
"\n",
"# Ngio can stream data from any fsspec-compatible store\n",
"url = \"https://raw.githubusercontent.com/fractal-analytics-platform/fractal-ome-zarr-examples/refs/heads/main/v04/20200812-CardiomyocyteDifferentiation14-Cycle1_B_03_mip.zarr/\"\n",
"store = get_fsspec_http_store(url)\n",
"ngff_image = NgffImage(store, \"r\")"
]
},
{
Expand All @@ -47,8 +52,11 @@
"outputs": [],
"source": [
"# Explore object metadata\n",
"print(\"Levels: \", ngff_image.levels_paths)\n",
"print(\"Num Levels: \", ngff_image.num_levels)"
"print(ngff_image)\n",
"image = ngff_image.get_image()\n",
"print(image)\n",
"nuclei = ngff_image.label.get_label(\"nuclei\")\n",
"print(nuclei)\n"
]
},
{
Expand Down
12 changes: 12 additions & 0 deletions src/ngio/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from enum import Enum
from typing import Any

import fsspec.implementations.http

from ngio.io import Group, StoreLike
from ngio.ngff_meta import (
ImageLabelMeta,
Expand All @@ -24,6 +26,16 @@
Lock = None


def get_fsspec_http_store(
url: str, client_kwargs: dict | None = None
) -> fsspec.mapping.FSMap:
"""Simple function to get an http fsspec store from a url."""
client_kwargs = {} if client_kwargs is None else client_kwargs
fs = fsspec.implementations.http.HTTPFileSystem(client_kwargs=client_kwargs)
store = fs.get_mapper(url)
return store


class State(Enum):
"""The state of an object.
Expand Down
5 changes: 5 additions & 0 deletions src/ngio/ngff_meta/fractal_image_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ def lazy_init(
channel_visualization = ChannelVisualisation.lazy_init(
color=color, start=start, end=end, active=active, data_type=data_type
)

if wavelength_id is None:
# TODO Evaluate if a better default value can be used
wavelength_id = label

return cls(
label=label,
wavelength_id=wavelength_id,
Expand Down
78 changes: 78 additions & 0 deletions src/ngio/tables/_ad_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

import zarr
from anndata import AnnData
from anndata._io.specs import read_elem
from anndata._io.utils import _read_legacy_raw
from anndata._io.zarr import read_dataframe
from anndata.compat import _clean_uns
from anndata.experimental import read_dispatched

from ngio.io import open_group_wrapper

if TYPE_CHECKING:
from collections.abc import Callable

from ngio.io import StoreOrGroup


def custom_read_zarr(store: StoreOrGroup) -> AnnData:
"""Read from a hierarchical Zarr array store.
# Implementation originally from https://github.com/scverse/anndata/blob/main/src/anndata/_io/zarr.py
# Original implementation would not work with remote storages so we had to copy it
# here and slightly modified it to work with remote storages.
Args:
store (StoreOrGroup): A store or group to read the AnnData from.
"""
group = open_group_wrapper(store=store, mode="r")

# Read with handling for backwards compat
def callback(func: Callable, elem_name: str, elem: Any, iospec: Any) -> Any:
if iospec.encoding_type == "anndata" or elem_name.endswith("/"):
ad_kwargs = {}
base_elem = [
"obs",
"var",
"obsm",
"varm",
"obsp",
"varp",
"uns",
"layers",
"X",
]
# This should make sure that the function behaves the same as the original
# implementation.
base_elem += list(elem.keys())
for k in base_elem:
v = elem.get(k)
if v is not None and not k.startswith("raw."):
ad_kwargs[k] = read_dispatched(v, callback)
return AnnData(**ad_kwargs)

elif elem_name.startswith("/raw."):
return None
elif elem_name in {"/obs", "/var"}:
return read_dataframe(elem)
elif elem_name == "/raw":
# Backwards compat
return _read_legacy_raw(group, func(elem), read_dataframe, func)
return func(elem)

adata = read_dispatched(group, callback=callback)

# Backwards compat (should figure out which version)
if "raw.X" in group:
raw = AnnData(**_read_legacy_raw(group, adata.raw, read_dataframe, read_elem))
raw.obs_names = adata.obs_names
adata.raw = raw

# Backwards compat for <0.7
if isinstance(group["obs"], zarr.Array):
_clean_uns(adata)

return adata
3 changes: 2 additions & 1 deletion src/ngio/tables/v1/_generic_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pydantic import BaseModel

from ngio.core.utils import State
from ngio.tables._ad_reader import custom_read_zarr
from ngio.tables._utils import Validator, table_ad_to_df, table_df_to_ad, validate_table

REQUIRED_COLUMNS = [
Expand Down Expand Up @@ -77,7 +78,7 @@ def __init__(
self._index_type = index_type
self._validators = validators

table_ad = ad.read_zarr(self._table_group)
table_ad = custom_read_zarr(store=group)

self._table = table_ad_to_df(
table_ad=table_ad,
Expand Down

0 comments on commit 8a809ea

Please sign in to comment.