-
-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(eap-api): support conditional aggregations in SELECT #6870
Changes from all commits
d061eee
b03e3e8
abc85b2
cea3456
b527547
dca9e59
4d57b83
ac24c55
24b3a30
462a2a7
f70521a
ce2571e
3e6a855
b0a04c5
e01beb2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,12 @@ | ||
import uuid | ||
from typing import Type | ||
|
||
from sentry_protos.snuba.v1.attribute_conditional_aggregation_pb2 import ( | ||
AttributeConditionalAggregation, | ||
) | ||
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( | ||
AggregationComparisonFilter, | ||
AggregationFilter, | ||
Column, | ||
TraceItemTableRequest, | ||
TraceItemTableResponse, | ||
|
@@ -26,8 +31,8 @@ def _apply_label_to_column(column: Column) -> None: | |
if column.HasField("key"): | ||
column.label = column.key.name | ||
|
||
elif column.HasField("aggregation"): | ||
column.label = column.aggregation.label | ||
elif column.HasField("conditional_aggregation"): | ||
column.label = column.conditional_aggregation.label | ||
|
||
for column in in_msg.columns: | ||
_apply_label_to_column(column) | ||
|
@@ -43,7 +48,9 @@ def _validate_select_and_groupby(in_msg: TraceItemTableRequest) -> None: | |
[c.key.name for c in in_msg.columns if c.HasField("key")] | ||
) | ||
grouped_by_columns = set([c.name for c in in_msg.group_by]) | ||
aggregation_present = any([c for c in in_msg.columns if c.HasField("aggregation")]) | ||
aggregation_present = any( | ||
[c for c in in_msg.columns if c.HasField("conditional_aggregation")] | ||
) | ||
if non_aggregted_columns != grouped_by_columns and aggregation_present: | ||
raise BadSnubaRPCRequestException( | ||
f"Non aggregated columns should be in group_by. non_aggregated_columns: {non_aggregted_columns}, grouped_by_columns: {grouped_by_columns}" | ||
|
@@ -80,6 +87,61 @@ def _transform_request(request: TraceItemTableRequest) -> TraceItemTableRequest: | |
return SparseAggregateAttributeTransformer(request).transform() | ||
|
||
|
||
def convert_to_conditional_aggregation(in_msg: TraceItemTableRequest) -> None: | ||
""" | ||
Up to this point we support aggregation, but now we want to support conditional aggregation, which only aggregates | ||
if the field satisfies the condition: https://clickhouse.com/docs/en/sql-reference/aggregate-functions/combinators#-if | ||
|
||
For messages that don't have conditional aggregation, this function replaces the aggregation with a conditional aggregation, | ||
where the filter is null, and every field is the same. This allows code elsewhere to set the default condition to always | ||
be true. | ||
|
||
The reason we do this "transformation" is to avoid code fragmentation down the line, where we constantly have to check | ||
if the request contains `AttributeAggregation` or `AttributeConditionalAggregation` | ||
""" | ||
|
||
def _add_conditional_aggregation( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see if you can replace in all the cases so you don't have the split behavior with Column and AggregationComparsionFilter There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. replace is done with |
||
input: Column | AggregationComparisonFilter, | ||
) -> None: | ||
aggregation = input.aggregation | ||
input.ClearField("aggregation") | ||
input.conditional_aggregation.CopyFrom( | ||
AttributeConditionalAggregation( | ||
aggregate=aggregation.aggregate, | ||
key=aggregation.key, | ||
label=aggregation.label, | ||
extrapolation_mode=aggregation.extrapolation_mode, | ||
) | ||
) | ||
|
||
def _convert(input: Column | AggregationFilter) -> None: | ||
if isinstance(input, Column): | ||
if input.HasField("aggregation"): | ||
_add_conditional_aggregation(input) | ||
|
||
if input.HasField("formula"): | ||
_convert(input.formula.left) | ||
_convert(input.formula.right) | ||
|
||
if isinstance(input, AggregationFilter): | ||
if input.HasField("and_filter"): | ||
for aggregation_filter in input.and_filter.filters: | ||
_convert(aggregation_filter) | ||
if input.HasField("or_filter"): | ||
for aggregation_filter in input.or_filter.filters: | ||
_convert(aggregation_filter) | ||
if input.HasField("comparison_filter"): | ||
if input.comparison_filter.HasField("aggregation"): | ||
_add_conditional_aggregation(input.comparison_filter) | ||
|
||
for column in in_msg.columns: | ||
_convert(column) | ||
for ob in in_msg.order_by: | ||
_convert(ob.column) | ||
if in_msg.HasField("aggregation_filter"): | ||
_convert(in_msg.aggregation_filter) | ||
|
||
|
||
class EndpointTraceItemTable( | ||
RPCEndpoint[TraceItemTableRequest, TraceItemTableResponse] | ||
): | ||
|
@@ -103,6 +165,7 @@ def response_class(cls) -> Type[TraceItemTableResponse]: | |
return TraceItemTableResponse | ||
|
||
def _execute(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse: | ||
convert_to_conditional_aggregation(in_msg) | ||
in_msg = _apply_labels_to_columns(in_msg) | ||
_validate_select_and_groupby(in_msg) | ||
_validate_order_by(in_msg) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How different would this function be for the TimeSeries endpoint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.