From 13f5a786510d335d321af4c372c9f5d5a557c069 Mon Sep 17 00:00:00 2001 From: Braedon Vickers Date: Wed, 18 Nov 2020 15:43:56 +0800 Subject: [PATCH] Add support for composite aggregations --- prometheus_es_exporter/parser.py | 49 ++++++++++------ tests/test_parser.py | 95 ++++++++++++++++++++++++++++++++ 2 files changed, 126 insertions(+), 18 deletions(-) diff --git a/prometheus_es_exporter/parser.py b/prometheus_es_exporter/parser.py index 7251980..d758ebf 100644 --- a/prometheus_es_exporter/parser.py +++ b/prometheus_es_exporter/parser.py @@ -3,6 +3,17 @@ from .metrics import format_metric_name, format_labels +def add_label(label_key, label_value, labels): + labels = labels.copy() + + if label_key in labels.keys(): + labels[label_key] = labels[label_key] + [label_value] + else: + labels[label_key] = [label_value] + + return labels + + def parse_buckets(agg_key, buckets, metric=None, labels=None): if metric is None: metric = [] @@ -12,23 +23,26 @@ def parse_buckets(agg_key, buckets, metric=None, labels=None): result = [] for index, bucket in enumerate(buckets): - labels_next = labels.copy() + labels_nest = labels.copy() if 'key' in bucket.keys(): - bucket_key = str(bucket['key']) - if agg_key in labels_next.keys(): - labels_next[agg_key] = labels_next[agg_key] + [bucket_key] + # Keys for composite aggregation buckets are dicts with multiple key/value pairs. + if isinstance(bucket['key'], dict): + for comp_key, comp_value in bucket['key'].items(): + label_key = '_'.join([agg_key, comp_key]) + labels_nest = add_label(label_key, str(comp_value), labels_nest) + else: - labels_next[agg_key] = [bucket_key] + labels_nest = add_label(agg_key, str(bucket['key']), labels_nest) + + # Delete the key so it isn't parsed for metrics. del bucket['key'] + else: bucket_key = 'filter_' + str(index) - if agg_key in labels_next.keys(): - labels_next[agg_key] = labels_next[agg_key] + [bucket_key] - else: - labels_next[agg_key] = [bucket_key] + labels_nest = add_label(agg_key, bucket_key, labels_nest) - result.extend(parse_agg(bucket_key, bucket, metric=metric, labels=labels_next)) + result.extend(parse_agg(agg_key, bucket, metric=metric, labels=labels_nest)) return result @@ -42,14 +56,9 @@ def parse_buckets_fixed(agg_key, buckets, metric=None, labels=None): result = [] for bucket_key, bucket in buckets.items(): - labels_next = labels.copy() - - if agg_key in labels_next.keys(): - labels_next[agg_key] = labels_next[agg_key] + [bucket_key] - else: - labels_next[agg_key] = [bucket_key] - - result.extend(parse_agg(bucket_key, bucket, metric=metric, labels=labels_next)) + labels_nest = labels.copy() + labels_next = add_label(agg_key, bucket_key, labels_nest) + result.extend(parse_agg(agg_key, bucket, metric=metric, labels=labels_next)) return result @@ -67,6 +76,10 @@ def parse_agg(agg_key, agg, metric=None, labels=None): result.extend(parse_buckets(agg_key, value, metric=metric, labels=labels)) elif key == 'buckets' and isinstance(value, dict): result.extend(parse_buckets_fixed(agg_key, value, metric=metric, labels=labels)) + elif key == 'after_key' and 'buckets' in agg: + # `after_key` is used for paging composite aggregations - don't parse for metrics. + # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-composite-aggregation.html#_pagination + continue elif isinstance(value, dict): result.extend(parse_agg(key, value, metric=metric + [key], labels=labels)) # We only want numbers as metrics. diff --git a/tests/test_parser.py b/tests/test_parser.py index 69ea66a..be43b6a 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -841,6 +841,101 @@ def test_nested_terms(self): result = convert_result(parse_response(response)) self.assertEqual(expected, result) + def test_composite(self): + # Query: + # { + # "size": 0, + # "query": { + # "match_all": {} + # }, + # "aggs": { + # "group_comp": { + # "composite": { + # "sources": [ + # {"group1": {"terms": {"field": "group1.keyword"}}}, + # {"val": {"terms": {"field": "val"}}} + # ] + # }, + # "aggs": { + # "val_sum": { + # "sum": {"field": "val"} + # } + # } + # } + # } + # } + response = { + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0 + }, + "aggregations": { + "group_comp": { + "after_key": { + "group1": "b", + "val": 3 + }, + "buckets": [ + { + "doc_count": 1, + "key": { + "group1": "a", + "val": 1 + }, + "val_sum": { + "value": 1 + } + }, + { + "doc_count": 1, + "key": { + "group1": "a", + "val": 2 + }, + "val_sum": { + "value": 2 + } + }, + { + "doc_count": 1, + "key": { + "group1": "b", + "val": 3 + }, + "val_sum": { + "value": 3 + } + } + ] + } + }, + "hits": { + "hits": [], + "max_score": None, + "total": { + "relation": "eq", + "value": 3 + } + }, + "timed_out": False, + "took": 3 + } + + expected = { + 'hits': 3, + 'took_milliseconds': 3, + 'group_comp_doc_count{group_comp_group1="a",group_comp_val="1"}': 1, + 'group_comp_val_sum_value{group_comp_group1="a",group_comp_val="1"}': 1, + 'group_comp_doc_count{group_comp_group1="a",group_comp_val="2"}': 1, + 'group_comp_val_sum_value{group_comp_group1="a",group_comp_val="2"}': 2, + 'group_comp_doc_count{group_comp_group1="b",group_comp_val="3"}': 1, + 'group_comp_val_sum_value{group_comp_group1="b",group_comp_val="3"}': 3, + } + result = convert_result(parse_response(response)) + self.assertEqual(expected, result) + # Tests handling of disallowed characters in labels and metric names # The '-'s in the aggregation name aren't allowed in metric names or # label keys, so need to be substituted.