Skip to content

Commit

Permalink
[python] Remove some dead code (#3699)
Browse files Browse the repository at this point in the history
* [python] Remove some dead code

* unit-test `to_json` and `from_json` methods which were lacking coverage

* docstrings per @jp-dark
  • Loading branch information
johnkerl authored Feb 13, 2025
1 parent 6a0a6ad commit 9e27275
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 333 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,14 @@ def get_shape(self) -> int:
return 1 + max(self.data.values())

def to_json(self) -> str:
"""The ``to_json`` and ``from_json`` methods allow you to persist
the registration mappings to disk."""
return json.dumps(self, default=attrs.asdict, sort_keys=True, indent=4)

@classmethod
def from_json(cls, s: str) -> Self:
"""The ``to_json`` and ``from_json`` methods allow you to persist
the registration mappings to disk."""
dikt = json.loads(s)
return cls(**dikt)

Expand Down Expand Up @@ -528,10 +532,14 @@ def get_var_shapes(self) -> Dict[str, int]:
return retval

def to_json(self) -> str:
"""The ``to_json`` and ``from_json`` methods allow you to persist
the registration mappings to disk."""
return json.dumps(self, default=attrs.asdict, sort_keys=True, indent=4)

@classmethod
def from_json(cls, s: str) -> Self:
"""The ``to_json`` and ``from_json`` methods allow you to persist
the registration mappings to disk."""
dikt = json.loads(s)
obs_axis = AxisAmbientLabelMapping(
data=dikt["obs_axis"]["data"], field_name=dikt["obs_axis"]["field_name"]
Expand Down
254 changes: 3 additions & 251 deletions apis/python/src/tiledbsoma/io/_registration/signatures.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,15 @@
# Copyright (c) TileDB, Inc. and The Chan Zuckerberg Initiative Foundation
## Copyright (c) TileDB, Inc. and The Chan Zuckerberg Initiative Foundation
##
## Licensed under the MIT License.
#
# Licensed under the MIT License.

from __future__ import annotations

import json
from typing import Dict, Union

import anndata as ad
import attrs
import pandas as pd
import pyarrow as pa
from typing_extensions import Self

import tiledbsoma
import tiledbsoma.logging
from tiledbsoma.io._util import read_h5ad # Allow us to read over S3 in backed mode
from tiledbsoma.io.conversions import df_to_arrow
from tiledbsoma.options import SOMATileDBContext

_EQUIVALENCES = {
"large_string": "string",
Expand Down Expand Up @@ -159,243 +151,3 @@ def _prepare_df_for_ingest(
df.rename(columns={"index": id_column_name}, inplace=True)

return original_index_name


@attrs.define(kw_only=True)
class Signature:
"""
This is support for compatibility pre-check for append-mode SOMA ingestion.
If a SOMA experiment already exists and the user wants to append another AnnData/H5AD to it ---
or, if no SOMA experiment exists yet but the user has two or more AnnData/H5AD inputs --- we
provide a fail-fast schema-compatibility check. Use of this pre-check ensures that we flag
non-appendable data before any data writes start. In particular, we avoid having a SOMA
experiment half-appended to.
At present we require that all schemas are identical, both within the ingestor and this
pre-checker. This includes ``obs`` and ``var`` field names and exact dtypes, as well as ``X``,
``obsm``, and ``varm`` dtypes.
Later, we can relax constraints: if say the SOMA experiment has ``float64`` dtype for ``X``
and an H5AD append has ``float32``, we can do the coercion in the ingestor as well as allowing
it within this pre-checker. Thus, this pre-checker logic will evolve over time as the ingestor
logic evolves over time.
"""

# Note: string/string dicts are easier to serialize/deserialize than pa.Schema
obs_schema: Dict[str, str]
var_schema: Dict[str, str]
raw_var_schema: Dict[str, str] | None

# TODO include 'raw' in X_dtypes or no? Different for AnnData and for SOMA. When in doubt,
# lean SOMA.
X_dtypes: Dict[str, str]
raw_X_dtype: str | None

obsm_dtypes: Dict[str, str]
varm_dtypes: Dict[str, str]

@classmethod
def from_anndata(
cls,
adata: ad.AnnData,
*,
default_obs_field_name: str = "obs_id",
default_var_field_name: str = "var_id",
default_X_layer_name: str = "data",
) -> Self:
"""
Constructs a pre-check signature from AnnData/H5AD input, which can be compared
against another signature from AnnData/H5AD or SOMA experiment.
AnnData inputs have a column offered as the index. This can be: named explicitly "obs_id",
"var_id", etc.; unnamed: adata.obs.index.name is None; named "index".
In the latter two cases the ingestor allows a rename to the user's choice such as "obs_id"
and "var_id". Here in the appender pre-check logic, we allow the same.
"""

obs_schema = _string_dict_from_pandas_dataframe(
adata.obs, default_obs_field_name
)
var_schema = _string_dict_from_pandas_dataframe(
adata.var, default_var_field_name
)

X_dtypes = {}
X_dtypes[default_X_layer_name] = str(
tiledbsoma._arrow_types.arrow_type_from_tiledb_dtype(adata.X.dtype)
)
for X_layer_name, X_layer in adata.layers.items():
X_dtypes[X_layer_name] = str(
tiledbsoma._arrow_types.arrow_type_from_tiledb_dtype(X_layer.dtype)
)

raw_X_dtype = None
raw_var_schema = None
if adata.raw is not None:
raw_X_dtype = str(
tiledbsoma._arrow_types.arrow_type_from_tiledb_dtype(adata.raw.X.dtype)
)
raw_var_schema = _string_dict_from_pandas_dataframe(
adata.raw.var, default_var_field_name
)

obsm_dtypes = {
k: str(tiledbsoma._arrow_types.arrow_type_from_tiledb_dtype(v.dtype))
for k, v in adata.obsm.items()
}
varm_dtypes = {
k: str(tiledbsoma._arrow_types.arrow_type_from_tiledb_dtype(v.dtype))
for k, v in adata.varm.items()
}

return cls(
obs_schema=obs_schema,
var_schema=var_schema,
X_dtypes=X_dtypes,
raw_X_dtype=raw_X_dtype,
raw_var_schema=raw_var_schema,
obsm_dtypes=obsm_dtypes,
varm_dtypes=varm_dtypes,
)

@classmethod
def from_h5ad(
cls,
h5ad_file_name: str,
*,
default_obs_field_name: str = "obs_id",
default_var_field_name: str = "var_id",
default_X_layer_name: str = "data",
) -> Self:
"""
See ``from_anndata``.
"""
with read_h5ad(h5ad_file_name, mode="r") as adata:
return cls.from_anndata(
adata,
default_X_layer_name=default_X_layer_name,
default_obs_field_name=default_obs_field_name,
default_var_field_name=default_var_field_name,
)

@classmethod
def from_soma_experiment(
cls,
uri: str,
measurement_name: str = "RNA",
context: SOMATileDBContext | None = None,
) -> Self:
"""
Constructs a pre-check signature from a SOMA experiment, which can be compared against
another signature from AnnData/H5AD or SOMA experiment.
"""

with tiledbsoma.Experiment.open(uri, context=context) as exp:
obs_schema = _string_dict_from_arrow_schema(exp.obs.schema)

var_schema = _string_dict_from_arrow_schema(
exp.ms[measurement_name].var.schema
)

X_dtypes = {}
for X_layer_name in exp.ms[measurement_name].X.keys():
X = exp.ms[measurement_name].X[X_layer_name]
X_dtypes[X_layer_name] = str(X.schema.field("soma_data").type)

raw_X_dtype = None
raw_var_schema = None
if "raw" in exp.ms:
raw_var_schema = _string_dict_from_arrow_schema(
exp.ms["raw"].var.schema
)

X = exp.ms["raw"].X[X_layer_name]
raw_X_dtype = str(X.schema.field("soma_data").type)

obsm_dtypes: Dict[str, str] = {}
if "obsm" in exp.ms[measurement_name]:
for obsm_layer_name in exp.ms[measurement_name].obsm.keys():
obsm = exp.ms[measurement_name].obsm[obsm_layer_name]
obsm_dtypes[obsm_layer_name] = str(
obsm.schema.field("soma_data").type
)

varm_dtypes: Dict[str, str] = {}
if "varm" in exp.ms[measurement_name]:
for varm_layer_name in exp.ms[measurement_name].varm.keys():
varm = exp.ms[measurement_name].varm[varm_layer_name]
varm_dtypes[varm_layer_name] = str(
varm.schema.field("soma_data").type
)

return cls(
obs_schema=obs_schema,
var_schema=var_schema,
X_dtypes=X_dtypes,
raw_X_dtype=raw_X_dtype,
raw_var_schema=raw_var_schema,
obsm_dtypes=obsm_dtypes,
varm_dtypes=varm_dtypes,
)

@classmethod
def check_compatible(cls, signatures: Dict[str, Self]) -> None:
"""
Determines if any number of signatures from SOMA experiment or AnnData/H5AD will be safe
from schema-incompatibility at ingestion time. On failure, a ``ValueError`` is raised
with user-suitable cause details.
"""
if len(signatures) < 2:
return
items = list(signatures.items())
namea, siga = items[0]
for nameb, sigb in items[1:]:
if not siga._compatible_with(sigb):
raise ValueError(
f"Incompatible signatures {namea!r}, {nameb!r}:\n{siga.to_json()}\n{sigb.to_json()}"
)

def _compatible_with(self, other: Self) -> bool:
"""
Pairwise helper method for ``compatible``. Reasons for incompatibility are currently advised
to be handled a level up by simply showing the user the failed signature pair.
"""

# Implementation note: _at present_ this could be implemented as `self == other`. But
# "coming soon" we'll allow people the ability to do things like coercing one input's
# float64 to another's float32 and the evolution will be toward more iffing, not less. As
# well, in that case, we'll need to also fine-grain the error-reporting to clearly spell out
# what we coudn't handle -- rather than just showing the user the two signatures' .to_json()
# output as we do today.

if self.obs_schema != other.obs_schema:
return False

if self.var_schema != other.var_schema:
return False

if self.X_dtypes != other.X_dtypes:
return False

if self.raw_X_dtype != other.raw_X_dtype:
return False
if self.raw_var_schema != other.raw_var_schema:
return False

if self.obsm_dtypes != other.obsm_dtypes:
return False
if self.varm_dtypes != other.varm_dtypes:
return False

return True

def to_json(self) -> str:
"""Presents a signature as JSON which is suitable for distributed logging."""
return json.dumps(self, default=attrs.asdict, sort_keys=True, indent=4)

@classmethod
def from_json(cls, s: str) -> Self:
dikt = json.loads(s)
return cls(**dikt)
17 changes: 14 additions & 3 deletions apis/python/tests/test_registration_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,15 @@ def test_axis_mappings(obs_field_name, var_field_name):
data={"a": 10, "b": 20, "c": 30},
field_name=obs_field_name,
)
assert dictionary.id_mapping_from_values(["a", "b", "c"]).data == (10, 20, 30)
assert dictionary.id_mapping_from_values(["c", "a"]).data == (30, 10)
assert dictionary.id_mapping_from_values([]).data == ()

for reload in [False, True]:
if reload:
dictionary = registration.AxisAmbientLabelMapping.from_json(
dictionary.to_json()
)
assert dictionary.id_mapping_from_values(["a", "b", "c"]).data == (10, 20, 30)
assert dictionary.id_mapping_from_values(["c", "a"]).data == (30, 10)
assert dictionary.id_mapping_from_values([]).data == ()

d = registration.AxisAmbientLabelMapping.from_isolated_dataframe(
anndata1.obs,
Expand All @@ -292,6 +298,11 @@ def test_isolated_anndata_mappings(obs_field_name, var_field_name):
rd = registration.ExperimentAmbientLabelMapping.from_isolated_anndata(
anndata1, measurement_name="measname"
)

for reload in [False, True]:
if reload:
rd = registration.ExperimentAmbientLabelMapping.from_json(rd.to_json())

assert rd.obs_axis.id_mapping_from_values([]).data == ()
assert rd.obs_axis.id_mapping_from_values(["AGAG", "ACTG"]).data == (2, 1)
assert rd.var_axes["measname"].id_mapping_from_values(["TP53", "VEGFA"]).data == (
Expand Down
Loading

0 comments on commit 9e27275

Please sign in to comment.