-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpairwise.py
271 lines (244 loc) · 8.24 KB
/
pairwise.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
import random
import time
from typing import List
from typing import Tuple
from collections import defaultdict, deque
def main():
test_seq_assembly()
def test_and_print_message(seq, seq_truth, k, message):
if seq == seq_truth:
print(f"Passed {message} (assembled original sequence). Congratulations!")
elif compare_composition(seq, seq_truth, k):
print(f"Passed {message} (assembled a sequence of the same composition with the original sequence). Congratulations!")
else:
print("FAILED test 1!")
def test_1(method):
print(f"Testing k-assembler by {method}")
seqs_truth = ["aaaaaaaaaaa","agcagctcagc","agcagctcagg",random_DNA_sequence(10000, 20000)]
ks = [5, 3, 3, 20]
for i in range(len(seqs_truth)):
print(f"\nExample {i}:")
curr = seqs_truth[i]
#print(curr)
k = ks[i]
kmers = get_kmers(curr, k, True)
begin = time.time()
if method == "k-mer pairwise comparison":
create_deBruijn_graph_by_string_comp(kmers)
elif method == "k-mer hashing":
g = debrujin_graph_from_kmers(kmers)
elif method == "k-mer hashing without deque":
g = debrujin_graph_from_kmers_nondeque(kmers)
else:
print("ERROR: unknown methods!")
end = time.time()
elapsed_secs = end - begin
print(f"Elapsed time for building de Bruijn graph: {elapsed_secs}")
# Euler Path was not implemented for the data structure given by the pairwise method
if(method != "k-mer pairwise comparison"):
balanced_count = balanceCount(g)
#print("BALANCED COUNT")
#print(balanced_count)
path = deque()
if has_Eulerian_path(balanced_count):
print("Passed test for existence of Eulerian path. Congratulations!")
else:
print("Failed test for existence of Eulerian path!")
continue
try:
path = eulPath(g,balanced_count)
seq = genomePath(path)
message = f"Test 1 Example {i}"
test_and_print_message(seq, curr, k, message)
except Exception as e:
print(f"ERROR: {e}")
def test_2(method):
seq_truth = random_DNA_sequence()
k = 10
kmers = get_kmers(seq_truth, k)
# Euler Path was not implemented for the data structure given by the pairwise method
if(method != "k-mer pairwise comparison"):
try:
seq = assemble_kmers(kmers, method)
test_and_print_message(seq, seq_truth, k, "Test 2")
except Exception as e:
print(e)
def test_3(method):
seq_truth = random_DNA_sequence(15, 15)
k = 4
print(f"Sequence: {seq_truth}")
kmers = get_kmers(seq_truth, k)
print("kmers:")
for kmer in kmers:
print(kmer)
try:
seq = assemble_kmers(kmers, method)
test_and_print_message(seq, seq_truth, k, "Test 3")
except Exception as e:
print(e)
def test_seq_assembly():
methods = [
"k-mer pairwise comparison",
"k-mer hashing",
"k-mer hashing without deque"
]
for method in methods:
print("-----------")
test_1(method)
print()
test_2(method)
print()
print("-----------")
test_3("k-mer hashing")
def assemble_kmers(kmers, method):
seq = ""
if method == "k-mer pairwise comparison":
create_deBruijn_graph_by_string_comp(kmers)
elif method == "k-mer hashing":
g = debrujin_graph_from_kmers(kmers)
elif method == "k-mer hashing without deque":
g = debrujin_graph_from_kmers_nondeque(kmers)
else:
raise Exception("ERROR: unknown methods!")
balanced_count = balanceCount(g)
#path = deque()
if not has_Eulerian_path(balanced_count):
raise Exception("ERROR: Eulerian path does not exist!")
else:
path = eulPath(g,balanced_count)
seq = genomePath(path)
return seq
def random_DNA_sequence(min_length=10, max_length=10000):
DNA = ""
length = random.randint(min_length, max_length)
nucleotides = "atgc"
for n in range(length):
DNA += nucleotides[random.randint(0, 3)]
return DNA
def get_kmers(seq, k, randomized=True):
kmers = [seq[i:i+k] for i in range(len(seq) - k + 1)]
if randomized:
nkmers = len(kmers)
for i in range(nkmers-1):
j = random.randint(i, len(kmers)-1)
kmers[i], kmers[j] = kmers[j], kmers[i]
return kmers
def compare_composition(s1, s2, k):
same_composition = True
if len(s1) != len(s2):
same_composition = False
elif s1 == s2:
same_composition = True
else:
composition1 = get_kmers(s1, k)
composition1.sort()
composition2 = get_kmers(s2, k)
composition2.sort()
same_composition = composition1 == composition2
return same_composition
def genomePath(kmers, apppend_last=True):
genome = ''
for kmer in kmers:
genome += kmer[0]
if apppend_last:
genome += kmer[1:]
return genome
def debrujin_graph_from_kmers(patterns):
kmers = set()
for pattern in patterns:
kmers = set(suffix_composition(len(pattern), pattern))
graph = defaultdict(deque)
for kmer in patterns:
graph[prefix(kmer)].append(suffix(kmer))
return graph
def debrujin_graph_from_kmers_nondeque(patterns):
kmers = []
for pattern in patterns:
kmers = kmers + suffix_composition(len(pattern), pattern)
kmers = set(kmers)
dict = {}
for kmer1 in kmers:
dict[kmer1] = deque()
for kmer in patterns:
dict[prefix(kmer)].append(suffix(kmer))
return dict
class Node:
def __init__(self, label: str):
self.m_label = label
self.m_num_of_incoming = 0
self.m_outgoing = []
def create_deBruijn_graph_by_string_comp(kmers):
nodes = []
nodesFound = []
i = 0
for kmerNode in kmers:
nodesFound.append(kmerNode)
nodes.append(Node(kmerNode))
for kmerCompare in kmers:
if len(kmerNode) > 1 and len(kmerCompare) > 1:
if kmerNode[:-1] == kmerCompare[1:]:
nodes[i].m_num_of_incoming += 1
if kmerNode[1:] == kmerCompare[:-1]:
nodes[i].m_outgoing.append(kmerCompare)
i += 1
return nodes
def suffix_composition(k, text):
kmers = []
for i in range(len(text) + 1 - k):
kmers.append(text[i:i+k-1])
return sorted(kmers)
def suffix(string):
return string[1:]
def prefix(string):
return string[:-1]
def balanceCount(adjacentList):
# create a set of all nodes in the graph
all_nodes = set(adjacentList.keys())
#print(all_nodes)
#print(adjacentList)
for nodes in adjacentList.values():
all_nodes.update(nodes)
# create a dictionary to hold the balanced counts
balanced_count = dict.fromkeys(all_nodes, 0)
# iterate over each node in the adjacency list
for node in adjacentList.keys():
# subtract the out-degree of the node from its balanced count
balanced_count[node] -= len(adjacentList[node])
# iterate over each outgoing edge from the node
for out in adjacentList[node]:
# add 1 to the balanced count of the node at the other end of the edge
try:
balanced_count[out] += 1
except KeyError:
balanced_count[out] = 1
return balanced_count
def build_sequence(path):
seq1 = ''.join(path)
seq2 = ''
for i in range(0, len(seq1)):
if (i % 2) == 0:
seq2 += seq1[i]
seq2 += seq1[len(seq1)-1]
return seq2
def eulPath(graph, balanced_count):
dictionary = deque()
#print("BALANCED COUNT ITEMS")
#print(balanced_count.items())
dictionary.appendleft([k for k, v in balanced_count.items() if v == -1][0])
path = deque()
while dictionary:
u_v = dictionary[0]
try:
w = graph[u_v][0]
dictionary.appendleft(w)
graph[u_v].popleft()
except:
path.appendleft(dictionary.popleft())
return path
def has_Eulerian_path(balanced_count):
for k, v in balanced_count.items():
if v == -1:
return True
return False
if __name__ == "__main__":
main()