Skip to content

Commit

Permalink
feat(rpc): Support RPC aggregate filters (#83590)
Browse files Browse the repository at this point in the history
This adds support for aggregate filters for the table RPC.

Requires getsentry/snuba#6776.
  • Loading branch information
Zylphrex authored Jan 17, 2025
1 parent 75e2057 commit 73c03db
Show file tree
Hide file tree
Showing 5 changed files with 536 additions and 76 deletions.
10 changes: 10 additions & 0 deletions src/sentry/search/eap/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Literal

from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import AggregationComparisonFilter
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey
from sentry_protos.snuba.v1.trace_item_filter_pb2 import ComparisonFilter

Expand All @@ -17,6 +18,15 @@
}
IN_OPERATORS = ["IN", "NOT IN"]

AGGREGATION_OPERATOR_MAP = {
"=": AggregationComparisonFilter.OP_EQUALS,
"!=": AggregationComparisonFilter.OP_NOT_EQUALS,
">": AggregationComparisonFilter.OP_GREATER_THAN,
"<": AggregationComparisonFilter.OP_LESS_THAN,
">=": AggregationComparisonFilter.OP_GREATER_THAN_OR_EQUALS,
"<=": AggregationComparisonFilter.OP_LESS_THAN_OR_EQUALS,
}

SearchType = (
SizeUnit
| DurationUnit
Expand Down
171 changes: 124 additions & 47 deletions src/sentry/search/eap/spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@
from dataclasses import dataclass, field
from datetime import datetime
from re import Match
from typing import cast
from typing import Literal, cast

import sentry_sdk
from parsimonious.exceptions import ParseError
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
AggregationAndFilter,
AggregationComparisonFilter,
AggregationFilter,
AggregationOrFilter,
)
from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta, TraceItemType
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import (
AttributeAggregation,
AttributeKey,
AttributeValue,
FloatArray,
Expand Down Expand Up @@ -74,14 +81,14 @@ def resolve_meta(self, referrer: str) -> RequestMeta:
@sentry_sdk.trace
def resolve_query(
self, querystring: str | None
) -> tuple[TraceItemFilter | None, list[VirtualColumnContext | None]]:
) -> tuple[TraceItemFilter | None, AggregationFilter | None, list[VirtualColumnContext | None]]:
"""Given a query string in the public search syntax eg. `span.description:foo` construct the TraceItemFilter"""
environment_query = self.__resolve_environment_query()
query, contexts = self.__resolve_query(querystring)
where, having, contexts = self.__resolve_query(querystring)
span = sentry_sdk.get_current_span()
if span:
span.set_tag("SearchResolver.query_string", querystring)
span.set_tag("SearchResolver.resolved_query", query)
span.set_tag("SearchResolver.resolved_query", where)
span.set_tag("SearchResolver.environment_query", environment_query)

