Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachel Chen authored and Rachel Chen committed Feb 26, 2025
1 parent 186713e commit 8c87dab
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 19 deletions.
67 changes: 51 additions & 16 deletions snuba/web/rpc/proto_component.py → snuba/web/rpc/proto_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,36 @@

from abc import ABC, abstractmethod
from types import MethodType
from typing import Any, Callable
from typing import Any, Callable, Generic, TypeVar

from google.protobuf.message import Message as ProtobufMessage
from sentry_protos.snuba.v1.attribute_conditional_aggregation_pb2 import (
AttributeConditionalAggregation,
)
from sentry_protos.snuba.v1.endpoint_time_series_pb2 import (
Expression as TimeSeriesExpression,
)
from sentry_protos.snuba.v1.endpoint_time_series_pb2 import TimeSeriesRequest
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
AggregationComparisonFilter,
AggregationFilter,
Column,
TraceItemTableRequest,
)

Tin = TypeVar("Tin", bound=ProtobufMessage)

class ProtoWrapper(ABC):
def __init__(self, underlying_proto: Any):

class ProtoWrapper(Generic[Tin], ABC):
def __init__(self, underlying_proto: Tin):
self.underlying_proto = underlying_proto

@abstractmethod
def accept(self, visitor: ProtoVisitor) -> None:
pass


class ColumnWrapper(ProtoWrapper):
class ColumnWrapper(ProtoWrapper[Column]):
def accept(self, visitor: ProtoVisitor) -> None:
visitor.visit_ColumnWrapper(self)
column = self.underlying_proto
Expand All @@ -34,12 +40,12 @@ def accept(self, visitor: ProtoVisitor) -> None:
ColumnWrapper(column.formula.right).accept(visitor)


class AggregationComparisonFilterWrapper(ProtoWrapper):
class AggregationComparisonFilterWrapper(ProtoWrapper[AggregationComparisonFilter]):
def accept(self, visitor: ProtoVisitor) -> None:
visitor.visit_AggregationComparisonFilterWrapper(self)


class AggregationFilterWrapper(ProtoWrapper):
class AggregationFilterWrapper(ProtoWrapper[AggregationFilter]):
def accept(self, visitor: ProtoVisitor) -> None:
visitor.visit_AggregationFilterWrapper(self)
aggregation_filter = self.underlying_proto
Expand All @@ -57,7 +63,7 @@ def accept(self, visitor: ProtoVisitor) -> None:
AggregationFilterWrapper(agg_filter).accept(visitor)


class TimeSeriesExpressionWrapper(ProtoWrapper):
class TimeSeriesExpressionWrapper(ProtoWrapper[TimeSeriesExpression]):
def accept(self, visitor: ProtoVisitor) -> None:
visitor.visit_TimeSeriesExpressionWrapper(self)

Expand All @@ -71,7 +77,7 @@ def accept(self, visitor: ProtoVisitor) -> None:
)


class TraceItemTableRequestWrapper(ProtoWrapper):
class TraceItemTableRequestWrapper(ProtoWrapper[TraceItemTableRequest]):
def accept(self, visitor: ProtoVisitor) -> None:
visitor.visit_TraceItemTableRequestWrapper(self)
trace_item_table_request = self.underlying_proto
Expand All @@ -85,7 +91,7 @@ def accept(self, visitor: ProtoVisitor) -> None:
).accept(visitor)


class TimeSeriesRequestWrapper(ProtoWrapper):
class TimeSeriesRequestWrapper(ProtoWrapper[TimeSeriesRequest]):
def accept(self, visitor: ProtoVisitor) -> None:
visitor.visit_TimeSeriesRequestWrapper(self)
time_series_request = self.underlying_proto
Expand All @@ -94,6 +100,39 @@ def accept(self, visitor: ProtoVisitor) -> None:


class ProtoVisitor(ABC):
"""
Proto visitor design is split into two parts:
1. the visitor. Responsible for only executing work on the object it is visiting
2. the wrappers that wrap around the protobuf objects. Think of these as the "nodes" of an AST. Responsible for traversal
I put the responsibility of traversing the wrapper structure onto the wrapper itself. This is a valid approach according
to Design Patterns: Elements of Reusable Object-Oriented Software page 339: "We can
put responsibility for traversal in any of three places: in the object structure,
in the visitor, or in a separate iterator object (see Iterator (257)).
Often the object structure is responsible for iteration. A collection will simply
iterate over its elements, calling the Accept operation on each. A composite
will commonly traverse itself by having each Accept operation traverse the
element's children and call Accept on each of them recursively."
If the visitor is responsible for traversal, then we'd have to implement all the visit methods for each protobuf field,
along with the corresponding wrapper class. There are too many in our codebase. By delegating the traversal
responsibility to the wrapper, we only need to create the wrapper class and the corresponding visit method when we need it.
How to use:
1. Given the protobuf message, split its fields into the ones you care about visiting and doing work on, vs the ones
you do not care about
2. For the fields you want to do work on, implement the visit methods. ex) def visit_ColumnWrapper, def visit_AggregationComparisonFilterWrapper,
these visit methods will not do any traversal. It will simply execute work
3. Implement the corresponding wrapper classes ex) class ColumnWrapper, class AggregationComparisonFilterWrapper
4. The wrapper classes' accept methods will call the matching visit method and traverse itself
5. For fields that you do not want to do work on, calling vistor.visit_IrrelevantField will default to the do_nothing
method below
"""

"Proto visitor design is split into two parts"

def __getattr__(self, visit_method: str) -> Callable[..., Any]:
subclass_visit_method = getattr(self.__class__, visit_method, None)
if callable(subclass_visit_method):
Expand Down Expand Up @@ -123,13 +162,9 @@ def _convert_aggregation_to_conditional_aggregation(

class AggregationToConditionalAggregationVisitor(ProtoVisitor):
"""
We support aggregation, but now we want to support conditional aggregation for the insights team, which only aggregates
if the field satisfies the condition: https://clickhouse.com/docs/en/sql-reference/aggregate-functions/combinators#-if.
For messages that don't have conditional aggregation, this function replaces the aggregation with a conditional aggregation,
where the filter is null, and every field is the same. This allows code elsewhere to set the default condition to always
be true.
The reason we do this "transformation" is to avoid code fragmentation down the line, where we constantly have to check
if the request contains `AttributeAggregation` or `AttributeConditionalAggregation`
Originally, the protobuf request did not support conditional aggregations. When they were added, the file structure did not allow adding a TraceItemFilter to the AttributeAggregation message without breaking backwards compatibility. A new AttributeConditionalAggregation message was added whose fields are a superset of of AttributeAggregation. AttributeAggregation was deprecated.
This code transforms every AttributeAggregation to an AttributeConditionalAggregation to be able to serve queries before the deprecation, and have the rest of the query pipeline only concerned with one kind of aggregation object.
"""

def visit_ColumnWrapper(self, column_wrapper: ColumnWrapper) -> None:
Expand Down
2 changes: 1 addition & 1 deletion snuba/web/rpc/v1/endpoint_time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from snuba.web.rpc import RPCEndpoint, TraceItemDataResolver
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.proto_component import (
from snuba.web.rpc.proto_visitor import (
AggregationToConditionalAggregationVisitor,
TimeSeriesRequestWrapper,
)
Expand Down
2 changes: 1 addition & 1 deletion snuba/web/rpc/v1/endpoint_trace_item_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from snuba.web.rpc import RPCEndpoint, TraceItemDataResolver
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.proto_component import (
from snuba.web.rpc.proto_visitor import (
AggregationToConditionalAggregationVisitor,
TraceItemTableRequestWrapper,
)
Expand Down
2 changes: 1 addition & 1 deletion tests/web/rpc/v1/test_conditional_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
Function,
)

from snuba.web.rpc.proto_component import (
from snuba.web.rpc.proto_visitor import (
AggregationToConditionalAggregationVisitor,
TimeSeriesRequestWrapper,
TraceItemTableRequestWrapper,
Expand Down

0 comments on commit 8c87dab

Please sign in to comment.