From f234a1a049f04cc5cb52e8ae879302e931b20700 Mon Sep 17 00:00:00 2001 From: Bento007 Date: Tue, 4 Mar 2025 14:45:38 -0800 Subject: [PATCH 1/7] silence dask log messages - fix dask warning. - run fast anndata tests first. --- .../cellxgene_schema/atac_seq.py | 13 ++++++++++--- cellxgene_schema_cli/cellxgene_schema/cli.py | 2 +- cellxgene_schema_cli/tests/test_atac_seq.py | 16 +++++++++++++--- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/cellxgene_schema_cli/cellxgene_schema/atac_seq.py b/cellxgene_schema_cli/cellxgene_schema/atac_seq.py index d21f6987..fa852199 100644 --- a/cellxgene_schema_cli/cellxgene_schema/atac_seq.py +++ b/cellxgene_schema_cli/cellxgene_schema/atac_seq.py @@ -19,7 +19,7 @@ from .utils import is_ontological_descendant_of -logger = logging.getLogger(__name__) +logger = logging.getLogger("cellxgene-schema") # TODO: these chromosome tables should be calculated from the fasta file? # location of fasta https://www.gencodegenes.org/human/release_44.html and file name GRCh38.primary_assembly.genome.fa @@ -276,7 +276,13 @@ def validate_fragment_stop_coordinate_within_chromosome(parquet_file: str, annda return "Anndata.obs.organism_ontology_term_id must have a unique value." chromosome_length_table = organism_ontology_term_id_by_chromosome_length_table[organism_ontology_term_id[0]] df = ddf.read_parquet(parquet_file, columns=["chromosome", "stop coordinate"]) - df["chromosome_length"] = df["chromosome"].map(chromosome_length_table).astype(int) + # the chromosomes in the fragment must match the chromosomes for that organism + mismatched_chromosomes = set(df["chromosome"].unique().compute()) - chromosome_length_table.keys() + if mismatched_chromosomes: + return f"Chromosomes in the fragment do not match the organism({organism_ontology_term_id}).\n" + "\t\n".join( + mismatched_chromosomes + ) + df["chromosome_length"] = df["chromosome"].map(chromosome_length_table, meta=int).astype(int) df = df["stop coordinate"] <= df["chromosome_length"] if not df.all().compute(): return "Stop coordinate must be less than the chromosome length." @@ -307,7 +313,8 @@ def validate_anndata_organism_ontology_term_id(anndata_file: str) -> Optional[st def detect_chromosomes(parquet_file: str) -> list[str]: logger.info("detecting chromosomes") df = ddf.read_parquet(parquet_file, columns=["chromosome"]).drop_duplicates() - return df["chromosome"].values.compute() + df = df["chromosome"].compute() + return df.tolist() def get_output_file(fragment_file: str, output_file: Optional[str]) -> str: diff --git a/cellxgene_schema_cli/cellxgene_schema/cli.py b/cellxgene_schema_cli/cellxgene_schema/cli.py index 48f646d3..bf26849b 100644 --- a/cellxgene_schema_cli/cellxgene_schema/cli.py +++ b/cellxgene_schema_cli/cellxgene_schema/cli.py @@ -3,7 +3,7 @@ import click -logger = logging.getLogger("cellxgene_schema") +logger = logging.getLogger("cellxgene-schema") @click.group( diff --git a/cellxgene_schema_cli/tests/test_atac_seq.py b/cellxgene_schema_cli/tests/test_atac_seq.py index ca51b481..1163eb1c 100644 --- a/cellxgene_schema_cli/tests/test_atac_seq.py +++ b/cellxgene_schema_cli/tests/test_atac_seq.py @@ -227,7 +227,16 @@ def test_organism_ontology_id_not_unique(self, atac_fragment_dataframe, atac_ann to_parquet_file(atac_fragment_dataframe, tmpdir), tmpdir + "/small_atac_seq.h5ad" ) # Assert - assert result == "Anndata.obs.organism_ontology_term_id must have a unique value." + assert result.startswith("Anndata.obs.organism_ontology_term_id must have a unique value.") + + def test_mismatch_chromosome(self, atac_fragment_dataframe, atac_anndata_file, tmpdir): + # Arrange + atac_fragment_dataframe["chromosome"] = ["foo", "chr2", "chr1"] + fragment_file = to_parquet_file(atac_fragment_dataframe, tmpdir) + # Act + result = atac_seq.validate_fragment_stop_coordinate_within_chromosome(fragment_file, atac_anndata_file) + # Assert + assert result.startswith("Chromosomes in the fragment do not match the organism") class TestValidateFragmentReadSupport: @@ -235,9 +244,10 @@ def test_positive(self, atac_fragment_file): result = atac_seq.validate_fragment_read_support(atac_fragment_file) assert not result - def test_negative(self, atac_fragment_dataframe, tmpdir): + @pytest.mark.parametrize("read_support", [0, -1]) + def test_negative(self, atac_fragment_dataframe, tmpdir, read_support): # Arrange - atac_fragment_dataframe["read support"] = 0 + atac_fragment_dataframe["read support"] = read_support fragment_file = to_parquet_file(atac_fragment_dataframe, tmpdir) # Act result = atac_seq.validate_fragment_read_support(fragment_file) From 27e5593678ff063e21794405cf408a7a15d1a0ba Mon Sep 17 00:00:00 2001 From: Bento007 Date: Tue, 4 Mar 2025 14:45:53 -0800 Subject: [PATCH 2/7] suggested changes --- .../cellxgene_schema/atac_seq.py | 115 +++++++++++++----- 1 file changed, 83 insertions(+), 32 deletions(-) diff --git a/cellxgene_schema_cli/cellxgene_schema/atac_seq.py b/cellxgene_schema_cli/cellxgene_schema/atac_seq.py index fa852199..5704306a 100644 --- a/cellxgene_schema_cli/cellxgene_schema/atac_seq.py +++ b/cellxgene_schema_cli/cellxgene_schema/atac_seq.py @@ -143,7 +143,7 @@ def check_anndata_requires_fragment(anndata_file: str) -> bool: """ onto_parser = OntologyParser() - def is_atac(x) -> str: + def is_atac(x: str) -> str: if is_ontological_descendant_of(onto_parser, x, "EFO:0010891"): if is_ontological_descendant_of(onto_parser, x, "EFO:0008913"): return "p" # paired @@ -187,37 +187,36 @@ def process_fragment( """ with tempfile.TemporaryDirectory() as tempdir: - # unzip the fragment. Subprocess is used here because gzip on the cli uses less memory with comparable speed to - # the python gzip library. - unzipped_file = Path(tempdir) / Path(fragment_file).name.rsplit(".", 1)[0] - logger.info(f"Unzipping {fragment_file}") - with open(unzipped_file, "wb") as fp: - subprocess.run(["gunzip", "-c", fragment_file], stdout=fp, check=True) - - _dask_cluster_config = dict(silence_logs=logging.ERROR, dashboard_address=None) + + # configure the dask cluster + _dask_cluster_config = dict(dashboard_address=None) + logging.getLogger("distributed").setLevel(logging.ERROR) if dask_cluster_config: _dask_cluster_config.update(dask_cluster_config) + # start the dask cluster and client with dd.LocalCluster(**_dask_cluster_config) as cluster, dd.Client(cluster): - # convert the fragment to a parquet file - logger.info(f"Converting {fragment_file} to parquet") - parquet_file_path = Path(tempdir) / Path(fragment_file).name.replace(".gz", ".parquet") - ddf.read_csv(unzipped_file, sep="\t", names=column_ordering, dtype=column_types).to_parquet( - parquet_file_path, partition_on=["chromosome"], compute=True - ) - - # remove the unzipped file - logger.debug(f"Removing {unzipped_file}") - unzipped_file.unlink() - parquet_file = str(parquet_file_path) - errors = validate(parquet_file, anndata_file) - if any(errors): - logger.error("Errors found in Fragment and/or Anndata file") - logger.error(errors) + # quick checks + errors = validate_anndata(anndata_file) + if errors: + return errors + + # convert the fragment to a parquet file for faster processing + try: + parquet_file = convert_to_parquet(fragment_file, tempdir) + except Exception: + msg = "Error converting fragment to parquet." + logger.exception(msg) + return [msg] + + # slow checks + errors = validate_anndata_with_fragment(parquet_file, anndata_file) + if errors: return errors else: logger.info("Fragment and Anndata file are valid") + # generate the index if generate_index: logger.info(f"Sorting fragment and generating index for {fragment_file}") index_fragment(fragment_file, parquet_file, tempdir, override_write_algorithm, output_file) @@ -225,18 +224,60 @@ def process_fragment( return [] -def validate(parquet_file: str, anndata_file: str) -> list[Optional[str]]: - jobs = [ +def convert_to_parquet(fragment_file: str, tempdir: str) -> str: + # unzip the fragment. Subprocess is used here because gzip on the cli uses less memory with comparable + # speed to the python gzip library. + unzipped_file = Path(tempdir) / Path(fragment_file).name.rsplit(".", 1)[0] + logger.info(f"Unzipping {fragment_file}") + with open(unzipped_file, "wb") as fp: + subprocess.run(["gunzip", "-c", fragment_file], stdout=fp, check=True) + + # convert the fragment to a parquet file + logger.info(f"Converting {fragment_file} to parquet") + parquet_file_path = Path(tempdir) / Path(fragment_file).name.replace(".gz", ".parquet") + try: + ddf.read_csv(unzipped_file, sep="\t", names=column_ordering, dtype=column_types).to_parquet( + parquet_file_path, partition_on=["chromosome"], compute=True + ) + except pd.errors.ParserError as e: + return [f"Error parsing fragment file: {e}"] + + # remove the unzipped file + logger.debug(f"Removing {unzipped_file}") + unzipped_file.unlink() + parquet_file = str(parquet_file_path) + return parquet_file + + +def _report_errors(header: str, errors: list[str]) -> list[str]: + if any(errors): + errors = [e for e in errors if e is not None] + errors = [header] + errors + logger.error("\n\t".join(errors)) + return errors + else: + return [] + + +def validate_anndata(anndata_file: str) -> list[str]: + try: + check_anndata_requires_fragment(anndata_file) + except ValueError as e: + return _report_errors("Errors found in Anndata file", str(e)) + errors = [validate_anndata_organism_ontology_term_id(anndata_file), validate_anndata_is_primary_data(anndata_file)] + return _report_errors("Errors found in Anndata file", errors) + + +def validate_anndata_with_fragment(parquet_file: str, anndata_file: str) -> list[str]: + errors = [ validate_fragment_start_coordinate_greater_than_0(parquet_file), validate_fragment_barcode_in_adata_index(parquet_file, anndata_file), validate_fragment_stop_coordinate_within_chromosome(parquet_file, anndata_file), validate_fragment_stop_greater_than_start_coordinate(parquet_file), validate_fragment_read_support(parquet_file), - validate_anndata_organism_ontology_term_id(anndata_file), - validate_anndata_is_primary_data(anndata_file), validate_fragment_no_duplicate_rows(parquet_file), ] - return jobs + return _report_errors("Errors found in Fragment and/or Anndata file", errors) def validate_fragment_no_duplicate_rows(parquet_file: str) -> Optional[str]: @@ -273,8 +314,18 @@ def validate_fragment_stop_coordinate_within_chromosome(parquet_file: str, annda with h5py.File(anndata_file) as f: organism_ontology_term_id = ad.io.read_elem(f["obs"])["organism_ontology_term_id"].unique().astype(str) if organism_ontology_term_id.size > 1: - return "Anndata.obs.organism_ontology_term_id must have a unique value." - chromosome_length_table = organism_ontology_term_id_by_chromosome_length_table[organism_ontology_term_id[0]] + error_message = ( + "Anndata.obs.organism_ontology_term_id must have a unique value. Found the following " "values:\n" + ) + "\n\t".join(organism_ontology_term_id) + return error_message + organism_ontology_term_id = organism_ontology_term_id[0] + try: + chromosome_length_table = organism_ontology_term_id_by_chromosome_length_table[organism_ontology_term_id] + except KeyError: + return ( + f"Anndata.obs.organism_ontology_term_id must be one of NCBITaxon:9606 or NCBITaxon" + f":10090. Received {organism_ontology_term_id}." + ) df = ddf.read_parquet(parquet_file, columns=["chromosome", "stop coordinate"]) # the chromosomes in the fragment must match the chromosomes for that organism mismatched_chromosomes = set(df["chromosome"].unique().compute()) - chromosome_length_table.keys() @@ -290,7 +341,7 @@ def validate_fragment_stop_coordinate_within_chromosome(parquet_file: str, annda def validate_fragment_read_support(parquet_file: str) -> Optional[str]: # check that the read support is greater than 0 - df = ddf.read_parquet(parquet_file, columns=["read support"], filters=[("read support", "==", 0)]) + df = ddf.read_parquet(parquet_file, columns=["read support"], filters=[("read support", "<=", 0)]) if len(df.compute()) != 0: return "Read support must be greater than 0." From 0699cb277ae7f349ee44df0827e7811c5ccaee81 Mon Sep 17 00:00:00 2001 From: Bento007 Date: Tue, 4 Mar 2025 16:09:01 -0800 Subject: [PATCH 3/7] Throw value error when parsing csv fails --- .../cellxgene_schema/atac_seq.py | 10 +++---- cellxgene_schema_cli/tests/test_atac_seq.py | 30 +++++++++++++++++++ 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/cellxgene_schema_cli/cellxgene_schema/atac_seq.py b/cellxgene_schema_cli/cellxgene_schema/atac_seq.py index 5704306a..433588ce 100644 --- a/cellxgene_schema_cli/cellxgene_schema/atac_seq.py +++ b/cellxgene_schema_cli/cellxgene_schema/atac_seq.py @@ -239,12 +239,10 @@ def convert_to_parquet(fragment_file: str, tempdir: str) -> str: ddf.read_csv(unzipped_file, sep="\t", names=column_ordering, dtype=column_types).to_parquet( parquet_file_path, partition_on=["chromosome"], compute=True ) - except pd.errors.ParserError as e: - return [f"Error parsing fragment file: {e}"] - - # remove the unzipped file - logger.debug(f"Removing {unzipped_file}") - unzipped_file.unlink() + finally: + # remove the unzipped file + logger.debug(f"Removing {unzipped_file}") + unzipped_file.unlink() parquet_file = str(parquet_file_path) return parquet_file diff --git a/cellxgene_schema_cli/tests/test_atac_seq.py b/cellxgene_schema_cli/tests/test_atac_seq.py index 1163eb1c..0cbe9761 100644 --- a/cellxgene_schema_cli/tests/test_atac_seq.py +++ b/cellxgene_schema_cli/tests/test_atac_seq.py @@ -131,6 +131,36 @@ def _test_process_fragment( assert chromosome in atac_seq.human_chromosome_by_length +class TestConvertToParquet: + def test_positive(self, atac_fragment_dataframe, tmpdir): + tsv_file = str( + tmpdir + "/fragment.tsv.gzip", + ) + atac_fragment_dataframe.to_csv( + tsv_file, sep="\t", index=False, compression="gzip", header=False, columns=atac_seq.column_ordering + ) + parquet_file = Path(atac_seq.convert_to_parquet(tsv_file, tmpdir)) + assert Path(parquet_file).is_dir() + + def test_missing_column(self, tmpdir, atac_fragment_dataframe): + atac_fragment_dataframe = atac_fragment_dataframe.drop(columns=["read support"]) + tsv_file = str(tmpdir + "/fragment.tsv") + atac_fragment_dataframe.to_csv( + tsv_file, sep="\t", index=False, compression="gzip", header=False, columns=atac_seq.column_ordering[:-1] + ) + with pytest.raises(ValueError): + atac_seq.convert_to_parquet(tsv_file, tmpdir) + + def test_invalid_column_dtype(self, tmpdir, atac_fragment_dataframe): + atac_fragment_dataframe["start coordinate"] = "foo" + tsv_file = str(tmpdir + "/fragment.tsv") + atac_fragment_dataframe.to_csv( + tsv_file, sep="\t", index=False, compression="gzip", header=False, columns=atac_seq.column_ordering + ) + with pytest.raises(ValueError): + atac_seq.convert_to_parquet(tsv_file, tmpdir) + + class TestValidateFragmentBarcodeInAdataIndex: def test_postive(self, atac_fragment_file, atac_anndata_file): result = atac_seq.validate_fragment_barcode_in_adata_index(atac_fragment_file, atac_anndata_file) From 335338161d1c90d637fed79d33676e71555508d2 Mon Sep 17 00:00:00 2001 From: Bento007 Date: Wed, 5 Mar 2025 09:50:34 -0800 Subject: [PATCH 4/7] fix error reporting --- .../cellxgene_schema/atac_seq.py | 12 +++------ cellxgene_schema_cli/cellxgene_schema/cli.py | 25 +++++++++++++------ 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/cellxgene_schema_cli/cellxgene_schema/atac_seq.py b/cellxgene_schema_cli/cellxgene_schema/atac_seq.py index 433588ce..f57ade69 100644 --- a/cellxgene_schema_cli/cellxgene_schema/atac_seq.py +++ b/cellxgene_schema_cli/cellxgene_schema/atac_seq.py @@ -247,9 +247,9 @@ def convert_to_parquet(fragment_file: str, tempdir: str) -> str: return parquet_file -def _report_errors(header: str, errors: list[str]) -> list[str]: +def report_errors(header: str, errors: list[str]) -> list[str]: if any(errors): - errors = [e for e in errors if e is not None] + errors = [f"{i}: {e})" for i, e in enumerate(errors) if e is not None] errors = [header] + errors logger.error("\n\t".join(errors)) return errors @@ -258,12 +258,8 @@ def _report_errors(header: str, errors: list[str]) -> list[str]: def validate_anndata(anndata_file: str) -> list[str]: - try: - check_anndata_requires_fragment(anndata_file) - except ValueError as e: - return _report_errors("Errors found in Anndata file", str(e)) errors = [validate_anndata_organism_ontology_term_id(anndata_file), validate_anndata_is_primary_data(anndata_file)] - return _report_errors("Errors found in Anndata file", errors) + return report_errors("Errors found in Anndata file", errors) def validate_anndata_with_fragment(parquet_file: str, anndata_file: str) -> list[str]: @@ -275,7 +271,7 @@ def validate_anndata_with_fragment(parquet_file: str, anndata_file: str) -> list validate_fragment_read_support(parquet_file), validate_fragment_no_duplicate_rows(parquet_file), ] - return _report_errors("Errors found in Fragment and/or Anndata file", errors) + return report_errors("Errors found in Fragment and/or Anndata file", errors) def validate_fragment_no_duplicate_rows(parquet_file: str) -> Optional[str]: diff --git a/cellxgene_schema_cli/cellxgene_schema/cli.py b/cellxgene_schema_cli/cellxgene_schema/cli.py index bf26849b..050efd1b 100644 --- a/cellxgene_schema_cli/cellxgene_schema/cli.py +++ b/cellxgene_schema_cli/cellxgene_schema/cli.py @@ -77,6 +77,20 @@ def add_labels(input_file, output_file): sys.exit(1) +def _check_anndata_requires_fragment(h5ad_file): + from .atac_seq import check_anndata_requires_fragment, report_errors + + try: + fragment_required = check_anndata_requires_fragment(h5ad_file) + if fragment_required: + logger.info("Andata requires an ATAC fragment file.") + else: + logger.info("Andata does not require an ATAC fragment file.") + except Exception as e: + report_errors("Andata does not support ATAC fragment files for the follow reason", [str(e)]) + sys.exit(1) + + @schema_cli.command( name="process-fragment", short_help="Check that an ATAC SEQ fragment follows the cellxgene data integration schema.", @@ -92,6 +106,8 @@ def add_labels(input_file, output_file): def fragment_validate(h5ad_file, fragment_file, generate_index, output_file): from .atac_seq import process_fragment + _check_anndata_requires_fragment(h5ad_file) + if not process_fragment(fragment_file, h5ad_file, generate_index=generate_index, output_file=output_file): sys.exit(1) @@ -105,14 +121,7 @@ def fragment_validate(h5ad_file, fragment_file, generate_index, output_file): ) @click.argument("h5ad_file", nargs=1, type=click.Path(exists=True, dir_okay=False)) def check_anndata_requires_fragment(h5ad_file): - from .atac_seq import check_anndata_requires_fragment - - try: - assay_type = check_anndata_requires_fragment(h5ad_file) - print(assay_type) - except Exception as e: - logger.error(f"Andata does not support atac fragment files for the follow reason: {e}") - sys.exit(1) + _check_anndata_requires_fragment(h5ad_file) @schema_cli.command( From 094498a8ac8b1c520b286733c2ab815287d047ef Mon Sep 17 00:00:00 2001 From: Bento007 Date: Wed, 5 Mar 2025 11:02:26 -0800 Subject: [PATCH 5/7] move unique organism_ontology_term_id to validate_anndata_organism_ontology_term_id --- .../cellxgene_schema/atac_seq.py | 29 ++++++++----------- cellxgene_schema_cli/tests/test_atac_seq.py | 25 ++++++++-------- 2 files changed, 24 insertions(+), 30 deletions(-) diff --git a/cellxgene_schema_cli/cellxgene_schema/atac_seq.py b/cellxgene_schema_cli/cellxgene_schema/atac_seq.py index f57ade69..51ae548d 100644 --- a/cellxgene_schema_cli/cellxgene_schema/atac_seq.py +++ b/cellxgene_schema_cli/cellxgene_schema/atac_seq.py @@ -306,22 +306,11 @@ def validate_fragment_stop_greater_than_start_coordinate(parquet_file: str) -> O def validate_fragment_stop_coordinate_within_chromosome(parquet_file: str, anndata_file: str) -> Optional[str]: # check that the stop coordinate is within the length of the chromosome with h5py.File(anndata_file) as f: - organism_ontology_term_id = ad.io.read_elem(f["obs"])["organism_ontology_term_id"].unique().astype(str) - if organism_ontology_term_id.size > 1: - error_message = ( - "Anndata.obs.organism_ontology_term_id must have a unique value. Found the following " "values:\n" - ) + "\n\t".join(organism_ontology_term_id) - return error_message - organism_ontology_term_id = organism_ontology_term_id[0] - try: - chromosome_length_table = organism_ontology_term_id_by_chromosome_length_table[organism_ontology_term_id] - except KeyError: - return ( - f"Anndata.obs.organism_ontology_term_id must be one of NCBITaxon:9606 or NCBITaxon" - f":10090. Received {organism_ontology_term_id}." - ) + organism_ontology_term_id = ad.io.read_elem(f["obs"])["organism_ontology_term_id"].unique().astype(str)[0] df = ddf.read_parquet(parquet_file, columns=["chromosome", "stop coordinate"]) + # the chromosomes in the fragment must match the chromosomes for that organism + chromosome_length_table = organism_ontology_term_id_by_chromosome_length_table[organism_ontology_term_id] mismatched_chromosomes = set(df["chromosome"].unique().compute()) - chromosome_length_table.keys() if mismatched_chromosomes: return f"Chromosomes in the fragment do not match the organism({organism_ontology_term_id}).\n" + "\t\n".join( @@ -349,10 +338,16 @@ def validate_anndata_is_primary_data(anndata_file: str) -> Optional[str]: def validate_anndata_organism_ontology_term_id(anndata_file: str) -> Optional[str]: with h5py.File(anndata_file) as f: - unique_organism_ontology_term_ids = ad.io.read_elem(f["obs"])["organism_ontology_term_id"].unique() + organism_ontology_term_ids = ad.io.read_elem(f["obs"])["organism_ontology_term_id"].unique().astype(str) + if organism_ontology_term_ids.size > 1: + error_message = ( + "Anndata.obs.organism_ontology_term_id must have a unique value. Found the following values:\n" + ) + "\n\t".join(organism_ontology_term_ids) + return error_message + organism_ontology_term_id = organism_ontology_term_ids[0] allowed_terms = [*organism_ontology_term_id_by_chromosome_length_table.keys()] - if unique_organism_ontology_term_ids[0] not in allowed_terms: - return f"Anndata.obs.organism_ontology_term_id must be one of {allowed_terms}." + if organism_ontology_term_id not in allowed_terms: + return f"Anndata.obs.organism_ontology_term_id must be one of {allowed_terms}. Got {organism_ontology_term_id}." def detect_chromosomes(parquet_file: str) -> list[str]: diff --git a/cellxgene_schema_cli/tests/test_atac_seq.py b/cellxgene_schema_cli/tests/test_atac_seq.py index 0cbe9761..dda605d9 100644 --- a/cellxgene_schema_cli/tests/test_atac_seq.py +++ b/cellxgene_schema_cli/tests/test_atac_seq.py @@ -247,18 +247,6 @@ def test_stop_less_than_chromosome_length(self, atac_fragment_dataframe, atac_an # Assert assert result == "Stop coordinate must be less than the chromosome length." - def test_organism_ontology_id_not_unique(self, atac_fragment_dataframe, atac_anndata_file, tmpdir): - # Arrange - atac_anndata = ad.read_h5ad(atac_anndata_file) - atac_anndata.obs["organism_ontology_term_id"] = ["NCBITaxon:10090", "NCBITaxon:9606", "NCBITaxon:9606"] - atac_anndata.write(tmpdir + "/small_atac_seq.h5ad") - # Act - result = atac_seq.validate_fragment_stop_coordinate_within_chromosome( - to_parquet_file(atac_fragment_dataframe, tmpdir), tmpdir + "/small_atac_seq.h5ad" - ) - # Assert - assert result.startswith("Anndata.obs.organism_ontology_term_id must have a unique value.") - def test_mismatch_chromosome(self, atac_fragment_dataframe, atac_anndata_file, tmpdir): # Arrange atac_fragment_dataframe["chromosome"] = ["foo", "chr2", "chr1"] @@ -355,7 +343,18 @@ def test_organism_ontology_term_id_not_allowed(self, atac_anndata, tmpdir): # Act result = atac_seq.validate_anndata_organism_ontology_term_id(atac_anndata_file) # Assert - assert result == "Anndata.obs.organism_ontology_term_id must be one of ['NCBITaxon:9606', 'NCBITaxon:10090']." + assert result.startswith( + "Anndata.obs.organism_ontology_term_id must be one of ['NCBITaxon:9606', 'NCBITaxon:10090']." + ) + + def test_organism_ontology_id_not_unique(self, atac_anndata, tmpdir): + # Arrange + atac_anndata.obs["organism_ontology_term_id"] = ["NCBITaxon:10090", "NCBITaxon:9606", "NCBITaxon:9606"] + atac_anndata_file = to_anndata_file(atac_anndata, tmpdir) + # Act + result = atac_seq.validate_anndata_organism_ontology_term_id(atac_anndata_file) + # Assert + assert result.startswith("Anndata.obs.organism_ontology_term_id must have a unique value.") class TestValidateFragmentNoDuplicateRows: From 912e5c661ff10da39bb574cb1c6c5cbba6fb368a Mon Sep 17 00:00:00 2001 From: Bento007 Date: Wed, 5 Mar 2025 15:42:31 -0800 Subject: [PATCH 6/7] suggested comments --- cellxgene_schema_cli/cellxgene_schema/atac_seq.py | 13 ++++++------- cellxgene_schema_cli/cellxgene_schema/cli.py | 6 +++--- .../cellxgene_schema/ontology_parser.py | 3 +++ cellxgene_schema_cli/cellxgene_schema/validate.py | 4 +--- .../cellxgene_schema/write_labels.py | 2 +- cellxgene_schema_cli/tests/test_atac_seq.py | 2 +- 6 files changed, 15 insertions(+), 15 deletions(-) create mode 100644 cellxgene_schema_cli/cellxgene_schema/ontology_parser.py diff --git a/cellxgene_schema_cli/cellxgene_schema/atac_seq.py b/cellxgene_schema_cli/cellxgene_schema/atac_seq.py index 51ae548d..d4c22e02 100644 --- a/cellxgene_schema_cli/cellxgene_schema/atac_seq.py +++ b/cellxgene_schema_cli/cellxgene_schema/atac_seq.py @@ -13,10 +13,10 @@ import h5py import pandas as pd import pysam -from cellxgene_ontology_guide.ontology_parser import OntologyParser from dask import delayed from dask.delayed import Delayed +from .ontology_parser import ONTOLOGY_PARSER from .utils import is_ontological_descendant_of logger = logging.getLogger("cellxgene-schema") @@ -141,11 +141,10 @@ def check_anndata_requires_fragment(anndata_file: str) -> bool: :param anndata_file: The anndata file to validate. :return: """ - onto_parser = OntologyParser() def is_atac(x: str) -> str: - if is_ontological_descendant_of(onto_parser, x, "EFO:0010891"): - if is_ontological_descendant_of(onto_parser, x, "EFO:0008913"): + if is_ontological_descendant_of(ONTOLOGY_PARSER, x, "EFO:0010891"): + if is_ontological_descendant_of(ONTOLOGY_PARSER, x, "EFO:0008913"): return "p" # paired else: return "u" # unpaired @@ -249,7 +248,7 @@ def convert_to_parquet(fragment_file: str, tempdir: str) -> str: def report_errors(header: str, errors: list[str]) -> list[str]: if any(errors): - errors = [f"{i}: {e})" for i, e in enumerate(errors) if e is not None] + errors = [e for e in errors if e is not None] errors = [header] + errors logger.error("\n\t".join(errors)) return errors @@ -259,7 +258,7 @@ def report_errors(header: str, errors: list[str]) -> list[str]: def validate_anndata(anndata_file: str) -> list[str]: errors = [validate_anndata_organism_ontology_term_id(anndata_file), validate_anndata_is_primary_data(anndata_file)] - return report_errors("Errors found in Anndata file", errors) + return report_errors("Errors found in Anndata file. Skipping fragment validation.", errors) def validate_anndata_with_fragment(parquet_file: str, anndata_file: str) -> list[str]: @@ -341,7 +340,7 @@ def validate_anndata_organism_ontology_term_id(anndata_file: str) -> Optional[st organism_ontology_term_ids = ad.io.read_elem(f["obs"])["organism_ontology_term_id"].unique().astype(str) if organism_ontology_term_ids.size > 1: error_message = ( - "Anndata.obs.organism_ontology_term_id must have a unique value. Found the following values:\n" + "Anndata.obs.organism_ontology_term_id must have exactly 1 unique value. Found the following values:\n" ) + "\n\t".join(organism_ontology_term_ids) return error_message organism_ontology_term_id = organism_ontology_term_ids[0] diff --git a/cellxgene_schema_cli/cellxgene_schema/cli.py b/cellxgene_schema_cli/cellxgene_schema/cli.py index 050efd1b..e1054c01 100644 --- a/cellxgene_schema_cli/cellxgene_schema/cli.py +++ b/cellxgene_schema_cli/cellxgene_schema/cli.py @@ -83,11 +83,11 @@ def _check_anndata_requires_fragment(h5ad_file): try: fragment_required = check_anndata_requires_fragment(h5ad_file) if fragment_required: - logger.info("Andata requires an ATAC fragment file.") + logger.info("Anndata requires an ATAC fragment file.") else: - logger.info("Andata does not require an ATAC fragment file.") + logger.info("Anndata does not require an ATAC fragment file.") except Exception as e: - report_errors("Andata does not support ATAC fragment files for the follow reason", [str(e)]) + report_errors("Anndata does not support ATAC fragment files for the followings reasons:", [str(e)]) sys.exit(1) diff --git a/cellxgene_schema_cli/cellxgene_schema/ontology_parser.py b/cellxgene_schema_cli/cellxgene_schema/ontology_parser.py new file mode 100644 index 00000000..1e55f462 --- /dev/null +++ b/cellxgene_schema_cli/cellxgene_schema/ontology_parser.py @@ -0,0 +1,3 @@ +from cellxgene_ontology_guide.ontology_parser import OntologyParser + +ONTOLOGY_PARSER = OntologyParser(schema_version="v5.3.0") diff --git a/cellxgene_schema_cli/cellxgene_schema/validate.py b/cellxgene_schema_cli/cellxgene_schema/validate.py index d535811b..25fc59c7 100644 --- a/cellxgene_schema_cli/cellxgene_schema/validate.py +++ b/cellxgene_schema_cli/cellxgene_schema/validate.py @@ -12,12 +12,12 @@ import pandas as pd import scipy from anndata.compat import DaskArray -from cellxgene_ontology_guide.ontology_parser import OntologyParser from dask.array import map_blocks from scipy import sparse from . import gencode, schema from .gencode import get_gene_checker +from .ontology_parser import ONTOLOGY_PARSER from .utils import ( SPARSE_MATRIX_TYPES, SUPPORTED_SPARSE_MATRIX_TYPES, @@ -30,8 +30,6 @@ logger = logging.getLogger(__name__) -ONTOLOGY_PARSER = OntologyParser(schema_version="v5.3.0") - ASSAY_VISIUM = "EFO:0010961" # generic term ASSAY_VISIUM_11M = "EFO:0022860" # specific visium assay ASSAY_SLIDE_SEQV2 = "EFO:0030062" diff --git a/cellxgene_schema_cli/cellxgene_schema/write_labels.py b/cellxgene_schema_cli/cellxgene_schema/write_labels.py index 499247ff..56780100 100644 --- a/cellxgene_schema_cli/cellxgene_schema/write_labels.py +++ b/cellxgene_schema_cli/cellxgene_schema/write_labels.py @@ -8,8 +8,8 @@ from . import gencode, schema from .env import SCHEMA_REFERENCE_BASE_URL, SCHEMA_REFERENCE_FILE_NAME from .gencode import get_gene_checker +from .ontology_parser import ONTOLOGY_PARSER from .utils import get_hash_digest_column, getattr_anndata -from .validate import ONTOLOGY_PARSER logger = logging.getLogger(__name__) diff --git a/cellxgene_schema_cli/tests/test_atac_seq.py b/cellxgene_schema_cli/tests/test_atac_seq.py index dda605d9..47cb8401 100644 --- a/cellxgene_schema_cli/tests/test_atac_seq.py +++ b/cellxgene_schema_cli/tests/test_atac_seq.py @@ -354,7 +354,7 @@ def test_organism_ontology_id_not_unique(self, atac_anndata, tmpdir): # Act result = atac_seq.validate_anndata_organism_ontology_term_id(atac_anndata_file) # Assert - assert result.startswith("Anndata.obs.organism_ontology_term_id must have a unique value.") + assert result.startswith("Anndata.obs.organism_ontology_term_id must have exactly 1 unique value.") class TestValidateFragmentNoDuplicateRows: From ec36b275029d7ab7834ba2b0e28333ee421ded8f Mon Sep 17 00:00:00 2001 From: Bento007 Date: Thu, 6 Mar 2025 10:27:42 -0800 Subject: [PATCH 7/7] under logging change --- cellxgene_schema_cli/cellxgene_schema/atac_seq.py | 2 +- cellxgene_schema_cli/cellxgene_schema/cli.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cellxgene_schema_cli/cellxgene_schema/atac_seq.py b/cellxgene_schema_cli/cellxgene_schema/atac_seq.py index d4c22e02..2b9432df 100644 --- a/cellxgene_schema_cli/cellxgene_schema/atac_seq.py +++ b/cellxgene_schema_cli/cellxgene_schema/atac_seq.py @@ -19,7 +19,7 @@ from .ontology_parser import ONTOLOGY_PARSER from .utils import is_ontological_descendant_of -logger = logging.getLogger("cellxgene-schema") +logger = logging.getLogger(__name__) # TODO: these chromosome tables should be calculated from the fasta file? # location of fasta https://www.gencodegenes.org/human/release_44.html and file name GRCh38.primary_assembly.genome.fa diff --git a/cellxgene_schema_cli/cellxgene_schema/cli.py b/cellxgene_schema_cli/cellxgene_schema/cli.py index e1054c01..2f9839e4 100644 --- a/cellxgene_schema_cli/cellxgene_schema/cli.py +++ b/cellxgene_schema_cli/cellxgene_schema/cli.py @@ -3,7 +3,7 @@ import click -logger = logging.getLogger("cellxgene-schema") +logger = logging.getLogger("cellxgene_schema") @click.group(