Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eap): use eap_items in timeseries endpoint #6934

Merged
merged 7 commits into from
Mar 6, 2025

Conversation

davidtsuk
Copy link
Contributor

@davidtsuk davidtsuk commented Mar 5, 2025

This PR updates the timeseries endpoint for the eap_spans resolver to use the new eap_items table when the following conditions are met:

  • the use_eap_items_table runtime config is set to 1
  • the start time of the query is greater than use_eap_items_table_start_timestamp_seconds (measured in UTC seconds)

The tests have also been updated to been run once on the eap_spans table and once on the eap_items table.

@davidtsuk davidtsuk requested review from a team as code owners March 5, 2025 00:01
@davidtsuk davidtsuk force-pushed the david/feat/use-eap-items branch from b409c28 to 955fb24 Compare March 5, 2025 00:08
Copy link

codecov bot commented Mar 5, 2025

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
919 1 918 8
View the full list of 1 ❄️ flaky tests
tests.datasets.test_errors_replacer.TestReplacer::test_process_offset_twice

Flake rate in main: 5.08% (Passed 56 times, Failed 3 times)

Stack Traces | 0.348s run time
Traceback (most recent call last):
  File ".../tests/datasets/test_errors_replacer.py", line 370, in test_process_offset_twice
    assert self.replacer.process_message(message) is None
