Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: atac fragment processing suggestion #1284

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 90 additions & 44 deletions cellxgene_schema_cli/cellxgene_schema/atac_seq.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
import h5py
import pandas as pd
import pysam
from cellxgene_ontology_guide.ontology_parser import OntologyParser
from dask import delayed
from dask.delayed import Delayed

from .ontology_parser import ONTOLOGY_PARSER
from .utils import is_ontological_descendant_of

logger = logging.getLogger(__name__)
logger = logging.getLogger("cellxgene-schema")

# TODO: these chromosome tables should be calculated from the fasta file?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: spoke to Trent about this in irl sync. Agreed that this issue should be tracked as a fast-follow for atac-seq validation, as this table should be aligned to the GENCODE version we're using for each pertinent species

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely agree here! It's just waiting to get out of sync.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# location of fasta https://www.gencodegenes.org/human/release_44.html and file name GRCh38.primary_assembly.genome.fa
Expand Down Expand Up @@ -141,11 +141,10 @@
:param anndata_file: The anndata file to validate.
:return:
"""
onto_parser = OntologyParser()

def is_atac(x) -> str:
if is_ontological_descendant_of(onto_parser, x, "EFO:0010891"):
if is_ontological_descendant_of(onto_parser, x, "EFO:0008913"):
def is_atac(x: str) -> str:
if is_ontological_descendant_of(ONTOLOGY_PARSER, x, "EFO:0010891"):
if is_ontological_descendant_of(ONTOLOGY_PARSER, x, "EFO:0008913"):
return "p" # paired
else:
return "u" # unpaired
Expand Down Expand Up @@ -187,56 +186,91 @@

"""
with tempfile.TemporaryDirectory() as tempdir:
# unzip the fragment. Subprocess is used here because gzip on the cli uses less memory with comparable speed to
# the python gzip library.
unzipped_file = Path(tempdir) / Path(fragment_file).name.rsplit(".", 1)[0]
logger.info(f"Unzipping {fragment_file}")
with open(unzipped_file, "wb") as fp:
subprocess.run(["gunzip", "-c", fragment_file], stdout=fp, check=True)

_dask_cluster_config = dict(silence_logs=logging.ERROR, dashboard_address=None)

# configure the dask cluster
_dask_cluster_config = dict(dashboard_address=None)
logging.getLogger("distributed").setLevel(logging.ERROR)
if dask_cluster_config:
_dask_cluster_config.update(dask_cluster_config)

# start the dask cluster and client
with dd.LocalCluster(**_dask_cluster_config) as cluster, dd.Client(cluster):
# convert the fragment to a parquet file
logger.info(f"Converting {fragment_file} to parquet")
parquet_file_path = Path(tempdir) / Path(fragment_file).name.replace(".gz", ".parquet")
ddf.read_csv(unzipped_file, sep="\t", names=column_ordering, dtype=column_types).to_parquet(
parquet_file_path, partition_on=["chromosome"], compute=True
)

# remove the unzipped file
logger.debug(f"Removing {unzipped_file}")
unzipped_file.unlink()
parquet_file = str(parquet_file_path)
errors = validate(parquet_file, anndata_file)
if any(errors):
logger.error("Errors found in Fragment and/or Anndata file")
logger.error(errors)
# quick checks
errors = validate_anndata(anndata_file)
if errors:
return errors

Check warning on line 201 in cellxgene_schema_cli/cellxgene_schema/atac_seq.py

View check run for this annotation

Codecov / codecov/patch

cellxgene_schema_cli/cellxgene_schema/atac_seq.py#L201

Added line #L201 was not covered by tests

# convert the fragment to a parquet file for faster processing
try:
parquet_file = convert_to_parquet(fragment_file, tempdir)
except Exception:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to propagate the Exception up the to the error message?

It looks like for the int columns in the fragment file, I will get a pandas parse exception with different dtypes.

For the chromosome column (col 1), there's this exception if there's a value that's not part of the expected categories:
pyarrow.lib.ArrowInvalid: No non-null segments were available for field 'chromosome'; couldn't infer type

And for the the barcode column, I think any given value is coerced to a string, so then the I get the validation error 'Barcodes don't match anndata.obs.index'

If it's too much to have more specific error messages for why the conversion failed, then maybe a message like : "Error converting fragment to parquet, check that dtypes for fragment file columns are consistent/match the schema"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There errors should appear that way now

msg = "Error converting fragment to parquet."
logger.exception(msg)
return [msg]

Check warning on line 209 in cellxgene_schema_cli/cellxgene_schema/atac_seq.py

View check run for this annotation

Codecov / codecov/patch

cellxgene_schema_cli/cellxgene_schema/atac_seq.py#L206-L209

Added lines #L206 - L209 were not covered by tests

# slow checks
errors = validate_anndata_with_fragment(parquet_file, anndata_file)
if errors:
return errors
else:
logger.info("Fragment and Anndata file are valid")

# generate the index
if generate_index:
logger.info(f"Sorting fragment and generating index for {fragment_file}")
index_fragment(fragment_file, parquet_file, tempdir, override_write_algorithm, output_file)
logger.debug("cleaning up")
return []


def validate(parquet_file: str, anndata_file: str) -> list[Optional[str]]:
jobs = [
def convert_to_parquet(fragment_file: str, tempdir: str) -> str:
# unzip the fragment. Subprocess is used here because gzip on the cli uses less memory with comparable
# speed to the python gzip library.
unzipped_file = Path(tempdir) / Path(fragment_file).name.rsplit(".", 1)[0]
logger.info(f"Unzipping {fragment_file}")
with open(unzipped_file, "wb") as fp:
subprocess.run(["gunzip", "-c", fragment_file], stdout=fp, check=True)

# convert the fragment to a parquet file
logger.info(f"Converting {fragment_file} to parquet")
parquet_file_path = Path(tempdir) / Path(fragment_file).name.replace(".gz", ".parquet")
try:
ddf.read_csv(unzipped_file, sep="\t", names=column_ordering, dtype=column_types).to_parquet(
parquet_file_path, partition_on=["chromosome"], compute=True
)
finally:
# remove the unzipped file
logger.debug(f"Removing {unzipped_file}")
unzipped_file.unlink()
parquet_file = str(parquet_file_path)
return parquet_file


def report_errors(header: str, errors: list[str]) -> list[str]:
if any(errors):
errors = [e for e in errors if e is not None]
errors = [header] + errors
logger.error("\n\t".join(errors))
return errors

Check warning on line 254 in cellxgene_schema_cli/cellxgene_schema/atac_seq.py

View check run for this annotation

Codecov / codecov/patch

cellxgene_schema_cli/cellxgene_schema/atac_seq.py#L251-L254

Added lines #L251 - L254 were not covered by tests
else:
return []


def validate_anndata(anndata_file: str) -> list[str]:
errors = [validate_anndata_organism_ontology_term_id(anndata_file), validate_anndata_is_primary_data(anndata_file)]
return report_errors("Errors found in Anndata file. Skipping fragment validation.", errors)


def validate_anndata_with_fragment(parquet_file: str, anndata_file: str) -> list[str]:
errors = [
validate_fragment_start_coordinate_greater_than_0(parquet_file),
validate_fragment_barcode_in_adata_index(parquet_file, anndata_file),
validate_fragment_stop_coordinate_within_chromosome(parquet_file, anndata_file),
validate_fragment_stop_greater_than_start_coordinate(parquet_file),
validate_fragment_read_support(parquet_file),
validate_anndata_organism_ontology_term_id(anndata_file),
validate_anndata_is_primary_data(anndata_file),
validate_fragment_no_duplicate_rows(parquet_file),
]
return jobs
return report_errors("Errors found in Fragment and/or Anndata file", errors)


def validate_fragment_no_duplicate_rows(parquet_file: str) -> Optional[str]:
Expand Down Expand Up @@ -271,20 +305,25 @@
def validate_fragment_stop_coordinate_within_chromosome(parquet_file: str, anndata_file: str) -> Optional[str]:
# check that the stop coordinate is within the length of the chromosome
with h5py.File(anndata_file) as f:
organism_ontology_term_id = ad.io.read_elem(f["obs"])["organism_ontology_term_id"].unique().astype(str)
if organism_ontology_term_id.size > 1:
return "Anndata.obs.organism_ontology_term_id must have a unique value."
chromosome_length_table = organism_ontology_term_id_by_chromosome_length_table[organism_ontology_term_id[0]]
organism_ontology_term_id = ad.io.read_elem(f["obs"])["organism_ontology_term_id"].unique().astype(str)[0]
df = ddf.read_parquet(parquet_file, columns=["chromosome", "stop coordinate"])
df["chromosome_length"] = df["chromosome"].map(chromosome_length_table).astype(int)

# the chromosomes in the fragment must match the chromosomes for that organism
chromosome_length_table = organism_ontology_term_id_by_chromosome_length_table[organism_ontology_term_id]
mismatched_chromosomes = set(df["chromosome"].unique().compute()) - chromosome_length_table.keys()
if mismatched_chromosomes:
return f"Chromosomes in the fragment do not match the organism({organism_ontology_term_id}).\n" + "\t\n".join(
mismatched_chromosomes
)
df["chromosome_length"] = df["chromosome"].map(chromosome_length_table, meta=int).astype(int)
df = df["stop coordinate"] <= df["chromosome_length"]
if not df.all().compute():
return "Stop coordinate must be less than the chromosome length."


def validate_fragment_read_support(parquet_file: str) -> Optional[str]:
# check that the read support is greater than 0
df = ddf.read_parquet(parquet_file, columns=["read support"], filters=[("read support", "==", 0)])
df = ddf.read_parquet(parquet_file, columns=["read support"], filters=[("read support", "<=", 0)])
if len(df.compute()) != 0:
return "Read support must be greater than 0."

Expand All @@ -298,16 +337,23 @@

def validate_anndata_organism_ontology_term_id(anndata_file: str) -> Optional[str]:
with h5py.File(anndata_file) as f:
unique_organism_ontology_term_ids = ad.io.read_elem(f["obs"])["organism_ontology_term_id"].unique()
organism_ontology_term_ids = ad.io.read_elem(f["obs"])["organism_ontology_term_id"].unique().astype(str)
if organism_ontology_term_ids.size > 1:
error_message = (
"Anndata.obs.organism_ontology_term_id must have exactly 1 unique value. Found the following values:\n"
) + "\n\t".join(organism_ontology_term_ids)
return error_message
organism_ontology_term_id = organism_ontology_term_ids[0]
allowed_terms = [*organism_ontology_term_id_by_chromosome_length_table.keys()]
if unique_organism_ontology_term_ids[0] not in allowed_terms:
return f"Anndata.obs.organism_ontology_term_id must be one of {allowed_terms}."
if organism_ontology_term_id not in allowed_terms:
return f"Anndata.obs.organism_ontology_term_id must be one of {allowed_terms}. Got {organism_ontology_term_id}."


def detect_chromosomes(parquet_file: str) -> list[str]:
logger.info("detecting chromosomes")
df = ddf.read_parquet(parquet_file, columns=["chromosome"]).drop_duplicates()
return df["chromosome"].values.compute()
df = df["chromosome"].compute()
return df.tolist()


def get_output_file(fragment_file: str, output_file: Optional[str]) -> str:
Expand Down
27 changes: 18 additions & 9 deletions cellxgene_schema_cli/cellxgene_schema/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import click

logger = logging.getLogger("cellxgene_schema")
logger = logging.getLogger("cellxgene-schema")

Check warning on line 6 in cellxgene_schema_cli/cellxgene_schema/cli.py

View check run for this annotation

Codecov / codecov/patch

cellxgene_schema_cli/cellxgene_schema/cli.py#L6

Added line #L6 was not covered by tests


@click.group(
Expand Down Expand Up @@ -77,6 +77,20 @@
sys.exit(1)


def _check_anndata_requires_fragment(h5ad_file):
from .atac_seq import check_anndata_requires_fragment, report_errors

Check warning on line 81 in cellxgene_schema_cli/cellxgene_schema/cli.py

View check run for this annotation

Codecov / codecov/patch

cellxgene_schema_cli/cellxgene_schema/cli.py#L80-L81

Added lines #L80 - L81 were not covered by tests

try:
fragment_required = check_anndata_requires_fragment(h5ad_file)
if fragment_required:
logger.info("Anndata requires an ATAC fragment file.")

Check warning on line 86 in cellxgene_schema_cli/cellxgene_schema/cli.py

View check run for this annotation

Codecov / codecov/patch

cellxgene_schema_cli/cellxgene_schema/cli.py#L83-L86

Added lines #L83 - L86 were not covered by tests
else:
logger.info("Anndata does not require an ATAC fragment file.")
except Exception as e:
report_errors("Anndata does not support ATAC fragment files for the followings reasons:", [str(e)])
sys.exit(1)

Check warning on line 91 in cellxgene_schema_cli/cellxgene_schema/cli.py

View check run for this annotation

Codecov / codecov/patch

cellxgene_schema_cli/cellxgene_schema/cli.py#L88-L91

Added lines #L88 - L91 were not covered by tests


@schema_cli.command(
name="process-fragment",
short_help="Check that an ATAC SEQ fragment follows the cellxgene data integration schema.",
Expand All @@ -92,6 +106,8 @@
def fragment_validate(h5ad_file, fragment_file, generate_index, output_file):
from .atac_seq import process_fragment

_check_anndata_requires_fragment(h5ad_file)

Check warning on line 109 in cellxgene_schema_cli/cellxgene_schema/cli.py

View check run for this annotation

Codecov / codecov/patch

cellxgene_schema_cli/cellxgene_schema/cli.py#L109

Added line #L109 was not covered by tests

if not process_fragment(fragment_file, h5ad_file, generate_index=generate_index, output_file=output_file):
sys.exit(1)

Expand All @@ -105,14 +121,7 @@
)
@click.argument("h5ad_file", nargs=1, type=click.Path(exists=True, dir_okay=False))
def check_anndata_requires_fragment(h5ad_file):
from .atac_seq import check_anndata_requires_fragment

try:
assay_type = check_anndata_requires_fragment(h5ad_file)
print(assay_type)
except Exception as e:
logger.error(f"Andata does not support atac fragment files for the follow reason: {e}")
sys.exit(1)
_check_anndata_requires_fragment(h5ad_file)

Check warning on line 124 in cellxgene_schema_cli/cellxgene_schema/cli.py

View check run for this annotation

Codecov / codecov/patch

cellxgene_schema_cli/cellxgene_schema/cli.py#L124

Added line #L124 was not covered by tests


@schema_cli.command(
Expand Down
3 changes: 3 additions & 0 deletions cellxgene_schema_cli/cellxgene_schema/ontology_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from cellxgene_ontology_guide.ontology_parser import OntologyParser

ONTOLOGY_PARSER = OntologyParser(schema_version="v5.3.0")
4 changes: 1 addition & 3 deletions cellxgene_schema_cli/cellxgene_schema/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
import pandas as pd
import scipy
from anndata.compat import DaskArray
from cellxgene_ontology_guide.ontology_parser import OntologyParser
from dask.array import map_blocks
from scipy import sparse

from . import gencode, schema
from .gencode import get_gene_checker
from .ontology_parser import ONTOLOGY_PARSER
from .utils import (
SPARSE_MATRIX_TYPES,
SUPPORTED_SPARSE_MATRIX_TYPES,
Expand All @@ -30,8 +30,6 @@

logger = logging.getLogger(__name__)

ONTOLOGY_PARSER = OntologyParser(schema_version="v5.3.0")

ASSAY_VISIUM = "EFO:0010961" # generic term
ASSAY_VISIUM_11M = "EFO:0022860" # specific visium assay
ASSAY_SLIDE_SEQV2 = "EFO:0030062"
Expand Down
2 changes: 1 addition & 1 deletion cellxgene_schema_cli/cellxgene_schema/write_labels.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from . import gencode, schema
from .env import SCHEMA_REFERENCE_BASE_URL, SCHEMA_REFERENCE_FILE_NAME
from .gencode import get_gene_checker
from .ontology_parser import ONTOLOGY_PARSER
from .utils import get_hash_digest_column, getattr_anndata
from .validate import ONTOLOGY_PARSER

logger = logging.getLogger(__name__)

Expand Down
61 changes: 50 additions & 11 deletions cellxgene_schema_cli/tests/test_atac_seq.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,36 @@ def _test_process_fragment(
assert chromosome in atac_seq.human_chromosome_by_length


class TestConvertToParquet:
def test_positive(self, atac_fragment_dataframe, tmpdir):
tsv_file = str(
tmpdir + "/fragment.tsv.gzip",
)
atac_fragment_dataframe.to_csv(
tsv_file, sep="\t", index=False, compression="gzip", header=False, columns=atac_seq.column_ordering
)
parquet_file = Path(atac_seq.convert_to_parquet(tsv_file, tmpdir))
assert Path(parquet_file).is_dir()

def test_missing_column(self, tmpdir, atac_fragment_dataframe):
atac_fragment_dataframe = atac_fragment_dataframe.drop(columns=["read support"])
tsv_file = str(tmpdir + "/fragment.tsv")
atac_fragment_dataframe.to_csv(
tsv_file, sep="\t", index=False, compression="gzip", header=False, columns=atac_seq.column_ordering[:-1]
)
with pytest.raises(ValueError):
atac_seq.convert_to_parquet(tsv_file, tmpdir)

def test_invalid_column_dtype(self, tmpdir, atac_fragment_dataframe):
atac_fragment_dataframe["start coordinate"] = "foo"
tsv_file = str(tmpdir + "/fragment.tsv")
atac_fragment_dataframe.to_csv(
tsv_file, sep="\t", index=False, compression="gzip", header=False, columns=atac_seq.column_ordering
)
with pytest.raises(ValueError):
atac_seq.convert_to_parquet(tsv_file, tmpdir)


class TestValidateFragmentBarcodeInAdataIndex:
def test_postive(self, atac_fragment_file, atac_anndata_file):
result = atac_seq.validate_fragment_barcode_in_adata_index(atac_fragment_file, atac_anndata_file)
Expand Down Expand Up @@ -217,27 +247,25 @@ def test_stop_less_than_chromosome_length(self, atac_fragment_dataframe, atac_an
# Assert
assert result == "Stop coordinate must be less than the chromosome length."

def test_organism_ontology_id_not_unique(self, atac_fragment_dataframe, atac_anndata_file, tmpdir):
def test_mismatch_chromosome(self, atac_fragment_dataframe, atac_anndata_file, tmpdir):
# Arrange
atac_anndata = ad.read_h5ad(atac_anndata_file)
atac_anndata.obs["organism_ontology_term_id"] = ["NCBITaxon:10090", "NCBITaxon:9606", "NCBITaxon:9606"]
atac_anndata.write(tmpdir + "/small_atac_seq.h5ad")
atac_fragment_dataframe["chromosome"] = ["foo", "chr2", "chr1"]
fragment_file = to_parquet_file(atac_fragment_dataframe, tmpdir)
# Act
result = atac_seq.validate_fragment_stop_coordinate_within_chromosome(
to_parquet_file(atac_fragment_dataframe, tmpdir), tmpdir + "/small_atac_seq.h5ad"
)
result = atac_seq.validate_fragment_stop_coordinate_within_chromosome(fragment_file, atac_anndata_file)
# Assert
assert result == "Anndata.obs.organism_ontology_term_id must have a unique value."
assert result.startswith("Chromosomes in the fragment do not match the organism")


class TestValidateFragmentReadSupport:
def test_positive(self, atac_fragment_file):
result = atac_seq.validate_fragment_read_support(atac_fragment_file)
assert not result

def test_negative(self, atac_fragment_dataframe, tmpdir):
@pytest.mark.parametrize("read_support", [0, -1])
def test_negative(self, atac_fragment_dataframe, tmpdir, read_support):
# Arrange
atac_fragment_dataframe["read support"] = 0
atac_fragment_dataframe["read support"] = read_support
fragment_file = to_parquet_file(atac_fragment_dataframe, tmpdir)
# Act
result = atac_seq.validate_fragment_read_support(fragment_file)
Expand Down Expand Up @@ -315,7 +343,18 @@ def test_organism_ontology_term_id_not_allowed(self, atac_anndata, tmpdir):
# Act
result = atac_seq.validate_anndata_organism_ontology_term_id(atac_anndata_file)
# Assert
assert result == "Anndata.obs.organism_ontology_term_id must be one of ['NCBITaxon:9606', 'NCBITaxon:10090']."
assert result.startswith(
"Anndata.obs.organism_ontology_term_id must be one of ['NCBITaxon:9606', 'NCBITaxon:10090']."
)

def test_organism_ontology_id_not_unique(self, atac_anndata, tmpdir):
# Arrange
atac_anndata.obs["organism_ontology_term_id"] = ["NCBITaxon:10090", "NCBITaxon:9606", "NCBITaxon:9606"]
atac_anndata_file = to_anndata_file(atac_anndata, tmpdir)
# Act
result = atac_seq.validate_anndata_organism_ontology_term_id(atac_anndata_file)
# Assert
assert result.startswith("Anndata.obs.organism_ontology_term_id must have exactly 1 unique value.")


class TestValidateFragmentNoDuplicateRows:
Expand Down
Loading