Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/ecmwf/polytope-server into …
Browse files Browse the repository at this point in the history
…feature/add_wave_alternative_grid
  • Loading branch information
mathleur committed Jan 24, 2025
2 parents 1dc193e + 48756a6 commit 4b8f569
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 40 deletions.
6 changes: 5 additions & 1 deletion polytope_server/common/metric_store/dynamodb_metric_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,14 @@ def get_metric(self, uuid):
if "Item" in response:
return _load(response["Item"])

def get_metrics(self, ascending=None, descending=None, limit=None, request_id=None, **kwargs):
def get_metrics(self, ascending=None, descending=None, limit=None, request_id=None, exclude_fields=None, **kwargs):
if ascending is not None and descending is not None:
raise ValueError("Cannot sort by ascending and descending at the same time.")

if exclude_fields:
logger.warning(f"exclude_fields parameter is provided but not implemented: {exclude_fields}")
raise NotImplementedError("The 'exclude_fields' feature is not implemented yet.")

if request_id is not None:
fn = self.table.query
params = {
Expand Down
2 changes: 1 addition & 1 deletion polytope_server/common/metric_store/metric_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def get_metric(self, uuid: str) -> Metric:
"""Fetch metric from the metric store"""

@abstractmethod
def get_metrics(self, ascending=None, descending=None, limit=None, **kwargs) -> List[Metric]:
def get_metrics(self, ascending=None, descending=None, limit=None, exclude_fields=None, **kwargs) -> List[Metric]:
"""Returns [limit] metrics which match kwargs, ordered by
ascending/descenging keys (e.g. ascending = 'timestamp')"""

Expand Down
78 changes: 44 additions & 34 deletions polytope_server/common/metric_store/mongodb_metric_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,25 @@ def get_metric(self, uuid):
else:
return None

def get_metrics(self, ascending=None, descending=None, limit=None, **kwargs):
all_slots = []
def get_metrics(self, ascending=None, descending=None, limit=None, exclude_fields=None, **kwargs):
"""
Fetch metrics from the store with optional sorting, limiting, and field exclusion.
Args:
ascending (str): Field to sort by ascending order.
descending (str): Field to sort by descending order.
limit (int): Limit the number of results.
exclude_fields (dict): Fields to exclude in the result (default is {"_id": False}).
**kwargs: Filters to apply to the query.
Returns:
List of metrics matching the query.
"""
# Default exclude_fields to {"_id": False} if not provided
if exclude_fields is None:
exclude_fields = {"_id": False}

all_slots = []
found_type = None
for k, v in self.metric_type_class_map.items():
class_slots = list(set().union(Metric.__slots__, v.__slots__))
Expand All @@ -117,46 +133,40 @@ def get_metrics(self, ascending=None, descending=None, limit=None, **kwargs):
all_slots = list(set().union(all_slots, class_slots))

if not found_type:
raise KeyError(
"The provided keys must be a subset of slots of any of the ",
"available metric types.",
)
raise KeyError("The provided keys must be a subset of slots of any of the available metric types.")

if ascending:
if ascending not in class_slots:
raise KeyError("The identified metric type does not have the key {}".format(ascending))
if ascending and ascending not in class_slots:
raise KeyError(f"The identified metric type does not have the key {ascending}")

if descending:
if descending not in class_slots:
raise KeyError("The identified metric type does not have the key {}".format(descending))
if descending and descending not in class_slots:
raise KeyError(f"The identified metric type does not have the key {descending}")

kwargs_to_pop = []
for k, v in kwargs.items():
if v is None:
kwargs_to_pop.append(k)
continue
kwargs[k] = self.metric_type_class_map[found_type].serialize_slot(k, v)
for k in kwargs_to_pop:
kwargs.pop(k)
if ascending and descending:
raise ValueError("Cannot sort by ascending and descending at the same time.")

cursor = self.store.find(kwargs, {"_id": False})
# Serialize and clean kwargs
kwargs = {
k: self.metric_type_class_map[found_type].serialize_slot(k, v) for k, v in kwargs.items() if v is not None
}

if ascending is not None and descending is not None:
raise ValueError("Cannot sort by ascending and descending at the same time.")
if ascending is not None:
cursor.sort(ascending, pymongo.ASCENDING)
elif descending is not None:
cursor.sort(descending, pymongo.DESCENDING)
if limit is not None:
cursor.limit(limit)
# Query the database with filters and exclude_fields
cursor = self.store.find(kwargs, exclude_fields)

# Apply sorting
if ascending:
cursor = cursor.sort(ascending, pymongo.ASCENDING)
elif descending:
cursor = cursor.sort(descending, pymongo.DESCENDING)

# Apply limit
if limit:
cursor = cursor.limit(limit)

# Process results
cursor_list = list(cursor)
if cursor_list:
res = []
for i in cursor_list:
metric = self.metric_type_class_map[MetricType(i.get("type"))](from_dict=i)
res.append(metric)
return res
return [self.metric_type_class_map[MetricType(i.get("type"))](from_dict=i) for i in cursor_list]

return []

def update_metric(self, metric):
Expand Down
12 changes: 8 additions & 4 deletions polytope_server/telemetry/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
generate_latest,
)

from ..common.metric import MetricType
from .config import config
from .exceptions import (
MetricCalculationError,
Expand Down Expand Up @@ -130,11 +131,14 @@ async def get_cached_usage_metrics(
return usage_metrics_cache["data"]

user_requests = []
metrics = metric_store.get_metrics()
metrics = metric_store.get_metrics(
type=MetricType.REQUEST_STATUS_CHANGE,
status="processed",
exclude_fields={"_id": False, "host": False, "request_id": False, "uuid": False},
)
user_requests = []
for u_r in metrics:
serialized_u_r = u_r.serialize()
if serialized_u_r["type"] == "request_status_change" and serialized_u_r["status"] == "processed":
user_requests.append(serialized_u_r)
user_requests.append(u_r.serialize())

if not isinstance(user_requests, list):
raise TelemetryDataError("Fetched data is not in the expected list format")
Expand Down

0 comments on commit 4b8f569

Please sign in to comment.