AssertionError: assert (ReplacementMessageMetadata(partition_index=1, offset=42, consumer_group='consumer_group'), UnmergeGroupsReplacement(state_name=<ReplacerState.ERRORS: 'errors'>, timestamp=datetime.datetime(2025, 3, 6, 0, 14, 7, 329383), hashes=['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'], all_columns=[FlattenedColumn(None, 'project_id', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'timestamp', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'event_id', schemas.UUID(modifiers=None)), FlattenedColumn(None, 'platform', schemas.String(modifiers=None)), FlattenedColumn(None, 'environment', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'release', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'dist', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'ip_address_v4', schemas.IPv4(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'ip_address_v6', schemas.IPv6(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user', schemas.String(modifiers=None)), FlattenedColumn(None, 'user_id', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user_name', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user_email', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_name', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_version', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'http_method', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'http_referer', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn('tags', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('tags', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('flags', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('flags', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('contexts', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('contexts', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn(None, 'transaction_name', schemas.String(modifiers=None)), FlattenedColumn(None, 'span_id', schemas.UInt(64, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'trace_id', schemas.UUID(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'partition', schemas.UInt(16, modifiers=None)), FlattenedColumn(None, 'offset', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'message_timestamp', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'retention_days', schemas.UInt(16, modifiers=None)), FlattenedColumn(None, 'deleted', schemas.UInt(8, modifiers=None)), FlattenedColumn(None, 'group_id', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'primary_hash', schemas.UUID(modifiers=None)), FlattenedColumn(None, 'received', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'message', schemas.String(modifiers=None)), FlattenedColumn(None, 'title', schemas.String(modifiers=None)), FlattenedColumn(None, 'culprit', schemas.String(modifiers=None)), FlattenedColumn(None, 'level', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'location', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'version', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'type', schemas.String(modifiers=None)), FlattenedColumn('exception_stacks', 'type', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'value', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'mechanism_type', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'mechanism_handled', schemas.Array(schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'abs_path', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'colno', schemas.Array(schemas.UInt(32, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'filename', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'function', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'lineno', schemas.Array(schemas.UInt(32, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'in_app', schemas.Array(schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'package', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'module', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'stack_level', schemas.Array(schemas.UInt(16, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn(None, 'exception_main_thread', schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_integrations', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('modules', 'name', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('modules', 'version', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn(None, 'trace_sampled', schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'num_processing_errors', schemas.UInt(64, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'replay_id', schemas.UUID(modifiers=SchemaModifiers(nullable=True, readonly=False)))], project_id=1, previous_group_id=1, new_group_id=2)) is None
 +  where (ReplacementMessageMetadata(partition_index=1, offset=42, consumer_group='consumer_group'), UnmergeGroupsReplacement(state_name=<ReplacerState.ERRORS: 'errors'>, timestamp=datetime.datetime(2025, 3, 6, 0, 14, 7, 329383), hashes=['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'], all_columns=[FlattenedColumn(None, 'project_id', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'timestamp', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'event_id', schemas.UUID(modifiers=None)), FlattenedColumn(None, 'platform', schemas.String(modifiers=None)), FlattenedColumn(None, 'environment', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'release', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'dist', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'ip_address_v4', schemas.IPv4(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'ip_address_v6', schemas.IPv6(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user', schemas.String(modifiers=None)), FlattenedColumn(None, 'user_id', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user_name', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user_email', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_name', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_version', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'http_method', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'http_referer', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn('tags', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('tags', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('flags', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('flags', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('contexts', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('contexts', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn(None, 'transaction_name', schemas.String(modifiers=None)), FlattenedColumn(None, 'span_id', schemas.UInt(64, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'trace_id', schemas.UUID(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'partition', schemas.UInt(16, modifiers=None)), FlattenedColumn(None, 'offset', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'message_timestamp', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'retention_days', schemas.UInt(16, modifiers=None)), FlattenedColumn(None, 'deleted', schemas.UInt(8, modifiers=None)), FlattenedColumn(None, 'group_id', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'primary_hash', schemas.UUID(modifiers=None)), FlattenedColumn(None, 'received', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'message', schemas.String(modifiers=None)), FlattenedColumn(None, 'title', schemas.String(modifiers=None)), FlattenedColumn(None, 'culprit', schemas.String(modifiers=None)), FlattenedColumn(None, 'level', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'location', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'version', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'type', schemas.String(modifiers=None)), FlattenedColumn('exception_stacks', 'type', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'value', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'mechanism_type', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'mechanism_handled', schemas.Array(schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'abs_path', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'colno', schemas.Array(schemas.UInt(32, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'filename', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'function', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'lineno', schemas.Array(schemas.UInt(32, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'in_app', schemas.Array(schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'package', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'module', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'stack_level', schemas.Array(schemas.UInt(16, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn(None, 'exception_main_thread', schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_integrations', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('modules', 'name', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('modules', 'version', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn(None, 'trace_sampled', schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'num_processing_errors', schemas.UInt(64, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'replay_id', schemas.UUID(modifiers=SchemaModifiers(nullable=True, readonly=False)))], project_id=1, previous_group_id=1, new_group_id=2)) = <bound method ReplacerWorker.process_message of <snuba.replacer.ReplacerWorker object at 0x7f92393a0e10>>(Message({Partition(topic=Topic(name='replacements'), index=1): 43}))
 +    where <bound method ReplacerWorker.process_message of <snuba.replacer.ReplacerWorker object at 0x7f92393a0e10>> = <snuba.replacer.ReplacerWorker object at 0x7f92393a0e10>.process_message
 +      where <snuba.replacer.ReplacerWorker object at 0x7f92393a0e10> = <tests.datasets.test_errors_replacer.TestReplacer object at 0x7f9239b9dd90>.replacer

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@davidtsuk davidtsuk marked this pull request as draft March 5, 2025 00:56
Copy link
Member

@kylemumma kylemumma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a PR description that describes what the PR is doing and the major changes.

@davidtsuk davidtsuk marked this pull request as ready for review March 5, 2025 19:18
@davidtsuk davidtsuk requested a review from kylemumma March 5, 2025 19:18
@@ -38,6 +40,160 @@
"sentry.end_timestamp",
}

COLLUMN_PREFIX: str = "sentry."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mispelled

Copy link
Member

@volokluev volokluev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job!

Only real change I'm requesting is in PROTO_TYPE_TO_ATTRIBUTE_COLUMN and a question about the virtual columns.

everything else looks good and pretty clean

Comment on lines 48 to 59
"sentry.organization_id": [AttributeKey.Type.TYPE_INT],
"sentry.project_id": [AttributeKey.Type.TYPE_INT],
"sentry.timestamp": [
AttributeKey.Type.TYPE_FLOAT,
AttributeKey.Type.TYPE_DOUBLE,
AttributeKey.Type.TYPE_INT,
AttributeKey.Type.TYPE_STRING,
],
"sentry.trace_id": [
AttributeKey.Type.TYPE_STRING
], # this gets converted from a uuid to a string in a storage processor
"sentry.item_id": [AttributeKey.Type.TYPE_STRING],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why define a prefix if you won't use it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's being used in the attribute_key_to_expression_eap_items function

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that it's used here too but not explicitly i.e.

/s/"sentry.organization_id"/f"{PREFIX}organization_id"

just a nit

}

PROTO_TYPE_TO_ATTRIBUTE_COLUMN: Final[Mapping[AttributeKey.Type.ValueType, str]] = {
AttributeKey.Type.TYPE_INT: "attributes_int",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use floats for TYPE_INT and TYPE_BOOLEAN (cast them in the query)

the attributes_int, attributes_bool are there to reconstruct the original event but we double write them to the float column for aggregations since those are bucketed

Comment on lines +171 to +172
if expression.column.column_name != "attributes_string":
return expression
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

@@ -240,25 +267,37 @@ def _proto_expression_to_ast_expression(expr: ProtoExpression) -> Expression:

def _build_query(request: TimeSeriesRequest) -> Query:
# TODO: This is hardcoded still
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can remove this comment I think

@@ -303,13 +304,14 @@ def _get_count_column_alias(

def get_count_column(
aggregation: AttributeAggregation | AttributeConditionalAggregation,
attribute_key_to_expression: Callable[[Any], Expression],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is the typing for this right? It's not really [Any] -> Expression is it?

Comment on lines 185 to +190
spans_storage = get_storage(StorageKey("eap_spans"))
items_storage = get_storage(StorageKey("eap_items"))
start = BASE_TIME
messages = [gen_message(start - timedelta(minutes=i)) for i in range(120)]
write_raw_unprocessed_events(spans_storage, messages) # type: ignore
write_raw_unprocessed_events(items_storage, messages) # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would extract the double writing to its own function and change that across the board rather than writing the double write everywhere

)


def apply_virtual_columns_eap_items(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does this function get called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't get called yet but when I update the trace item table endpoint, it will get used

@volokluev
Copy link
Member

Oh one more thing so we can compare performance:

Can you add something that lets us force using the eap_spans table from the client side so we can compare performance?

Maybe a special prefix in the referrer or something

@@ -77,6 +77,9 @@


def use_eap_items_table(request_meta: RequestMeta) -> bool:
if request_meta.referrer.startswith("force_use_eap_items_table"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to force using the OLD table

think about this: I run a timeseries query for some ingested timeframe, if use_eap_items_table is turned on, there's no way to for me to compare that query against the old table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I thought you wanted to test it before turning it on

@davidtsuk davidtsuk merged commit f3546d1 into master Mar 6, 2025
32 checks passed
@davidtsuk davidtsuk deleted the david/feat/use-eap-items branch March 6, 2025 02:18
jan-auer added a commit that referenced this pull request Mar 6, 2025
* master:
  feat(eap): use eap_items in timeseries endpoint (#6934)
  chore(tests): Clean up which sentry tests are run on snuba PRs (#6938)
  Revert "bug: cant use group by with formulas (#6933)"
  bug: cant use group by with formulas (#6933)
  fix(test): run more relevant sentry CI tests (#6937)
  fix(ourlogs): Logs trace item table should support offset (#6936)
  Revert "fix(eap): Remove column processor preventing the use of an index (#6932)"
  fix(eap): Remove column processor preventing the use of an index (#6932)
  feat(eap): Sampled storage materialized views (#6930)
  feat(capman): Instead of flatly rejecting heavy projects, limit their bytes scanned on the query level (#6929)
  feat(ourlogs): Add a trace item details endpoint (#6921)
  Run rustup toolchain install explicitly (#6928)
  fix(ci): Auth protoc to avoid rate limits (#6927)
  fix(eap): Set a max execution time for queries (#6925)
  feat(smart-autocomplete): add migration for smart autocomplete view on eap_items (#6912)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants