-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
190 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
import logging | ||
import sys | ||
from pathlib import Path | ||
from typing import Optional, Union | ||
|
||
import click | ||
import h5py | ||
import numpy as np | ||
import pandas as pd | ||
from anngeno import AnnGeno | ||
from tqdm import trange | ||
|
||
PathLike = Union[str, Path] | ||
|
||
logging.basicConfig( | ||
format="[%(asctime)s] %(levelname)s:%(name)s: %(message)s", | ||
stream=sys.stdout, | ||
) | ||
logger = logging.getLogger(__name__) | ||
logger.setLevel(logging.INFO) | ||
|
||
|
||
def _convert_genotypes_h5( | ||
variant_file: PathLike, | ||
phenotype_file: PathLike, | ||
genotype_file: PathLike, | ||
out_file: PathLike, | ||
batch_size: int = 100, | ||
max_samples: Optional[int] = None, | ||
): | ||
logger.info("Reading sample IDs") | ||
samples = pd.read_parquet(phenotype_file).index.astype(str).to_numpy() | ||
if max_samples is not None: | ||
samples = samples[:max_samples] | ||
n_samples = len(samples) | ||
logger.info("Reading variant metadata") | ||
variant_metadata = pd.read_parquet( | ||
variant_file, columns=["id", "chrom", "pos", "ref", "alt"] | ||
) | ||
n_variants = len(variant_metadata) | ||
with h5py.File(genotype_file, "r") as g: | ||
gt_matrix = g["genotype_matrix"] | ||
variant_matrix = g["variant_matrix"] | ||
assert ( | ||
n_samples == gt_matrix.shape[0] | ||
or max_samples is not None | ||
and max_samples < gt_matrix.shape[0] | ||
) | ||
|
||
# TODO: Rewrite below here to reflect changes in AnnGeno | ||
logger.info("Initializing AnnGeno object") | ||
ag = AnnGeno( | ||
out_file, filemode="w", samples=samples, variant_metadata=variant_metadata | ||
) | ||
|
||
logger.info("Transforming genotype file") | ||
for start_idx in trange(0, n_samples, batch_size, desc="Chunks"): | ||
end_idx = min(start_idx + batch_size, n_samples) | ||
sample_slice = slice(start_idx, end_idx) | ||
this_genotypes = [] | ||
for i in range(start_idx, end_idx): | ||
ids = variant_matrix[i, variant_matrix[i] != -1] | ||
gts = gt_matrix[i, gt_matrix[i] != -1] | ||
|
||
genotypes_dense = np.zeros(n_variants, dtype=np.uint8) | ||
genotypes_dense[ag.variant_col_by_id[ids]] = gts | ||
|
||
this_genotypes.append(genotypes_dense) | ||
|
||
ag.set_samples(sample_slice, np.stack(this_genotypes, axis=0)) | ||
|
||
|
||
@click.command() | ||
def convert_genotypes_h5( | ||
variant_file: PathLike, | ||
phenotype_file: PathLike, | ||
genotype_file: PathLike, | ||
out_file: PathLike, | ||
batch_size: int = 100, | ||
max_samples: Optional[int] = None, | ||
): | ||
_convert_genotypes_h5( | ||
variant_file=variant_file, | ||
phenotype_file=phenotype_file, | ||
genotype_file=genotype_file, | ||
out_file=out_file, | ||
batch_size=batch_size, | ||
max_samples=max_samples, | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
convert_genotypes_h5() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
from deeprvat.data.anngeno_dl import AnnGenoDataset | ||
import hypothesis.strategies as st | ||
from pathlib import Path | ||
import tempfile | ||
from typing import Any, Dict | ||
from anngeno import AnnGeno | ||
from anngeno.test_utils import anngeno_args_and_genotypes | ||
from hypothesis import Phase, given, settings | ||
|
||
|
||
# TODO: Implement | ||
# Check that entries with 1 in AnnGenoDataset.variant_gene_mask correspond | ||
# to variants in the gene in question | ||
@settings(phases=[Phase.explicit, Phase.reuse, Phase.generate, Phase.target]) | ||
def test_variant_gene_mask(): | ||
# get_region for each training region | ||
pass | ||
|
||
|
||
# TODO: Implement | ||
# Check that entries with 1 in AnnGenoDataset.gene_phenotype_mask correspond to genes | ||
# associated with the phenotype in question according to the training_regions argument | ||
@settings(phases=[Phase.explicit, Phase.reuse, Phase.generate, Phase.target]) | ||
def test_gene_covariatephenotype_mask(): | ||
pass | ||
|
||
|
||
# TODO: Implement | ||
# Check output of __getitem__ | ||
# Sometimes use sample_set | ||
# Sometimes use cache_regions | ||
@given( | ||
anngeno_args_and_genotypes=anngeno_args_and_genotypes( | ||
min_phenotypes=1, min_annotations=1, region_set=True | ||
), | ||
batch_proportion=st.floats(min_value=0, max_value=1, exclude_min=True), | ||
) | ||
@settings(phases=[Phase.explicit, Phase.reuse, Phase.generate, Phase.target]) | ||
def test_getitem_training(anngeno_args_and_genotypes: Dict[str, Any]): | ||
anngeno_args = anngeno_args_and_genotypes["anngeno_args"] | ||
genotypes = anngeno_args_and_genotypes["genotypes"] | ||
|
||
variant_ids = anngeno_args["variant_metadata"]["id"] | ||
with tempfile.TemporaryDirectory() as tmpdirname: | ||
filename = Path(tmpdirname) / anngeno_args["filename"] | ||
anngeno_args["filename"] = filename | ||
ag = AnnGeno(**anngeno_args) | ||
|
||
ag.set_samples( | ||
slice(None), | ||
genotypes, | ||
variant_ids=variant_ids, | ||
) | ||
|
||
# Can only use ag.subset_samples in read-only mode | ||
del ag | ||
ag = AnnGeno(filename=anngeno_args["filename"]) | ||
|
||
if sample_subset := anngeno_args_and_genotypes.get("sample_subset", None): | ||
ag.subset_samples(sample_subset) | ||
|
||
ag.subset_annotations( | ||
annotation_columns=anngeno_args_and_genotypes.get( | ||
"annotation_columns", None | ||
), | ||
variant_set=anngeno_args_and_genotypes.get("variant_set", None), | ||
) | ||
|
||
# TODO: construct dataaset and iterate through it | ||
batch_size = math.ceil(batch_proportion * n_samples) | ||
agd = AnnGenoDataset( | ||
filename=filename, | ||
sample_batch_size=batch_size, | ||
mask_type="sum", # TODO: Test max | ||
training_mode=True, | ||
training_regions=anngeno_args_and_genotypes["training_regions"], | ||
|
||
) | ||
|
||
# TODO: reconstruct each region using variant_gene_mask | ||
|
||
# TODO: compare to results from using AnnGeno.get_region(), AnnGeno.phenotypes, AnnGeno.annotations | ||
|
||
|
||
# TODO: Implement | ||
# Check output of __getitem__ | ||
# Sometimes use sample_set | ||
# Sometimes use cache_regions | ||
@given( | ||
anngeno_args_and_genotypes=anngeno_args_and_genotypes(), | ||
batch_proportion=st.floats(min_value=0, max_value=1, exclude_min=True), | ||
) | ||
@settings(phases=[Phase.explicit, Phase.reuse, Phase.generate, Phase.target]) | ||
def test_getitem_testing(): | ||
# use __getitem__ | ||
# compare to results from using AnnGeno.get_region(), AnnGeno.phenotypes, AnnGeno.annotations | ||
pass |