-
Notifications
You must be signed in to change notification settings - Fork 0
/
postgres-gdb.py
333 lines (290 loc) · 11.5 KB
/
postgres-gdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import os.path
import re
import subprocess
import argparse
from autocvar import autocvar, AutoNumCVar
gdb.Command('pg', gdb.COMMAND_DATA, prefix=True)
class BackendAttach(gdb.Command):
"""attach user postgres backend"""
def __init__ (self):
super (self.__class__, self).__init__ ("pg attach", gdb.COMMAND_RUNNING)
self.backends = ()
self.pid = None
def attach(self):
gdb.execute(f"attach {self.pid}")
def grab_backends(self, datadir):
pidfile = os.path.expanduser(os.path.join(datadir.string(), 'postmaster.pid'))
master_pid = 0
with open(pidfile) as f:
master_pid = f.readline().strip()
if not master_pid:
return
pscmd = 'ps -o pid=,cmd= --ppid=%s' % master_pid
output = subprocess.getoutput(pscmd)
def split_cmdline(line):
match = re.match(r'\s*(\d+)\s+', line)
return (match.group(1), line[match.end():])
processes = [split_cmdline(line) for line in output.split('\n')]
ignore_titles = ('postgres: checkpointer', 'postgres: background writer', 'postgres: walwriter',
'postgres: autovacuum launcher', 'postgres: logical replication launcher')
backends = [(int(p[0]), p[1]) for p in filter(lambda p : p[1].strip() not in ignore_titles, processes)]
if not backends:
gdb.write("No postgres backend found\n")
return
self.backends = backends
def print_backends(self):
for i, (pid, title) in enumerate(self.backends):
print(f"{i + 1}. {pid} {title}")
def invoke (self, arg, from_tty):
argv = gdb.string_to_argv(arg)
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--force', action='store_true')
parser.add_argument('-i', '--interactive', action='store_true')
parser.add_argument('-l', '--list', action='store_true')
args = parser.parse_args(argv)
cur = gdb.selected_inferior()
if not args.force and cur.pid != 0:
gdb.write(f"Process {cur.pid} has already attached.\n")
return
self.grab_backends(gdb.convenience_variable("datadir"))
if not self.backends:
return
self.pid = self.backends[0][0]
if args.list:
self.print_backends()
return
multiple = len(self.backends) > 1
if multiple:
gdb.write("There are more than 1 backends:\n")
self.print_backends()
if args.interactive:
index = int(input("Select backend:"))
if index <= 0 or index > len(self.backends):
gdb.write("Invalid process input")
return
self.pid = self.backends[index - 1][0]
if cur.pid != 0:
gdb.execute("detach")
self.attach()
BackendAttach()
class TreeWalker(object):
"""A base class for tree traverse"""
SHOW_FUNC_PREFIX = 'show_'
WALK_FUNC_PREFIX = 'walk_'
def __init__(self):
self.level_graph = []
self.autoncvar = None
self.current_level = 0
def reset(self):
self.level_graph = []
self.autoncvar = AutoNumCVar()
def walk(self, expr):
self.reset()
self.do_walk(expr, 0)
def do_walk(self, expr, level):
expr_typed = expr.dynamic_type
expr_casted = expr.cast(expr_typed)
self.current_level = level
level_graph = ' '.join(self.level_graph[:level])
for i, c in enumerate(self.level_graph):
if c == '`':
self.level_graph[i] = ' '
cname = self.autoncvar.set_var(expr_casted)
left_margin = "{}{}".format('' if level == 0 else '--', cname)
element_show_info = ''
show_func = self.get_action_func(expr_typed, self.SHOW_FUNC_PREFIX)
if show_func is not None:
element_show_info = show_func(expr_casted)
if element_show_info is not None:
print("{}{} ({}) {} {}".format(
level_graph, left_margin, expr_typed, expr, element_show_info))
walk_func = self.get_action_func(expr_typed, self.WALK_FUNC_PREFIX)
if walk_func is None:
return
children = walk_func(expr_casted)
if not children:
return
if len(self.level_graph) < level + 1:
self.level_graph.append('|')
else:
self.level_graph[level] = '|'
for i, child in enumerate(children):
if i == len(children) - 1:
self.level_graph[level] = '`'
self.do_walk(child, level + 1)
def get_action_func(self, element_type, action_prefix):
def type_name(typ):
if typ.code == gdb.TYPE_CODE_PTR:
typ = typ.target()
return typ.name if hasattr(typ, 'name') and typ.name is not None else str(typ)
func_name = action_prefix + type_name(element_type)
if hasattr(self, func_name) and callable(getattr(self, func_name)):
return getattr(self, func_name)
for field in element_type.fields():
if not field.is_base_class:
continue
typ = field.type
func_name = action_prefix + type_name(typ)
if hasattr(self, func_name):
return getattr(self, func_name)
return self.get_action_func(typ, action_prefix)
# Fall through to common action function
if hasattr(self, action_prefix) and callable(getattr(self, action_prefix)):
return getattr(self, action_prefix)
return None
def cast_Node(val):
if val.type.target().code == gdb.TYPE_CODE_VOID:
val = val.cast(gdb.lookup_type("Node").pointer())
node_type = str(val['type'])
typ = gdb.lookup_type(node_type[2:])
return val.cast(typ.pointer())
class ListCell:
ptr = 1
Int = 451
Oid = 452
Xid = 453
type_values = {ptr : 'ptr_value', Int : 'int_value',
Oid : 'oid_value', Xid : 'xid_value'}
def __init__(self, typ, val, val_type):
self.value_type = typ
self.value = val[self.type_values[typ]]
if typ != self.ptr:
return
self.value = cast_Node(self.value) if \
val_type.name == 'Node' else self.value.cast(val_type.pointer())
def to_string(self, cvar):
if self.value_type != self.ptr:
return self.value
type_name = self.value.dereference().type.name
return f'{cvar} ({type_name} *) ' + str(self.value)
class List:
def __init__(self, val, ptr_type_name = None):
self._type = int(val['type'])
self.type_name = ListCell.type_values[self._type]
self.length = int(val["length"])
self._elements = val["elements"]
self._index = 0
self._ptr_type = gdb.lookup_type(ptr_type_name) if ptr_type_name else None
def __iter__(self):
return self
def __next__(self):
if self._index >= self.length:
raise StopIteration
cell = ListCell(self._type, self._elements[self._index], self._ptr_type)
self._index += 1
return cell
class ExprTraverser(gdb.Command, TreeWalker):
def __init__ (self):
super(self.__class__, self).__init__ ("pg expr", gdb.COMMAND_DATA)
def walk_List(self, val):
children = []
for ele in List(val, "Node"):
children.append(ele.value)
return children
def _walk_to_args(self, val):
return self.walk_List(val['args']) if gdb.types.has_field(val.type.target(), 'args') else []
def walk_(self, val):
return self._walk_to_args(val)
def show_(self, val):
# Display properties for each struct, can define as single string if
# only 1 property needs to display. if a property name includes ':',
# a property value can be appended after ':'. It will be hided if
# the property value does not equal to the value.
display_fields = {
'Const' : ('constisnull:true', 'consttype', 'constvalue'),
'Var' : ('varno', 'varattno', 'vartype'),
'BoolExpr' : 'boolop',
'OpExpr' : 'opno',
'ScalarArrayOpExpr' : 'opno',
'FuncExpr' : ('funcid', 'funcresulttype')
}
typname = val.dereference().type.name
def prop_str(prop):
if ':' in prop:
prop, disp_val = prop.split(':')
if disp_val != str(val[prop]):
return ''
name = prop[len(typname):] if prop.startswith(typname.lower()) else prop
return f'{name} = {val[prop]}'
if typname not in display_fields:
return ''
prop = display_fields[typname]
return prop_str(prop) if isinstance(prop, str) else \
', '.join([prop_str(p) for p in prop if prop_str(p)])
def invoke(self, arg, from_tty):
if not arg:
print("usage: pg expr [expr]")
return
expr = gdb.parse_and_eval(arg)
self.walk(expr)
ExprTraverser()
class PlanTraverser(gdb.Command, TreeWalker):
def __init__ (self):
super(self.__class__, self).__init__ ("pg plan", gdb.COMMAND_DATA)
def walk_(self, val):
children = []
typ = gdb.lookup_type('Plan')
plan = val.cast(typ.pointer())
for field in ('lefttree', 'righttree'):
if plan[field]:
child = plan[field]
children.append(cast_Node(child))
return children
def show_scan(self, val):
relid = val['scan']['scanrelid']
return f'scanrelid={relid}'
show_SeqScan = show_scan
def invoke(self, arg, from_tty):
if not arg:
print("usage: pg plan [plan]")
return
plan = gdb.parse_and_eval(arg)
self.walk(cast_Node(plan))
PlanTraverser()
class NodeCastPrinter(gdb.Command):
def __init__ (self):
super(self.__class__, self).__init__ ("pg node", gdb.COMMAND_DATA)
def invoke(self, arg, from_tty):
if not arg:
print("usage: pg plan [plan]")
return
node = gdb.parse_and_eval(arg)
val = cast_Node(node)
cname = AutoNumCVar().set_var(val)
print(f'{cname} ({val.type})', val)
NodeCastPrinter()
class ListPrinter:
"""Pretty-printer for List."""
def __init__(self, val):
self.val = List(val, "Node")
self.autoncvar = AutoNumCVar()
def display_hint(self):
return "array"
def to_string(self):
return f"List with {self.val.length} {self.val.type_name} elements"
def children(self):
for i, elt in enumerate(self.val):
cvar = autocvar.set_var(elt.value)
yield (str(i), elt.to_string(cvar))
def register_pretty_printer(objfile):
"""A routine to register a pretty-printer against the given OBJFILE."""
objfile.pretty_printers.append(type_lookup_function)
def type_lookup_function(val):
"""A routine that returns the correct pretty printer for VAL
if appropriate. Returns None otherwise.
"""
tag = val.type.tag
name = val.type.name
if name == "List":
return ListPrinter(val)
return None
if __name__ == "__main__":
if gdb.current_objfile() is not None:
# This is the case where this script is being "auto-loaded"
# for a given objfile. Register the pretty-printer for that
# objfile.
register_pretty_printer(gdb.current_objfile())
else:
for objfile in gdb.objfiles():
if os.path.basename(objfile.filename) == "postgres":
objfile.pretty_printers.append(type_lookup_function)