Skip to content

Commit

Permalink
track each subquery field proc separately
Browse files Browse the repository at this point in the history
  • Loading branch information
m.kindritskiy committed Feb 20, 2023
1 parent 8b5598c commit 059557a
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions hiku/telemetry/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ def wrapper(*args):
return wrapper


def _subquery_field_names(func):
def wrapper(fields, *args):
return func([f.name for _, f in fields], fields, *args)
return wrapper


class GraphMetricsBase(GraphTransformer):
root_name = 'Root'

Expand Down Expand Up @@ -102,7 +96,6 @@ def _wrap_link(self, node_name, link_name, func):
def _wrap_subquery(self, node_name, subquery):
observe = self._observe_fields(node_name)
wrapper = self.subquery_wrapper(observe, subquery)
wrapper = _subquery_field_names(wrapper)
wrapper.__subquery__ = lambda: wrapper
return wrapper

Expand Down Expand Up @@ -153,15 +146,23 @@ def visit_link(self, obj):
class _SubqueryMixin:

def subquery_wrapper(self, observe, subquery):
def wrapper(field_names, *args):
def wrap_proc(start_time, field_name, proc):
def _proc_wrapper(*args):
result = proc(*args)
observe(start_time, [field_name])
return result

return _proc_wrapper

def wrapper(fields, *args):
start_time = time.perf_counter()
result_proc = subquery(*args)
wrapped_fields = []
for gf, qf in fields:
gf.func.proc = wrap_proc(start_time, gf.name, gf.func.proc)
wrapped_fields.append((gf, qf))

return subquery(wrapped_fields, *args)

def proc_wrapper():
result = result_proc()
observe(start_time, field_names)
return result
return proc_wrapper
return wrapper


Expand Down Expand Up @@ -200,4 +201,4 @@ async def wrapper(link_name, *args):
result = await func(*args)
observe(start_time, [link_name])
return result
return wrapper
return wrapper

0 comments on commit 059557a

Please sign in to comment.