diff --git a/bin/ivar_variants_to_vcf.py b/bin/ivar_variants_to_vcf.py index d00b3988..ed273207 100755 --- a/bin/ivar_variants_to_vcf.py +++ b/bin/ivar_variants_to_vcf.py @@ -3,15 +3,15 @@ from email.charset import QP import os import sys -import re import errno import argparse -from collections import OrderedDict -from collections import deque +import warnings +from itertools import product import numpy as np from Bio import SeqIO from scipy.stats import fisher_exact +import pandas as pd def parse_args(args=None): @@ -24,8 +24,8 @@ def parse_args(args=None): parser.add_argument( "-po", "--pass_only", - help="Only output variants that PASS filters.", action="store_true", + help="Only output variants that PASS filters.", ) parser.add_argument( "-af", @@ -34,18 +34,31 @@ def parse_args(args=None): default=0, help="Only output variants where allele frequency is greater than this number (default: 0).", ) + parser.add_argument( + "-bq", + "--bad_quality_threshold", + type=float, + default=20, + help="Only output variants where ALT_QUAL is greater than this number (default: 20).", + ) + parser.add_argument( + "-mt", + "--merge_af_threshold", + type=float, + default=0.25, + help="Only merge variants within a range of Allele Frequency. Useful to distinguish haplotypes", + ) parser.add_argument( "-is", "--ignore_strand_bias", - default=False, - help="Does not take strand bias into account, use this option when not using amplicon sequencing.", action="store_true", + help="Does not take strand bias into account, use this option when using amplicon sequencing.", ) parser.add_argument( "-ic", "--ignore_merge_codons", - help="Output variants without taking into account if consecutive positions belong to the same codon.", action="store_true", + help="Output variants without taking into account if consecutive positions belong to the same codon.", ) parser.add_argument( "-f", @@ -57,11 +70,460 @@ def parse_args(args=None): return parser.parse_args(args) +class IvarVariants: + def __init__( + self, + file_in=None, + file_out=None, + pass_only=None, + freq_threshold=None, + bad_qual_threshold=None, + merge_af_threshold=None, + ignore_stbias=None, + ignore_merge=None, + fasta=None, + ): + self.file_in = file_in + self.file_out = file_out + if os.path.exists(self.file_out): + self.filename = str(os.path.splitext(self.file_in)[0]) + else: + self.filename = str(self.file_in) + self.pass_only = pass_only + try: + self.freq_threshold = float(freq_threshold) + except Exception: + print("Invalid allele frequency threshold. Setting it to 0") + self.freq_threshold = 0 + try: + self.bad_qual_threshold = float(bad_qual_threshold) + except Exception: + print("Invalid bad quality threshold. Setting it to 20") + self.bad_qual_threshold = 20 + try: + self.merge_af_threshold = float(merge_af_threshold) + except Exception: + print("Invalid merge af threshold. Setting it to 0.25") + self.merge_af_threshold = 0.25 + self.ignore_stbias = ignore_stbias + self.ignore_merge = ignore_merge + + if not self.file_out: + exit(f"Output file not provided. Aborting...") + if fasta: + self.ref_fasta = ( + fasta if os.path.exists(str(fasta)) else exit("Invalid fasta path") + ) + else: + self.ref_fasta = None + + if self.file_in: + try: + self.raw_ivar_df = pd.read_csv(self.file_in, sep="\t") + except Exception as e: + exit(f"Could not read input file: {file_in}") + else: + exit(f"Input file not provided. Aborting...") + if self.raw_ivar_df.empty: + exit(f"Input tsv was empty") + + def strand_bias_filter(self, row): + """Calculate strand-bias fisher test. + + Args: + row - a row from ivar.tsv dataframe as list + Returns: + Whether it passes the filter ("") or not. ("sb") + """ + data_matrix = np.array( + [ + [row["REF_DP"] - row["REF_RV"], row["REF_RV"]], + [row["ALT_DP"] - row["ALT_RV"], row["ALT_RV"]], + ] + ) + _, pvalue = fisher_exact(data_matrix, alternative="greater") + if pvalue < 0.05: + return "sb" + else: + return "" + + def apply_filters(self, row): + """Apply all the filters to a row and return its output merged + + Args: + row (list): A row from from ivar.tsv dataframe as list + + Returns: + vcf_filter (str): Results from each filter joined by ';' + """ + ivar_filter = "" if row["PASS"] == True else "ft" + try: + float(row["ALT_QUAL"]) + except ValueError: + bad_quality_filter = "bq" + bad_quality_filter = "" if row["ALT_QUAL"] >= self.bad_qual_threshold else "bq" + if not self.ignore_stbias: + stb_filter = self.strand_bias_filter(row) + else: + stb_filter = "" + all_filters = [ivar_filter, stb_filter, bad_quality_filter] + vcf_filter = ";".join([x for x in all_filters if x]) + if not vcf_filter: + vcf_filter = "PASS" + return vcf_filter + + def initiate_vcf_df(self): + """Read the input ivar.tsv file, process the data depending on the parameters + selected when running the script and create a pandas dataframe with it + + Returns: + vcf_df: Pandas dataframe after processing the file + """ + ivar_df = self.raw_ivar_df.copy() + ivar_df = ivar_df.dropna(thresh=2) + if self.pass_only: + ivar_df = ivar_df[ivar_df["PASS"] == True] + ivar_df = ivar_df[ivar_df["ALT_FREQ"] >= self.freq_threshold] + vcf_dict = {} + vcf_dict["REGION"] = ivar_df["REGION"] + vcf_dict["POS"] = ivar_df["POS"] + vcf_dict["ID"] = ["."] * len(ivar_df) + # Dealing with insertions and deletions + vcf_dict["REF"] = np.where( + ivar_df["ALT"].str[0] == "-", + ivar_df["REF"] + ivar_df["ALT"].str[1:], + ivar_df["REF"], + ) + vcf_dict["ALT"] = np.where( + ivar_df["ALT"].str[0] == "+", + ivar_df["REF"] + ivar_df["ALT"].str[1:], + np.where(ivar_df["ALT"].str[0] == "-", ivar_df["REF"], ivar_df["ALT"]), + ) + vcf_dict["QUAL"] = ["."] * len(ivar_df) + vcf_dict["FILTER"] = ivar_df.apply(self.apply_filters, axis=1) + vcf_dict["INFO"] = np.select( + [ivar_df["ALT"].str[0] == "+", ivar_df["ALT"].str[0] == "-"], + ["TYPE=INS", "TYPE=DEL"], + default="TYPE=SNP", + ) + format_cols = [ + "GT", + "DP", + "REF_DP", + "REF_RV", + "REF_QUAL", + "ALT_DP", + "ALT_RV", + "ALT_QUAL", + "ALT_FREQ", + ] + vcf_dict["FORMAT"] = ":".join(format_cols) + # simple workaround by setting the genotype to a constant 1 + ivar_df["DP"] = ivar_df["TOTAL_DP"] + ivar_df["GT"] = 1 + vcf_dict["FILENAME"] = ivar_df[format_cols].astype(str).apply(":".join, axis=1) + if not self.ignore_merge: + # These columns are needed to merge codons. They will be deleted later + vcf_dict["REF_CODON"] = ivar_df["REF_CODON"] + vcf_dict["ALT_CODON"] = ivar_df["ALT_CODON"] + vcf_df = pd.DataFrame.from_dict(vcf_dict) + return vcf_df + + def find_consecutive(self, vcf_df): + """Find and extract the consecutive variants in the vcf dataframe + + Args: + vcf_df (pd.DataFrame): Pandas df from initiate_vcf_df() + + Returns: + consecutive_df(pd.DataFrame): dataframe only with variants in + consecutive positions, including those with duplicates + """ + consecutive_mask = vcf_df["POS"].diff() <= 1 + consecutive_mask = consecutive_mask | consecutive_mask.shift(-1) + consecutive_df = vcf_df[consecutive_mask] + return consecutive_df + + def split_non_consecutive(self, consecutive_df): + """Split rows that are not consecutive to form groups of consecutive variants + + Args: + consecutive_df (pd.DataFrame): Consecutive variants from find_consecutive() + + Returns: + split_rows_list(list): List of dataframes with consecutive rows + """ + # Numpy raises a warning from a pandas function that is wrongly set as deprecated + with warnings.catch_warnings(): + warnings.simplefilter(action="ignore", category=FutureWarning) + split_rows_list = np.split( + consecutive_df, np.where(np.diff(consecutive_df["POS"]) > 1)[0] + 1 + ) + return split_rows_list + + def find_duplicates(self, row_set): + """Check wether there are duplicated variants in a group of consecutive rows + + Args: + row_set(pd.DataFrame): Rows from split_non_consecutive() & get_same_codon() + + Returns: + Bool: True if there are duplicates in the dataframe, False if not + """ + consdups_mask = row_set["POS"].diff() == 0 + consdups_mask = consdups_mask | consdups_mask.shift(-1) + if row_set[consdups_mask].empty: + return False + else: + return True + + def get_same_codon(self, splitted_groups): + """Receive a list of dataframes and exclude dataframes with only consecutive + rows that share codon sequence, excluding those purely formed by duplicates. + + Args: + splitted_groups (list(pd.DataFrame)): Groups from split_non_consecutive() + + Returns: + same_codon_rows(list(pd.DataFrame)): list with separated dataframes + """ + same_codon_dfs = [] + for consec_df in splitted_groups: + clean_codon_rows = [ + self.find_consecutive(group) + for _, group in consec_df.groupby("REF_CODON") + if not self.find_consecutive(group).empty + and not self.find_consecutive(group)["POS"].nunique() == 1 + ] + same_codon_dfs.append(clean_codon_rows) + same_codon_consecutive = [df for group in same_codon_dfs for df in group] + return same_codon_consecutive + + def split_by_codon(self, same_codon_rows): + """Split each dataframe into a dictionary with rows belonging to the same codon + + Args: + same_codon_rows (list(pd.DataFrame)): Groups from get_same_codon() + + Returns: + split_rows_dict(dict[index:pd.DataFrame]): Dictionary containing dataframe + with rows belonging to the same codon as values and the index of the first + row as keys, which is the position where the rows are going to be merged. + """ + split_rows_dict = {} + for rowset in same_codon_rows: + last_pos = 0 + rows_groups = [] + first_index = None + for row in rowset.itertuples(): + alt_pos = [x for x in range(3) if row.REF_CODON[x] != row.ALT_CODON[x]] + if len(alt_pos) > 1: + print("Found 2 conflicting variants in position ", row.POS) + continue + alt_pos = alt_pos[0] + first_index = row.Index if first_index == None else first_index + if alt_pos < last_pos: + split_rows_dict[first_index] = pd.DataFrame(rows_groups) + rows_groups = [] + first_index = row.Index + rows_groups.append(row) + last_pos = alt_pos + split_rows_dict[first_index] = pd.DataFrame(rows_groups) + return split_rows_dict + + def exclude_af_outliers(self, consec_rows, af_threshold): + """Remove rows containing variants that differ more than the AF threshold + from the rest of the variants taking the median as mid position. + + Args: + consec_rows (pd.DataFrame): Consecutive rows aimed to be merged + af_threshold (float): Allele Frequency threshold used to exclude outliers + + Returns: clean_consec_rows (pd.DataFrame): Consecutive rows without AF outliers + """ + consec_rows["AF"] = consec_rows["FILENAME"].str.split(":").str[8] + all_afs = consec_rows["AF"].astype(float) + af_median = all_afs.median() + + consec_rows["AF"] = np.where( + abs(all_afs - af_median) <= af_threshold, True, False + ) + consec_rows = consec_rows[consec_rows["AF"] == True] + + clean_consec_rows = consec_rows.drop("AF", axis=1) + return clean_consec_rows + + def merge_rows(self, consec_rows): + """Merge certain columns from a set of rows into the position of the first one + + Args: + consec_rows (pd.DataFrame): Clean dataframe only with rows to be merged + + Returns: + row_to_merge (list): List with the values for each cell in the merged row + """ + merged_index = consec_rows.head(1).index[0] + consec_rows.at[merged_index, "REF"] = "".join(consec_rows["REF"]) + consec_rows.at[merged_index, "ALT"] = "".join(consec_rows["ALT"]) + freqs_to_merge = ",".join(consec_rows["FILENAME"].str.split(":").str[8]) + depths_to_merge = ",".join(consec_rows["FILENAME"].str.split(":").str[5]) + stats_to_merge = ":".join([freqs_to_merge, depths_to_merge]) + consec_rows.at[merged_index, "FILENAME"] += f":{stats_to_merge}" + consec_rows.at[merged_index, "FORMAT"] += ":MERGED_AF:MERGED_DP" + merged_row = consec_rows.loc[merged_index].values.tolist() + return merged_row + + def handle_dup_rows(self, row_set): + """Split dataframe with multiple variants in the same position and create a list + of rows for each possible resulting codon in the position of the first variant + + Args: + row_set (pd.DataFrame): Consecutive variants df from split_rows_dict() + + Returns: + merged_rowlist(list(list)): List of merged rows from merge_rows() + """ + # Numpy raises a warning from a pandas function that is wrongly set as deprecated + # https://github.com/numpy/numpy/issues/24889 + # https://github.com/apache/arrow/issues/36412 + with warnings.catch_warnings(): + warnings.simplefilter(action="ignore", category=FutureWarning) + split_dups = np.split( + row_set, np.where(np.diff(row_set["POS"]) >= 1)[0] + 1 + ) + split_indexes = [rowlist.index.to_list() for rowlist in split_dups] + index_product_list = [indexlist for indexlist in product(*split_indexes)] + merged_rowlist = [] + for indexlist in index_product_list: + consec_rows = row_set.loc[indexlist, :] + clean_rows = self.exclude_af_outliers(consec_rows, self.merge_af_threshold) + if self.find_consecutive(clean_rows).empty: + continue + merged_row = self.merge_rows(clean_rows) + merged_rowlist.append(merged_row) + + for row in merged_rowlist: + dropped_rowlist = [x for x in merged_rowlist if x != row] + if any(row[4] == rowdrop[4] for rowdrop in dropped_rowlist): + merged_rowlist.remove(row) + return merged_rowlist + + def process_vcf_df(self, vcf_df): + """Merge rows with consecutive SNPs that passed all filters and without NAs + + Args: + vcf_df - dataframe from self.initiate_vcf_df() + Returns: + processed_vcf_df: dataframe with consecutive variants merged + """ + clean_vcf_df = vcf_df[vcf_df["INFO"] == "TYPE=SNP"] + clean_vcf_df = clean_vcf_df[clean_vcf_df["FILTER"] == "PASS"].dropna() + consecutive_df = self.find_consecutive(clean_vcf_df) + if consecutive_df.empty: + return vcf_df + splitted_groups = self.split_non_consecutive(consecutive_df) + same_codon_consecutive = self.get_same_codon(splitted_groups) + split_rows_dict = self.split_by_codon(same_codon_consecutive) + for first_index, row_set in split_rows_dict.items(): + vcf_df = vcf_df.drop(row_set.Index) + # Redundant "Index" column is generated by Itertuples in split_by_codon() + row_set = row_set.drop(["Index"], axis=1) + if self.find_duplicates(row_set): + rows_to_merge = self.handle_dup_rows(row_set) + indexes_to_merge = [ + x for x in range(first_index, first_index + len(rows_to_merge)) + ] + for index, row in zip(indexes_to_merge, rows_to_merge): + vcf_df.loc[index] = row + else: + clean_rows = self.exclude_af_outliers(row_set, self.merge_af_threshold) + if self.find_consecutive(clean_rows).empty: + continue + rows_to_merge = self.merge_rows(clean_rows) + vcf_df.loc[first_index] = rows_to_merge + vcf_df = vcf_df.sort_index().reset_index(drop=True) + processed_vcf_df = vcf_df.drop(["REF_CODON", "ALT_CODON"], axis=1) + return processed_vcf_df + + def get_vcf_header(self): + """Create the vcf header for VCFv4.2 + + Returns: + header: String containing all the vcf header lines separated by newline. + """ + ## Define VCF header + header_source = ["##fileformat=VCFv4.2", "##source=iVar"] + if self.ref_fasta: + header_contig = [] + for record in SeqIO.parse(self.ref_fasta, "fasta"): + header_contig += [ + "##contig=" + ] + + header_source += header_contig + + header_info = [ + '##INFO=', + ] + header_filter = [ + '##FILTER=', + '##FILTER= 0.05">', + '##FILTER=', + ] + if not self.ignore_stbias: + header_filter.append( + '##FILTER=' + ) + header_format = [ + '##FORMAT=', + '##FORMAT=', + '##FORMAT=', + '##FORMAT=', + '##FORMAT=', + '##FORMAT=', + '##FORMAT=', + '##FORMAT=', + '##FORMAT=', + ] + if not self.ignore_merge: + header_format.append( + '##FORMAT=' + ) + header_format.append( + '##FORMAT=' + ) + header = header_source + header_info + header_filter + header_format + return header + + def write_vcf(self): + """Merge the vcf header and table and write them into a file + + Returns: + header: String containing all the vcf header lines separated by newline. + """ + vcf_header = "\n".join(self.get_vcf_header()) + vcf_table = self.initiate_vcf_df() + if not self.ignore_merge: + vcf_table = self.process_vcf_df(vcf_table) + # Workaround because itertuples cannot handle special characters in column names + vcf_table = vcf_table.rename( + columns={"REGION": "#CHROM", "FILENAME": self.filename} + ) + with open(self.file_out, "w") as file_out: + file_out.write(vcf_header + "\n") + vcf_table.to_csv(self.file_out, sep="\t", index=False, header=True, mode="a") + return + + def make_dir(path): """ Description: Create directory if it doesn't exist. - Input: + Args: path - path where the directory will be created. Returns: None @@ -74,520 +536,21 @@ def make_dir(path): raise -def parse_ivar_line(line): - """ - Description: - Parse ivar line to get needed variables for vcf format. - input: - line - ivar tsv line - return: - CHROM, POS, ID, REF, ALT, QUAL, INFO, FORMAT, REF_CODON, ALT_CODON, pass_test, var_type - """ - - line = line.strip("\n").split("\t") - ## Assign intial fields to variables - CHROM = line[0] - POS = line[1] - ID = "." - REF = line[2] - ALT = line[3] - - ## REF/ALF depths and quals - try: - REF_DP = int(line[4]) - except ValueError: - print(line) - print(line[4]) - exit(-1) - REF_RV = int(line[5]) - REF_FW = REF_DP - REF_RV - REF_QUAL = int(line[6]) - ALT_RV = int(line[8]) - ALT_DP = int(line[7]) - ALT_FW = ALT_DP - ALT_RV - ALT_QUAL = int(line[9]) - ALT_FREQ = float(line[10]) - FORMAT = [REF_DP, REF_RV, REF_QUAL, ALT_DP, ALT_RV, ALT_QUAL, ALT_FREQ] - - ## Codon annotation - REF_CODON = line[15] - ALT_CODON = line[17] - - ## Determine variant type - var_type = "SNP" - if ALT[0] == "+": - ALT = REF + ALT[1:] - var_type = "INS" - elif ALT[0] == "-": - REF += ALT[1:] - ALT = line[2] - var_type = "DEL" - - QUAL = "." - - ## Determine FILTER field - INFO = f"DP={int(float(line[11]))}" - pass_test = line[13] - - return ( - CHROM, - POS, - ID, - REF, - ALT, - QUAL, - INFO, - FORMAT, - REF_CODON, - ALT_CODON, - pass_test, - var_type, - ) - - -###################### -## FILTER FUNCTIONS ## -###################### - - -def ivar_filter(pass_test): - """ - Description: - process ivar filter into vcf filter format. - input: - pass_test - ivar fisher exact test [ True, False ] - return: - Whether it passes the filter or not. [False, "ft"] - """ - if pass_test == "TRUE": - return False - else: - return "ft" - - -def strand_bias_filter(format): - """ - Description: - Calculate strand-bias fisher test. - input: - format - format variables - return: - Whether it passes the filter or not. [False, "sb"] - """ - # format=[REF_DP, REF_RV, REF_QUAL, ALT_DP, ALT_RV, ALT_QUAL, ALT_FREQ] - # table: - ## REF_FW REF_RV - ## ALT_FW ALT_RV - table = np.array([[format[0] - format[1], format[1]], [format[3] - format[4], format[4]]]) - oddsr, pvalue = fisher_exact(table, alternative="greater") - - # h0: both strands are equally represented. - # If test is significant h0 is refused so there is an strand bias. - if pvalue < 0.05: - return "sb" - else: - return False - - -def write_vcf_header(ref, ignore_strand_bias, file_out, filename): - """ - Description: - Write vcf header for VCFv4.2 - input: - ref - (optional), ref in fasta format - ignore_strand_bias - if no strand-bias is calculated [True, False] - file_out - output file_in - filename - name of the output file - return: - Nothing. - """ - ## Define VCF header - header_source = ["##fileformat=VCFv4.2", "##source=iVar"] - if ref: - header_contig = [] - for record in SeqIO.parse(ref, "fasta"): - header_contig += ["##contig="] - - header_source += header_contig - - header_info = ['##INFO='] - header_filter = [ - '##FILTER=', - '##FILTER= 0.05">', - ] - header_format = [ - '##FORMAT=', - '##FORMAT=', - '##FORMAT=', - '##FORMAT=', - '##FORMAT=', - '##FORMAT=', - '##FORMAT=', - '##FORMAT=', - ] - header_cols = [f"#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\t{filename}"] - if not ignore_strand_bias: - header_filter += ['##FILTER='] - - header = header_source + header_info + header_filter + header_format + header_cols - fout = open(file_out, "w") - fout.write("\n".join(header) + "\n") - fout.close() - - -def write_vcf_line(chrom, pos, id, ref, alt, filter, qual, info, format, file_out): - """ - Description: - Format variables into vcf line format and write line to file. - input: - chrom, pos, id, ref, alt, filter, qual, info, format - vcf variables - file_out - file output - return: - Nothing. - """ - sample = f'1:{":".join(str(x) for x in format)}' - format = "GT:REF_DP:REF_RV:REF_QUAL:ALT_DP:ALT_RV:ALT_QUAL:ALT_FREQ" - - oline = ( - chrom - + "\t" - + pos - + "\t" - + id - + "\t" - + ref - + "\t" - + alt - + "\t" - + qual - + "\t" - + filter - + "\t" - + info - + "\t" - + format - + "\t" - + sample - + "\n" - ) - fout = open(file_out, "a") - fout.write(oline) - fout.close() - - -############################ -## MERGE CODONS FUNCTIONS ## -############################ - - -def check_consecutive(mylist): - """ - Description: - This function checks a list of numbers and returns how many items are consecutive. - input: - my_list - A list of integers - return: - Number of items consecutive in the list - [False, 2, 3,..] - """ - # getting first index of tuple for consecutive checking - my_list = list(map(int, [i[0] for i in mylist])) - ## Check if the list contains consecutive numbers - if len(my_list) == 1: - return False - elif sorted(my_list) == list(range(min(my_list), max(my_list) + 1)): - return len(my_list) - else: - ## If not, and the list is > 1, remove the last item and reevaluate. - if len(my_list) > 2: - my_list.pop() - if sorted(my_list) == list(range(min(my_list), max(my_list) + 1)): - return len(my_list) - else: - return False - return False - - -def get_diff_position(seq1, seq2): - """ - Description: - Function to compare two codon nucleotide sequences (size 3) and retuns the position where it differs. - Input: - seq1 - string size 3 [A,T,C,G]. Ex. "ATC" - seq2 - string size 3 [A,T,C,G]. Ex. "ACC" - Returns: - Returns position where seq1 != seq2 - """ - # If codon is NA treat as not same codon - if seq1 == "NA": - return 2 - - ind_diff = [i for i in range(len(seq1)) if seq1[i] != seq2[i]] - if len(ind_diff) > 1: - print("There has been an issue, more than one difference between the seqs.") - return False - else: - return ind_diff[0] - - -def check_merge_codons(q_pos, fe_codon_ref, fe_codon_alt): - """ - Description: - Logic for determine if variant lines need to be collapsed into one determining - if they are consecutive and belong to the same codon. - Input: - qpos - list of positions. Ex. [4441, 4442, 4443] - fe_codon_ref - first position codon annotation for ref. Ex. "ATG" - fe_codon_alt - first position codon annotation for alt. Ex. "AGG" - Returns: - Returns num_collapse. Number of lines that need to be collapsed into one. - """ - # Are two positions in the queue consecutive? - # q_pos = [4441, 4442, 5067] - num_collapse = 0 - if check_consecutive(list(q_pos)) == 2: - ## If the first position is not on the third position of the codon they are in the same codon. - if get_diff_position(fe_codon_ref, fe_codon_alt) != 2: - num_collapse = 2 - else: - num_collapse = 1 - # Are the three positions in the queue consecutive? - # q_pos = [4441, 4442, 4443] - elif check_consecutive(list(q_pos)) == 3: - ## we check the first position in which codon position is to process it acordingly. - # If first position is in the first codon position all three positions belong to the same codon. - if get_diff_position(fe_codon_ref, fe_codon_alt) == 0: - num_collapse = 3 - # If first position is in the second codon position, we have the two first positions belonging to the same codon and the last one independent. - elif get_diff_position(fe_codon_ref, fe_codon_alt) == 1: - num_collapse = 2 - ## Finally if we have the first position in the last codon position, we write first position and left the remaining two to be evaluated in the next iteration. - elif get_diff_position(fe_codon_ref, fe_codon_alt) == 2: - num_collapse = 1 - # If no consecutive process only one line. - elif check_consecutive(list(q_pos)) == False: - num_collapse = 1 - - return num_collapse - - -def process_variants(variants, num_collapse): - """ - Description: - The function set the variables acordingly to the lines to collapse do to consecutive variants. - Input: - variants - Dict with var lines. - num_collapse - number of lines to collapse [2,3] - Returns:: - Vars fixed: chrom, pos, id, ref, alt, qual, filter, info, format - """ - # Collapsed variant parameters equal to first variant - key_list = ["chrom", "pos", "id", "qual", "filter", "info", "format"] - chrom, pos, id, qual, filter, info, format = [variants[next(iter(variants))][key] for key in key_list] - - # If no consecutive, process one variant line - # If two consecutive, process two variant lines into one - # If three consecutive process three variant lines and write one - ref = "" - alt = "" - iter_variants = iter(variants) - for _ in range(num_collapse): # fixed notation - var = next(iter_variants) - ref += variants[var]["ref"] - alt += variants[var]["alt"] - - return chrom, pos, id, ref, alt, qual, filter, info, format - - def main(args=None): - # Process args args = parse_args(args) - - # Initialize vars - filename = os.path.splitext(args.file_in)[0] - out_dir = os.path.dirname(args.file_out) - var_list = [] # store variants - var_count_dict = {"SNP": 0, "INS": 0, "DEL": 0} # variant counts - variants = OrderedDict() # variant dict (merge codon) - q_pos = deque([], maxlen=3) # pos fifo queue (merge codon) - last_pos = "" - - # Create output directory - make_dir(out_dir) - - ############################## - ## Write vcf header to file ## - ############################## - write_vcf_header(args.fasta, args.ignore_strand_bias, args.file_out, filename) - - ################################# - ## Read and process input file ## - ################################# - with open(args.file_in, "r") as fin: - for line in fin: - if "REGION" not in line: - ################ - ## Parse line ## - ################ - ## format= - # [REF_DP, REF_RV, REF_QUAL, ALT_DP, ALT_RV, ALT_QUAL, ALT_FREQ] - write_line = True - ( - chrom, - pos, - id, - ref, - alt, - qual, - info, - format, - ref_codon, - alt_codon, - pass_test, - var_type, - ) = parse_ivar_line(line) - - ## If pos is duplicated due to annotation skip lines - if pos == last_pos: - continue - - last_pos = pos - ##################### - ## Process filters ## - ##################### - ## ivar fisher test - filter = "" - if ivar_filter(pass_test): - filter = ivar_filter(pass_test) - ## strand-bias fisher test - if not args.ignore_strand_bias: - if strand_bias_filter(format): - if filter: - filter += ";" + strand_bias_filter(format) - else: - filter = strand_bias_filter(format) - - if not filter: - filter = "PASS" - - ##################### - ## Filter variants ## - ##################### - if args.pass_only and filter != "PASS": - write_line = False - ### AF filtering. ALT_DP/(ALT_DP+REF_DP) - if float(format[3] / (format[0] + format[3])) < args.allele_freq_threshold: - write_line = False - ### Duplication filter - if (chrom, pos, ref, alt) in var_list: - write_line = False - else: - var_list.append((chrom, pos, ref, alt)) - - ############################################################ - ## MERGE_CODONS ## - ## Merge consecutive variants belonging to the same codon ## - ############################################################ - if not args.ignore_merge_codons and var_type == "SNP": - ## re-fill queue and dict accordingly - q_pos.append((pos, var_type)) # adding type information - variants[(chrom, pos, ref, alt)] = { - "chrom": chrom, - "pos": pos, - "id": id, - "ref": ref, - "alt": alt, - "qual": qual, - "filter": filter, - "info": info, - "format": format, - "ref_codon": ref_codon, - "alt_codon": alt_codon, - } - - if len(q_pos) == q_pos.maxlen: - fe_codon_ref = variants[next(iter(variants))]["ref_codon"] - fe_codon_alt = variants[next(iter(variants))]["alt_codon"] - num_collapse = check_merge_codons(q_pos, fe_codon_ref, fe_codon_alt) - ( - chrom, - pos, - id, - ref, - alt, - qual, - filter, - info, - format, - ) = process_variants(variants, num_collapse) - - ## Empty variants dict and queue accordingly - for _ in range(num_collapse): - variants.popitem(last=False) - q_pos.popleft() - else: - write_line = False - - ############################## - ## Write output to vcf file ## - ############################## - if write_line: - var_count_dict[var_type] += 1 - write_vcf_line( - chrom, - pos, - id, - ref, - alt, - filter, - qual, - info, - format, - args.file_out, - ) - - if not args.ignore_merge_codons: - ####################### - ## handle last lines ## - ####################### - while len(q_pos) > 0: - try: - fe_codon_ref = variants[next(iter(variants))]["ref_codon"] - fe_codon_alt = variants[next(iter(variants))]["alt_codon"] - except StopIteration: - break - else: - num_collapse = check_merge_codons(q_pos, fe_codon_ref, fe_codon_alt) - (chrom, pos, id, ref, alt, qual, filter, info, format) = process_variants(variants, num_collapse) - - var_count_dict[q_pos[0][1]] += 1 - write_vcf_line(chrom, pos, id, ref, alt, filter, qual, info, format, args.file_out) - ## Empty variants dict and queue accordingly - for _ in range(num_collapse): - variants.popitem(last=False) - q_pos.popleft() - - ############################################# - ## variant counts to pass to MultiQC ## - ############################################# - var_count_list = [(k, str(v)) for k, v in sorted(var_count_dict.items())] - - # format output table a little more cleanly - # row_spacing = len(filename) - - row = create_f_string(30, "<") # an arbitraily long value to fit most sample names - row += create_f_string(10) * len(var_count_list) # A spacing of ten looks pretty - - headers = ["sample"] - headers.extend([x[0] for x in var_count_list]) - data = [filename] - data.extend([x[1] for x in var_count_list]) - print(row.format(*headers)) - print(row.format(*data)) - - -def create_f_string(str_size, placement="^"): - row_size = "{: " + placement + str(str_size) + "}" - return row_size + ivar_to_vcf = IvarVariants( + file_in=args.file_in, + file_out=args.file_out, + pass_only=args.pass_only, + freq_threshold=args.allele_freq_threshold, + bad_qual_threshold=args.bad_quality_threshold, + merge_af_threshold=args.merge_af_threshold, + ignore_stbias=args.ignore_strand_bias, + ignore_merge=args.ignore_merge_codons, + fasta=args.fasta, + ) + ivar_to_vcf.write_vcf() + return if __name__ == "__main__":