Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: str_attr and num_attr materialized views for new eap_items table #6907

Merged
merged 8 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
version: v1
kind: readable_storage
name: items_attrs

storage:
key: items_attrs
set_key: events_analytics_platform

readiness_state: complete

schema:
columns:
[
{ name: organization_id, type: UInt, args: { size: 64 } },
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: item_type, type: UInt, args: { size: 8 } },
{ name: attr_key, type: String },
# attr_type can either be 'string' or 'float'
{ name: attr_type, type: String, args: { schema_modifiers: [low_cardinality] } },
{ name: timestamp, type: DateTime },
{ name: retention_days, type: UInt, args: { size: 16 } },
# attr_value is always an empty string for 'float' attr_type
{ name: attr_value, type: String },
]
local_table_name: items_attrs_1_local
dist_table_name: items_attrs_1_dist
allocation_policies:
- name: ConcurrentRateLimitAllocationPolicy
args:
required_tenant_types:
- organization_id
- project_id
- name: ReferrerGuardRailPolicy
args:
required_tenant_types:
- referrer
- name: BytesScannedRejectingPolicy
args:
required_tenant_types:
- organization_id
- referrer

query_processors:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there used to be a processor

- processor: UUIDColumnProcessor
    args:
      columns: [trace_id]

but since theres no trace_id column I thought its safe to remove

- processor: UniqInSelectAndHavingProcessor
- processor: TupleUnaliaser

mandatory_condition_checkers:
- condition: OrgIdEnforcer
args:
field_name: organization_id
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from __future__ import annotations

from typing import Sequence

from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import OperationTarget, SqlOperation
from snuba.utils.constants import ITEM_ATTRIBUTE_BUCKETS
from snuba.utils.schemas import Column, DateTime, String, UInt


class Migration(migration.ClickhouseNodeMigration):
"""
This migration creates a table meant to store just the attributes seen in a particular org.

* attr_type can either be "string" or "float"
* attr_value is always an empty string for float attributes
"""

blocking = False
storage_set_key = StorageSetKey.EVENTS_ANALYTICS_PLATFORM
granularity = "8192"

mv = "items_attrs_1_mv"
local_table = "items_attrs_1_local"
dist_table = "items_attrs_1_dist"
columns: Sequence[Column[Modifiers]] = [
Column("organization_id", UInt(64)),
Column("project_id", UInt(64)),
Column("item_type", UInt(8)),
Column("attr_key", String(modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column("attr_type", String(Modifiers(low_cardinality=True))),
Column(
"timestamp",
DateTime(modifiers=Modifiers(codecs=["DoubleDelta", "ZSTD(1)"])),
),
Column("retention_days", UInt(16)),
Column("attr_value", String(modifiers=Modifiers(codecs=["ZSTD(1)"]))),
]

def forwards_ops(self) -> Sequence[SqlOperation]:
return [
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.local_table,
engine=table_engines.ReplacingMergeTree(
storage_set=self.storage_set_key,
primary_key="(organization_id, project_id, timestamp, item_type, attr_key)",
order_by="(organization_id, project_id, timestamp, item_type, attr_key, attr_type, attr_value, retention_days)",
partition_by="(retention_days, toMonday(timestamp))",
settings={
"index_granularity": self.granularity,
},
ttl="timestamp + toIntervalDay(retention_days)",
),
columns=self.columns,
target=OperationTarget.LOCAL,
),
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.dist_table,
engine=table_engines.Distributed(
local_table_name=self.local_table, sharding_key=None
),
columns=self.columns,
target=OperationTarget.DISTRIBUTED,
),
operations.CreateMaterializedView(
storage_set=self.storage_set_key,
view_name=self.mv,
columns=self.columns,
destination_table_name=self.local_table,
target=OperationTarget.LOCAL,
query=f"""
SELECT
organization_id,
project_id,
item_type,
attrs.1 as attr_key,
attrs.2 as attr_value,
attrs.3 as attr_type,
toStartOfWeek(timestamp) AS timestamp,
retention_days,
FROM eap_items_1_local
LEFT ARRAY JOIN
arrayConcat(
{", ".join(f"arrayMap(x -> tuple(x.1, x.2, 'string'), CAST(attributes_string_{n}, 'Array(Tuple(String, String))'))" for n in range(ITEM_ATTRIBUTE_BUCKETS))},
{",".join(f"arrayMap(x -> tuple(x, '', 'float'), mapKeys(attributes_float_{n}))" for n in range(ITEM_ATTRIBUTE_BUCKETS))}
) AS attrs
""",
),
]

def backwards_ops(self) -> Sequence[SqlOperation]:
return [
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.mv,
target=OperationTarget.LOCAL,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.local_table,
target=OperationTarget.LOCAL,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.dist_table,
target=OperationTarget.DISTRIBUTED,
),
]
3 changes: 3 additions & 0 deletions snuba/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# This will affect migrations and querying.
ATTRIBUTE_BUCKETS = 20

# number of buckets in eap_items_1_local table
ITEM_ATTRIBUTE_BUCKETS = 40

# Maximum number of attempts to fetch profile events
PROFILE_EVENTS_MAX_ATTEMPTS = (
4 # Will result in ~23 seconds total wait time with exponential backoff
Expand Down
Loading