Skip to content

Commit

Permalink
Added more functions
Browse files Browse the repository at this point in the history
  • Loading branch information
timtreis committed Jun 26, 2023
1 parent 818beca commit 708cb5b
Show file tree
Hide file tree
Showing 4 changed files with 450 additions and 72 deletions.
359 changes: 340 additions & 19 deletions examples/gefslim.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/gefslim/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from importlib.metadata import version

from .gefslim import GEF
from .utils import _get_h5_from_gef

__version__ = version("gefslim")
133 changes: 80 additions & 53 deletions src/gefslim/gefslim.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from pathlib import Path

import anndata
import h5py
import numpy as np
import pandas as pd

from gefslim.utils import _get_h5_from_gef


class GEF:
"""Class that interacts with the .gef file."""
Expand All @@ -16,29 +17,29 @@ def __init__(
self.result_dir = Path(result_dir)
self.data = {}

def get_counts_per_cell(self) -> pd.DataFrame:
def get_counts_per_cell(self, name: str) -> pd.DataFrame:
"""Returns the number of probes per cell."""
cellcut = self.read_cellcut().to_df()
cellcut = self.read_cellcut(name=name).to_df()

result = pd.DataFrame()
result["counts"] = cellcut[["geneCount"]]
result["cell_id"] = cellcut.index.tolist()

return result

def get_area_per_cell(self) -> pd.DataFrame:
def get_area_per_cell(self, name: str) -> pd.DataFrame:
"""Returns the size in pixel per cell."""
cellcut = self.read_cellcut().to_df()
cellcut = self.read_cellcut(name=name).to_df()

result = pd.DataFrame()
result["cell_id"] = cellcut.index.tolist()
result["area"] = cellcut["area"].values

return result

def get_cell_borders(self, transformed: bool = True) -> pd.DataFrame:
def get_cell_borders(self, name: str, transformed: bool = True) -> pd.DataFrame:
"""Returns the spatial information per cell."""
cellcut = self.read_cellcut().to_df()
cellcut = self.read_cellcut(name=name).to_df()

result = pd.DataFrame()
result["cell_id"] = cellcut.index.tolist()
Expand All @@ -57,65 +58,91 @@ def get_cell_borders(self, transformed: bool = True) -> pd.DataFrame:

return result

def get_genecounts_per_cell(self) -> anndata.AnnData:
def get_genecounts_per_cell(self, name: str) -> anndata.AnnData:
"""Returns the counts per gene per cell."""
folder = self.result_dir / "041.cellcut"
h5 = _get_h5_from_gef(
result_path=self.result_dir,
folder_name="041.cellcut",
file_name=name,
)

for file in folder.glob("*"):
if "cellcut" in str(file):
fname = file.name
self.data[fname] = h5py.File(file)
gene_data = h5["cellBin"]["gene"][:]
geneExp_data = h5["cellBin"]["geneExp"][:]

if len(self.data) > 1:
raise NotImplementedError("Only 1 cellcut is supported for now")
# Extract which probes were found in which cells
probe_df = pd.DataFrame()
gene_list = [
name.decode() for name, cell_count in gene_data[["geneName", "cellCount"]] for _ in range(cell_count)
]
probe_df["genes"] = gene_list
probe_df["cellID"] = geneExp_data["cellID"]
probe_df["counts"] = geneExp_data["count"]

else:
gene_data = self.data[next(iter(self.data))]["cellBin"]["gene"][:]
geneExp_data = self.data[next(iter(self.data))]["cellBin"]["geneExp"][:]
probe_df = probe_df.rename(columns={"genes": "gene", "cellID": "cell_id"})

# Extract which probes were found in which cells
probe_df = pd.DataFrame()
gene_list = [
name.decode() for name, cell_count in gene_data[["geneName", "cellCount"]] for _ in range(cell_count)
]
probe_df["genes"] = gene_list
probe_df["cellID"] = geneExp_data["cellID"]
probe_df["counts"] = geneExp_data["count"]
return probe_df.sort_values(["cell_id", "gene"]).reset_index(drop=True)

probe_df = probe_df.rename(columns={"genes": "gene", "cellID": "cell_id"})
def read_cellcut(self, name: str) -> anndata.AnnData:
"""Returns the cellcut file."""
h5 = _get_h5_from_gef(
result_path=self.result_dir,
folder_name="041.cellcut",
file_name=name,
)

return probe_df.sort_values(["cell_id", "gene"]).reset_index(drop=True)
cell_data = h5["cellBin"]["cell"][:]
cellBorder_data = h5["cellBin"]["cellBorder"][:]

def read_cellcut(self) -> None:
"""Returns the cellcut file."""
folder = self.result_dir / "041.cellcut"
tmp = {}
names = list(cell_data.dtype.names)
names.remove("id")
for name in names:
tmp[name] = cell_data[name]

for file in folder.glob("*"):
if "cellcut" in str(file):
fname = file.name
self.data[fname] = h5py.File(file)
cellcut_df = pd.DataFrame(tmp, columns=names)

if len(self.data) > 1:
raise NotImplementedError("Only 1 cellcut is supported for now")
# truncate border points
cellcut_df["border"] = [border[~np.all(border == [32767, 32767], axis=1)] for border in cellBorder_data]
names += ["border"]

else:
cell_data = self.data[next(iter(self.data))]["cellBin"]["cell"][:]
cellBorder_data = self.data[next(iter(self.data))]["cellBin"]["cellBorder"][:]
cellcut_adata = anndata.AnnData(X=cellcut_df.values)
cellcut_adata.obs_names = [str(i) for i in cell_data["id"]]
cellcut_adata.var_names = names

tmp = {}
names = list(cell_data.dtype.names)
names.remove("id")
for name in names:
tmp[name] = cell_data[name]
return cellcut_adata

cellcut_df = pd.DataFrame(tmp, columns=names)
def get_genecounts_per_spot(self, name: str, binsize: int = 100) -> pd.DataFrame:
"""Returns the counts per bin around (x/y)."""
h5 = _get_h5_from_gef(
result_path=self.result_dir,
folder_name="04.tissuecut",
file_name=name,
)

# truncate border points
cellcut_df["border"] = [border[~np.all(border == [32767, 32767], axis=1)] for border in cellBorder_data]
names += ["border"]
exp_data = h5["geneExp"][f"bin{binsize}"]["expression"][:]
gene_data = h5["geneExp"][f"bin{binsize}"]["gene"][:]

cellcut_adata = anndata.AnnData(X=cellcut_df.values)
cellcut_adata.obs_names = [str(i) for i in cell_data["id"]]
cellcut_adata.var_names = names
result = pd.DataFrame()
result["x"] = exp_data["x"]
result["y"] = exp_data["y"]
result["gene"] = [name.decode() for name, cell_count in gene_data[["gene", "count"]] for _ in range(cell_count)]
result["counts"] = exp_data["count"]

return result.sort_values(["x", "y", "gene"]).reset_index(drop=True)

def get_gene_stats(self, name: str) -> pd.DataFrame:
"""Returns the counts per bin around (x/y)."""
h5 = _get_h5_from_gef(
result_path=self.result_dir,
folder_name="04.tissuecut",
file_name=name,
)

stat_data = h5["stat"]["gene"][:]

result = pd.DataFrame()
result["gene"] = [gene.decode() for gene in stat_data["gene"]]
result["MIDcount"] = stat_data["MIDcount"]
result["E10"] = stat_data["E10"]

return cellcut_adata
return result.sort_values("MIDcount", ascending=False).reset_index(drop=True)
29 changes: 29 additions & 0 deletions src/gefslim/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os
import pathlib

import h5py


def _get_h5_from_gef(
result_path: pathlib.Path,
folder_name: str,
file_name: str,
) -> h5py.File:
"""Returns the connection to a h5py file."""
if not isinstance(result_path, pathlib.Path):
raise TypeError(f"Parameter 'result_path' must be a pathlib.Path, got: {type(result_path)}")

if not isinstance(folder_name, str):
raise TypeError(f"Parameter 'folder_name' must be a string, got: {type(folder_name)}")

if not isinstance(file_name, str):
raise TypeError(f"Parameter 'file_name' must be a string, got: {type(file_name)}")

if ".gef" not in file_name:
raise ValueError("Parameter 'file_name' must be a `.gef` file.")

path = result_path / folder_name / file_name
if not os.path.exists(path):
raise FileNotFoundError(f"File '{path}' does not exist.")

return h5py.File(path, "r")

0 comments on commit 708cb5b

Please sign in to comment.