-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathFastaReader.py
157 lines (125 loc) · 3.38 KB
/
FastaReader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from __future__ import absolute_import
import gzip
from os.path import abspath, expanduser
def split_header(name):
"""
split fasta header to id and description
:param name:
:return:
"""
parts = name.split(None, 1)
if len(parts) == 1:
parts.append("")
return parts
class FastaRecord(object):
"""
object to process a fasta record
"""
DELIMITER = ">"
def __init__(self, name, seq):
try:
assert "\n" not in name
assert "\n" not in seq
assert self.DELIMITER not in seq
self._name = name
self._seq = seq
self._id, self._description = split_header(name)
except AssertionError:
raise ValueError("Invalid FASTA record data")
@property
def name(self):
"""
the name of the seq, strings after ">"
"""
return self._name
@property
def id(self):
"""
The id of the seq, equal to the FASTA header
up to the first whitespace.
"""
return self._id
@property
def description(self):
"""
The description of the seq in the FASTA file, equal to
the contents of the FASTA header following the first whitespace
"""
return self._description
@property
def seq(self):
"""
The seq of the record
"""
return self._seq
@property
def length(self):
"""
the length of the seq
"""
return len(self._seq)
@classmethod
def from_string(cls, string):
"""
Interprets a string as a FASTA record. Does not make any
assumptions about wrapping of the seq string.
"""
string = string.strip()
try:
lines = string.splitlines()
assert len(lines) > 1
assert lines[0][0] == cls.DELIMITER
name = lines[0][1:]
seq = "".join(lines[1:])
return FastaRecord(name, seq)
except AssertionError:
raise ValueError("String not recognized as a valid FASTA record")
def __str__(self):
"""
str conversion
:return:
"""
return ">%s\n%s" % (self.name, self.seq)
def check_format(filename):
"""
check the format of file
:param filename:
:return:
"""
allowed_format = [".fa", ".fasta", ".fa.gz", ".fasta.gz"]
if any([f for f in allowed_format if filename.endswith(f)]):
return 0
else:
msg = "file format is not in %s" % allowed_format
raise Exception(msg)
def yield_fasta_records(stream):
"""
yield fastq records from stream
:param stream: a stream object
:return:
"""
string = ""
for line in stream:
line = line.strip()
if not line:
continue
if string and line.startswith(">"):
yield FastaRecord.from_string(string)
string = ""
string += "%s\n" % line
if string:
yield FastaRecord.from_string(string)
def open_fasta(filename):
"""
read fasta file and return fasta records
:param filename:
:return:
"""
check_format(filename)
filename = abspath(expanduser(filename))
mode = 'r'
if filename.endswith(".gz"):
stream = gzip.open(filename, mode)
else:
stream = open(filename, mode)
return yield_fasta_records(stream)