-
Notifications
You must be signed in to change notification settings - Fork 2
/
parse-test-align.py
executable file
·235 lines (201 loc) · 7.62 KB
/
parse-test-align.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#!/usr/bin/env python3
import sys
import errno
import random
import logging
import argparse
assert sys.version_info.major >= 3, 'Python 3 required'
REVCOMP_TABLE = str.maketrans('acgtrymkbdhvACGTRYMKBDHV', 'tgcayrkmvhdbTGCAYRKMVHDB')
DESCRIPTION = """Generate test reads from a human-readable alignment.
This script allows you to create test inputs in a natural way, by writing reads in their aligned
position in a human-readable text file. See tests/parse-align.in.txt for an example."""
def make_argparser():
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument('alignment', type=argparse.FileType('r'), nargs='?', default=sys.stdin,
help='The input alignment file. Omit to read from stdin.')
parser.add_argument('-1', '--fq1', type=argparse.FileType('w'),
help='Write the first-mate reads to this fastq file. Warning: will overwrite any existing file.')
parser.add_argument('-2', '--fq2', type=argparse.FileType('w'),
help='Write the first-mate reads to this fastq file. Warning: will overwrite any existing file.')
parser.add_argument('-r', '--ref', type=argparse.FileType('w'),
help='Write the reference sequence to this file. Warning: will overwrite any existing file.')
parser.add_argument('-d', '--duplex', action='store_true',
help='Interpret these as duplex reads, and prepend with barcodes and constant sequences.')
parser.add_argument('-c', '--const', default='TACGT',
help='Default: %(default)s')
parser.add_argument('-q', '--default-qual', type=int, default=40,
help='Default: %(default)s')
parser.add_argument('-n', '--add-pair-num', action='store_true',
help='Append the read pair number to the read names (not compatible with --duplex).')
parser.add_argument('-B', '--print-barlen', action='store_true',
help='Print the detected lenght of the barcodes to stdout.')
parser.add_argument('-l', '--log', type=argparse.FileType('w'), default=sys.stderr,
help='Print log messages to this file instead of to stderr. Warning: Will overwrite the file.')
parser.add_argument('-Q', '--quiet', dest='volume', action='store_const', const=logging.CRITICAL,
default=logging.WARNING)
parser.add_argument('-v', '--verbose', dest='volume', action='store_const', const=logging.INFO)
parser.add_argument('-D', '--debug', dest='volume', action='store_const', const=logging.DEBUG)
return parser
def main(argv):
parser = make_argparser()
args = parser.parse_args(argv[1:])
logging.basicConfig(stream=args.log, level=args.volume, format='%(message)s')
tone_down_logger()
add_pair_num = args.add_pair_num
if args.duplex:
if args.add_pair_num:
fail('Error: --duplex and --add-pair-num not compatible.')
add_pair_num = True
qual_char = chr(args.default_qual+32)
if args.fq1 and args.fq2:
fq_files = [args.fq1, args.fq2]
else:
fq_files = []
barlen = None
if add_pair_num:
pair_num = 0
else:
pair_num = None
ref_seq = None
family_num = 0
first_mate = None
last_barcode = None
for line_raw in args.alignment:
if line_raw.startswith('#'):
continue
prefix = line_raw.split()[0]
line = line_raw[len(prefix):].rstrip()
if not line:
continue
if ref_seq is None and prefix.startswith('f'):
raw_ref_seq = line.lstrip()
if args.ref:
ref_seq = raw_ref_seq.replace('-', '')
args.ref.write('>ref\n')
args.ref.write(ref_seq+'\n')
elif prefix.startswith('r'):
assert raw_ref_seq is not None, line_raw
mate = int(prefix[1])
if first_mate is None:
first_mate = mate
if args.duplex:
barcode = prefix[2:]
if barcode != last_barcode:
family_num += 1
pair_num = 0
if barlen is not None and barlen != len(barcode):
fail('Error: Variable barcode lengths encountered. Barcode {!r} length != {}.'
.format(barcode, barlen))
barlen = len(barcode)
tags = (barcode[:barlen//2], barcode[barlen//2:])
else:
name = prefix[2:] or None
if mate == first_mate and add_pair_num:
pair_num += 1
raw_seq, pos, direction = get_raw_seq(line)
mutated_seq = substitute_ref_bases(raw_seq, pos, raw_ref_seq)
if direction == 'reverse':
final_seq = revcomp(mutated_seq)
else:
final_seq = mutated_seq
if fq_files:
if args.duplex:
formatter = format_duplex_read(
final_seq, direction, mate, family_num, pair_num, tags, args.const, qual_char
)
else:
formatter = format_read(final_seq, qual_char, name, pair_num)
write_read(fq_files, mate, formatter)
if args.duplex:
last_barcode = barcode
if args.print_barlen:
print(barlen)
def get_raw_seq(line):
direction = None
seq = line.lstrip(' ')
if seq.endswith('+'):
assert not (seq.startswith('+') or seq.startswith('-')), f'Seq has multiple +/- marks: {seq!r}'
direction = 'forward'
seq = seq[:-1]
pos = len(line) - len(seq)
elif seq.startswith('-'):
assert not seq.endswith('-'), f"Seq has '-' at both the start and end: {seq!r}"
direction = 'reverse'
seq = seq[1:]
pos = len(line) - len(seq) + 1
elif seq.startswith('+'):
fail(f"'+' should go at end of seq, but was at start: {seq!r}")
elif seq.endswith('-'):
fail(f"'-' should go at start of seq, but was at end: {seq!r}")
else:
fail(f'A +/- direction is required at the 3\' end of the read.\nFailed on {seq!r}')
return seq, pos, direction
def substitute_ref_bases(raw_seq, pos, ref_seq):
"""Replace dots in the raw sequence with reference bases."""
final_seq = ''
for i, raw_char in enumerate(raw_seq):
if raw_char == '.':
ref_char = ref_seq[pos+i-1]
if ref_char != '-':
final_seq += ref_char
elif raw_char != '-':
final_seq += raw_char
return final_seq
def write_read(fq_files, mate, formatter):
for line in formatter:
fq_files[mate-1].write(line+'\n')
def format_read(seq, qual_char, name=None, pair_num=None):
if name is not None:
if pair_num is None:
yield f'@{name}'
else:
yield f'@{name}.{pair_num}'
else:
yield f'@{random.randint(100000, 999999)}'
yield seq
yield '+'
yield qual_char * len(seq)
def format_duplex_read(seq, direction, mate, family_num, pair_num, tags, const, qual_char):
# Read name (line 1):
if (direction == 'forward' and mate == 1) or (direction == 'reverse' and mate == 2):
order = 'ab'
else:
order = 'ba'
yield '@fam{}.{}.pair{} mate{}'.format(family_num, order, pair_num, mate)
# Read sequence (line 2):
if order == 'ab':
tags_ordered = tags
if order == 'ba':
tags_ordered = list(reversed(tags))
tag = tags_ordered[mate-1]
yield tag + const + seq
# Plus (line 3):
yield '+'
# Quality scores (line 4):
read_len = len(tag + const + seq)
yield qual_char * read_len
def revcomp(seq):
return seq.translate(REVCOMP_TABLE)[::-1]
def rand_seq(length):
barcode = ''
for i in range(length):
barcode += random.choice('ACGT')
return barcode
def tone_down_logger():
"""Change the logging level names from all-caps to capitalized lowercase.
E.g. "WARNING" -> "Warning" (turn down the volume a bit in your log files)"""
for level in (logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG):
level_name = logging.getLevelName(level)
logging.addLevelName(level, level_name.capitalize())
def fail(message):
logging.critical(message)
if __name__ == '__main__':
sys.exit(1)
else:
raise Exception('Unrecoverable error')
if __name__ == '__main__':
try:
sys.exit(main(sys.argv))
except IOError as ioe:
if ioe.errno != errno.EPIPE:
raise