-
Notifications
You must be signed in to change notification settings - Fork 0
/
docx_handling.py
164 lines (146 loc) · 5.02 KB
/
docx_handling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
default_relpath = 'word/_rels/document.xml.rels'
def read_zip_files(filename, paths,*, errors='ignore'):
from zipfile import ZipFile
with ZipFile(filename) as myzip:
for p in paths:
try:
with myzip.open(p) as f:
yield f.read()
except Exception as e:
if errors != 'ignore':
raise
else:
yield None
return
def get_embedding_rels(filename, relpath=None):
if relpath is None:
relpath = default_relpath
from lxml.etree import parse
from io import BytesIO
from pathlib import PurePath as P
cnt = [*read_zip_files(filename, [relpath], errors='raise')][0]
def rejoin(path):
ret = []
for a in path.parts:
if a == '..': ret.pop()
else: ret.append(a)
return str(P(*ret))
tree = parse(BytesIO(cnt))
base = P(relpath).parent.parent
def retarget(d):
return {**d, 'Target': rejoin(base.joinpath(d['Target']))} if 'Target' in d else d
rels = [retarget(dict(r.items())) for r in tree.findall('Relationship', tree.getroot().nsmap)]
return rels
def rels_to_paths(rels, filter=None):
if filter is None:
filter = lambda rel: rel.get('Type', '').endswith('oleObject')
paths = [(rel['Id'], rel['Target']) for rel in rels if filter(rel)]
return paths
def get_embedding_format(compobj):
from struct import unpack
b = bytes(compobj)
start = 28
ret = []
for i in range(3):
try:
strl, = unpack('<I', b[start:start + 4])
s, = unpack(f'{strl}s', b[start + 4:start + 4 + strl])
start = start + 4 + strl
ret.append(s[:-1])
except:
break
return ret
def read_ole_contents(oledata, paths=None):
if paths is None:
paths = [['CONTENTS'], ['\x03PRINT']]
import olefile
def try_default(func, *args, **kw):
try:
return func(*args, **kw)
except Exception as e:
# print(e)
pass
return None
def read(ole, fn):
return ole.openstream(fn).read()
ole, data, form = None, None, None
try:
ole = olefile.OleFileIO(oledata)
try:
_form = get_embedding_format(read(ole, ['\x01CompObj']))
form = _form[2]
except:
pass
# print(ole.listdir())
for p in paths:
data = try_default(read, ole, p)
if data is not None: break
return (form, data)
except Exception as e:
import sys
print('failed to unpack ole:', e, file=sys.stderr)
finally:
if ole: ole.close()
return (None, None)
def read_objs_from_doc(filename,*, paths=None, mapper=None, relpath=None, rIds=None):
if mapper is None:
mapper = lambda form, obj: (form, obj)
rels = get_embedding_rels(filename, relpath)
if rIds is None:
ole_paths = [r.get('Target') for r in rels if r.get('Type', '').endswith('oleObject')]
else:
pathmap = dict(rels_to_paths(rels))
ole_paths = [pathmap[r] for r in rIds] # don't drop invalid records to preserve ordering
for ole in read_zip_files(filename, ole_paths, errors='ignore'):
if ole is None:
yield (None, None)
continue
form, obj = read_ole_contents(ole, paths=paths)
if obj is None:
# print('empty', filename, form)
yield (None, None)
continue
# print(form, filename)
try:
_form, _obj = mapper(form, obj)
if _obj is not None:
yield (_form, _obj)
else:
yield (None, None)
except Exception as e:
import sys
print(filename, e, file=sys.stderr)
yield (None, None)
# raise
return
def get_docx_table_embeddings(filename):
from docx import Document
from docx.oxml.shared import qn
def get_table_row_rIds(row):
ret = []
try:
for ps in [cell.paragraphs for cell in row.cells]:
for rs in [p.runs for p in ps if p]:
for x in [r.element.findall('{*}object/{*}OLEObject') for r in rs if r]:
if not x: continue
for o in x:
try:
ret.append(o.get(qn("r:id")))
except Exception as e: pass
except: pass
return ret
doc = Document(filename)
for tbl in doc.tables:
ret = []
row_rids = []
header = None
for n_row, row in enumerate(tbl.rows):
vals = []
try:
for cell in row.cells:
text = '\n'.join([p.text for p in cell.paragraphs if (cell.paragraphs and p is not None and p.text is not None)])
vals.append(text)
except: pass
ret.append(vals)
row_rids.append(get_table_row_rIds(row))
yield (ret, row_rids)