diff --git a/numalogic/connectors/prometheus.py b/numalogic/connectors/prometheus.py index 6ad3b433..08401bf0 100644 --- a/numalogic/connectors/prometheus.py +++ b/numalogic/connectors/prometheus.py @@ -58,9 +58,9 @@ def build_query(metric: str, filters: dict[str, str]) -> str: def fetch( self, - metric_name: str, start: datetime, end: Optional[datetime] = None, + metric_name: str = "", filters: Optional[dict[str, str]] = None, return_labels: Optional[list[str]] = None, aggregate: bool = True, @@ -70,9 +70,9 @@ def fetch( Args: ------- - metric_name: Prometheus metric name start: Start time end: End time + metric_name: Prometheus metric name (default="") filters: Prometheus label filters return_labels: Prometheus label names as columns to return aggregate: Whether to aggregate the data @@ -96,22 +96,32 @@ def fetch( results = self.query_range(query, start_ts, end_ts) df = pd.json_normalize(results) - return_labels = [f"metric.{label}" for label in return_labels or []] if df.empty: LOGGER.warning("Query returned no results") return df - df = self._consolidate_df(df, metric_name, return_labels) - if aggregate and return_labels: - df = self._agg_df(df, [metric_name]) + extra_labels = [f"metric.{label}" for label in return_labels or []] + if metric_name: + metric_names = [metric_name] + else: + metric_names = self._extract_metric_names(df) - try: - df.set_index("timestamp", inplace=True) - except KeyError: - pass - df.sort_values(by="timestamp", inplace=True) + df.set_index(_METRIC_KEY, inplace=True) - return df + dfs = [] + for metric_name in metric_names: + _df = self._consolidate_df(df.loc[[metric_name]], metric_name, extra_labels) + dfs.append(_df.set_index(["timestamp", *extra_labels])) + + df = dfs[0].join(dfs[1:]).reset_index().set_index("timestamp") + + if return_labels: + df.rename(columns=dict(zip(extra_labels, return_labels)), inplace=True) + + if aggregate: + df = self._agg_df(df, metric_names) + + return df.sort_values(by=["timestamp"]) def raw_fetch( self, diff --git a/tests/connectors/test_prometheus.py b/tests/connectors/test_prometheus.py index 46e7c714..63955746 100644 --- a/tests/connectors/test_prometheus.py +++ b/tests/connectors/test_prometheus.py @@ -245,6 +245,42 @@ def test_fetch_return_labels(self): self.assertEqual(df.index.name, "timestamp") self.assertListEqual([10.5, 12.5], df[metric].to_list()) + @patch.object(PrometheusFetcher, "_api_query_range", Mock(return_value=_mock_mv())) + def test_fetch_mv_01(self): + df = self.fetcher.fetch( + filters={"namespace": "odl-odlgraphql-usw2-e2e", "numalogic": "true"}, + start=datetime.now() - timedelta(minutes=3), + aggregate=True, + ) + self.assertEqual(df.shape, (7, 3)) + self.assertListEqual( + df.columns.to_list(), + [ + "namespace_app_rollouts_cpu_utilization", + "namespace_app_rollouts_http_request_error_rate", + "namespace_app_rollouts_memory_utilization", + ], + ) + self.assertEqual(df.index.name, "timestamp") + + @patch.object(PrometheusFetcher, "_api_query_range", Mock(return_value=_mock_mv())) + def test_fetch_mv_02(self): + df = self.fetcher.fetch( + filters={"namespace": "odl-odlgraphql-usw2-e2e", "numalogic": "true"}, + start=datetime.now() - timedelta(minutes=3), + aggregate=False, + ) + self.assertEqual(df.shape, (10, 3)) + self.assertListEqual( + df.columns.to_list(), + [ + "namespace_app_rollouts_cpu_utilization", + "namespace_app_rollouts_http_request_error_rate", + "namespace_app_rollouts_memory_utilization", + ], + ) + self.assertEqual(df.index.name, "timestamp") + @patch.object(PrometheusFetcher, "_api_query_range", Mock(return_value=[])) def test_fetch_no_data(self): df = self.fetcher.fetch(