-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathfilewriter.py
executable file
·240 lines (197 loc) · 8.34 KB
/
filewriter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
"""filewriter contains the functions required for writing skeleton files
to disk.
"""
import configparser as ConfigParser
import os
import sys
from io import BytesIO
import signature2bytegenerator
class FileWriter:
"""Functions for writing skeleton files to disk."""
def __init__(self, puid_type):
self.eof_written = None
self.var_written = None
self.bof_written = None
self.boflen = None
self.tmpbio = None
self.puid_str = None
self.nt_file = None
self.nt_string = None
self.sig2map = None
self.var_pos = None
self.puid_no = None
config = ConfigParser.RawConfigParser()
config.read("skeletonsuite.cfg")
self.newtriplesdir = (
os.getcwd()
+ "//"
+ config.get("locations", "output")
+ "//"
+ puid_type
+ "//"
)
if os.path.exists(self.newtriplesdir) is False:
try:
os.makedirs(self.newtriplesdir)
except OSError as err:
print(err, file=sys.stderr)
self.bof = 1 # enum-esque vars to help check already written sequences
self.var = 2
self.eof = 3
try:
self.fillbyte = config.getint("runtime", "fillbyte")
except ValueError as err:
print(err, file=sys.stderr)
self.fillbyte = 0
@staticmethod
def int_list_from_sequence(bytes_):
"""Convert bytes to a list."""
return list(bytes.fromhex(bytes_))
# Write BOF sequence to file
def write_header(self, min_, seq):
"""Write BOF to file."""
self.sig2map = signature2bytegenerator.Sig2ByteGenerator()
self.detect_write_issues(self.bof)
bof_sequence = ""
if self.bof_written is True:
# the sequences are aligned okay...
if int(min_) > int(self.boflen):
# sys.stderr.write(string.ljust(" ", 22, " ") + string.rjust(grt1, 20, " ") + '\n')
# sys.stderr.write(string.ljust(" ", 16, " ") + string.rjust(grt2, 20, " ") + '\n')
self.nt_file.seek(self.boflen)
mint = int(min_) - int(self.boflen)
bof_sequence = self.sig2map.map_signature(mint, seq, 0, self.fillbyte)
# if second sequence is zero may be error in PRONOM
# so write after BOF to not overwrite anything
elif int(min_) == 0:
# sys.stderr.write(string.ljust(" ", 22, " ") + string.rjust(eq2, 20, " ") + "\n")
# sys.stderr.write(string.ljust(" ", 18, " ") + string.rjust(eq3, 20, " ") + "\n")
self.nt_file.seek(self.boflen)
bof_sequence = self.sig2map.map_signature(min_, seq, 0, self.fillbyte)
# we might not need this... may fit under min >(=) boflen
elif int(min_) == int(self.boflen):
bof_sequence = self.sig2map.map_signature(min_, seq, 0, self.fillbyte)
else:
self.nt_file.seek(0)
bof_sequence = self.sig2map.map_signature(min_, seq, 0, self.fillbyte)
tmpread = False
if self.eof_written is True: # read eof into tmp and re-write
tmpread = self.write_seq_with_eof()
for sequence in bof_sequence:
try:
bof = self.int_list_from_sequence(sequence)
self.nt_file.write(bytes(bof))
except TypeError as err:
error = f"{err} BOF Signature not mapped: {seq}"
print(error, file=sys.stderr)
self.boflen = self.nt_file.tell()
self.bof_written = True
if tmpread is True:
self.nt_file.write(self.tmpbio.getvalue())
# Write EOF sequence to file
def write_footer(self, min_off, seq):
"""Write skeleton file EOF."""
self.sig2map = (
signature2bytegenerator.Sig2ByteGenerator()
) # TODO: New instance or not?
self.detect_write_issues(self.eof)
self.nt_file.seek(0, 2) # seek to end of file
eof_sequence = self.sig2map.map_signature(0, seq, min_off, self.fillbyte)
for sequence in eof_sequence:
try:
eof = self.int_list_from_sequence(sequence)
self.nt_file.write(bytes(eof))
except (TypeError, ValueError) as err:
print(f"EOF Signature not mapped: {seq} ({err})\n", file=sys.stderr)
self.eof_written = True
def write_var(self, min_, max_, seq):
"""Write variable sequences to file."""
self.sig2map = (
signature2bytegenerator.Sig2ByteGenerator()
) # TODO: New instance or not?
self.detect_write_issues(self.var)
if self.var_written is False:
self.var_pos = self.boflen
self.var_written = True
self.nt_file.seek(self.var_pos)
var_sequence = self.sig2map.map_signature(
min_, seq, max_, self.fillbyte
) # padding sequence
tmpread = False
if self.eof_written is True: # read eof into tmp and re-write
tmpread = self.write_seq_with_eof()
for sequence in var_sequence:
try:
var = self.int_list_from_sequence(sequence)
self.nt_file.write(bytes(var))
except (TypeError, ValueError) as err:
_ = err
print("VAR Signature not mapped: {seq} ({err})\n", file=sys.stderr)
if tmpread is True:
self.nt_file.write(self.tmpbio.getvalue())
self.var_pos = self.nt_file.tell()
self.var_written = True
def write_file(self, puid, puid_no, signature_id, ext):
"""Create a new file."""
self.nt_string = (
f"{self.newtriplesdir}{puid}-{puid_no}-signature-id-{signature_id}.{ext}"
)
self.puid_no = puid_no
if os.path.exists(self.nt_string) is False:
# to standard out so as not to clutter error log...
sys.stdout.write("Writing " + str(os.path.basename(self.nt_string)) + "\n")
self.nt_file = open(self.nt_string, "wb")
self.puid_str = puid + "/" + str(self.puid_no)
# Vars to ensure we know what sequences have been written
self.bof_written = False
self.var_written = False
self.eof_written = False
self.boflen = 0 # init or here, no problem
return self.puid_str
def write_seq_with_eof(self):
"""We can attempt to write a var or BOF sequence with EOF
already written by creating a tmp location for the EOF data
while we write the VAR out.
"""
self.nt_file.close()
self.nt_file = open(self.nt_string, "r+b") # consider default mode?
self.nt_file.seek(self.boflen) # if bof written > zero : will simply be zero
self.tmpbio = BytesIO(self.nt_file.read())
self.nt_file.seek(self.boflen)
return True
def detect_write_issues(self, pos):
"""The ordering of sequences in the PRONOM database may prevent
the successful generation of a skeleton file. Detect these
issues and provide feedback to users as a warning.
"""
error_str = str.ljust("(" + self.puid_str + "): ", 13, " ")
info_str = str.ljust("INFO:", 9, " ")
warn_str = str.ljust("WARNING:", 9, " ")
if pos == self.bof:
if self.bof_written is True:
print(
f"{warn_str}{error_str}Attempting to write BOF with BOF written.",
file=sys.stderr,
)
if self.eof_written is True:
print(
f"{info_str}{error_str}Attempting to write BOF with EOF written.",
file=sys.stderr,
)
elif pos == self.eof:
if self.eof_written is True:
print(
f"{warn_str}{error_str}Attempting to write EOF with EOF written.",
file=sys.stderr,
)
elif pos == self.var:
if self.var_written is True:
print(
f"{warn_str}{error_str}Attempting to write VAR with VAR written.",
file=sys.stderr,
)
if self.eof_written is True:
print(
f"{info_str}{error_str}Attempting to write VAR with EOF written.",
file=sys.stderr,
)