Skip to content

Commit

Permalink
feat(smart-autocomplete): add migration for smart autocomplete view o…
Browse files Browse the repository at this point in the history
…n eap_items (#6912)

Consolidating the learnings from the previous smart autocomplete
experiments into one view:

* only indexing the keys for greater compaction
* storing a hash array for numerical and string attrbiutes
* bloom filter index on hash columns


The following was tested with locally inserted data:

1. De-dupication of rows with the same attribute keys
2. Searching by attribute hash
3. bloom filter index used when searching the same hash column
  • Loading branch information
volokluev authored Mar 3, 2025
1 parent 799e027 commit 61f1dfe
Showing 1 changed file with 149 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from typing import List, Sequence

from snuba.clusters.storage_sets import StorageSetKey
from snuba.datasets.storages.tags_hash_map import get_array_vals_hash
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import OperationTarget, SqlOperation
from snuba.utils.schemas import Array, Column, Date, String, UInt

num_attr_buckets = 40

columns: List[Column[Modifiers]] = [
Column("organization_id", UInt(64)),
Column("project_id", UInt(64)),
Column("item_type", UInt(8)),
Column("date", Date(Modifiers(codecs=["DoubleDelta", "ZSTD(1)"]))),
Column(
"retention_days",
UInt(16),
),
Column(
"attribute_keys_hash",
Array(
UInt(64),
Modifiers(
materialized=get_array_vals_hash(
"arrayConcat(attributes_string, attributes_float)"
)
),
),
),
Column("attributes_string", Array(String())),
Column("attributes_float", Array(String())),
# a hash of all the attribute keys of the item in sorted order
# this lets us deduplicate rows with merges
Column(
"key_hash",
UInt(
64,
Modifiers(
materialized="cityHash64(arraySort(arrayConcat(attributes_string, attributes_float)))"
),
),
),
]


_attr_num_names = ", ".join(
[f"mapKeys(attributes_float_{i})" for i in range(num_attr_buckets)]
)
_attr_str_names = ", ".join(
[f"mapKeys(attributes_string_{i})" for i in range(num_attr_buckets)]
)


MV_QUERY = f"""
SELECT
organization_id AS organization_id,
project_id AS project_id,
item_type as item_type,
toMonday(timestamp) AS date,
retention_days as retention_days,
arrayConcat({_attr_str_names}) AS attributes_string,
arrayConcat({_attr_num_names}) AS attributes_float
FROM eap_items_1_local
"""


class Migration(migration.ClickhouseNodeMigration):

blocking = False
storage_set_key = StorageSetKey.EVENTS_ANALYTICS_PLATFORM
granularity = "8192"

local_table_name = "eap_item_co_occurring_attrs_1_local"
dist_table_name = "eap_item_co_occurring_attrs_1_dist"
mv_name = "eap_item_co_occurring_attrs_1_mv"

def forwards_ops(self) -> Sequence[SqlOperation]:
create_table_ops = [
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.local_table_name,
engine=table_engines.ReplacingMergeTree(
storage_set=self.storage_set_key,
primary_key="(organization_id, project_id, date, item_type, key_hash)",
order_by="(organization_id, project_id, date, item_type, key_hash, retention_days)",
partition_by="(retention_days, toMonday(date))",
ttl="date + toIntervalDay(retention_days)",
),
columns=columns,
target=OperationTarget.LOCAL,
),
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.dist_table_name,
engine=table_engines.Distributed(
local_table_name=self.local_table_name,
sharding_key=None,
),
columns=columns,
target=OperationTarget.DISTRIBUTED,
),
]

index_ops = [
operations.AddIndex(
storage_set=self.storage_set_key,
table_name=self.local_table_name,
index_name="bf_attribute_keys_hash",
index_expression="attribute_keys_hash",
index_type="bloom_filter",
granularity=1,
target=operations.OperationTarget.LOCAL,
),
]
materialized_view_ops: list[SqlOperation] = []

materialized_view_ops.append(
operations.CreateMaterializedView(
storage_set=self.storage_set_key,
view_name=self.mv_name,
columns=columns,
destination_table_name=self.local_table_name,
target=OperationTarget.LOCAL,
query=MV_QUERY,
),
)

return create_table_ops + index_ops + materialized_view_ops

def backwards_ops(self) -> Sequence[SqlOperation]:
return [
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.mv_name,
target=OperationTarget.LOCAL,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.local_table_name,
target=OperationTarget.LOCAL,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.dist_table_name,
target=OperationTarget.DISTRIBUTED,
),
]

0 comments on commit 61f1dfe

Please sign in to comment.