-
Notifications
You must be signed in to change notification settings - Fork 0
/
preprocessing_msa.py
191 lines (152 loc) · 5.9 KB
/
preprocessing_msa.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import os
import sys
import itertools
import collections
#from Bio import SeqIO
import pdb
# make amino acids set
aa = "ARNDBCEQZGHILKMFPSTWYV"
amino_acids = set()
for c in aa:
amino_acids.add(c)
# I need the named tuple to count amino acids at each position
# and order them by the count easily
AA = collections.namedtuple('AA', 'name count')
def read_fasta(fasta_file_path):
"""
read fasta file and return a dict of all sequences
:param fasta_file_path: path to fasta file
:return: a dictionary of seq_name:seq
"""
sequences = dict()
if not os.path.exists(fasta_file_path):
logging.error("file {} does not exist".format(fasta_file_path))
sys.exit()
if fasta_file_path.endswith("gz"):
fasta_file = gzip.open(fasta_file_path, "rt")
else:
fasta_file = open(fasta_file_path, "r")
seqs = []
seq_name = ""
for line in fasta_file:
line = line.strip()
if not line: # empty line
continue
if line.startswith(">"):
if len(seqs) != 0: # there was a sequence before
sequences[seq_name] = "".join(seqs)
seq_name = line[1:]
seqs = []
else:
seq_name = line[1:]
else:
seqs.append(line)
if seqs:
sequences[seq_name] = "".join(seqs)
return sequences
def return_pos_counts(reads, percentage, dna=False):
'''
This function takes the aligned reads dictionary and returns a list of lists, each list for a position in the aligned proteins
and the distribution of the amino acids at that position in the form of AA namedtuples
:params reads: dictionary of sample IDs as keys and protein sequence as values
'''
position_counts_dict = []
position_counts_dict.append(dict())
len_reads = len(reads)
for i in range(len(reads[list(reads.keys())[1]])):
position_counts_dict.append(dict())
for read in reads.values():
if read[i] not in position_counts_dict[i]:
position_counts_dict[i][read[i]] = 1
else:
position_counts_dict[i][read[i]] += 1
position_counts = []
for pos, count_list in enumerate(position_counts_dict):
position_counts.append([]) # list of AA objects
for aa in count_list.keys():
if dna:
# skipping samples that have an ambiguos letter at that position
if aa in {"-", "*", 'B', 'D','H', 'K', 'M', 'N', 'R', 'S','W', 'Y'}:
continue
else:
# skipping samples that have a - or * at that position
if aa in {"-", "*"}:
continue
# discard letters with counts less than percentage provided
if count_list[aa] < percentage*len_reads:
continue
else:
position_counts[pos].append(AA(name=aa, count=count_list[aa]))
for count_list in position_counts:
count_list.sort(key = lambda x: x.count, reverse = True)
return position_counts
def generate_vcf_files(possible_pairs, position_counts, reads):
'''
Generates a fake VCF file to give to R to do the Hierarchical Clustering then Sankoff algorithm on the Dendrogram
:params possible_pairs: is a dictionary of position and the different amino acids at that position
:params position_counts: dictionary of all positions the amino acid distribution at that count
:params reads: dictionary of sample IDs and sequences
'''
header = ["Chromosome", "POS", "Indiv", "gt_PL", "gt_GT", "gt_GT_allele\n"]
output_vcf = open("output_vcf.vcf" , "w")
output_vcf.write("\t".join(header))
# len_read = len(reads[list(reads.keys())[0]])
for pos in possible_pairs.keys():
for sample_id, read in reads.items():
if read[pos] in possible_pairs[pos]:
gt_GT = str(possible_pairs[pos].index(read[pos]))
else:
gt_GT = "NA"
output_vcf.write("\t".join(["1", str(pos), str(sample_id), "0", gt_GT, ','.join(possible_pairs[pos] + ["\n"])]))
def generate_sequence_table(reads):
'''
outputs a TSV file for R to read with lines as samples and columns as positions in sequence
:params reads: dictionary of sample IDs and sequences
'''
output_table = open("input_protein_table.tsv", "w")
# creating header for the table
header = "\t".join([str(x) for x in range(len(reads[list(reads.keys())[1]]))])
header = "samples_name\t" + header + "\n"
output_table.write(header)
for sample_id, read in reads.items():
line = [str(sample_id)]
line += [char for char in read]
line[-1] = line[-1] + "\n"
output_table.write("\t".join(line))
if __name__ == "__main__":
if len(sys.argv) < 3:
print("You need to give the input reads file as first arguments and the percentage as second argument and 'dna' or 'aa' as third argument")
sys.exit()
if not os.path.exists(sys.argv[1]):
print("The file {} does not exist".format(sys.argv[1]))
sys.exit()
reads = read_fasta(sys.argv[1])
percentage = float(sys.argv[2])
# generating the tsv table with each character in a column
# needed for the R modules later
generate_sequence_table(reads)
if sys.argv[3] == "dna":
position_counts = return_pos_counts(reads, percentage, dna=True)
elif sys.argv[3] == "aa":
position_counts = return_pos_counts(reads, percentage, dna=False)
else:
print("You need to give third argument as either 'dna' or 'aa'")
sys.exit()
total_count = len(reads)
possible_pairs = dict()
counter = 0
for pos, aa_list in enumerate(position_counts):
if len(aa_list) > 1:
possible_pairs[pos] = [x.name for x in aa_list] # this should be sorted according to counts
# elif len(aa_list) == 1:
# counter += 1
# print("{} position had predominantly 1 protein from {} positions".format(counter, len(reads[list(reads.keys())[1]])))
pairs_file = open("variant_pairs.txt", "w")
pairs_file.write("first\tsecond\n")
positions = list(possible_pairs.keys())
# generating all possible pairs from position with variants
# ignored position that are predominantly one amino acid
for pair in itertools.combinations(positions, 2):
pairs_file.write("{}\t{}\n".format(pair[0], pair[1]))
pairs_file.close()
generate_vcf_files(possible_pairs, position_counts, reads)