# The RPC request meta does not contain the environment.
Expand All @@ -92,20 +99,21 @@ def resolve_query(
# But if both are defined, we AND them together.

if not environment_query:
return query, contexts
return where, having, contexts

if not query:
return environment_query, []
if not where:
return environment_query, having, []

return (
TraceItemFilter(
and_filter=AndFilter(
filters=[
environment_query,
query,
where,
]
)
),
having,
contexts,
)

Expand Down Expand Up @@ -134,9 +142,9 @@ def __resolve_environment_query(self) -> TraceItemFilter | None:

def __resolve_query(
self, querystring: str | None
) -> tuple[TraceItemFilter | None, list[VirtualColumnContext | None]]:
) -> tuple[TraceItemFilter | None, AggregationFilter | None, list[VirtualColumnContext | None]]:
if querystring is None:
return None, []
return None, None, []
try:
parsed_terms = event_search.parse_search_query(
querystring,
Expand All @@ -161,14 +169,16 @@ def __resolve_query(

def _resolve_boolean_conditions(
self, terms: event_filter.ParsedTerms
) -> tuple[TraceItemFilter | None, list[VirtualColumnContext | None]]:
) -> tuple[TraceItemFilter | None, AggregationFilter | None, list[VirtualColumnContext | None]]:
if len(terms) == 0:
return None, []
return None, None, []
elif len(terms) == 1:
if isinstance(terms[0], event_search.ParenExpression):
return self._resolve_boolean_conditions(terms[0].children)
elif isinstance(terms[0], event_search.SearchFilter):
return self._resolve_terms([cast(event_search.SearchFilter, terms[0])])
elif isinstance(terms[0], event_search.AggregateFilter):
return self._resolve_terms([cast(event_search.AggregateFilter, terms[0])])
else:
raise NotImplementedError("Haven't handled all the search expressions yet")

Expand Down Expand Up @@ -208,39 +218,56 @@ def _resolve_boolean_conditions(
# the two sides. If there is no OR, split the first element out to AND
index = None
lhs, rhs = None, None
operator: type[OrFilter] | type[AndFilter] | None = None
try:
index = terms.index(event_search.SearchBoolean.BOOLEAN_OR)
lhs, rhs = terms[:index], terms[index + 1 :]
operator = OrFilter
operator: Literal["and" | "or"] = "or"
except Exception:
lhs, rhs = terms[:1], terms[1:]
operator = AndFilter
operator = "and"

resolved_lhs, contexts_lhs = self._resolve_boolean_conditions(lhs)
resolved_rhs, contexts_rhs = self._resolve_boolean_conditions(rhs)
where_lhs, having_lhs, contexts_lhs = self._resolve_boolean_conditions(lhs)
where_rhs, having_rhs, contexts_rhs = self._resolve_boolean_conditions(rhs)
contexts = contexts_lhs + contexts_rhs

if resolved_lhs is not None and resolved_rhs is not None:
if operator == AndFilter:
return (
TraceItemFilter(and_filter=AndFilter(filters=[resolved_lhs, resolved_rhs])),
contexts,
where = None
having = None

if where_lhs is not None and where_rhs is not None:
if operator == "and":
where = TraceItemFilter(and_filter=AndFilter(filters=[where_lhs, where_rhs]))
else:
where = TraceItemFilter(or_filter=OrFilter(filters=[where_lhs, where_rhs]))
elif where_lhs is None and where_rhs is not None:
where = where_rhs
elif where_lhs is not None and where_rhs is None:
where = where_lhs

if having_lhs is not None and having_rhs is not None:
if operator == "and":
having = AggregationFilter(
and_filter=AggregationAndFilter(filters=[having_lhs, having_rhs])
)
else:
return (
TraceItemFilter(or_filter=OrFilter(filters=[resolved_lhs, resolved_rhs])),
contexts,
having = AggregationFilter(
or_filter=AggregationOrFilter(filters=[having_lhs, having_rhs])
)
elif resolved_lhs is None and resolved_rhs is not None:
return resolved_rhs, contexts
elif resolved_lhs is not None and resolved_rhs is None:
return resolved_lhs, contexts
else:
return None, contexts
elif having_lhs is None and having_rhs is not None:
having = having_rhs
elif having_lhs is not None and having_rhs is None:
having = having_lhs

return where, having, contexts

def _resolve_terms(
self, terms: event_filter.ParsedTerms
) -> tuple[TraceItemFilter | None, AggregationFilter | None, list[VirtualColumnContext | None]]:
where, where_contexts = self._resolve_where(terms)
having, having_contexts = self._resolve_having(terms)
return where, having, where_contexts + having_contexts

def _resolve_where(
self, terms: event_filter.ParsedTerms
) -> tuple[TraceItemFilter | None, list[VirtualColumnContext | None]]:
parsed_terms = []
resolved_contexts = []
Expand All @@ -251,21 +278,46 @@ def _resolve_terms(
)
parsed_terms.append(resolved_term)
resolved_contexts.append(resolved_context)
else:
if self.config.use_aggregate_conditions:
raise NotImplementedError("Can't filter on aggregates yet")

if len(parsed_terms) > 1:
return TraceItemFilter(and_filter=AndFilter(filters=parsed_terms)), resolved_contexts
elif len(parsed_terms) == 1:
return parsed_terms[0], resolved_contexts
else:
return None, []

def _resolve_having(
self, terms: event_filter.ParsedTerms
) -> tuple[AggregationFilter | None, list[VirtualColumnContext | None]]:
if not self.config.use_aggregate_conditions:
return None, []

parsed_terms = []
resolved_contexts = []
for item in terms:
if isinstance(item, event_search.AggregateFilter):
resolved_term, resolved_context = self.resolve_aggregate_term(
cast(event_search.AggregateFilter, item)
)
parsed_terms.append(resolved_term)
resolved_contexts.append(resolved_context)

if len(parsed_terms) > 1:
return (
AggregationFilter(and_filter=AggregationAndFilter(filters=parsed_terms)),
resolved_contexts,
)
elif len(parsed_terms) == 1:
return parsed_terms[0], resolved_contexts
return None, []

def resolve_term(
self, term: event_search.SearchFilter
) -> tuple[TraceItemFilter, VirtualColumnContext | None]:
resolved_column, context = self.resolve_column(term.key.name)

if not isinstance(resolved_column.proto_definition, AttributeKey):
raise ValueError(f"{term.key.name} is not valid search term")

raw_value = term.value.raw_value
if term.value.is_wildcard():
if term.operator == "=":
Expand All @@ -287,19 +339,44 @@ def resolve_term(
operator = constants.OPERATOR_MAP[term.operator]
else:
raise InvalidSearchQuery(f"Unknown operator: {term.operator}")
if isinstance(resolved_column.proto_definition, AttributeKey):
return (
TraceItemFilter(
comparison_filter=ComparisonFilter(
key=resolved_column.proto_definition,
op=operator,
value=self._resolve_search_value(resolved_column, term.operator, raw_value),
)
),
context,
)

return (
TraceItemFilter(
comparison_filter=ComparisonFilter(
key=resolved_column.proto_definition,
op=operator,
value=self._resolve_search_value(resolved_column, term.operator, raw_value),
)
),
context,
)

def resolve_aggregate_term(
self, term: event_search.AggregateFilter
) -> tuple[AggregationFilter, VirtualColumnContext | None]:
resolved_column, context = self.resolve_column(term.key.name)

if not isinstance(resolved_column.proto_definition, AttributeAggregation):
raise ValueError(f"{term.key.name} is not valid search term")

# TODO: Handle different units properly
value = term.value.value

if term.operator in constants.OPERATOR_MAP:
operator = constants.AGGREGATION_OPERATOR_MAP[term.operator]
else:
raise NotImplementedError("Can't filter on aggregates yet")
raise InvalidSearchQuery(f"Unknown operator: {term.operator}")

return (
AggregationFilter(
comparison_filter=AggregationComparisonFilter(
aggregation=resolved_column.proto_definition,
op=operator,
val=value,
),
),
context,
)

def _resolve_search_value(
self,
Expand Down
7 changes: 4 additions & 3 deletions src/sentry/snuba/spans_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def run_table_query(
SearchResolver(params=params, config=config) if search_resolver is None else search_resolver
)
meta = resolver.resolve_meta(referrer=referrer)
query, query_contexts = resolver.resolve_query(query_string)
where, having, query_contexts = resolver.resolve_query(query_string)
columns, column_contexts = resolver.resolve_columns(selected_columns)
contexts = resolver.clean_contexts(query_contexts + column_contexts)
# We allow orderby function_aliases if they're a selected_column
Expand Down Expand Up @@ -89,7 +89,8 @@ def run_table_query(
"""Run the query"""
rpc_request = TraceItemTableRequest(
meta=meta,
filter=query,
filter=where,
aggregation_filter=having,
columns=labeled_columns,
group_by=(
[
Expand Down Expand Up @@ -169,7 +170,7 @@ def get_timeseries_query(
) -> TimeSeriesRequest:
resolver = SearchResolver(params=params, config=config)
meta = resolver.resolve_meta(referrer=referrer)
query, query_contexts = resolver.resolve_query(query_string)
query, _, query_contexts = resolver.resolve_query(query_string)
(aggregations, _) = resolver.resolve_aggregates(y_axes)
(groupbys, _) = resolver.resolve_columns(groupby)
if extra_conditions is not None:
Expand Down
Loading

0 comments on commit 73c03db

Please sign in to comment.