Skip to content

Commit

Permalink
Add support for composite aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
braedon committed Nov 18, 2020
1 parent 0080c1c commit 13f5a78
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 18 deletions.
49 changes: 31 additions & 18 deletions prometheus_es_exporter/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@
from .metrics import format_metric_name, format_labels


def add_label(label_key, label_value, labels):
labels = labels.copy()

if label_key in labels.keys():
labels[label_key] = labels[label_key] + [label_value]
else:
labels[label_key] = [label_value]

return labels


def parse_buckets(agg_key, buckets, metric=None, labels=None):
if metric is None:
metric = []
Expand All @@ -12,23 +23,26 @@ def parse_buckets(agg_key, buckets, metric=None, labels=None):
result = []

for index, bucket in enumerate(buckets):
labels_next = labels.copy()
labels_nest = labels.copy()

if 'key' in bucket.keys():
bucket_key = str(bucket['key'])
if agg_key in labels_next.keys():
labels_next[agg_key] = labels_next[agg_key] + [bucket_key]
# Keys for composite aggregation buckets are dicts with multiple key/value pairs.
if isinstance(bucket['key'], dict):
for comp_key, comp_value in bucket['key'].items():
label_key = '_'.join([agg_key, comp_key])
labels_nest = add_label(label_key, str(comp_value), labels_nest)

else:
labels_next[agg_key] = [bucket_key]
labels_nest = add_label(agg_key, str(bucket['key']), labels_nest)

# Delete the key so it isn't parsed for metrics.
del bucket['key']

else:
bucket_key = 'filter_' + str(index)
if agg_key in labels_next.keys():
labels_next[agg_key] = labels_next[agg_key] + [bucket_key]
else:
labels_next[agg_key] = [bucket_key]
labels_nest = add_label(agg_key, bucket_key, labels_nest)

result.extend(parse_agg(bucket_key, bucket, metric=metric, labels=labels_next))
result.extend(parse_agg(agg_key, bucket, metric=metric, labels=labels_nest))

return result

Expand All @@ -42,14 +56,9 @@ def parse_buckets_fixed(agg_key, buckets, metric=None, labels=None):
result = []

for bucket_key, bucket in buckets.items():
labels_next = labels.copy()

if agg_key in labels_next.keys():
labels_next[agg_key] = labels_next[agg_key] + [bucket_key]
else:
labels_next[agg_key] = [bucket_key]

result.extend(parse_agg(bucket_key, bucket, metric=metric, labels=labels_next))
labels_nest = labels.copy()
labels_next = add_label(agg_key, bucket_key, labels_nest)
result.extend(parse_agg(agg_key, bucket, metric=metric, labels=labels_next))

return result

Expand All @@ -67,6 +76,10 @@ def parse_agg(agg_key, agg, metric=None, labels=None):
result.extend(parse_buckets(agg_key, value, metric=metric, labels=labels))
elif key == 'buckets' and isinstance(value, dict):
result.extend(parse_buckets_fixed(agg_key, value, metric=metric, labels=labels))
elif key == 'after_key' and 'buckets' in agg:
# `after_key` is used for paging composite aggregations - don't parse for metrics.
# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-composite-aggregation.html#_pagination
continue
elif isinstance(value, dict):
result.extend(parse_agg(key, value, metric=metric + [key], labels=labels))
# We only want numbers as metrics.
Expand Down
95 changes: 95 additions & 0 deletions tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,101 @@ def test_nested_terms(self):
result = convert_result(parse_response(response))
self.assertEqual(expected, result)

def test_composite(self):
# Query:
# {
# "size": 0,
# "query": {
# "match_all": {}
# },
# "aggs": {
# "group_comp": {
# "composite": {
# "sources": [
# {"group1": {"terms": {"field": "group1.keyword"}}},
# {"val": {"terms": {"field": "val"}}}
# ]
# },
# "aggs": {
# "val_sum": {
# "sum": {"field": "val"}
# }
# }
# }
# }
# }
response = {
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"aggregations": {
"group_comp": {
"after_key": {
"group1": "b",
"val": 3
},
"buckets": [
{
"doc_count": 1,
"key": {
"group1": "a",
"val": 1
},
"val_sum": {
"value": 1
}
},
{
"doc_count": 1,
"key": {
"group1": "a",
"val": 2
},
"val_sum": {
"value": 2
}
},
{
"doc_count": 1,
"key": {
"group1": "b",
"val": 3
},
"val_sum": {
"value": 3
}
}
]
}
},
"hits": {
"hits": [],
"max_score": None,
"total": {
"relation": "eq",
"value": 3
}
},
"timed_out": False,
"took": 3
}

expected = {
'hits': 3,
'took_milliseconds': 3,
'group_comp_doc_count{group_comp_group1="a",group_comp_val="1"}': 1,
'group_comp_val_sum_value{group_comp_group1="a",group_comp_val="1"}': 1,
'group_comp_doc_count{group_comp_group1="a",group_comp_val="2"}': 1,
'group_comp_val_sum_value{group_comp_group1="a",group_comp_val="2"}': 2,
'group_comp_doc_count{group_comp_group1="b",group_comp_val="3"}': 1,
'group_comp_val_sum_value{group_comp_group1="b",group_comp_val="3"}': 3,
}
result = convert_result(parse_response(response))
self.assertEqual(expected, result)

# Tests handling of disallowed characters in labels and metric names
# The '-'s in the aggregation name aren't allowed in metric names or
# label keys, so need to be substituted.
Expand Down

0 comments on commit 13f5a78

Please sign in to comment.