diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index e9643e3..db64b1c 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -14,7 +14,7 @@ jobs: uses: actions/checkout@v4 - name: Set up Miniconda - uses: conda-incubator/setup-miniconda@v2 + uses: conda-incubator/setup-miniconda@v3 with: activate-environment: taranis_env environment-file: environment.yml diff --git a/environment.yml b/environment.yml index a2ef128..1d45d06 100644 --- a/environment.yml +++ b/environment.yml @@ -10,6 +10,7 @@ dependencies: - bioconda::blast>=2.9 - bioconda::mash>=2 - bioconda::prodigal=2.6.3 + - bioconda::mafft=7.525 - pip - pip : - -r requirements.txt diff --git a/taranis/__main__.py b/taranis/__main__.py index d3ae34f..cb17c20 100644 --- a/taranis/__main__.py +++ b/taranis/__main__.py @@ -3,7 +3,6 @@ import click import concurrent.futures import glob -import os import rich.console import rich.logging import rich.traceback @@ -15,7 +14,7 @@ import taranis.reference_alleles import taranis.allele_calling -from pathlib import Path +import taranis.inferred_alleles log = logging.getLogger() @@ -217,6 +216,7 @@ def analyze_schema( usegenus: str, cpus: int, ): + _ = taranis.utils.check_additional_programs_installed([["prokka", "--version"]]) schema_files = taranis.utils.get_files_in_folder(inputdir, "fasta") results = [] @@ -316,6 +316,12 @@ def analyze_schema( default=1, help="Number of cpus used for execution", ) +@click.option( + "--force/--no-force", + required=False, + default=False, + help="Overwrite the output folder if it exists", +) def reference_alleles( schema: str, output: str, @@ -325,7 +331,11 @@ def reference_alleles( cluster_resolution: float, seed: int, cpus: int, + force: bool, ): + _ = taranis.utils.check_additional_programs_installed( + [["mash", "--version"], ["makeblastdb", "-version"], ["blastn", "-version"]] + ) start = time.perf_counter() max_cpus = taranis.utils.cpus_available() if cpus > max_cpus: @@ -335,23 +345,8 @@ def reference_alleles( schema_files = taranis.utils.get_files_in_folder(schema, "fasta") # Check if output folder exists - if taranis.utils.folder_exists(output): - q_question = ( - "Folder " - + output - + " already exists. Files will be overwritten. Do you want to continue?" - ) - if "no" in taranis.utils.query_user_yes_no(q_question, "no"): - log.info("Aborting code by user request") - stderr.print("[red] Exiting code. ") - sys.exit(1) - else: - try: - os.makedirs(output) - except OSError as e: - log.info("Unable to create folder at %s with error %s", output, e) - stderr.print("[red] ERROR. Unable to create folder " + output) - sys.exit(1) + if not force: + _ = taranis.utils.prompt_user_if_folder_exists(output) """Create the reference alleles from the schema """ results = [] with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor: @@ -396,6 +391,32 @@ def reference_alleles( type=click.Path(exists=True), help="Directory where the schema reference allele files are located. ", ) +@click.option( + "-a", + "--annotation", + required=True, + multiple=False, + type=click.Path(exists=True), + help="Annotation file. ", +) +@click.option( + "-t", + "--threshold", + required=False, + nargs=1, + default=0.8, + type=float, + help="Threshold value to consider in blast. Values from 0 to 1. default 0.8", +) +@click.option( + "-p", + "--perc-identity", + required=False, + nargs=1, + default=90, + type=int, + help="Percentage of identity to consider in blast. default 90", +) @click.option( "-o", "--output", @@ -404,6 +425,12 @@ def reference_alleles( type=click.Path(), help="Output folder to save reference alleles", ) +@click.option( + "--force/--no-force", + required=False, + default=False, + help="Overwrite the output folder if it exists", +) @click.argument( "assemblies", callback=expand_wildcards, @@ -411,12 +438,62 @@ def reference_alleles( required=True, type=click.Path(exists=True), ) +@click.option( + "--snp/--no-snp", + required=False, + default=False, + help="Create SNP file for alleles in assembly in relation with reference allele", +) +@click.option( + "--alignment/--no-alignment", + required=False, + default=False, + help="Create alignment files", +) +@click.option( + "-q", + "--proteine-threshold", + required=False, + nargs=1, + default=80, + type=int, + help="Threshold of protein coverage to consider as TPR. default 90", +) +@click.option( + "-i", + "--increase-sequence", + required=False, + nargs=1, + default=20, + type=int, + help="Increase the number of triplet sequences to find the stop codon. default 20", +) +@click.option( + "--cpus", + required=False, + multiple=False, + type=int, + default=1, + help="Number of cpus used for execution", +) def allele_calling( - schema, - reference, - assemblies, - output, + schema: str, + reference: str, + annotation: str, + assemblies: list, + threshold: float, + perc_identity: int, + output: str, + force: bool, + snp: bool, + alignment: bool, + proteine_threshold: int, + increase_sequence: int, + cpus: int, ): + _ = taranis.utils.check_additional_programs_installed( + [["blastn", "-version"], ["makeblastdb", "-version"], ["mafft", "--version"]] + ) schema_ref_files = taranis.utils.get_files_in_folder(reference, "fasta") if len(schema_ref_files) == 0: log.error("Referenc allele folder %s does not have any fasta file", schema) @@ -424,45 +501,78 @@ def allele_calling( sys.exit(1) # Check if output folder exists - if taranis.utils.folder_exists(output): - q_question = ( - "Folder " - + output - + " already exists. Files will be overwritten. Do you want to continue?" - ) - if "no" in taranis.utils.query_user_yes_no(q_question, "no"): - log.info("Aborting code by user request") - stderr.print("[red] Exiting code. ") - sys.exit(1) - else: - try: - os.makedirs(output) - except OSError as e: - log.info("Unable to create folder at %s with error %s", output, e) - stderr.print("[red] ERROR. Unable to create {output} folder") - sys.exit(1) + if not force: + _ = taranis.utils.prompt_user_if_folder_exists(output) # Filter fasta files from reference folder # ref_alleles = glob.glob(os.path.join(reference, "*.fasta")) - # Create predictions - - """ - pred_out = os.path.join(output, "prediction") - pred_sample = taranis.prediction.Prediction(genome, sample, pred_out) - pred_sample.training() - pred_sample.prediction() + max_cpus = taranis.utils.cpus_available() + if cpus > max_cpus: + stderr.print("[red] Number of CPUs bigger than the CPUs available") + stderr.print("Running code with ", max_cpus) + cpus = max_cpus + # Read the annotation file + stderr.print("[green] Reading annotation file") + log.info("Reading annotation file") + map_pred = [["gene", 7], ["product", 8], ["allele_quality", 9]] + prediction_data = taranis.utils.read_compressed_file( + annotation, separator=",", index_key=1, mapping=map_pred + ) + # Create the instanace for inference alleles + inf_allele_obj = taranis.inferred_alleles.InferredAllele() + """Analyze the sample file against schema to identify alleles """ - """Analyze the sample file against schema to identify outbreakers - """ + start = time.perf_counter() results = [] + + with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor: + futures = [ + executor.submit( + taranis.allele_calling.parallel_execution, + assembly_file, + schema, + prediction_data, + schema_ref_files, + threshold, + perc_identity, + output, + inf_allele_obj, + snp, + alignment, + proteine_threshold, + increase_sequence, + ) + for assembly_file in assemblies + ] + for future in concurrent.futures.as_completed(futures): + try: + results.append(future.result()) + except Exception as e: + print(e) + continue + """ for assembly_file in assemblies: - assembly_name = Path(assembly_file).stem results.append( - { - assembly_name: taranis.allele_calling.parallel_execution( - assembly_file, schema, schema_ref_files, output - ) - } + taranis.allele_calling.parallel_execution( + assembly_file, + schema, + prediction_data, + schema_ref_files, + threshold, + perc_identity, + output, + inf_allele_obj, + snp, + alignment, + proteine_threshold, + increase_sequence, + ) ) - + """ + _ = taranis.allele_calling.collect_data( + results, output, snp, alignment, schema_ref_files, cpus + ) + finish = time.perf_counter() + print(f"Allele calling finish in {round((finish-start)/60, 2)} minutes") + log.info("Allele calling finish in %s minutes", round((finish - start) / 60, 2)) # sample_allele_obj.analyze_sample() diff --git a/taranis/allele_calling.py b/taranis/allele_calling.py index d7baf37..6909a96 100644 --- a/taranis/allele_calling.py +++ b/taranis/allele_calling.py @@ -1,17 +1,17 @@ import io +import concurrent.futures import logging import os import rich.console - import taranis.utils import taranis.blast -# import numpy from collections import OrderedDict from pathlib import Path +from Bio.Seq import Seq from Bio import SeqIO - +from io import StringIO log = logging.getLogger(__name__) stderr = rich.console.Console( @@ -24,130 +24,876 @@ class AlleleCalling: def __init__( - self, sample_file: str, schema: str, reference_alleles: list, out_folder: str + self, + sample_file: str, + schema: str, + annotation: dict, + reference_alleles: list, + threshold: float, + perc_identity: int, + out_folder: str, + inf_alle_obj: object, + snp_request: bool = False, + aligment_request: bool = False, + tpr_limit: int = 80, + increase_sequence: int = 20, ): - # self.prediction = prediction + """Allele calling initial creation object + + Args: + sample_file (str): assembly file + schema (str): folder with alleles schema + annotation (dict): annotation of locus according to prokka + reference_alleles (list): folder with reference alleles + threshold (float): threshold to consider a match in blast + out_folder (str): output folder + inf_alle_obj (object): object to infer alleles + snp_request (bool, optional): snp saved to file. Defaults to False. + aligment_request (bool, optional): allignment saved to file. Defaults to False. + tpr_limit (int, optional): lower threshold to consider trunked proteine. Defaults to 80. + increase_sequence (int, optional): increase sequence to be analysed. Defaults to 20. + """ + self.prediction_data = annotation # store prediction annotation self.sample_file = sample_file + self.sample_records = taranis.utils.read_fasta_file( + self.sample_file, convert_to_dict=True + ) self.schema = schema self.ref_alleles = reference_alleles + self.threshold = threshold + self.perc_identity = perc_identity self.out_folder = out_folder self.s_name = Path(sample_file).stem self.blast_dir = os.path.join(out_folder, "blastdb") # create blast for sample file self.blast_obj = taranis.blast.Blast("nucl") _ = self.blast_obj.create_blastdb(sample_file, self.blast_dir) + # store inferred allele object + self.inf_alle_obj = inf_alle_obj + self.snp_request = snp_request + self.aligment_request = aligment_request + self.tpr_limit = tpr_limit / 100 + self.increase_sequence = increase_sequence * 3 def assign_allele_type( - self, blast_result: list, allele_file: str, allele_name: str + self, + valid_blast_results: list, + allele_file: str, + allele_name: str, + ref_allele_seq: str, ) -> list: - def get_blast_details(blast_result: list, allele_name: str) -> list: + """Assign allele type to the allele + + Args: + valid_blast_results (list): information collected by running blast + allele_file (str): file name with allele sequence + allele_name (str): allele name + ref_allele_seq (str): reference allele sequence + + Returns: + list: containing allele classification, allele match id, and allele + details + """ + + def _check_if_plot(column_blast_res: list) -> bool: + """Check if allele is partial length + + Args: + column_blast_res (list): blast result + + Returns: + bool: True if allele is partial length + """ + if ( + column_blast_res[8] == "1" # check at contig start + # check if contig ends is the same as match allele ends + or column_blast_res[9] == column_blast_res[7] + or column_blast_res[9] == "1" # check reverse at contig end + # check if contig start is the same as match allele start reverse + or column_blast_res[8] == column_blast_res[7] + ): + return True + return False + + def _extend_sequence_for_finding_start_stop_codon( + split_blast_result: list, + prot_error_result: str, + predicted_prot_seq: str, + search_codon: str = "stop", + ) -> list: + """Extend match sequence, according the (increase_sequence) for + trying find the stop or start codon. When parameter is set to + stop additional nucleotides are added to extend the chance to + find out the codon stop. + If parameter is set to start then additional nucleotide is added + on the start value to identify that is a valid start codon. If + true then additional nucletotides are added to find the stop codon. + + Args: + split_blast_result (list): list having the informaction collected + by running blast + prot_error_result (str): protein conversion result + predicted_prot_seq (str): predicted protein sequence + search_codon (str, optional): codon to be found. 2 values are + allowed start of stop. By default is stop. + + Returns: + list: updated information if stop or start codon is found and the + updated protein sequence and protein conversion result if changed + """ + # collect data for checking PLOT + data_for_plot = [""] * 10 + # cop the contig length + data_for_plot[7] = split_blast_result[15] + # copy start position + data_for_plot[8] = split_blast_result[9] + # copy end position + data_for_plot[9] = split_blast_result[10] + # check if PLOT + if not _check_if_plot(data_for_plot): + # remove the "-" character in the contig sequence in case that + # there was not possible to find the start/stop codon and + # function return the original blast result + split_blast_result[13] = split_blast_result[13].replace("-", "") + # fetch the sequence until the last triplet is stop codon + contig_seq = self.sample_records[split_blast_result[1]] + start_seq = int(split_blast_result[9]) + stop_seq = int(split_blast_result[10]) + if stop_seq > start_seq: + # sequence direction is forward + direction = "forward" + if search_codon == "start": + # add nucleotides according to the first match in the + # reference allele + start_ref_allele = int(split_blast_result[11]) // 3 * 3 + # try extended 1 nucleotide to find the start codon + start_seq_found = False + # subtract 1 because index start at 0 + new_start_seq = start_seq - start_ref_allele - 1 + for _ in range(1, 3): + new_start_seq -= 1 + if ( + contig_seq[new_start_seq : new_start_seq + 3] + in taranis.utils.START_CODON_FORWARD + ): + # increase 1 because we substact 1 when searching + # for stop codon + start_seq = new_start_seq + 1 + start_seq_found = True + break + # continue to find the stop codon with the new start + if not start_seq_found: + # start codon not found. Return the original blast result + return ( + split_blast_result, + prot_error_result, + predicted_prot_seq, + ) + + # adjust the sequence to be a triplet + interval = (stop_seq - start_seq) // 3 * 3 + new_stop_seq = start_seq + interval + self.increase_sequence + start_seq -= 1 + # if the increased length is higher than the contig length + # adjust the stop sequence to maximun contig length + # multiply by 3. + if stop_seq > len(contig_seq): + stop_seq = len(contig_seq) // 3 * 3 + else: + stop_seq = new_stop_seq - 1 + c_sequence = contig_seq[start_seq:stop_seq] + else: + # sequence direction is reverse + direction = "reverse" + if search_codon == "start": + if ( + contig_seq[start_seq - 2 : start_seq + 1] + in taranis.utils.START_CODON_REVERSE + ): + start_seq += 1 + # continue to find the stop codon with the new start + else: + # start codon not found. Return the original blast result + return ( + split_blast_result, + prot_error_result, + predicted_prot_seq, + ) + # adjust the sequence to be a triplet + interval = (start_seq - stop_seq) // 3 * 3 + new_stop_seq = start_seq - interval - self.increase_sequence + # if the increased length is lower than 0 (contig start) + # position, adjust the start sequence to minumum contig + # length multiply by 3 + if new_stop_seq < 0: + # get the minimum contig length that is multiple by 3 + stop_seq = stop_seq % 3 - 1 + else: + stop_seq = new_stop_seq + # get the sequence in reverse + c_sequence = str( + Seq(contig_seq[stop_seq:start_seq]).reverse_complement() + ) + new_prot_conv_result = taranis.utils.convert_to_protein( + c_sequence, force_coding=False, delete_incompleted_triplet=False + ) + # check if stop codon is found in protein sequence + + if ( + "protein" in new_prot_conv_result + and "*" in new_prot_conv_result["protein"] + ): + # increase 3 nucleotides beecause index start at 0 + new_seq_length = new_prot_conv_result["protein"].index("*") * 3 + 3 + match_sequence = c_sequence[:new_seq_length] + split_blast_result[4] = str(new_seq_length) + split_blast_result[13] = match_sequence + prot_error_result = "-" + predicted_prot_seq = new_prot_conv_result["protein"][ + 0 : new_seq_length // 3 + ] + # update the start and stop position + if direction == "forward": + split_blast_result[10] = str( + int(split_blast_result[9]) + new_seq_length + ) + else: + split_blast_result[10] = str( + int(split_blast_result[9]) - new_seq_length + ) + # ignore the previous process if stop codon is not found + + return split_blast_result, prot_error_result, predicted_prot_seq + + def _get_blast_details( + blast_result: str, allele_name: str, ref_allele_seq + ) -> list: + """Collect blast details and modify the order of the columns + + Args: + blast_result (str): information collected by running blast + allele_name (str): allele name + + Returns: + list: containing allele details in the correct order to be saved + blast_details[0] = sample name + blast_details[1] = contig name + blast_details[2] = core gene name + blast_details[3] = allele gene + blast_details[4] = coding allele type + blast_details[5] = reference allele length + blast_details[6] = match alignment length + blast_details[7] = contig length + blast_details[8] = match contig position start + blast_details[9] = match contig position end + blast_details[10] = direction + blast_details[11] = gene annotation + blast_details[12] = product annotation + blast_details[13] = allele quality + blast_details[14] = protein conversion result + blast_details[15] = match sequence in contig + blast_details[16] = reference allele sequence + blast_details[17] = predicted protein sequence + """ + split_blast_result = blast_result.split("\t") + match_allele_name = split_blast_result[0] + try: + gene_annotation = self.prediction_data[match_allele_name]["gene"] + product_annotation = self.prediction_data[match_allele_name]["product"] + allele_quality = self.prediction_data[match_allele_name][ + "allele_quality" + ] + except KeyError: + gene_annotation = "Not found" + product_annotation = "Not found" + allele_quality = "Not found" + if int(split_blast_result[10]) > int(split_blast_result[9]): + direction = "+" + else: + direction = "-" + # remove the gaps in sequences + match_sequence = split_blast_result[13].replace("-", "") + # check if the sequence is coding + prot_conv_result = taranis.utils.convert_to_protein( + match_sequence, force_coding=False, delete_incompleted_triplet=True + ) + prot_error_result = ( + prot_conv_result["error"] if "error" in prot_conv_result else "-" + ) + predicted_prot_seq = ( + prot_conv_result["protein"] if "protein" in prot_conv_result else "-" + ) + # remove if extra nucleotides are added at the end of the last + # completed triplet before the stop codon + if "extra nucleotides after stop codon" in prot_error_result: + new_seq_len = len(match_sequence) // 3 * 3 + match_sequence = match_sequence[:new_seq_len] + split_blast_result[4] = str(new_seq_len) + # reset the error message + prot_error_result = "-" + + # extend the sequence to find the stop codon + elif "Last triplet sequence is not a stop codon" in prot_error_result: + ( + split_blast_result, + prot_error_result, + predicted_prot_seq, + ) = _extend_sequence_for_finding_start_stop_codon( + split_blast_result, + prot_error_result, + predicted_prot_seq, + search_codon="stop", + ) + # update the match sequence + match_sequence = split_blast_result[13] + # extend the sequence to find the start codon + elif "Sequence does not have a start codon" in prot_error_result: + ( + split_blast_result, + prot_error_result, + predicted_prot_seq, + ) = _extend_sequence_for_finding_start_stop_codon( + split_blast_result, + prot_error_result, + predicted_prot_seq, + search_codon="start", + ) + # update the match sequence + match_sequence = split_blast_result[13] # get blast details blast_details = [ - blast_result[0].split("_")[0], # Core gene - self.s_name, - "gene annotation", - "product annotation", - allele_name, - "allele quality", - blast_result[1], # contig - blast_result[3], # query length - blast_result[9], # contig start - blast_result[10], # contig end - blast_result[13], # contig sequence + self.s_name, # sample name + split_blast_result[1], # contig name + allele_name, # core gene name + split_blast_result[0], # allele gene + "coding", # coding allele type. To be filled later idx = 4 + split_blast_result[3], # reference allele length + split_blast_result[4], # match alignment length + split_blast_result[15], # contig length + split_blast_result[9], # match contig position start + split_blast_result[10], # match contig position end + direction, + gene_annotation, + product_annotation, + allele_quality, + prot_error_result, # protein conversion result + match_sequence, # match sequence in contig + ref_allele_seq, # reference allele sequence + predicted_prot_seq, # predicted protein sequence ] return blast_details - if len(blast_result) > 1: - # allele is named as NIPHEM - - # Hacer un blast con la query esta secuencia y la database del alelo - # Create blast db with sample file - pass + def _find_match_allele_schema(allele_file: str, match_sequence: str) -> str: + """Find the allele name in the schema that match the sequence - elif len(blast_result) == 1: - column_blast_res = blast_result[0].split("\t") - column_blast_res[13] = column_blast_res[13].replace("-", "") - allele_details = get_blast_details(column_blast_res, allele_name) + Args: + allele_file (str): file with allele sequences + match_sequence (str): sequence to be matched + Returns: + str: allele name in the schema that match the sequence + """ grep_result = taranis.utils.grep_execution( - allele_file, column_blast_res[13], "-b1" + allele_file, match_sequence, "-xb1" ) - # check if sequence match alleles in schema if len(grep_result) > 0: - allele_name = grep_result[0].split(">")[1] + return grep_result[0].split("_")[1] + return "" - # allele is labled as EXACT - return ["EXC", allele_name, allele_details] - # check if contig is shorter than allele - if int(column_blast_res[3]) > int(column_blast_res[4]): - # check if sequence is shorter because it starts or ends at the contig - if ( - column_blast_res[9] == 1 # check at contig start - or column_blast_res[14] - == column_blast_res[10] # check at contig end - or column_blast_res[10] == 1 # check reverse at contig end - or column_blast_res[9] - == column_blast_res[15] # check reverse at contig start - ): - # allele is labled as PLOT - return ["PLOT", allele_name, allele_details] - # allele is labled as ASM - return ["ASM", allele_name, allele_details] - # check if contig is longer than allele - if int(column_blast_res[3]) < int(column_blast_res[4]): - # allele is labled as ALM - return ["ALM", allele_name, allele_details] - if int(column_blast_res[3]) == int(column_blast_res[4]): - # allele is labled as INF - return ["INF", allele_name, allele_details] + # valid_blast_results = _discard_low_threshold_results(blast_results) + match_allele_schema = "" + # if len(valid_blast_results) == 0: + # no match results labelled as LNF. details data filled with empty data + # return ["LNF", "LNF", ["-"] * 18] + if len(valid_blast_results) > 1: + # could be NIPHEM or NIPH + b_split_data = [] + match_allele_seq = [] + for valid_blast_result in valid_blast_results: + multi_allele_data = _get_blast_details( + valid_blast_result, allele_name, ref_allele_seq + ) + # get match allele sequence + match_allele_seq.append(multi_allele_data[14]) + b_split_data.append(multi_allele_data) + # check if match allele is in schema + if match_allele_schema == "": + # find the allele in schema with the match sequence in the contig + match_allele_schema = _find_match_allele_schema( + allele_file, multi_allele_data[15] + ) + if len(set(match_allele_seq)) == 1: + # all sequuences are equal labelled as NIPHEM + classification = "NIPHEM" + else: + # some of the sequences are different labelled as NIPH + classification = "NIPH" + # update coding allele type + for (idx,) in range(len(b_split_data)): + b_split_data[idx][4] = classification + "_" + match_allele_schema + else: + b_split_data = _get_blast_details( + valid_blast_results[0], allele_name, ref_allele_seq + ) + # found the allele in schema with the match sequence in the contig + match_allele_schema = _find_match_allele_schema( + allele_file, b_split_data[15] + ) + + # PLOT, TPR, ASM, ALM, INF, EXC are possible classifications + if match_allele_schema != "": + # exact match found labelled as EXC + classification = "EXC" + elif _check_if_plot(b_split_data): + # match allele is partial length labelled as PLOT + classification = "PLOT" + # check if protein length divided by the length of triplet matched + # sequence is lower the the tpr limit + elif ( + b_split_data[14] == "Multiple stop codons" + and b_split_data[17].index("*") / (int(b_split_data[6]) / 3) + < self.tpr_limit + ): + # labelled as TPR + classification = "TPR" + # check if match allele is shorter than reference allele + elif int(b_split_data[6]) < int(b_split_data[5]): + classification = "ASM" + # check if match allele is longer than reference allele + elif ( + int(b_split_data[6]) > int(b_split_data[5]) + or b_split_data[14] == "Last sequence is not a stop codon" + ): + classification = "ALM" + else: + # if sequence was not found after running grep labelled as INF + classification = "INF" + + # assign an identification value to the new allele + if match_allele_schema == "": + match_allele_schema = str( + self.inf_alle_obj.get_inferred_allele(b_split_data[14], allele_name) + ) + b_split_data[4] = classification + "_" + match_allele_schema + return [ + classification, + classification + "_" + match_allele_schema, + b_split_data, + ] + + def discard_low_threshold_results(self, blast_results: list) -> list: + """Discard blast results with lower threshold + + Args: + blast_results (list): blast results + + Returns: + list: blast results with higher query size + """ + valid_blast_result = [] + for b_result in blast_results: + blast_split = b_result.split("\t") + # check if the division of the match contig length by the + # reference allele length is higher than the threshold + if (int(blast_split[4]) / int(blast_split[3])) >= self.threshold: + valid_blast_result.append(b_result) + return valid_blast_result def search_match_allele(self): # Create blast db with sample file - result = {"allele_type": {}, "allele_match": {}, "allele_details": {}} + result = { + "allele_type": {}, + "allele_match": {}, + "allele_details": {}, + "snp_data": {}, + "alignment_data": {}, + } + count = 0 for ref_allele in self.ref_alleles: - # schema_alleles = os.path.join(self.schema, ref_allele) - # parallel in all CPUs in cluster node - alleles = OrderedDict() - match_found = False - with open(ref_allele, "r") as fh: - for record in SeqIO.parse(fh, "fasta"): - alleles[record.id] = str(record.seq) + count += 1 + log.debug( + " Processing allele ", + ref_allele, + " ", + count, + " of ", + len(self.ref_alleles), + ) + alleles = taranis.utils.read_fasta_file(ref_allele, convert_to_dict=True) + match_found = False + count_2 = 0 for r_id, r_seq in alleles.items(): + count_2 += 1 + + log.debug("Running blast for ", count_2, " of ", len(alleles)) # create file in memory to increase speed query_file = io.StringIO() query_file.write(">" + r_id + "\n" + r_seq) query_file.seek(0) blast_result = self.blast_obj.run_blast( - query_file.read(), perc_identity=90, query_type="stdin" + query_file.read(), + perc_identity=self.perc_identity, + num_threads=1, + query_type="stdin", ) if len(blast_result) > 0: - match_found = True - break + valid_blast_results = self.discard_low_threshold_results( + blast_result + ) + if len(valid_blast_results) > 0: + match_found = True + break # Close object and discard memory buffer query_file.close() + allele_file = os.path.join(self.schema, os.path.basename(ref_allele)) + allele_name = Path(allele_file).stem if match_found: - allele_file = os.path.join(self.schema, os.path.basename(ref_allele)) - # blast_result = self.blast_obj.run_blast(q_file,perc_identity=100) - allele_name = Path(allele_file).stem ( result["allele_type"][allele_name], result["allele_match"][allele_name], result["allele_details"][allele_name], - ) = self.assign_allele_type(blast_result, allele_file, allele_name) + ) = self.assign_allele_type( + valid_blast_results, allele_file, allele_name, r_seq + ) else: # Sample does not have a reference allele to be matched # Keep LNF info - # ver el codigo de espe - # lnf_tpr_tag() - pass + result["allele_type"][allele_name] = "LNF" + result["allele_match"][allele_name] = allele_name + details = ["-"] * 18 + details[0] = self.s_name + details[2] = allele_name + details[4] = "LNF" + result["allele_details"][allele_name] = details + # prepare the data for snp and alignment analysis + try: + ref_allele_seq = result["allele_details"][allele_name][16] + except KeyError as e: + log.error("Error in allele details") + log.error(e) + stderr.print(f"Error in allele details{e}") + continue + allele_seq = result["allele_details"][allele_name][15] + ref_allele_name = result["allele_details"][allele_name][3] + if self.snp_request and result["allele_type"][allele_name] != "LNF": + # run snp analysis + result["snp_data"][allele_name] = taranis.utils.get_snp_information( + ref_allele_seq, allele_seq, ref_allele_name + ) + if self.aligment_request and result["allele_type"][allele_name] != "LNF": + # run alignment analysis + result["alignment_data"][allele_name] = ( + taranis.utils.get_alignment_data( + ref_allele_seq, allele_seq, ref_allele_name + ) + ) + # delete blast folder + _ = taranis.utils.delete_folder(os.path.join(self.blast_dir, self.s_name)) return result def parallel_execution( - sample_file: str, schema: str, reference_alleles: list, out_folder: str + sample_file: str, + schema: str, + prediction_data: dict, + reference_alleles: list, + threshold: float, + perc_identity: int, + out_folder: str, + inf_alle_obj: object, + snp_request: bool = False, + aligment_request: bool = False, + trp_limit: int = 80, + increase_sequence: int = 20, ): - allele_obj = AlleleCalling(sample_file, schema, reference_alleles, out_folder) - return allele_obj.search_match_allele() + allele_obj = AlleleCalling( + sample_file, + schema, + prediction_data, + reference_alleles, + threshold, + perc_identity, + out_folder, + inf_alle_obj, + snp_request, + aligment_request, + trp_limit, + increase_sequence, + ) + sample_name = Path(sample_file).stem + stderr.print(f"[green] Analyzing sample {sample_name}") + log.info(f"Analyzing sample {sample_name}") + return {sample_name: allele_obj.search_match_allele()} + + +def create_multiple_alignment( + ref_alleles_seq: dict, results: list, a_list: str, alignment_folder: str, mafft_cpus +) -> None: + allele_multiple_align = [] + for ref_id, ref_seq in ref_alleles_seq[a_list].items(): + input_buffer = StringIO() + # get the reference allele sequence + input_buffer.write(">Ref_" + ref_id + "\n") + input_buffer.write(ref_seq + "\n") + # get the sequences for sample on the same allele + for result in results: + for sample, values in result.items(): + # discard the allele if it is LNF + if values["allele_type"][a_list] == "LNF": + continue + # get the allele name in sample + input_buffer.write( + ">" + + sample + + "_" + + a_list + + "_" + + values["allele_details"][a_list][4] + + "\n" + ) + # get the sequence of the allele in sample + input_buffer.write(values["allele_details"][a_list][15] + "\n") + # print(input_buffer.tell()) + input_buffer.seek(0) + + allele_multiple_align.append( + taranis.utils.get_multiple_alignment(input_buffer, mafft_cpus) + ) + # release memory + input_buffer.close() + # save multiple alignment to file + with open( + os.path.join(alignment_folder, a_list + "_multiple_alignment.aln"), "w" + ) as fo: + for alignment in allele_multiple_align: + for align in alignment: + fo.write(align) + + +def collect_data( + results: list, + output: str, + snp_request: bool, + aligment_request: bool, + ref_alleles: list, + cpus: int, +) -> None: + """Collect data for the allele calling analysis, done for each sample and + create the summary file, graphics, and if requested snp and alignment files + + Args: + results (list): list of allele calling data results for each sample + output (str): output folder + snp_request (bool): request to save snp to file + aligment_request (bool): request to save alignment and multi alignemte to file + ref_alleles (list): reference alleles + cpus (int): number of cpus to be used if alignment is requested + """ + + def stats_graphics(stats_folder: str, summary_result: dict) -> None: + stderr.print("Creating graphics") + log.info("Creating graphics") + allele_types = [ + "NIPHEM", + "NIPH", + "EXC", + "PLOT", + "ASM", + "ALM", + "INF", + "LNF", + "TPR", + ] + # inizialize classification data + classif_data = {} + for allele_type in allele_types: + classif_data[allele_type] = [] + graphic_folder = os.path.join(stats_folder, "graphics") + + _ = taranis.utils.create_new_folder(graphic_folder) + s_list = [] + # collecting data to create graphics + for sample, classif_counts in summary_result.items(): + s_list.append(sample) # create list of samples + for classif, count in classif_counts.items(): + classif_data[classif].append(int(count)) + # create graphics per each classification type + for allele_type, counts in classif_data.items(): + _ = taranis.utils.create_graphic( + graphic_folder, + str(allele_type + "_graphic.png"), + "bar", + s_list, + counts, + ["Samples", "number"], + str("Number of " + allele_type + " in samples"), + ) + return + + def read_reference_alleles(ref_alleles: list) -> dict[dict]: + # read reference alleles + ref_alleles_data = {} + for ref_allele in ref_alleles: + alleles = {} + with open(ref_allele, "r") as fh: + for record in SeqIO.parse(fh, "fasta"): + alleles[record.id] = str(record.seq) + ref_alleles_data[Path(ref_allele).stem] = alleles + return ref_alleles_data + + summary_result_file = os.path.join(output, "allele_calling_summary.csv") + sample_allele_match_file = os.path.join(output, "allele_calling_match.csv") + sample_allele_detail_file = os.path.join(output, "matching_contig.csv") + allele_types = ["NIPHEM", "NIPH", "EXC", "PLOT", "ASM", "ALM", "INF", "LNF", "TPR"] + detail_heading = [ + "sample", + "contig", + "core gene", + "reference allele name", + "codification", + "query length", + "match length", + "contig length", + "contig start", + "contig stop", + "direction", + "gene notation", + "product notation", + "reference allele quality", + "protein conversion result", + "match sequence", + "reference allele sequence", + "predicted protein sequence", + ] + + summary_result = {} # used for summary file and allele classification graphics + sample_allele_match = {} # used for allele match file + + # get allele list + first_sample = list(results[0].keys())[0] + allele_list = sorted(results[0][first_sample]["allele_type"].keys()) + for result in results: + for sample, values in result.items(): + sum_allele_type = OrderedDict() # used for summary file + allele_match = {} + for allele_type in allele_types: + sum_allele_type[allele_type] = 0 + for allele, type_of_allele in values["allele_type"].items(): + # increase allele type count + sum_allele_type[type_of_allele] += 1 + # add allele name match to sample + allele_match[allele] = ( + # type_of_allele + "_" + values["allele_match"][allele] + values["allele_match"][allele] + ) + summary_result[sample] = sum_allele_type + sample_allele_match[sample] = allele_match + + # save summary results to file + with open(summary_result_file, "w") as fo: + fo.write("Sample," + ",".join(allele_types) + "\n") + for sample, counts in summary_result.items(): + fo.write(f"{sample},") + for _, count in counts.items(): + fo.write(f"{count},") + fo.write("\n") + # save allele match to file + with open(sample_allele_match_file, "w") as fo: + fo.write("Sample," + ",".join(allele_list) + "\n") + for sample, allele_cod in sample_allele_match.items(): + fo.write(f"{sample}") + for allele in allele_list: + fo.write(f",{allele_cod[allele]}") + fo.write("\n") + + with open(sample_allele_detail_file, "w") as fo: + fo.write(",".join(detail_heading) + "\n") + for result in results: + for sample, values in result.items(): + for allele, detail_value in values["allele_details"].items(): + if type(detail_value[0]) is list: + for detail in detail_value: + fo.write(",".join(detail) + "\n") + else: + fo.write(",".join(detail_value) + "\n") + # save snp to file if requested + if snp_request: + for result in results: + for sample, values in result.items(): + snp_file = os.path.join(output, sample + "_snp_data.csv") + with open(snp_file, "w") as fo: + fo.write( + "Sample name,Locus name,Reference allele,Position,Ref,Alt,Codon Ref,Codon Alt,Amino Ref,Amino Alt,Category Ref,Category Alt\n" + ) + for allele, snp_data in values["snp_data"].items(): + for ref_allele, snp_info_list in snp_data.items(): + for snp_info in snp_info_list: + fo.write( + sample + + "," + + allele + + "," + + ref_allele + + "," + + ",".join(snp_info) + + "\n" + ) + # create alignment files + if aligment_request: + alignment_folder = os.path.join(output, "alignments") + _ = taranis.utils.create_new_folder(alignment_folder) + align_collection = {} + for result in results: + for sample, values in result.items(): + for allele, alignment_data in values["alignment_data"].items(): + if allele not in align_collection: + align_collection[allele] = OrderedDict() + + # align_collection[allele][sample] = [] + for _, value in alignment_data.items(): + align_collection[allele][sample] = value + # save alignment to file + for allele, samples in align_collection.items(): + with open(os.path.join(alignment_folder, allele + ".txt"), "w") as fo: + for sample, alignment_data in samples.items(): + fo.write(allele + "_sample_" + sample + "\n") + fo.write("\n".join(alignment_data) + "\n") + + # create multiple alignment files + stderr.print("Processing multiple alignment information") + log.info("Processing multiple alignment information") + ref_alleles_seq = read_reference_alleles(ref_alleles) + # assign cpus to be used in multiple alignment + mul_align_cpus = 1 if cpus // 3 == 0 else cpus // 3 + mafft_cpus = 1 if mul_align_cpus == 1 else 3 + m_align = [] + with concurrent.futures.ThreadPoolExecutor( + max_workers=mul_align_cpus + ) as executor: + futures = [ + executor.submit( + create_multiple_alignment, + ref_alleles_seq, + results, + a_list, + alignment_folder, + mafft_cpus, + ) + for a_list in allele_list + ] + for future in concurrent.futures.as_completed(futures): + try: + m_align.append(future.result()) + except Exception as e: + print(e) + continue + + # for a_list in allele_list: + # _ = create_multiple_alignment(ref_alleles_seq, results, a_list, alignment_folder) + + # Create graphics + stats_graphics(output, summary_result) + return diff --git a/taranis/analyze_schema.py b/taranis/analyze_schema.py index 25ae5bf..15c1f71 100644 --- a/taranis/analyze_schema.py +++ b/taranis/analyze_schema.py @@ -86,9 +86,11 @@ def check_allele_quality(self, prokka_annotation: dict) -> OrderedDict: with open(self.schema_allele) as fh: for record in SeqIO.parse(fh, "fasta"): try: - prokka_ann = prokka_annotation[record.id] + prokka_ann = prokka_annotation[record.id]["gene"] + product_annotation = prokka_annotation[record.id]["product"] except Exception: prokka_ann = "Not found in prokka" + product_annotation = "Not found" a_quality[record.id] = { "allele_name": self.allele_name, "quality": "Good quality", @@ -97,6 +99,7 @@ def check_allele_quality(self, prokka_annotation: dict) -> OrderedDict: "start_codon_alt": "standard", "protein_seq": "", "cds_coding": prokka_ann, + "product_annotation": product_annotation, } allele_seq[record.id] = str(record.seq) a_quality[record.id]["length"] = str(len(str(record.seq))) @@ -370,6 +373,7 @@ def stats_graphics(stats_folder: str) -> None: "nucleotide sequence length", "star codon", "CDS coding", + "product annotation", "allele quality", "bad quality reason", ] @@ -380,6 +384,7 @@ def stats_graphics(stats_folder: str) -> None: "length", "start_codon_alt", "cds_coding", + "product_annotation", "quality", "reason", ] diff --git a/taranis/blast.py b/taranis/blast.py index 7458752..74ad732 100644 --- a/taranis/blast.py +++ b/taranis/blast.py @@ -96,7 +96,7 @@ def run_blast( if query_type == "stdin": stdin_query = query query = "-" - blast_parameters = '"6 , qseqid , sseqid , pident , qlen , length , mismatch , gapopen , evalue , bitscore , sstart , send , qstart , qend , sseq , slen"' + blast_parameters = '"6 , qseqid , sseqid , pident , qlen , length , mismatch , gapopen , evalue , bitscore , sstart , send , qstart , qend , sseq , qseq , slen"' cline = NcbiblastnCommandline( task="blastn", db=self.out_blast_dir, diff --git a/taranis/distance.py b/taranis/distance.py index 028dfbb..496fac0 100644 --- a/taranis/distance.py +++ b/taranis/distance.py @@ -64,10 +64,10 @@ def create_matrix(self) -> pd.DataFrame: ) stderr.print(f"{e}") sys.exit(1) - - # Close the file handles - mash_distance_result.stdout.close() - mash_distance_result.stderr.close() + finally: + # Close the file handles + mash_distance_result.stdout.close() + mash_distance_result.stderr.close() out_data = out.decode("UTF-8").split("\n") allele_names = [item.split("\t")[0] for item in out_data[1:-1]] diff --git a/taranis/inferred_alleles.py b/taranis/inferred_alleles.py new file mode 100644 index 0000000..0c60ba1 --- /dev/null +++ b/taranis/inferred_alleles.py @@ -0,0 +1,35 @@ +import threading + + +class InferredAllele: + def __init__(self): + self.inferred_seq = {} + self.last_allele_index = {} + self.lock = threading.Lock() # Create a lock object + + def get_inferred_allele(self, sequence: str, allele: str) -> str: + """Infer allele from the sequence + + Args: + sequence (str): sequence to infer the allele + + Returns: + str: inferred allele + """ + with self.lock: # Acquire the lock + if sequence not in self.inferred_seq: + return self.set_inferred_allele(sequence, allele) + return self.inferred_seq[sequence] + + def set_inferred_allele(self, sequence: str, allele: str) -> None: + """Set the inferred allele for the sequence + + Args: + sequence (str): sequence to infer the allele + allele (str): inferred allele + """ + if allele not in self.last_allele_index: + self.last_allele_index[allele] = 0 + self.last_allele_index[allele] += 1 + self.inferred_seq[sequence] = self.last_allele_index[allele] + return self.inferred_seq[sequence] diff --git a/taranis/utils.py b/taranis/utils.py index 57c881c..d83228d 100644 --- a/taranis/utils.py +++ b/taranis/utils.py @@ -1,12 +1,14 @@ #!/usr/bin/env python +import Bio.Data.CodonTable import glob +import gzip import io import logging import multiprocessing -import numpy as np import os import pandas as pd import plotly.graph_objects as go +import plotly.io as pio import questionary import shutil @@ -20,6 +22,11 @@ from pathlib import Path from Bio import SeqIO +from Bio.Seq import Seq +from collections import OrderedDict + +import warnings +from Bio import BiopythonWarning log = logging.getLogger(__name__) @@ -61,6 +68,11 @@ def rich_force_colors(): def cpus_available() -> int: + """Get the number of cpus available in the system + + Returns: + int: number of cpus + """ return multiprocessing.cpu_count() @@ -79,6 +91,79 @@ def get_seq_direction(allele_sequence): return "Error" +def check_additional_programs_installed(software_list: list) -> None: + """Check if the input list of programs are installed in the system + + Args: + software_list (list): list of programs to be checked + """ + for program, command in software_list: + try: + _ = subprocess.run( + [program, command], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + ) + except Exception as e: + log.error( + "Program %s is not installed in the system. Error message: %s ", + program, + e, + ) + stderr.print("[red] Program " + program + " is not installed in the system") + sys.exit(1) + return + + +def convert_to_protein( + sequence: str, force_coding: bool = False, delete_incompleted_triplet: bool = False +) -> dict: + """Check if the input sequence is a coding protein. + + Args: + sequence (str): sequence to be checked + force_coding (bool, optional): force to check if sequence is coding. + Defaults to False. + delete_incompleted_triplet (bool, optional): if not multiple by 3 + remove the latest sequences to check they are added after the stop + codon. Defaults to False. + + Returns: + dict: protein sequence and/or error message + """ + conv_result = {"error": "-"} + # checck if exists start codon + if sequence[0:3] not in START_CODON_FORWARD: + return {"error": "Sequence does not have a start codon"} + if len(sequence) % 3 != 0: + if not delete_incompleted_triplet: + return {"error": "Sequence is not a multiple of three"} + # Remove the last or second to last bases to check if there is a stop codon + new_seq_len = len(sequence) // 3 * 3 + sequence = sequence[:new_seq_len] + # this error will be overwritten if another error is found + conv_result["error"] = "extra nucleotides after stop codon" + + seq_sequence = Seq(sequence) + try: + seq_prot = seq_sequence.translate(table=1, cds=force_coding) + except Bio.Data.CodonTable.TranslationError as e: + log.info("Unable to translate sequence. Info message: %s ", e) + return {"error": e} + # get the latest stop codon + last_stop = seq_prot.rfind("*") + # if force_coding is False, check if there are multiple stop codons + if not force_coding: + first_stop = seq_prot.find("*") + if first_stop != last_stop: + conv_result["error"] = "Multiple stop codons" + if last_stop != len(seq_prot) - 1: + conv_result["error"] = "Last triplet sequence is not a stop codon" + conv_result["protein"] = str(seq_prot) + return conv_result + + def create_annotation_files( fasta_file: str, annotation_dir: str, @@ -169,19 +254,30 @@ def create_graphic( title (str): title of the figure """ fig = go.Figure() - if mode == "lines": - fig.add_trace(go.Scatter(x=x_data, y=y_data, mode=mode, name=title)) - fig.update_layout(xaxis_title=labels[0], yaxis_title=labels[1]) - elif mode == "pie": - fig.add_trace(go.Pie(labels=labels, values=x_data)) - elif mode == "bar": - fig.add_trace(go.Bar(x=x_data, y=y_data)) - fig.update_layout(xaxis_title=labels[0], yaxis_title=labels[1]) - elif mode == "box": - fig.add_trace(go.Box(y=y_data)) - - fig.update_layout(title_text=title) - fig.write_image(os.path.join(out_folder, f_name)) + layout_update = {} + plot_options = { + "lines": (go.Scatter, {"mode": mode}), + "pie": (go.Pie, {"labels": labels, "values": x_data}), + "bar": (go.Bar, {"x": x_data, "y": y_data}), + "box": (go.Box, {"y": y_data}), + } + + if mode in plot_options: + trace_class, trace_kwargs = plot_options[mode] + fig.add_trace(trace_class(**trace_kwargs)) + if mode == "bar": + layout_update = { + "xaxis_title": labels[0], + "yaxis_title": labels[1], + "xaxis_tickangle": 45, + } + else: + raise ValueError(f"Unsupported mode: {mode}") + + layout_update["title_text"] = title + fig.update_layout(**layout_update) + + pio.write_image(fig, os.path.join(out_folder, f_name)) return @@ -214,11 +310,14 @@ def file_exists(file_to_check): return False +""" def find_nearest_numpy_value(array, value): array = np.asarray(array) idx = (np.abs(array - value)).argmin() return array[idx] + """ + def folder_exists(folder_to_check): """Checks if input folder exists @@ -234,6 +333,29 @@ def folder_exists(folder_to_check): return False +def get_alignment_data(ref_sequence: str, allele_sequence: str, ref_allele) -> dict: + """Get the alignment data between the reference allele and the match allele + sequence. It returns 3 lines, the reference allele, the alignment character + and the match allele sequence + + Args: + allele_sequence (str): sequence to be compared + ref_sequences (dict): sequences of reference alleles + + Returns: + dict: key: ref_sequence, value: alignment data + """ + alignment_data = {} + alignment = "" + for _, (ref, alt) in enumerate(zip(ref_sequence, allele_sequence)): + if ref == alt: + alignment += "|" + else: + alignment += " " + alignment_data[ref_allele] = [ref_sequence, alignment, allele_sequence] + return alignment_data + + def get_files_in_folder(folder: str, extension: str = None) -> list[str]: """get the list of files, filtered by extension in the input folder. If extension is not set, then all files in folder are returned @@ -255,16 +377,121 @@ def get_files_in_folder(folder: str, extension: str = None) -> list[str]: return glob.glob(folder_files) -def grep_execution(input_file: str, pattern: str, parameters: str) -> list: - """_summary_ +def get_multiple_alignment(input_buffer: io.StringIO, mafft_cpus: int) -> list[str]: + """Run MAFFT with input from the string buffer and capture output to another string buffer + + Args: + input_buffer (io.StringIO): fasta sequences to be aligned + mafft_cpus (int): number of cpus to be used in mafft + Returns: + list[str]: list of aligned sequences + """ + output_buffer = io.StringIO() + # Run MAFFT + mafft_command = ( + "mafft --auto --quiet --thread " + str(mafft_cpus) + " -" + ) # "-" tells MAFFT to read from stdin + process = subprocess.Popen( + mafft_command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE + ) + stdout, _ = process.communicate(input_buffer.getvalue().encode()) + + # Convert the stdout bytes to a string buffer + output_buffer = io.StringIO(stdout.decode()) + output_buffer.seek(0) + # convert the string buffer to a list of lines + multi_result = [] + for line in output_buffer: + multi_result.append(line) + + # Close the file objects and process + output_buffer.close() + process.stdout.close() + + return multi_result + + +def get_snp_information( + ref_sequence: str, alt_sequence: str, ref_allele_name +) -> dict[list[str]]: + """Get the snp information between the reference allele sequence and the + allele sequence in sample. + It collects; position of snp, nucleotide changed reference/alternative, + triplet code (belongs the change), amino acid change and category of + amino acid + + Args: + ref_sequences (dict): sequences of reference alleles + allele_sequence (str): sequence to be compared + + Returns: + dict: key: ref_sequence, value: list of snp information + """ + # Supress warning that len of alt sequence not a multiple of three + warnings.simplefilter("ignore", BiopythonWarning) + snp_info = {} + ref_protein = str(Seq(ref_sequence).translate()) + if len(alt_sequence) % 3 != 0: + log.debug( + "Sequence %s is not a multiple of three. Removing last nucleotides", + ref_allele_name, + ) + # remove the last nucleotides to be able to translate to protein + alt_sequence = alt_sequence[: len(alt_sequence) // 3 * 3] + + alt_protein = str(Seq(alt_sequence).translate()) + snp_line = [] + # get the shortest sequence for the loop + length_for_snp = min(len(ref_sequence), len(alt_sequence)) + for idx in range(length_for_snp): + if ref_sequence[idx] != alt_sequence[idx]: + # calculate the triplet index + triplet_idx = idx // 3 + # get triplet code + ref_triplet = ref_sequence[triplet_idx * 3 : triplet_idx * 3 + 3] + alt_triplet = alt_sequence[triplet_idx * 3 : triplet_idx * 3 + 3] + # get amino acid change + ref_aa = ref_protein[triplet_idx] + try: + alt_aa = alt_protein[triplet_idx] + except IndexError as e: + log.debug( + "Unable to get amino acid for %s and %s with error %s", + ref_allele_name, + alt_sequence, + e, + ) + alt_aa = "-" + # get amino acid category + ref_category = map_amino_acid_to_annotation(ref_sequence[triplet_idx]) + alt_category = map_amino_acid_to_annotation(alt_sequence[triplet_idx]) + snp_line.append( + [ + str(idx), + ref_sequence[idx], + alt_sequence[idx], + ref_triplet, + alt_triplet, + ref_aa, + alt_aa, + ref_category, + alt_category, + ] + ) + snp_info[ref_allele_name] = snp_line + return snp_info + + +def grep_execution(input_file: str, pattern: str, parameters: str) -> list[str]: + """run grep command and return the output Args: - input_file (str): _description_ - pattern (str): _description_ - parmeters (str): _description_ + input_file (str): input file path + pattern (str): pattern to be searched + parmeters (str): parameters to be used in grep Returns: - list: _description_ + list[str]: list of lines which match the pattern """ try: result = subprocess.run( @@ -274,16 +501,75 @@ def grep_execution(input_file: str, pattern: str, parameters: str) -> list: text=True, ) except subprocess.CalledProcessError as e: - log.error("Unable to run grep. Error message: %s ", e) + log.debug("Unable to run grep. Error message: %s ", e) return [] return result.stdout.split("\n") +def map_amino_acid_to_annotation(amino_acid): + # Dictionary mapping amino acids to their categories + amino_acid_categories = { + "A": "Nonpolar", + "C": "Polar", + "D": "Acidic", + "E": "Acidic", + "F": "Nonpolar", + "G": "Nonpolar", + "H": "Basic", + "I": "Nonpolar", + "K": "Basic", + "L": "Nonpolar", + "M": "Nonpolar", + "N": "Polar", + "P": "Nonpolar", + "Q": "Polar", + "R": "Basic", + "S": "Polar", + "T": "Polar", + "V": "Nonpolar", + "W": "Nonpolar", + "Y": "Polar", + } + + # Return the category of the given amino acid + return amino_acid_categories.get(amino_acid, "Unknown") + + def prompt_text(msg): source = questionary.text(msg).unsafe_ask() return source +def prompt_user_if_folder_exists(folder: str) -> bool: + """Prompt the user to continue if the folder exists + + Args: + folder (str): folder path + + Returns: + bool: True if user wants to continue + """ + if folder_exists(folder): + q_question = ( + "Folder " + + folder + + " already exists. Files will be overwritten. Do you want to continue?" + ) + if "no" in query_user_yes_no(q_question, "no"): + log.info("Aborting code by user request") + stderr.print("[red] Exiting code. ") + sys.exit(1) + else: + try: + os.makedirs(folder) + except OSError as e: + log.info("Unable to create folder at %s with error %s", folder, e) + stderr.print("[red] ERROR. Unable to create folder " + folder) + sys.exit(1) + + return True + + def query_user_yes_no(question, default): """Query the user to choose yes or no for the query question @@ -342,9 +628,12 @@ def read_annotation_file(ann_file: str) -> dict: for line in lines: if "Prodigal" in line: - gene_match = re.search(r"(.*)[\t]Prodigal.*gene=(\w+)_.*", line) + gene_match = re.search(r"(.*)[\t]Prodigal.*gene=(\w+)_.*product=(.*)", line) if gene_match: - ann_data[gene_match.group(1)] = gene_match.group(2) + ann_data[gene_match.group(1)] = { + "gene": gene_match.group(2), + "product": gene_match.group(3).strip(), + } else: pred_match = re.search(r"(.*)[\t]Prodigal.*product=(\w+)_.*", line) if pred_match: @@ -354,7 +643,56 @@ def read_annotation_file(ann_file: str) -> dict: return ann_data -def read_fasta_file(fasta_file): +def read_compressed_file( + file_name: str, separator: str = ",", index_key: int = None, mapping: list = [] +) -> dict | str: + """Read the compressed file and return a dictionary using as key value + the mapping data if the index_key is an integer, else return the uncompressed + file + + Args: + file_name (str): file to be uncompressed + separator (str, optional): split line according separator. Defaults to ",". + index_key (int, optional): index value . Defaults to None. + mapping (list, optional): defined the key value for dictionary. Defaults to []. + + Returns: + dict|str: uncompresed information file + """ + out_data = {} + with gzip.open(file_name, "rb") as fh: + lines = fh.readlines() + if not index_key: + return lines[:-2] + for line in lines[1:]: + line = line.decode("utf-8") + s_line = line.split(separator) + # ignore empty lines + if len(s_line) == 1: + continue + key_data = s_line[index_key] + out_data[key_data] = {} + for item in mapping: + out_data[key_data][item[0]] = s_line[item[1]] + return out_data + + +def read_fasta_file(fasta_file: str, convert_to_dict=False) -> dict | str: + """Read the fasta file and return the data as a dictionary if convert_to_dict + + Args: + fasta_file (str): _description_ + convert_to_dict (bool, optional): _description_. Defaults to False. + + Returns: + dict: fasta id as key and sequence as value in str format + """ + conv_fasta = OrderedDict() + if convert_to_dict: + with open(fasta_file, "r") as fh: + for record in SeqIO.parse(fh, "fasta"): + conv_fasta[record.id] = str(record.seq) + return conv_fasta return SeqIO.parse(fasta_file, "fasta")