diff --git a/polytope_server/common/metric_store/dynamodb_metric_store.py b/polytope_server/common/metric_store/dynamodb_metric_store.py index c9ae66d..35de647 100644 --- a/polytope_server/common/metric_store/dynamodb_metric_store.py +++ b/polytope_server/common/metric_store/dynamodb_metric_store.py @@ -171,10 +171,14 @@ def get_metric(self, uuid): if "Item" in response: return _load(response["Item"]) - def get_metrics(self, ascending=None, descending=None, limit=None, request_id=None, **kwargs): + def get_metrics(self, ascending=None, descending=None, limit=None, request_id=None, exclude_fields=None, **kwargs): if ascending is not None and descending is not None: raise ValueError("Cannot sort by ascending and descending at the same time.") + if exclude_fields: + logger.warning(f"exclude_fields parameter is provided but not implemented: {exclude_fields}") + raise NotImplementedError("The 'exclude_fields' feature is not implemented yet.") + if request_id is not None: fn = self.table.query params = { diff --git a/polytope_server/common/metric_store/metric_store.py b/polytope_server/common/metric_store/metric_store.py index f1d14bf..521b26d 100644 --- a/polytope_server/common/metric_store/metric_store.py +++ b/polytope_server/common/metric_store/metric_store.py @@ -41,7 +41,7 @@ def get_metric(self, uuid: str) -> Metric: """Fetch metric from the metric store""" @abstractmethod - def get_metrics(self, ascending=None, descending=None, limit=None, **kwargs) -> List[Metric]: + def get_metrics(self, ascending=None, descending=None, limit=None, exclude_fields=None, **kwargs) -> List[Metric]: """Returns [limit] metrics which match kwargs, ordered by ascending/descenging keys (e.g. ascending = 'timestamp')""" diff --git a/polytope_server/common/metric_store/mongodb_metric_store.py b/polytope_server/common/metric_store/mongodb_metric_store.py index 5d7b0e2..1b4d982 100644 --- a/polytope_server/common/metric_store/mongodb_metric_store.py +++ b/polytope_server/common/metric_store/mongodb_metric_store.py @@ -106,9 +106,25 @@ def get_metric(self, uuid): else: return None - def get_metrics(self, ascending=None, descending=None, limit=None, **kwargs): - all_slots = [] + def get_metrics(self, ascending=None, descending=None, limit=None, exclude_fields=None, **kwargs): + """ + Fetch metrics from the store with optional sorting, limiting, and field exclusion. + Args: + ascending (str): Field to sort by ascending order. + descending (str): Field to sort by descending order. + limit (int): Limit the number of results. + exclude_fields (dict): Fields to exclude in the result (default is {"_id": False}). + **kwargs: Filters to apply to the query. + + Returns: + List of metrics matching the query. + """ + # Default exclude_fields to {"_id": False} if not provided + if exclude_fields is None: + exclude_fields = {"_id": False} + + all_slots = [] found_type = None for k, v in self.metric_type_class_map.items(): class_slots = list(set().union(Metric.__slots__, v.__slots__)) @@ -117,46 +133,40 @@ def get_metrics(self, ascending=None, descending=None, limit=None, **kwargs): all_slots = list(set().union(all_slots, class_slots)) if not found_type: - raise KeyError( - "The provided keys must be a subset of slots of any of the ", - "available metric types.", - ) + raise KeyError("The provided keys must be a subset of slots of any of the available metric types.") - if ascending: - if ascending not in class_slots: - raise KeyError("The identified metric type does not have the key {}".format(ascending)) + if ascending and ascending not in class_slots: + raise KeyError(f"The identified metric type does not have the key {ascending}") - if descending: - if descending not in class_slots: - raise KeyError("The identified metric type does not have the key {}".format(descending)) + if descending and descending not in class_slots: + raise KeyError(f"The identified metric type does not have the key {descending}") - kwargs_to_pop = [] - for k, v in kwargs.items(): - if v is None: - kwargs_to_pop.append(k) - continue - kwargs[k] = self.metric_type_class_map[found_type].serialize_slot(k, v) - for k in kwargs_to_pop: - kwargs.pop(k) + if ascending and descending: + raise ValueError("Cannot sort by ascending and descending at the same time.") - cursor = self.store.find(kwargs, {"_id": False}) + # Serialize and clean kwargs + kwargs = { + k: self.metric_type_class_map[found_type].serialize_slot(k, v) for k, v in kwargs.items() if v is not None + } - if ascending is not None and descending is not None: - raise ValueError("Cannot sort by ascending and descending at the same time.") - if ascending is not None: - cursor.sort(ascending, pymongo.ASCENDING) - elif descending is not None: - cursor.sort(descending, pymongo.DESCENDING) - if limit is not None: - cursor.limit(limit) + # Query the database with filters and exclude_fields + cursor = self.store.find(kwargs, exclude_fields) + # Apply sorting + if ascending: + cursor = cursor.sort(ascending, pymongo.ASCENDING) + elif descending: + cursor = cursor.sort(descending, pymongo.DESCENDING) + + # Apply limit + if limit: + cursor = cursor.limit(limit) + + # Process results cursor_list = list(cursor) if cursor_list: - res = [] - for i in cursor_list: - metric = self.metric_type_class_map[MetricType(i.get("type"))](from_dict=i) - res.append(metric) - return res + return [self.metric_type_class_map[MetricType(i.get("type"))](from_dict=i) for i in cursor_list] + return [] def update_metric(self, metric): diff --git a/polytope_server/telemetry/helpers.py b/polytope_server/telemetry/helpers.py index 6364fe9..5032a9a 100644 --- a/polytope_server/telemetry/helpers.py +++ b/polytope_server/telemetry/helpers.py @@ -33,6 +33,7 @@ generate_latest, ) +from ..common.metric import MetricType from .config import config from .exceptions import ( MetricCalculationError, @@ -130,11 +131,14 @@ async def get_cached_usage_metrics( return usage_metrics_cache["data"] user_requests = [] - metrics = metric_store.get_metrics() + metrics = metric_store.get_metrics( + type=MetricType.REQUEST_STATUS_CHANGE, + status="processed", + exclude_fields={"_id": False, "host": False, "request_id": False, "uuid": False}, + ) + user_requests = [] for u_r in metrics: - serialized_u_r = u_r.serialize() - if serialized_u_r["type"] == "request_status_change" and serialized_u_r["status"] == "processed": - user_requests.append(serialized_u_r) + user_requests.append(u_r.serialize()) if not isinstance(user_requests, list): raise TelemetryDataError("Fetched data is not in the expected list format")