Skip to content

Commit

Permalink
convert in beginning
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachel Chen authored and Rachel Chen committed Feb 12, 2025
1 parent abc85b2 commit cea3456
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 44 deletions.
2 changes: 0 additions & 2 deletions snuba/web/db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,6 @@ def execute_query(
# Apply clickhouse query setting overrides
clickhouse_query_settings.update(query_settings.get_clickhouse_settings())

print("formatted_queryyyy", formatted_query)

result = reader.execute(
formatted_query,
clickhouse_query_settings,
Expand Down
36 changes: 28 additions & 8 deletions snuba/web/rpc/v1/endpoint_trace_item_table.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import uuid
from typing import Type

from sentry_protos.snuba.v1.attribute_conditional_aggregation_pb2 import (
AttributeConditionalAggregation,
)
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
Column,
TraceItemTableRequest,
TraceItemTableResponse,
)
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey
from sentry_protos.snuba.v1.trace_item_filter_pb2 import ExistsFilter, TraceItemFilter

from snuba.web.rpc import RPCEndpoint, TraceItemDataResolver
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
Expand All @@ -29,9 +34,6 @@ def _apply_label_to_column(column: Column) -> None:
elif column.HasField("conditional_aggregation"):
column.label = column.conditional_aggregation.label

elif column.HasField("aggregation"):
column.label = column.aggregation.label

for column in in_msg.columns:
_apply_label_to_column(column)

Expand All @@ -47,11 +49,7 @@ def _validate_select_and_groupby(in_msg: TraceItemTableRequest) -> None:
)
grouped_by_columns = set([c.name for c in in_msg.group_by])
aggregation_present = any(
[
c
for c in in_msg.columns
if (c.HasField("aggregation") or c.HasField("conditional_aggregation"))
]
[c for c in in_msg.columns if c.HasField("conditional_aggregation")]
)
if non_aggregted_columns != grouped_by_columns and aggregation_present:
raise BadSnubaRPCRequestException(
Expand Down Expand Up @@ -89,6 +87,27 @@ def _transform_request(request: TraceItemTableRequest) -> TraceItemTableRequest:
return SparseAggregateAttributeTransformer(request).transform()


def convert_to_conditional_aggregation(in_msg: TraceItemTableRequest) -> None:
for column in in_msg.columns:
if column.HasField("aggregation"):
aggregation = column.aggregation
column.conditional_aggregation.CopyFrom(
AttributeConditionalAggregation(
aggregate=aggregation.aggregate,
key=column.key,
label=column.label,
extrapolation_mode=aggregation.extrapolation_mode,
filter=TraceItemFilter( # I needed a filter that will always evaluate to literal(true)
exists_filter=ExistsFilter(
key=AttributeKey(
type=AttributeKey.Type.TYPE_INT, name="organization_id"
)
)
),
)
)


class EndpointTraceItemTable(
RPCEndpoint[TraceItemTableRequest, TraceItemTableResponse]
):
Expand Down Expand Up @@ -125,6 +144,7 @@ def _execute(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse:
)

in_msg = _transform_request(in_msg)
convert_to_conditional_aggregation(in_msg)

resolver = self.get_resolver(in_msg.meta.trace_item_type)
return resolver.resolve(in_msg)
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,7 @@ def _get_reliability_context_columns(column: Column) -> list[SelectedExpression]
extrapolated aggregates need to request extra columns to calculate the reliability of the result.
this function returns the list of columns that need to be requested.
"""
if not (
column.HasField("aggregation") or column.HasField("conditional_aggregation")
):
if not (column.HasField("conditional_aggregation")):
return []

if (
Expand Down Expand Up @@ -198,11 +196,6 @@ def _column_to_expression(column: Column) -> Expression:
# aggregation label may not be set and the column label takes priority anyways.
function_expr = replace(function_expr, alias=column.label)
return function_expr
elif column.HasField("aggregation"):
function_expr = aggregation_to_expression(column.aggregation)
# aggregation label may not be set and the column label takes priority anyways.
function_expr = replace(function_expr, alias=column.label)
return function_expr
elif column.HasField("formula"):
formula_expr = _formula_to_expression(column.formula)
formula_expr = replace(formula_expr, alias=column.label)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,6 @@ def _convert_order_by(
),
)
)
elif x.column.HasField("aggregation"):
res.append(
OrderBy(
direction=direction,
expression=aggregation_to_expression(
x.column.aggregation,
attribute_key_to_expression(x.column.aggregation.key),
),
)
)
return res


Expand Down Expand Up @@ -167,16 +157,6 @@ def _build_query(request: TraceItemTableRequest) -> Query:
selected_columns.append(
SelectedExpression(name=column.label, expression=function_expr)
)
elif column.HasField("aggregation"):
function_expr = aggregation_to_expression(
column.aggregation,
attribute_key_to_expression(column.aggregation.key),
)
# aggregation label may not be set and the column label takes priority anyways.
function_expr = replace(function_expr, alias=column.label)
selected_columns.append(
SelectedExpression(name=column.label, expression=function_expr)
)
elif column.HasField("formula"):
raise BadSnubaRPCRequestException(
"formulas are not supported for uptime checks"
Expand Down
4 changes: 1 addition & 3 deletions snuba/web/rpc/v1/resolvers/common/trace_item_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ def convert_results(
converters[column.label] = lambda x: AttributeValue(val_float=float(x))
elif column.key.type == AttributeKey.TYPE_DOUBLE:
converters[column.label] = lambda x: AttributeValue(val_double=float(x))
elif column.HasField("aggregation") or column.HasField(
"conditional_aggregation"
):
elif column.HasField("conditional_aggregation"):
converters[column.label] = lambda x: AttributeValue(val_double=float(x))
elif column.HasField("formula"):
converters[column.label] = lambda x: AttributeValue(val_double=float(x))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ def transform(self) -> TraceItemTableRequest:
# get all the keys that are used in aggregates
agg_keys = []
for column in self.req.columns:
if column.WhichOneof("column") == "aggregation":
agg_keys.append(column.aggregation.key)
if column.WhichOneof("column") == "conditional_aggregation":
agg_keys.append(column.conditional_aggregation.key)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1758,7 +1758,6 @@ def test_aggregation_filter_basic(self, setup_teardown: Any) -> None:
],
),
]
assert False

def test_conditional_aggregation_in_select(self, setup_teardown: Any) -> None:
"""
Expand Down

0 comments on commit cea3456

Please sign in to comment.