From 69402530be81da5ceda58496cdfa0f7ba19ceb1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Arnoux?= Date: Fri, 18 Oct 2024 12:12:34 +0200 Subject: [PATCH] Update docstring and typing --- ppanggolin/annotate/annotate.py | 176 +++++++++++++++----------------- tests/annotate/test_annotate.py | 132 ++++++++++++------------ 2 files changed, 152 insertions(+), 156 deletions(-) diff --git a/ppanggolin/annotate/annotate.py b/ppanggolin/annotate/annotate.py index f3f79756..69420cff 100644 --- a/ppanggolin/annotate/annotate.py +++ b/ppanggolin/annotate/annotate.py @@ -4,13 +4,13 @@ import argparse import logging from concurrent.futures import ProcessPoolExecutor -from math import e + from multiprocessing import get_context import os from pathlib import Path import tempfile import time -from typing import List, Set, Tuple, Iterable, Dict, Generator +from typing import List, Set, Tuple, Iterable, Dict, Generator, Union import re from collections import defaultdict import warnings @@ -54,8 +54,8 @@ def check_annotate_args(args: argparse.Namespace): def create_gene(org: Organism, contig: Contig, gene_counter: int, rna_counter: int, gene_id: str, dbxrefs: Set[str], - coordinates: List[Tuple[int]], strand: str, gene_type: str, position: int = None, gene_name: str = "", - product: str = "", genetic_code: int = 11, protein_id: str = "") -> Gene: + coordinates: List[Tuple[int, int]], strand: str, gene_type: str, position: int = None, + gene_name: str = "", product: str = "", genetic_code: int = 11, protein_id: str = "") -> Gene: """ Create a Gene object and associate to contig and Organism @@ -64,7 +64,7 @@ def create_gene(org: Organism, contig: Contig, gene_counter: int, rna_counter: i :param gene_counter: Gene counter to name gene :param rna_counter: RNA counter to name RNA :param gene_id: local identifier - :param dbxref: cross-reference to external DB + :param dbxrefs: cross-reference to external DB :param coordinates: Gene start and stop positions :param strand: gene strand association :param gene_type: gene type @@ -112,7 +112,7 @@ def create_gene(org: Organism, contig: Contig, gene_counter: int, rna_counter: i return new_gene -def extract_positions(string: str) -> Tuple[List[Tuple[int, int]], bool, bool]: +def extract_positions(string: str) -> Tuple[List[Tuple[int, int]], bool, bool, bool]: """ Extracts start and stop positions from a string and determines whether it is complement and pseudogene. @@ -158,7 +158,6 @@ def extract_positions(string: str) -> Tuple[List[Tuple[int, int]], bool, bool]: else: positions = string.rstrip() - # Check if '>' or '<' exists in the positions to identify partial genes if '>' in positions or '<' in positions: @@ -167,12 +166,12 @@ def extract_positions(string: str) -> Tuple[List[Tuple[int, int]], bool, bool]: if ">" in positions.split(',')[-1]: has_partial_end = True - + inner_positions = ','.join(positions.split(',')[1:-1]) - if '>' in inner_positions or '<' in inner_positions or (not has_partial_end and not has_partial_start): + if '>' in inner_positions or '<' in inner_positions or (not has_partial_end and not has_partial_start): raise ValueError(f"Error parsing positions '{positions}' extracted from GBFF string '{string}'. " - f"Chevrons are found in the inner position. This case is unexpected and not handle.") + f"Chevrons are found in the inner position. This case is unexpected and not handle.") for position in positions.split(','): @@ -194,7 +193,9 @@ def extract_positions(string: str) -> Tuple[List[Tuple[int, int]], bool, bool]: return coordinates, complement, has_partial_start, has_partial_end -def parse_gbff_by_contig(gbff_file_path: Path) -> Generator[Tuple[List[str], List[str], List[str]], None, None]: +def parse_gbff_by_contig(gbff_file_path: Path + ) -> Generator[Tuple[Dict[str, str], Generator[Dict[str, Union[str, Set[str]]], None, None], str], + None, None]: """ Parse a GBFF file by contig and yield tuples containing header, feature, and sequence info for each contig. @@ -234,8 +235,8 @@ def parse_gbff_by_contig(gbff_file_path: Path) -> Generator[Tuple[List[str], Lis f"{len(header_lines)} feature lines, " f"and {len(sequence_lines)} sequence lines." ) - yield parse_contig_header_lines(header_lines), parse_feature_lines(feature_lines), parse_dna_seq_lines( - sequence_lines) + yield (parse_contig_header_lines(header_lines), parse_feature_lines(feature_lines), + parse_dna_seq_lines(sequence_lines)) header_lines = [] feature_lines = [] @@ -257,8 +258,8 @@ def parse_gbff_by_contig(gbff_file_path: Path) -> Generator[Tuple[List[str], Lis # In case the last // is missing, return the last contig if header_lines or feature_lines or sequence_lines: - return parse_contig_header_lines(header_lines), parse_feature_lines(feature_lines), parse_dna_seq_lines( - sequence_lines) + yield (parse_contig_header_lines(header_lines), parse_feature_lines(feature_lines), + parse_dna_seq_lines(sequence_lines)) def parse_contig_header_lines(header_lines: List[str]) -> Dict[str, str]: @@ -282,7 +283,8 @@ def parse_contig_header_lines(header_lines: List[str]) -> Dict[str, str]: return {field: '\n'.join(value) for field, value in field_to_value.items()} -def parse_feature_lines(feature_lines: List[str]) -> Generator[Dict[str, str], None, None]: + +def parse_feature_lines(feature_lines: List[str]) -> Generator[Dict[str, Union[str, Set[str]]], None, None]: """ Parse feature lines from a GBFF file and yield dictionaries representing each feature. @@ -290,19 +292,19 @@ def parse_feature_lines(feature_lines: List[str]) -> Generator[Dict[str, str], N :return: A generator that yields dictionaries, each representing a feature with its type, location, and qualifiers. """ - def stringify_feature_values(feature:Dict[str,List[str]]) -> Dict[str, str]: + def stringify_feature_values(feature: Dict[str, List[str]]) -> Dict[str, Union[str, Set[str]]]: """ All value of the returned dict are str except for db_xref that is a list. When multiple values exist for the same tag only the first one is kept. """ stringify_feature = {} - for tag, value in feature.items(): + for tag, val in feature.items(): if tag == "db_xref": - stringify_feature[tag] = value - elif isinstance(value, list): - stringify_feature[tag] = value[0] + stringify_feature[tag] = set(val) + elif isinstance(val, list): + stringify_feature[tag] = val[0] else: - stringify_feature[tag] = value + stringify_feature[tag] = val return defaultdict(str, stringify_feature) current_feature = {} @@ -316,13 +318,13 @@ def stringify_feature_values(feature:Dict[str,List[str]]) -> Dict[str, str]: yield stringify_feature_values(current_feature) current_feature = { - "feature_type" : line[:21].strip(), - "location" : [line[21:].strip()], + "feature_type": line[:21].strip(), + "location": [line[21:].strip()], } current_qualifier = "location" elif line.strip().startswith('/'): - qualifier_line = line.strip()[1:] # [1:] used to remove / + qualifier_line = line.strip()[1:] # [1:] used to remove / if "=" in qualifier_line: current_qualifier, value = qualifier_line.split('=', 1) @@ -339,7 +341,7 @@ def stringify_feature_values(feature:Dict[str,List[str]]) -> Dict[str, str]: current_feature[current_qualifier] = [value] else: - # the line does not start a qualifier so its the continuation of the last qualifier value. + # the line does not start a qualifier so it's the continuation of the last qualifier value. value = line.strip() value = value[:-1] if value.endswith('"') else value current_feature[current_qualifier][-1] += f" {value}" @@ -349,7 +351,7 @@ def stringify_feature_values(feature:Dict[str,List[str]]) -> Dict[str, str]: yield stringify_feature_values(current_feature) -def parse_dna_seq_lines(sequence_lines: List[str]) -> Generator[Dict[str, str], None, None]: +def parse_dna_seq_lines(sequence_lines: List[str]) -> str: """ Parse sequence_lines from a GBFF file and return dna sequence @@ -362,8 +364,8 @@ def parse_dna_seq_lines(sequence_lines: List[str]) -> Generator[Dict[str, str], return sequence -def combine_contigs_metadata(contig_to_metadata: Dict[str, Dict[str, str]]) -> Tuple[ - Dict[str, str], Dict[str, Dict[str, str]]]: +def combine_contigs_metadata(contig_to_metadata: Dict[Contig, Dict[str, str]] + ) -> Tuple[Dict[str, str], Dict[Contig, Dict[str, str]]]: """ Combine contig metadata to identify shared and unique metadata tags and values. @@ -469,7 +471,8 @@ def shift_start_coordinates(coordinates: List[Tuple[int, int]], shift: int) -> L raise ValueError(f'Shifting the start resulted in a gene with null or negative size. ' f'Coordinates: {coordinates}, Shift: {shift}') else: - logging.getLogger("PPanGGOLiN").warning(f'Coordinate part {new_coordinates[0]} resulted in a non-positive length after shift. Removing it.') + logging.getLogger("PPanGGOLiN").warning( + f'Coordinate part {new_coordinates[0]} resulted in a non-positive length after shift. Removing it.') new_coordinates = new_coordinates[1:] # If length is negative, propagate the shift to the next coordinate @@ -480,24 +483,22 @@ def shift_start_coordinates(coordinates: List[Tuple[int, int]], shift: int) -> L return new_coordinates - - def fix_partial_gene_coordinates( - coordinates: List[Tuple[int, int]], - is_complement: bool, - start_shift: int, - ensure_codon_multiple:bool = True + coordinates: List[Tuple[int, int]], + is_complement: bool, + start_shift: int, + ensure_codon_multiple: bool = True ) -> List[Tuple[int, int]]: """ Adjusts gene coordinates if they have partial starts or ends, ensuring the gene length is a multiple of 3. If the gene is on the complement strand, the adjustments will be reversed (i.e., applied to the opposite ends). - - :param has_partial_start: Flag indicating if the gene has a partial start. - :param has_partial_end: Flag indicating if the gene has a partial end. + :param coordinates: List of coordinate tuples (start, stop) for the gene. :param is_complement: Flag indicating if the gene is on the complement strand. :param start_shift: The value by which the start coordinate should be shifted. + :param ensure_codon_multiple: Flag to check that gene length is a multiple of 3. + :return: A new list of adjusted coordinate tuples. """ @@ -532,7 +533,8 @@ def fix_partial_gene_coordinates( gene_length = sum([(stop - start + 1) for start, stop in coordinates]) if gene_length % 3 != 0: - logging.getLogger("PPanGGOLiN").warning(f'Gene with coordinates: {coordinates} has a length that is not a multiple of 3 after adjusting for partiality with new cordinates ({coordinates}): {gene_length}') + logging.getLogger("PPanGGOLiN").warning( + f'Gene with coordinates: {coordinates} has a length that is not a multiple of 3 after adjusting for partiality with new cordinates ({coordinates}): {gene_length}') return coordinates @@ -545,7 +547,7 @@ def read_org_gbff(organism_name: str, gbff_file_path: Path, circular_contigs: Li :param organism_name: Organism name :param gbff_file_path: Path to corresponding GBFF file :param circular_contigs: list of contigs - :param pseudo: Allow to read pseudogenes + :param use_pseudogenes: Allow to read pseudogenes :param translation_table: Translation table (genetic code) to use when /transl_table is missing from CDS tags. @@ -562,10 +564,6 @@ def read_org_gbff(organism_name: str, gbff_file_path: Path, circular_contigs: Li contig_to_metadata = {} for header, features, sequence in parse_gbff_by_contig(gbff_file_path): - - contig_id = None - contig_len = None - if "LOCUS" not in header: raise ValueError('Missing LOCUS line in GBFF header.') @@ -620,26 +618,26 @@ def read_org_gbff(organism_name: str, gbff_file_path: Path, circular_contigs: Li f"db_xref values does not have the expected format. Expect 'db_xref=:' " f"but got {feature['db_xref']} in file {gbff_file_path}. " "db_xref tags is therefore not retrieved in contig/genomes metadata.") - - contig_to_metadata[contig].update(db_xref_for_metadata) + else: + contig_to_metadata[contig].update(db_xref_for_metadata) genetic_code = '' if feature['feature_type'] not in ['CDS', 'rRNA', 'tRNA']: continue - coordinates, is_complement, has_partial_start, has_partial_end = extract_positions(''.join(feature['location'])) + coordinates, is_complement, has_partial_start, has_partial_end = extract_positions( + ''.join(feature['location'])) - if "pseudo" in feature and not use_pseudogenes: continue - + elif "transl_except" in feature and not use_pseudogenes: - # that's probably a 'stop' codon into selenocystein. + # that's probably a 'stop' codon into selenocystein. logging.getLogger("PPanGGOLiN").info( f"CDS '{feature['locus_tag']}' contains a 'transl_except' annotation ({feature['transl_except']}) " f"in contig '{contig}' in file '{gbff_file_path}'. " f"PPanGGOLiN does not handle 'transl_except' annotations. This gene's protein sequence " "will likely contain an internal stop codon when translated with PPanGGOLiN." ) - + if feature['feature_type'] == 'CDS': if feature['transl_table'] == "": used_transl_table_arg += 1 @@ -647,12 +645,12 @@ def read_org_gbff(organism_name: str, gbff_file_path: Path, circular_contigs: Li else: genetic_code = int(feature["transl_table"]) - if has_partial_start or has_partial_end: + start_shift = 0 if 'codon_start' not in feature else int( + feature['codon_start']) - 1 # -1 is to be in zero based index. - start_shift = 0 if 'codon_start' not in feature else int(feature['codon_start']) -1 # -1 is to be in zero based index. - - coordinates = fix_partial_gene_coordinates(coordinates, is_complement=is_complement, start_shift=start_shift) + coordinates = fix_partial_gene_coordinates(coordinates, is_complement=is_complement, + start_shift=start_shift) strand = "-" if is_complement else "+" @@ -773,7 +771,6 @@ def get_id_attribute(attributes_dict: dict) -> str: f"Not the case for file: {gff_file_path}") return element_id - def check_chevrons_in_start_and_stop(start: str, stop: str) -> Tuple[int, int, bool]: """ Checks for the presence of chevrons ('<' or '>') in the start and stop strings, removes them if present, @@ -795,7 +792,6 @@ def check_chevrons_in_start_and_stop(start: str, stop: str) -> Tuple[int, int, b return start, stop, chevrons_present - contig = None # initialize contig has_fasta = False fasta_string = "" @@ -811,7 +807,6 @@ def check_chevrons_in_start_and_stop(start: str, stop: str) -> Tuple[int, int, b for line in gff_file: if has_fasta: fasta_string += line - continue elif line.startswith('##', 0, 2): if line.startswith('FASTA', 2, 7): @@ -843,7 +838,8 @@ def check_chevrons_in_start_and_stop(start: str, stop: str) -> Tuple[int, int, b pseudogene = False - start, stop, has_chevron = check_chevrons_in_start_and_stop(start=fields_gff[gff_start], stop=fields_gff[gff_end]) + gene_start, gene_stop, has_chevron = check_chevrons_in_start_and_stop(start=fields_gff[gff_start], + stop=fields_gff[gff_end]) if fields_gff[gff_type] == 'region': # keep region attributes to add them as metadata of genome and contigs @@ -862,7 +858,7 @@ def check_chevrons_in_start_and_stop(start: str, stop: str) -> Tuple[int, int, b if fields_gff[gff_seqname] in circular_contigs or ('IS_CIRCULAR' in attributes and attributes['IS_CIRCULAR'] == "true"): # WARNING: In case we have prodigal gff with is_circular attributes. - # This would fail as contig is not defined. However is_circular should not be found in prodigal gff + # This would fail as contig is not defined. However, is_circular should not be found in prodigal gff logging.getLogger("PPanGGOLiN").debug(f"Contig {contig.name} is circular.") contig.is_circular = True assert contig.name == fields_gff[gff_seqname] @@ -872,7 +868,7 @@ def check_chevrons_in_start_and_stop(start: str, stop: str) -> Tuple[int, int, b id_attribute = get_id_attribute(attributes) locus_tag = attributes.get("LOCUS_TAG") protein_id = attributes.get("PROTEIN_ID") - + if locus_tag is not None: gene_id = locus_tag @@ -922,15 +918,15 @@ def check_chevrons_in_start_and_stop(start: str, stop: str) -> Tuple[int, int, b if gene_id in id_attr_to_gene_id: # the ID has already been seen at least once in this genome existing_gene = id_attr_to_gene_id[gene_id] - new_gene_info = {"strand":fields_gff[gff_strand], - "type":fields_gff[gff_type], - "name":name, - "position":contig.number_of_genes, - "product":product, - "local_identifier":gene_id, - "start": start, - "stop": stop, - "frame":gene_frame} + new_gene_info = {"strand": fields_gff[gff_strand], + "type": fields_gff[gff_type], + "name": name, + "position": contig.number_of_genes, + "product": product, + "local_identifier": gene_id, + "start": gene_start, + "stop": gene_stop, + "frame": gene_frame} check_and_add_extra_gene_part(existing_gene, new_gene_info) @@ -941,12 +937,11 @@ def check_chevrons_in_start_and_stop(start: str, stop: str) -> Tuple[int, int, b id_attr_to_gene_id[gene_id] = gene # here contig is filled in order, so position is the number of genes already stored in the contig. - gene.fill_annotations(start=start, stop=stop, - strand=fields_gff[gff_strand], gene_type=fields_gff[gff_type], name=name, - position=contig.number_of_genes, product=product, - local_identifier=gene_id, + gene.fill_annotations(start=gene_start, stop=gene_stop, strand=fields_gff[gff_strand], + gene_type=fields_gff[gff_type], name=name, product=product, + position=contig.number_of_genes, local_identifier=gene_id, genetic_code=genetic_code, is_partial=is_partial, frame=gene_frame) - + gene.fill_parents(org, contig) gene_counter += 1 contig.add(gene) @@ -956,9 +951,9 @@ def check_chevrons_in_start_and_stop(start: str, stop: str) -> Tuple[int, int, b rna_type = fields_gff[gff_type] rna = RNA(org.name + f"_{rna_type}_" + str(rna_counter).zfill(4)) - rna.fill_annotations(start=start, stop=stop, - strand=fields_gff[gff_strand], gene_type=fields_gff[gff_type], name=name, - product=product, local_identifier=gene_id) + rna.fill_annotations(start=gene_start, stop=gene_stop, strand=fields_gff[gff_strand], + gene_type=fields_gff[gff_type], name=name, product=product, + local_identifier=gene_id) rna.fill_parents(org, contig) rna_counter += 1 contig.add_rna(rna) @@ -971,7 +966,8 @@ def check_chevrons_in_start_and_stop(start: str, stop: str) -> Tuple[int, int, b for gene in contig.genes: if gene.is_partial: is_complement = gene.strand == '-' - gene.coordinates = fix_partial_gene_coordinates(gene.coordinates, is_complement=is_complement, start_shift=gene.frame ) + gene.coordinates = fix_partial_gene_coordinates(gene.coordinates, is_complement=is_complement, + start_shift=gene.frame) # GET THE FASTA SEQUENCES OF THE GENES if fasta_string == "": @@ -1000,7 +996,8 @@ def check_chevrons_in_start_and_stop(start: str, stop: str) -> Tuple[int, int, b return org, has_fasta -def add_metadata_from_gff_file(contig_name_to_region_info: Dict[str, str], org: Organism, gff_file_path: Path): +def add_metadata_from_gff_file(contig_name_to_region_info: Dict[str, Dict[str, str]], + org: Organism, gff_file_path: Path): """ Add metadata to the organism object from a GFF file. @@ -1011,7 +1008,8 @@ def add_metadata_from_gff_file(contig_name_to_region_info: Dict[str, str], org: # Check if the number of contigs matches the expected number in the organism if len(contig_name_to_region_info) == org.number_of_contigs: - genome_metadata, contig_to_uniq_metadata = combine_contigs_metadata(contig_name_to_region_info) + contig_to_region_info = {org.get(name): region_info for name, region_info in contig_name_to_region_info.items()} + genome_metadata, contig_to_uniq_metadata = combine_contigs_metadata(contig_to_region_info) if genome_metadata: org.add_metadata(Metadata(source='annotation_file', **genome_metadata)) else: @@ -1019,13 +1017,9 @@ def add_metadata_from_gff_file(contig_name_to_region_info: Dict[str, str], org: f"Inconsistent data in GFF file {gff_file_path}: " f"expected {org.number_of_contigs} contigs but found {len(contig_name_to_region_info)} regions." ) + contig_to_uniq_metadata = {} - for contig_name, metadata_dict in contig_to_uniq_metadata.items(): - try: - contig = org.get(contig_name) - except KeyError: - raise ValueError(f"Contig '{contig_name}' does not exist in the genome object created from GFF file {gff_file_path}.") - + for contig, metadata_dict in contig_to_uniq_metadata.items(): contig.add_metadata(Metadata(source='annotation_file', **metadata_dict)) @@ -1056,7 +1050,8 @@ def check_and_add_extra_gene_part(gene: Gene, new_gene_info: Dict, max_separatio # The new gene info seems concordant with the gene object. We can try to merge them assert new_gene_info['start'] <= new_gene_info['stop'], "Start is greater than stop. Incorrect coordinates." - new_gene_is_before = (gene.strand == "+" and new_gene_info['start'] < gene.start) or (gene.strand == "-" and new_gene_info['start'] > gene.start) + new_gene_is_before = (gene.strand == "+" and new_gene_info['start'] < gene.start) or ( + gene.strand == "-" and new_gene_info['start'] > gene.start) if new_gene_is_before: # new gene start if before the current gene # so its frame if used @@ -1069,7 +1064,7 @@ def check_and_add_extra_gene_part(gene: Gene, new_gene_info: Dict, max_separatio first_stop = gene.coordinates[0][1] for start, _ in gene.coordinates[1:]: if abs(start - first_stop) > max_separation: - # This is maybe to restrictive but lets go with that first. + # This is maybe to restrictive but let's go with that first. raise ValueError( f"The coordinates of genes are too far apart ({abs(start - first_stop)}nt). This is unexpected. " f"Gene coordinates : {gene.coordinates}") @@ -1091,7 +1086,6 @@ def check_and_add_extra_gene_part(gene: Gene, new_gene_info: Dict, max_separatio "same local identifier": f"{gene.local_identifier} == {new_gene_info['local_identifier']}: {gene.local_identifier == new_gene_info['local_identifier']}", } - raise ValueError( f"Two genes have the same id attributes but different info in some key attribute. {detailed_comparison}") diff --git a/tests/annotate/test_annotate.py b/tests/annotate/test_annotate.py index 08682d91..e52868c2 100644 --- a/tests/annotate/test_annotate.py +++ b/tests/annotate/test_annotate.py @@ -1,28 +1,35 @@ import pytest from pathlib import Path + +from ppanggolin.genome import Contig from ppanggolin.annotate.annotate import extract_positions, read_anno_file, parse_contig_header_lines, \ - parse_gbff_by_contig, parse_feature_lines, parse_dna_seq_lines, read_org_gbff, combine_contigs_metadata, fix_partial_gene_coordinates, shift_start_coordinates, shift_end_coordinates - - -@pytest.mark.parametrize("input_string, expected_positions, expected_complement, expected_partialgene_start, expected_partialgene_end", [ - ("join(190..7695,7695..12071)", [(190, 7695), (7695, 12071)], False, False, False), - ("order(190..7695,7995..12071)", [(190, 7695), (7995, 12071)], False, False, False), - ("complement(join(4359800..4360707,4360707..4360962,1..100))", [(4359800, 4360707), (4360707, 4360962), (1, 100)], - True, False, False), - ("complement(order(4359800..4360707,4360707..4360962,1..100))", [(4359800, 4360707), (4360707, 4360962), (1, 100)], - True, False, False), - ("join(6835405..6835731,1..1218)", [(6835405, 6835731), (1, 1218)], False, False, False), - ("join(1375484..1375555,1375557..1376579)", [(1375484, 1375555), (1375557, 1376579)], False, False, False), - ("complement(6815492..6816265)", [(6815492, 6816265)], True, False, False), - ("6811501..6812109", [(6811501, 6812109)], False, False, False), - ("complement(6792573..>6795461)", [(6792573, 6795461)], True, False, True), - ("complement(<6792573..6795461)", [(6792573, 6795461)], True, True, False), - ("complement(<6792573..>6795461)", [(6792573, 6795461)], True, True, True), - ("join(1038313,1..1016)", [(1038313, 1038313), (1, 1016)], False, False, False), - ("1038313", [(1038313, 1038313)], False, False, False) -]) -def test_extract_positions(input_string, expected_positions, expected_complement, expected_partialgene_start, expected_partialgene_end): - positions, is_complement, has_partial_start, has_partial_end = extract_positions(input_string) + parse_gbff_by_contig, parse_feature_lines, parse_dna_seq_lines, read_org_gbff, combine_contigs_metadata, \ + fix_partial_gene_coordinates, shift_start_coordinates, shift_end_coordinates + + +@pytest.mark.parametrize( + "input_string, expected_positions, expected_complement, expected_partialgene_start, expected_partialgene_end", [ + ("join(190..7695,7695..12071)", [(190, 7695), (7695, 12071)], False, False, False), + ("order(190..7695,7995..12071)", [(190, 7695), (7995, 12071)], False, False, False), + ("complement(join(4359800..4360707,4360707..4360962,1..100))", + [(4359800, 4360707), (4360707, 4360962), (1, 100)], + True, False, False), + ("complement(order(4359800..4360707,4360707..4360962,1..100))", + [(4359800, 4360707), (4360707, 4360962), (1, 100)], + True, False, False), + ("join(6835405..6835731,1..1218)", [(6835405, 6835731), (1, 1218)], False, False, False), + ("join(1375484..1375555,1375557..1376579)", [(1375484, 1375555), (1375557, 1376579)], False, False, False), + ("complement(6815492..6816265)", [(6815492, 6816265)], True, False, False), + ("6811501..6812109", [(6811501, 6812109)], False, False, False), + ("complement(6792573..>6795461)", [(6792573, 6795461)], True, False, True), + ("complement(<6792573..6795461)", [(6792573, 6795461)], True, True, False), + ("complement(<6792573..>6795461)", [(6792573, 6795461)], True, True, True), + ("join(1038313,1..1016)", [(1038313, 1038313), (1, 1016)], False, False, False), + ("1038313", [(1038313, 1038313)], False, False, False) + ]) +def test_extract_positions(input_string, expected_positions, expected_complement, expected_partialgene_start, + expected_partialgene_end): + positions, is_complement, has_partial_start, has_partial_end = extract_positions(input_string) assert positions == expected_positions assert is_complement == expected_complement assert has_partial_start == expected_partialgene_start @@ -43,6 +50,7 @@ def test_extract_positions_with_strange_chevrons(): with pytest.raises(ValueError): extract_positions("complement(join(4359800..4360707,1..<100))") # start chevron in ending position + def test_extract_positions_with_wrong_positions_format2(): with pytest.raises(ValueError): extract_positions("start..stop") # start and stop are not integer @@ -238,8 +246,6 @@ def test_parse_gbff_by_contig(sample_gbff_path): # Add more test cases as needed ]) def test_parse_feature_lines(input_lines, expected_output): - print(expected_output) - a = list(parse_feature_lines(input_lines)) assert list(parse_feature_lines(input_lines)) == expected_output @@ -252,23 +258,24 @@ def test_parse_dna_seq_lines(): def test_combine_contigs_metadata(): + contig1, contig2, contig3 = Contig(1, "contig1"), Contig(2, "contig2"), Contig(3, "contig3") contig_to_metadata = { - "contig1": {"sp": "spA", "strain": "123", "contig_feat": "ABC"}, - "contig2": {"sp": "spA", "strain": "123", "contig_feat": "XYZ"}, - "contig3": {"sp": "spA", "strain": "123"} + contig1: {"sp": "spA", "strain": "123", "contig_feat": "ABC"}, + contig2: {"sp": "spA", "strain": "123", "contig_feat": "XYZ"}, + contig3: {"sp": "spA", "strain": "123"} } genome_metadata, contig_metadata = combine_contigs_metadata(contig_to_metadata) assert genome_metadata == {"sp": "spA", "strain": "123"} - assert contig_metadata == {"contig1": {"contig_feat": "ABC"}, "contig2": {"contig_feat": "XYZ"}, } + assert contig_metadata == {contig1: {"contig_feat": "ABC"}, contig2: {"contig_feat": "XYZ"}, } @pytest.mark.parametrize("coordinates, is_complement, start_shift, expected", [ # Case 1: No partial start or end, expect no change in non-complement strand # Coordinates are already correct, no need to modify anything. ([(11, 40)], False, 0, [(11, 40)]), - + # Case 2: Partial start, no partial end (Non-complement) # A shift of 1 is added to the start coordinate. ([(10, 40)], False, 1, [(11, 40)]), # start_shift of 1 added to start @@ -276,7 +283,7 @@ def test_combine_contigs_metadata(): # Case 2: Partial start, no partial end (Non-complement) # A shift of 2 is added to the start coordinate. ([(2, 33)], False, 2, [(4, 33)]), # start_shift of 2 added to start - + # Case 3: No partial start, partial end (Non-complement) # Adjust last coordinate to make gene length a multiple of 3. ([(11, 41)], False, 0, [(11, 40)]), # last end adjusted to be a multiple of 3 @@ -284,23 +291,23 @@ def test_combine_contigs_metadata(): # Case 3: No partial start, partial end (Non-complement) # Gene length is already a multiple of 3, so no changes needed. ([(11, 40)], False, 0, [(11, 40)]), # gene length already a multiple of 3 so no change is made - + # Case 4: Partial start and end (Non-complement) # Both start and end need adjustment: add shift to start and adjust end to make gene length a multiple of 3. ([(10, 41)], False, 1, [(11, 40)]), # start_shift added to start, and length adjusted - + # Case 5: Partial start and no partial end on complement strand # Adjust start since we are on the complement strand. - ([(9, 40)], True, 0, [(11, 40)]), # length adjusted - + ([(9, 40)], True, 0, [(11, 40)]), # length adjusted + # Case 5: No partial start but partial end on complement strand # Shift removed from the last end on the complement strand. - ( [(9, 40)], True, 2, [(9, 38)]), # start_shift removed + ([(9, 40)], True, 2, [(9, 38)]), # start_shift removed # Case 5: Partial start and end on complement strand # Adjust both start and end since we are on the complement strand, ensuring gene length is a multiple of 3. - ( [(8, 40)], True, 2, [(9, 38)]), # start_shift removed and length adjusted - + ([(8, 40)], True, 2, [(9, 38)]), # start_shift removed and length adjusted + # Case 5: Joined coordinates without partial start or end # Nothing to adjust as the gene is properly framed. ([(1, 9), (7, 12)], False, 0, [(1, 9), (7, 12)]), # nothing to do @@ -308,7 +315,7 @@ def test_combine_contigs_metadata(): # Case 5: Joined coordinates with partial start # Adjust the first start coordinate by the shift. ([(3, 9), (7, 12)], False, 1, [(4, 9), (7, 12)]), # adjust start - + # Case 5: Joined coordinates with partial end # Adjust the last end to ensure the gene length is a multiple of 3. ([(3, 9), (7, 12)], False, 0, [(3, 9), (7, 11)]), # adjust end @@ -322,48 +329,50 @@ def test_combine_contigs_metadata(): ([(4, 9), (7, 12)], True, 2, [(5, 9), (7, 10)]), # adjust start and end in complement mode # Real tricky case from GCF_000623275.1 - ([(4681814, 4682911), (1, 1)], False, 0, [(4681814, 4682911)]), # ajust the end by removing one nt. In this case that remove the second part of the coordinates + ([(4681814, 4682911), (1, 1)], False, 0, [(4681814, 4682911)]), + # ajust the end by removing one nt. In this case that remove the second part of the coordinates # Tricky case inspired by last case - ([(30, 60), (1, 1)], False, 0, [(30, 59)]), # ajust the end by removing two nt. In this case that remove the second part of the coordinates and one nt in the first part - + ([(30, 60), (1, 1)], False, 0, [(30, 59)]), + # ajust the end by removing two nt. In this case that remove the second part of the coordinates and one nt in the first part # Tricky case inspired by last case - ([(60, 60), (1, 9)], False, 1, [(1, 9)]), # ajust the end by removing one nt. In this case that remove the second part of the coordinates + ([(60, 60), (1, 9)], False, 1, [(1, 9)]), + # ajust the end by removing one nt. In this case that remove the second part of the coordinates # Tricky case inspired by last case - ([(60, 60), (1, 10)], False, 2, [(2, 10)]), # ajust the end by removing one nt. In this case that remove the second part of the coordinates + ([(60, 60), (1, 10)], False, 2, [(2, 10)]), + # ajust the end by removing one nt. In this case that remove the second part of the coordinates # Very tricky case inspired by last case - ([(59, 60), (60, 60), (1, 9)], False, 3, [(1, 9)]), # ajust the end by removing one nt. In this case that remove the second part of the coordinates - + ([(59, 60), (60, 60), (1, 9)], False, 3, [(1, 9)]), + # ajust the end by removing one nt. In this case that remove the second part of the coordinates + # Very tricky case inspired by last case ([(60, 61), (1, 8)], True, 3, [(61, 61), (1, 5)]), # - - ]) - def test_fix_partial_gene_coordinates(coordinates, is_complement, start_shift, expected): result = fix_partial_gene_coordinates(coordinates, is_complement, start_shift) assert result == expected - def test_fix_partial_gene_coordinates_with_wrong_coordinates(): - with pytest.raises(ValueError): - fix_partial_gene_coordinates(coordinates=[(1, 1)], is_complement=False, start_shift=0) # gene is too small, the length adjustement at the end lead to no gene + fix_partial_gene_coordinates(coordinates=[(1, 1)], is_complement=False, + start_shift=0) # gene is too small, the length adjustement at the end lead to no gene with pytest.raises(ValueError): - fix_partial_gene_coordinates([(1, 1)], False, 1) # gene is too small, the length adjustement at the start lead to no gene - + fix_partial_gene_coordinates([(1, 1)], False, + 1) # gene is too small, the length adjustement at the start lead to no gene + with pytest.raises(ValueError): - fix_partial_gene_coordinates([(60,60), (1, 1)], False, 1) # gene is too small, the length adjustement at the start and at the end lead to no gene + fix_partial_gene_coordinates([(60, 60), (1, 1)], False, + 1) # gene is too small, the length adjustement at the start and at the end lead to no gene with pytest.raises(ValueError): - fix_partial_gene_coordinates([], False, 1) # chevron in inner position - + fix_partial_gene_coordinates([], False, 1) # chevron in inner position + @pytest.mark.parametrize("coordinates, shift, expected", [ @@ -377,10 +386,7 @@ def test_fix_partial_gene_coordinates_with_wrong_coordinates(): ([(1, 1), (1, 2), (1, 4)], 2, [(2, 2), (1, 4)]), - - ]) - - +]) def test_shift_start_coordinates(coordinates, shift, expected): result = shift_start_coordinates(coordinates, shift) assert result == expected @@ -398,11 +404,7 @@ def test_shift_start_coordinates(coordinates, shift, expected): ([(18, 18), (1, 4)], 4, [(18, 18)]), - - ]) - - +]) def test_shift_end_coordinates(coordinates, shift, expected): result = shift_end_coordinates(coordinates, shift) assert result == expected -