-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathflatten-fasta.py
105 lines (89 loc) · 3.4 KB
/
flatten-fasta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import re
import sys
import argparse
import itertools
from pathlib import Path
# catch broken pipe errors to allow ex) python foo.py ... | head
# see: https://stackoverflow.com/a/30091579
from signal import signal, SIGPIPE, SIG_DFL
signal(SIGPIPE, SIG_DFL)
#===============================================================================
def fasta_reader(fasta):
"""
Read in a fasta file lazily and return a generator of the name and sequence
Input:
------
fasta :: FileType
opened file
Yields:
-------
generator :: (name, seq)
name :: str
Name of the read taken from the fasta file
read :: str
Sequence taken from the fasta file
Requires:
---------
itertools
Example:
--------
itertools.groupby takes a key function and groups all items into a list
until that key changes. We can key on lines beginning with >, then grab
every line until the next record in the fasta. This makes our method robust
to some fasta formats that have forced line breaks at given characters.
foo = '>ABC>DEF>GHI'
[(k, list(g)) for k,g in itertools.groupby(foo, lambda x: x == '>')]
--> [(True, ['>']), (False, ['A', 'B', 'C']), (True, ['>']), ... ]
Note:
-----
Adapted from: https://www.biostars.org/p/710/#1412
"""
# ditch the boolean (x[0]) and just keep the header/seq grouping
fa_iter = (x[1]
for x in itertools.groupby(fasta, lambda line: line[0] == ">"))
for header in fa_iter:
# drop the ">"
name = next(header)[1:].strip()
# join all sequence lines to one by iterating until the next group.
read = "".join(s.strip() for s in next(fa_iter))
yield name, read
#===============================================================================
if __name__ == '__main__':
description = "reorient all sequences in a fasta to start at a particular pattern"
parser = argparse.ArgumentParser(description=description)
parser.add_argument("fasta",
nargs="+",
type=argparse.FileType('r'),
help="fasta's to process")
parser.add_argument(
'-r',
'--records',
type=int,
dest='num_records',
metavar='N',
default='0',
help='number of records in the fasta to process (default = all)')
parser.add_argument('--no-flat',
dest='no_flat',
action='store_true',
help='just output the file')
args = parser.parse_args()
if args.num_records < 0:
raise ValueError('Number of records must be >0!!')
elif args.num_records == 0:
num_records = None
else:
num_records = args.num_records
#---------------------------------------------------------------------------
# re-orient the reference plasmids
for fasta in args.fasta:
basename = Path(fasta.name).stem
records = itertools.islice(fasta_reader(fasta), num_records)
# include the well in the record line
# also concatenate sequences together to enable mapping across the plasmid junction
for name, seq in records:
seq = seq.upper()
if args.no_flat:
print(f'>{basename + "_" + name}\n{seq}', file=sys.stdout)
else:
print(f'>{basename + "_" + name}\n{seq+seq}', file=sys.stdout)