diff --git a/Brewfile b/Brewfile index c35e195944a..f396bebe584 100644 --- a/Brewfile +++ b/Brewfile @@ -1,2 +1,2 @@ -# necessary for rust-snuba -brew 'cmake' +brew 'cmake' # for rust-snuba +brew 'protobuf' # for rust-snuba > sentry_protos diff --git a/docs/source/contributing/environment.rst b/docs/source/contributing/environment.rst index 0735425e5e5..84a3e3d57b1 100644 --- a/docs/source/contributing/environment.rst +++ b/docs/source/contributing/environment.rst @@ -82,7 +82,8 @@ and, in another terminal:: cd ../sentry git checkout master git pull - sentry devservices up --exclude=snuba + devservices up + docker stop snuba-snuba-1 snuba-clickhouse-1 This will get the most recent version of Sentry on master, and bring up all snuba's dependencies. diff --git a/docs/source/getstarted.rst b/docs/source/getstarted.rst index 3dcfe8c0bdd..90a0e5caec9 100644 --- a/docs/source/getstarted.rst +++ b/docs/source/getstarted.rst @@ -22,7 +22,7 @@ in ``~/.sentry/sentry.conf.py``:: And then use:: - sentry devservices up --exclude=snuba + devservices up --exclude=snuba Note that Snuba assumes that everything is running on UTC time. Otherwise you may experience issues with timezone mismatches. diff --git a/docs/source/migrations/modes.rst b/docs/source/migrations/modes.rst index 94b8b98f178..ca4cc805eb0 100644 --- a/docs/source/migrations/modes.rst +++ b/docs/source/migrations/modes.rst @@ -27,7 +27,7 @@ Enabling Local Mode In your local ``server.py``, set ``SENTRY_DISTRIBUTED_CLICKHOUSE_TABLES`` to False. This is the default setting, so configuration is already set up for local mode migrations. Start up the corresponding ClickHouse -container (``sentry devservices up clickhouse``). +container (``devservices up clickhouse``). Now, run migrations as expected (``snuba migrations migrate --force``). @@ -36,7 +36,7 @@ Enabling Distributed Mode ============================ In your local ``server.py``, set ``SENTRY_DISTRIBUTED_CLICKHOUSE_TABLES`` -to True. Start up the corresponding ClickHouse container (``sentry devservices up clickhouse``). +to True. Start up the corresponding ClickHouse container (``devservices up clickhouse``). Make sure that the Zookeeper container is also running; without it, distributed migrations will not work properly. diff --git a/requirements.txt b/requirements.txt index b3d6a79ece8..edd034b29fd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ black==22.6.0 blinker==1.5 click==8.1.7 -clickhouse-driver==0.2.6 +clickhouse-driver==0.2.9 confluent-kafka==2.7.0 datadog==0.21.0 devservices==1.0.16 diff --git a/rust_snuba/benches/processors.rs b/rust_snuba/benches/processors.rs index 631e616803b..d8dd4b4de5a 100644 --- a/rust_snuba/benches/processors.rs +++ b/rust_snuba/benches/processors.rs @@ -99,6 +99,7 @@ fn create_factory( }, stop_at_timestamp: None, batch_write_timeout: None, + custom_envoy_request_timeout: None, }; Box::new(factory) } diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 8b93c9efb97..72c5a8dbe2b 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -46,6 +46,7 @@ pub fn consumer( stop_at_timestamp: Option, batch_write_timeout_ms: Option, max_dlq_buffer_length: Option, + custom_envoy_request_timeout: Option, ) -> usize { py.allow_threads(|| { consumer_impl( @@ -65,6 +66,7 @@ pub fn consumer( batch_write_timeout_ms, mutations_mode, max_dlq_buffer_length, + custom_envoy_request_timeout, ) }) } @@ -87,6 +89,7 @@ pub fn consumer_impl( batch_write_timeout_ms: Option, mutations_mode: bool, max_dlq_buffer_length: Option, + custom_envoy_request_timeout: Option, ) -> usize { setup_logging(); @@ -281,6 +284,7 @@ pub fn consumer_impl( accountant_topic_config: consumer_config.accountant_topic, stop_at_timestamp, batch_write_timeout, + custom_envoy_request_timeout, }; StreamProcessor::with_kafka(config, factory, topic, dlq_policy) diff --git a/rust_snuba/src/factory.rs b/rust_snuba/src/factory.rs index 0ac543bc1fe..f65f5f442b3 100644 --- a/rust_snuba/src/factory.rs +++ b/rust_snuba/src/factory.rs @@ -57,6 +57,7 @@ pub struct ConsumerStrategyFactory { pub accountant_topic_config: config::TopicConfig, pub stop_at_timestamp: Option, pub batch_write_timeout: Option, + pub custom_envoy_request_timeout: Option, } impl ProcessingStrategyFactory for ConsumerStrategyFactory { @@ -117,6 +118,7 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactory { &self.storage_config.clickhouse_cluster.password, self.async_inserts, self.batch_write_timeout, + self.custom_envoy_request_timeout, ); let accumulator = Arc::new( diff --git a/rust_snuba/src/strategies/clickhouse/batch.rs b/rust_snuba/src/strategies/clickhouse/batch.rs index b43e5acc495..1524d8b7171 100644 --- a/rust_snuba/src/strategies/clickhouse/batch.rs +++ b/rust_snuba/src/strategies/clickhouse/batch.rs @@ -43,8 +43,9 @@ impl BatchFactory { clickhouse_password: &str, async_inserts: bool, batch_write_timeout: Option, + custom_envoy_request_timeout: Option, ) -> Self { - let mut headers = HeaderMap::with_capacity(5); + let mut headers = HeaderMap::with_capacity(6); headers.insert(CONNECTION, HeaderValue::from_static("keep-alive")); headers.insert(ACCEPT_ENCODING, HeaderValue::from_static("gzip,deflate")); headers.insert( @@ -59,6 +60,12 @@ impl BatchFactory { "X-ClickHouse-Database", HeaderValue::from_str(database).unwrap(), ); + if let Some(custom_envoy_request_timeout) = custom_envoy_request_timeout { + headers.insert( + "x-envoy-upstream-rq-per-try-timeout-ms", + HeaderValue::from_str(&custom_envoy_request_timeout.to_string()).unwrap(), + ); + } let mut query_params = String::new(); query_params.push_str("load_balancing=in_order&insert_distributed_sync=1"); @@ -268,6 +275,7 @@ mod tests { "", false, None, + None, ); let mut batch = factory.new_batch(); @@ -303,6 +311,7 @@ mod tests { "", true, None, + None, ); let mut batch = factory.new_batch(); @@ -337,6 +346,7 @@ mod tests { "", false, None, + None, ); let mut batch = factory.new_batch(); @@ -369,6 +379,7 @@ mod tests { "", false, None, + None, ); let mut batch = factory.new_batch(); @@ -405,6 +416,7 @@ mod tests { // pass in an unreasonably short timeout // which prevents the client request from reaching Clickhouse Some(Duration::from_millis(0)), + None, ); let mut batch = factory.new_batch(); @@ -439,6 +451,7 @@ mod tests { true, // pass in a reasonable timeout Some(Duration::from_millis(1000)), + None, ); let mut batch = factory.new_batch(); diff --git a/snuba/admin/README.md b/snuba/admin/README.md index 0d4e9c776f0..28eb2ea8a99 100644 --- a/snuba/admin/README.md +++ b/snuba/admin/README.md @@ -14,7 +14,7 @@ snuba admin The server should be running on http://127.0.0.1:1219 -note: please ensure that sentry devservices are up via `sentry devservices up --exclude=snuba` from within the sentry repository +note: please ensure that sentry devservices are up via `devservices up --exclude=snuba` from within the sentry repository # Developing the Javascript diff --git a/snuba/admin/static/tracing/query_display.tsx b/snuba/admin/static/tracing/query_display.tsx index 37f3ee045c9..8166257ea73 100644 --- a/snuba/admin/static/tracing/query_display.tsx +++ b/snuba/admin/static/tracing/query_display.tsx @@ -29,8 +29,10 @@ function QueryDisplay(props: { predefinedQueryOptions: Array; }) { const [storages, setStorages] = useState([]); + const [checkedGatherProfileEvents, setCheckedGatherProfileEvents] = useState(true); const [query, setQuery] = useState({ storage: getParamFromStorage("storage"), + gather_profile_events: checkedGatherProfileEvents }); const [queryResultHistory, setQueryResultHistory] = useState( getRecentHistory(HISTORY_KEY) @@ -53,6 +55,7 @@ function QueryDisplay(props: { } function executeQuery() { + query.gather_profile_events = checkedGatherProfileEvents; return props.api .executeTracingQuery(query as TracingRequest) .then((result) => { @@ -105,12 +108,10 @@ function QueryDisplay(props: {
) => - setQuery((prevQuery) => ({ - ...prevQuery, - gather_profile_events: evt.currentTarget.checked, - })) + checked={checkedGatherProfileEvents} + onChange={(evt: React.ChangeEvent) => { + setCheckedGatherProfileEvents(evt.currentTarget.checked); + } } onLabel="PROFILE" offLabel="NO PROFILE" diff --git a/snuba/admin/static/tracing/types.tsx b/snuba/admin/static/tracing/types.tsx index ae6c0f7bc7b..1dccd8f8f6b 100644 --- a/snuba/admin/static/tracing/types.tsx +++ b/snuba/admin/static/tracing/types.tsx @@ -1,7 +1,7 @@ export type TracingRequest = { sql: string; storage: string; - gather_profile_events?: boolean; + gather_profile_events: boolean; }; type TracingResult = { diff --git a/snuba/cli/rust_consumer.py b/snuba/cli/rust_consumer.py index 82ff3e6c79e..4d5a790435d 100644 --- a/snuba/cli/rust_consumer.py +++ b/snuba/cli/rust_consumer.py @@ -178,6 +178,12 @@ default=None, help="Optional timeout for batch writer client connecting and sending request to Clickhouse", ) +@click.option( + "--custom-envoy-request-timeout", + type=int, + default=None, + help="Optional request timeout value for Snuba -> Envoy -> Clickhouse connection", +) @click.option( "--quantized-rebalance-consumer-group-delay-secs", type=int, @@ -216,6 +222,7 @@ def rust_consumer( mutations_mode: bool, max_dlq_buffer_length: Optional[int], quantized_rebalance_consumer_group_delay_secs: Optional[int], + custom_envoy_request_timeout: Optional[int], ) -> None: """ Experimental alternative to `snuba consumer` @@ -236,6 +243,7 @@ def rust_consumer( slice_id=slice_id, group_instance_id=group_instance_id, quantized_rebalance_consumer_group_delay_secs=quantized_rebalance_consumer_group_delay_secs, + custom_envoy_request_timeout=custom_envoy_request_timeout, ) consumer_config_raw = json.dumps(asdict(consumer_config)) @@ -269,6 +277,7 @@ def rust_consumer( stop_at_timestamp, batch_write_timeout_ms, max_dlq_buffer_length, + custom_envoy_request_timeout, ) sys.exit(exitcode) diff --git a/snuba/clickhouse/native.py b/snuba/clickhouse/native.py index 6d65d124e84..bbf3b22747a 100644 --- a/snuba/clickhouse/native.py +++ b/snuba/clickhouse/native.py @@ -206,11 +206,11 @@ def query_execute() -> Any: result_data = query_execute() profile_data = ClickhouseProfile( - bytes=conn.last_query.profile_info.bytes or 0, - progress_bytes=conn.last_query.progress.bytes or 0, - blocks=conn.last_query.profile_info.blocks or 0, - rows=conn.last_query.profile_info.rows or 0, + blocks=getattr(conn.last_query.profile_info, "blocks", 0), + bytes=getattr(conn.last_query.profile_info, "bytes", 0), elapsed=conn.last_query.elapsed or 0.0, + progress_bytes=getattr(conn.last_query.progress, "bytes", 0), + rows=getattr(conn.last_query.profile_info, "rows", 0), ) if with_column_types: result = ClickhouseResult( diff --git a/snuba/consumers/consumer_config.py b/snuba/consumers/consumer_config.py index a8915324a81..03885f2a44a 100644 --- a/snuba/consumers/consumer_config.py +++ b/snuba/consumers/consumer_config.py @@ -157,6 +157,7 @@ def resolve_consumer_config( queued_min_messages: Optional[int] = None, group_instance_id: Optional[str] = None, quantized_rebalance_consumer_group_delay_secs: Optional[int] = None, + custom_envoy_request_timeout: Optional[int] = None, ) -> ConsumerConfig: """ Resolves the ClickHouse cluster and Kafka brokers, and the physical topic name diff --git a/snuba/datasets/configuration/events/storages/errors_ro.yaml b/snuba/datasets/configuration/events/storages/errors_ro.yaml index 789de54aee2..8bf039e65fd 100644 --- a/snuba/datasets/configuration/events/storages/errors_ro.yaml +++ b/snuba/datasets/configuration/events/storages/errors_ro.yaml @@ -279,15 +279,6 @@ allocation_policies: getsentry.tasks.backfill_grouping_records: max_threads: 7 concurrent_limit: 60 - - name: BytesScannedWindowAllocationPolicy - args: - required_tenant_types: - - organization_id - - referrer - default_config_overrides: - is_enforced: 1 - throttled_thread_number: 1 - org_limit_bytes_scanned: 10000000 query_processors: - processor: UniqInSelectAndHavingProcessor - processor: TupleUnaliaser diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml index 60523fa799c..268408fa587 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml @@ -6,7 +6,7 @@ storage: key: eap_spans set_key: events_analytics_platform -readiness_state: partial +readiness_state: complete schema: columns: diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/spans_num_attrs.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/spans_num_attrs.yaml index 0f7435e75ff..3ba712e32d3 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/spans_num_attrs.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/spans_num_attrs.yaml @@ -6,7 +6,7 @@ storage: key: spans_num_attrs set_key: events_analytics_platform -readiness_state: partial +readiness_state: complete schema: columns: diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/spans_str_attrs.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/spans_str_attrs.yaml index b0cffb91df7..e12f7e5fb5b 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/spans_str_attrs.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/spans_str_attrs.yaml @@ -6,7 +6,7 @@ storage: key: spans_str_attrs set_key: events_analytics_platform -readiness_state: partial +readiness_state: complete schema: columns: diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml index b9b1dcedc08..aed55bf6e64 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml @@ -4,7 +4,7 @@ name: uptime_monitor_checks storage: key: uptime_monitor_checks set_key: events_analytics_platform -readiness_state: partial +readiness_state: complete schema: columns: [ diff --git a/snuba/datasets/configuration/generic_metrics/entities/distributions_meta_tag_values.yaml b/snuba/datasets/configuration/generic_metrics/entities/distributions_meta_tag_values.yaml deleted file mode 100644 index 3671cbbaf43..00000000000 --- a/snuba/datasets/configuration/generic_metrics/entities/distributions_meta_tag_values.yaml +++ /dev/null @@ -1,51 +0,0 @@ -version: v1 -# This entity stores meta data about metrics, specifically the unique tag values for a given metric and tag key. - -kind: entity -name: generic_metrics_distributions_meta_tag_values - -schema: - [ - { name: project_id, type: UInt, args: { size: 64 } }, - { name: metric_id, type: UInt, args: { size: 64 } }, - { name: tag_key, type: UInt, args: { size: 64 } }, - { name: tag_value, type: String}, - { name: timestamp, type: DateTime }, - { name: retention_days, type: UInt, args: { size: 16 } }, - { - name: count, - type: AggregateFunction, - args: - { - func: sum, - arg_types: [{ type: Float, args: { size: 64 } }], - }, - }, - ] - -storages: - - storage: generic_metrics_distributions_meta_tag_values - translation_mappers: - functions: - - mapper: FunctionNameMapper - args: - from_name: sum - to_name: sumMerge - - mapper: FunctionNameMapper - args: - from_name: sumIf - to_name: sumMergeIf - -storage_selector: - selector: SimpleQueryStorageSelector - args: - storage: generic_metrics_distributions_meta_tag_values - -validators: - - validator: EntityRequiredColumnValidator - args: - required_filter_columns: ["project_id", "metric_id", "tag_key"] -required_time_column: timestamp -partition_key_column_name: project_id - -query_processors: [] diff --git a/snuba/datasets/configuration/generic_metrics/storages/distributions_meta_tag_values.yaml b/snuba/datasets/configuration/generic_metrics/storages/distributions_meta_tag_values.yaml deleted file mode 100644 index ccb1d4c0a73..00000000000 --- a/snuba/datasets/configuration/generic_metrics/storages/distributions_meta_tag_values.yaml +++ /dev/null @@ -1,59 +0,0 @@ -version: v1 -# This table stores meta data about metrics, specifically the unique tag values for a given metric and tag key. - -kind: readable_storage -name: generic_metrics_distributions_meta_tag_values -storage: - key: generic_metrics_distributions_meta_tag_values - set_key: generic_metrics_distributions -readiness_state: complete -schema: - columns: - [ - { name: project_id, type: UInt, args: { size: 64 } }, - { name: metric_id, type: UInt, args: { size: 64 } }, - { name: tag_key, type: UInt, args: { size: 64 } }, - { name: tag_value, type: String}, - { name: timestamp, type: DateTime }, - { name: retention_days, type: UInt, args: { size: 16 } }, - { - name: count, - type: AggregateFunction, - args: - { - func: sum, - arg_types: [{ type: Float, args: { size: 64 } }], - }, - }, - ] - local_table_name: generic_metric_distributions_meta_tag_values_local - dist_table_name: generic_metric_distributions_meta_tag_values_dist -required_time_column: timestamp -allocation_policies: - - name: ConcurrentRateLimitAllocationPolicy - args: - required_tenant_types: - - referrer - - organization_id - - project_id - default_config_overrides: - is_enforced: 1 - - name: BytesScannedWindowAllocationPolicy - args: - required_tenant_types: - - referrer - - organization_id - default_config_overrides: - is_enforced: 1 - throttled_thread_number: 1 - org_limit_bytes_scanned: 10000000 - - name: ReferrerGuardRailPolicy - args: - required_tenant_types: - - referrer - default_config_overrides: - is_enforced: 1 - is_active: 1 - -query_processors: - - processor: TupleUnaliaser diff --git a/snuba/datasets/storages/tags_hash_map.py b/snuba/datasets/storages/tags_hash_map.py index a0ab6bd2b2d..4cfdec05e6c 100644 --- a/snuba/datasets/storages/tags_hash_map.py +++ b/snuba/datasets/storages/tags_hash_map.py @@ -33,6 +33,10 @@ ) +def get_array_vals_hash(col_name: str) -> str: + return f"arrayMap(k -> cityHash64(k), {col_name})" + + def hash_map_int_column_definition(key_column_name: str, value_column_name: str) -> str: return ( f"arrayMap((k, v) -> cityHash64(concat(toString(k), '=', toString(v))), " diff --git a/snuba/manual_jobs/recreate_missing_eap_spans_materialized_views.py b/snuba/manual_jobs/recreate_missing_eap_spans_materialized_views.py new file mode 100755 index 00000000000..d83dc757789 --- /dev/null +++ b/snuba/manual_jobs/recreate_missing_eap_spans_materialized_views.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 + +from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster +from snuba.clusters.storage_sets import StorageSetKey +from snuba.manual_jobs import Job, JobLogger, JobSpec + +materialized_views = [ + "CREATE MATERIALIZED VIEW IF NOT EXISTS spans_num_attrs_3_mv TO spans_num_attrs_3_local (`organization_id` UInt64, `project_id` UInt64, `attr_key` String CODEC(ZSTD(1)), `attr_min_value` SimpleAggregateFunction(min, Float64), `attr_max_value` SimpleAggregateFunction(max, Float64), `timestamp` DateTime CODEC(DoubleDelta, ZSTD(1)), `retention_days` UInt16, `count` SimpleAggregateFunction(sum, UInt64)) AS SELECT organization_id, project_id, attrs.1 AS attr_key, attrs.2 AS attr_min_value, attrs.2 AS attr_max_value, toStartOfDay(_sort_timestamp) AS timestamp, retention_days, 1 AS count FROM eap_spans_2_local LEFT ARRAY JOIN arrayConcat(CAST(attr_num_0, 'Array(Tuple(String, Float64))'), CAST(attr_num_1, 'Array(Tuple(String, Float64))'), CAST(attr_num_2, 'Array(Tuple(String, Float64))'), CAST(attr_num_3, 'Array(Tuple(String, Float64))'), CAST(attr_num_4, 'Array(Tuple(String, Float64))'), CAST(attr_num_5, 'Array(Tuple(String, Float64))'), CAST(attr_num_6, 'Array(Tuple(String, Float64))'), CAST(attr_num_7, 'Array(Tuple(String, Float64))'), CAST(attr_num_8, 'Array(Tuple(String, Float64))'), CAST(attr_num_9, 'Array(Tuple(String, Float64))'), CAST(attr_num_10, 'Array(Tuple(String, Float64))'), CAST(attr_num_11, 'Array(Tuple(String, Float64))'), CAST(attr_num_12, 'Array(Tuple(String, Float64))'), CAST(attr_num_13, 'Array(Tuple(String, Float64))'), CAST(attr_num_14, 'Array(Tuple(String, Float64))'), CAST(attr_num_15, 'Array(Tuple(String, Float64))'), CAST(attr_num_16, 'Array(Tuple(String, Float64))'), CAST(attr_num_17, 'Array(Tuple(String, Float64))'), CAST(attr_num_18, 'Array(Tuple(String, Float64))'), CAST(attr_num_19, 'Array(Tuple(String, Float64))'), [('sentry.duration_ms', duration_micro / 1000)]) AS attrs GROUP BY organization_id, project_id, attrs.1, attrs.2, timestamp, retention_days", + "CREATE MATERIALIZED VIEW IF NOT EXISTS spans_str_attrs_3_mv TO spans_str_attrs_3_local (`organization_id` UInt64, `project_id` UInt64, `attr_key` String CODEC(ZSTD(1)), `attr_value` String CODEC(ZSTD(1)), `timestamp` DateTime CODEC(DoubleDelta, ZSTD(1)), `retention_days` UInt16, `count` SimpleAggregateFunction(sum, UInt64)) AS SELECT organization_id, project_id, attrs.1 AS attr_key, attrs.2 AS attr_value, toStartOfDay(_sort_timestamp) AS timestamp, retention_days, 1 AS count FROM eap_spans_2_local LEFT ARRAY JOIN arrayConcat(CAST(attr_str_0, 'Array(Tuple(String, String))'), CAST(attr_str_1, 'Array(Tuple(String, String))'), CAST(attr_str_2, 'Array(Tuple(String, String))'), CAST(attr_str_3, 'Array(Tuple(String, String))'), CAST(attr_str_4, 'Array(Tuple(String, String))'), CAST(attr_str_5, 'Array(Tuple(String, String))'), CAST(attr_str_6, 'Array(Tuple(String, String))'), CAST(attr_str_7, 'Array(Tuple(String, String))'), CAST(attr_str_8, 'Array(Tuple(String, String))'), CAST(attr_str_9, 'Array(Tuple(String, String))'), CAST(attr_str_10, 'Array(Tuple(String, String))'), CAST(attr_str_11, 'Array(Tuple(String, String))'), CAST(attr_str_12, 'Array(Tuple(String, String))'), CAST(attr_str_13, 'Array(Tuple(String, String))'), CAST(attr_str_14, 'Array(Tuple(String, String))'), CAST(attr_str_15, 'Array(Tuple(String, String))'), CAST(attr_str_16, 'Array(Tuple(String, String))'), CAST(attr_str_17, 'Array(Tuple(String, String))'), CAST(attr_str_18, 'Array(Tuple(String, String))'), CAST(attr_str_19, 'Array(Tuple(String, String))'), [('sentry.service', service), ('sentry.segment_name', segment_name), ('sentry.name', name)]) AS attrs GROUP BY organization_id, project_id, attr_key, attr_value, timestamp, retention_days", +] + + +class RecreateMissingEAPSpansMaterializedViews(Job): + def __init__(self, job_spec: JobSpec) -> None: + super().__init__(job_spec) + + def execute(self, logger: JobLogger) -> None: + cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM) + + for storage_node in cluster.get_local_nodes(): + connection = cluster.get_node_connection( + ClickhouseClientSettings.MIGRATE, + storage_node, + ) + for query in materialized_views: + logger.info("Executing query: {query}") + connection.execute(query=query) + + logger.info("complete") diff --git a/snuba/migrations/groups.py b/snuba/migrations/groups.py index 20078b1d2f3..0a5dcd3e75e 100644 --- a/snuba/migrations/groups.py +++ b/snuba/migrations/groups.py @@ -172,7 +172,7 @@ def __init__( MigrationGroup.EVENTS_ANALYTICS_PLATFORM: _MigrationGroup( loader=EventsAnalyticsPlatformLoader(), storage_sets_keys={StorageSetKey.EVENTS_ANALYTICS_PLATFORM}, - readiness_state=ReadinessState.PARTIAL, + readiness_state=ReadinessState.COMPLETE, ), MigrationGroup.GROUP_ATTRIBUTES: _MigrationGroup( loader=GroupAttributesLoader(), diff --git a/snuba/snuba_migrations/events_analytics_platform/0024_items.py b/snuba/snuba_migrations/events_analytics_platform/0024_items.py new file mode 100644 index 00000000000..5e697f8acd2 --- /dev/null +++ b/snuba/snuba_migrations/events_analytics_platform/0024_items.py @@ -0,0 +1,145 @@ +from typing import List, Sequence + +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import migration, operations, table_engines +from snuba.migrations.columns import MigrationModifiers as Modifiers +from snuba.migrations.operations import AddIndicesData, OperationTarget, SqlOperation +from snuba.utils.schemas import ( + UUID, + Bool, + Column, + DateTime, + Float, + Int, + Map, + String, + UInt, +) + +storage_set_name = StorageSetKey.EVENTS_ANALYTICS_PLATFORM +local_table_name = "eap_items_1_local" +dist_table_name = "eap_items_1_dist" +num_attr_buckets = 40 + +columns: List[Column[Modifiers]] = [ + Column("organization_id", UInt(64)), + Column("project_id", UInt(64)), + Column("item_type", UInt(8)), + Column("timestamp", DateTime(Modifiers(codecs=["DoubleDelta", "ZSTD(1)"]))), + Column("trace_id", UUID()), + Column("item_id", UInt(128)), + Column("sampling_weight", UInt(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))), + Column( + "retention_days", + UInt(16, modifiers=Modifiers(codecs=["T64", "ZSTD(1)"])), + ), + Column( + "attributes_bool", + Map( + String(), + Bool(), + ), + ), + Column( + "attributes_int", + Map( + String(), + Int(64), + ), + ), +] + +columns.extend( + [ + Column( + f"attributes_string_{i}", + Map( + String(), + String(), + modifiers=Modifiers( + codecs=["ZSTD(1)"], + ), + ), + ) + for i in range(num_attr_buckets) + ] +) + +columns.extend( + [ + Column( + f"attributes_float_{i}", + Map( + String(), + Float(64), + modifiers=Modifiers( + codecs=["ZSTD(1)"], + ), + ), + ) + for i in range(num_attr_buckets) + ] +) + + +indices: Sequence[AddIndicesData] = [ + AddIndicesData( + name="bf_trace_id", + expression="trace_id", + type="bloom_filter", + granularity=1, + ) +] + + +class Migration(migration.ClickhouseNodeMigration): + blocking = False + + def forwards_ops(self) -> Sequence[SqlOperation]: + res: List[SqlOperation] = [ + operations.CreateTable( + storage_set=storage_set_name, + table_name=local_table_name, + columns=columns, + engine=table_engines.ReplacingMergeTree( + primary_key="(organization_id, project_id, item_type, timestamp)", + order_by="(organization_id, project_id, item_type, timestamp, trace_id, item_id)", + partition_by="(retention_days, toMonday(timestamp))", + settings={"index_granularity": "8192"}, + storage_set=storage_set_name, + ttl="timestamp + toIntervalDay(retention_days)", + ), + target=OperationTarget.LOCAL, + ), + operations.CreateTable( + storage_set=storage_set_name, + table_name=dist_table_name, + columns=columns, + engine=table_engines.Distributed( + local_table_name=local_table_name, + sharding_key="cityHash64(reinterpretAsUInt128(trace_id))", + ), + target=OperationTarget.DISTRIBUTED, + ), + operations.AddIndices( + storage_set=storage_set_name, + table_name=local_table_name, + indices=indices, + target=OperationTarget.LOCAL, + ), + ] + return res + + def backwards_ops(self) -> Sequence[SqlOperation]: + return [ + operations.DropTable( + storage_set=storage_set_name, + table_name=local_table_name, + target=OperationTarget.LOCAL, + ), + operations.DropTable( + storage_set=storage_set_name, + table_name=dist_table_name, + target=OperationTarget.DISTRIBUTED, + ), + ] diff --git a/snuba/snuba_migrations/events_analytics_platform/0025_smart_autocomplete_index.py b/snuba/snuba_migrations/events_analytics_platform/0025_smart_autocomplete_index.py new file mode 100644 index 00000000000..533d0d98620 --- /dev/null +++ b/snuba/snuba_migrations/events_analytics_platform/0025_smart_autocomplete_index.py @@ -0,0 +1,143 @@ +from typing import Sequence + +from snuba.clusters.storage_sets import StorageSetKey +from snuba.datasets.storages.tags_hash_map import get_array_vals_hash +from snuba.migrations import migration, operations +from snuba.migrations.columns import MigrationModifiers as Modifiers +from snuba.utils.schemas import Array, Column, UInt + + +class Migration(migration.ClickhouseNodeMigration): + + blocking = False + storage_set_key = StorageSetKey.EVENTS_ANALYTICS_PLATFORM + granularity = "8192" + + local_table_name = "eap_trace_item_attrs_local" + dist_table_name = "eap_trace_item_attrs_dist" + mv_name = "eap_trace_item_attrs_mv" + + str_hash_map_col = "_str_attr_keys_hashes" + float_hash_map_col = "_float64_attr_keys_hashes" + + def forwards_ops(self) -> Sequence[operations.SqlOperation]: + return [ + # --- Str attrs ----- + operations.AddColumn( + storage_set=self.storage_set_key, + table_name=self.local_table_name, + column=Column( + name=self.str_hash_map_col, + type=Array( + UInt(64), + Modifiers( + materialized=get_array_vals_hash("mapKeys(attrs_string)") + ), + ), + ), + after="attrs_string", + target=operations.OperationTarget.LOCAL, + ), + operations.AddColumn( + storage_set=self.storage_set_key, + table_name=self.dist_table_name, + column=Column( + self.str_hash_map_col, + type=Array( + UInt(64), + Modifiers( + materialized=get_array_vals_hash("mapKeys(attrs_string)") + ), + ), + ), + after="attrs_string", + target=operations.OperationTarget.DISTRIBUTED, + ), + operations.AddIndex( + storage_set=self.storage_set_key, + table_name=self.local_table_name, + index_name=f"bf_{self.str_hash_map_col}", + index_expression=self.str_hash_map_col, + index_type="bloom_filter", + granularity=1, + target=operations.OperationTarget.LOCAL, + ), + # --- Num attrs ----- + operations.AddColumn( + storage_set=self.storage_set_key, + table_name=self.local_table_name, + column=Column( + name=self.float_hash_map_col, + type=Array( + UInt(64), + Modifiers(materialized=get_array_vals_hash("attrs_float64")), + ), + ), + after="attrs_float64", + target=operations.OperationTarget.LOCAL, + ), + operations.AddColumn( + storage_set=self.storage_set_key, + table_name=self.dist_table_name, + column=Column( + self.float_hash_map_col, + type=Array( + UInt(64), + Modifiers(materialized=get_array_vals_hash("attrs_float64")), + ), + ), + after="attrs_float64", + target=operations.OperationTarget.DISTRIBUTED, + ), + operations.AddIndex( + storage_set=self.storage_set_key, + table_name=self.local_table_name, + index_name=f"bf_{self.float_hash_map_col}", + index_expression=self.float_hash_map_col, + index_type="bloom_filter", + granularity=1, + target=operations.OperationTarget.LOCAL, + ), + ] + + def backwards_ops(self) -> Sequence[operations.SqlOperation]: + return [ + # --- Str attrs ----- + operations.DropColumn( + storage_set=self.storage_set_key, + table_name=self.dist_table_name, + column_name=self.str_hash_map_col, + target=operations.OperationTarget.DISTRIBUTED, + ), + operations.DropIndex( + storage_set=self.storage_set_key, + table_name=self.local_table_name, + index_name=f"bf_{self.str_hash_map_col}", + target=operations.OperationTarget.LOCAL, + ), + operations.DropColumn( + storage_set=self.storage_set_key, + table_name=self.local_table_name, + column_name=self.str_hash_map_col, + target=operations.OperationTarget.LOCAL, + ), + # --- Num attrs ----- + operations.DropColumn( + storage_set=self.storage_set_key, + table_name=self.dist_table_name, + column_name=self.float_hash_map_col, + target=operations.OperationTarget.DISTRIBUTED, + ), + operations.DropIndex( + storage_set=self.storage_set_key, + table_name=self.local_table_name, + index_name=f"bf_{self.float_hash_map_col}", + target=operations.OperationTarget.LOCAL, + ), + operations.DropColumn( + storage_set=self.storage_set_key, + table_name=self.local_table_name, + column_name=self.float_hash_map_col, + target=operations.OperationTarget.LOCAL, + ), + ] diff --git a/snuba/snuba_migrations/events_analytics_platform/0026_items_add_attributes_hash_map.py b/snuba/snuba_migrations/events_analytics_platform/0026_items_add_attributes_hash_map.py new file mode 100644 index 00000000000..03b5255b1c2 --- /dev/null +++ b/snuba/snuba_migrations/events_analytics_platform/0026_items_add_attributes_hash_map.py @@ -0,0 +1,61 @@ +from typing import Sequence + +from snuba.clickhouse.columns import Array, Column, UInt +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import migration, operations +from snuba.migrations.columns import MigrationModifiers as Modifiers +from snuba.migrations.operations import OperationTarget + +storage_set_name = StorageSetKey.EVENTS_ANALYTICS_PLATFORM +local_table_name = "eap_items_1_local" +dist_table_name = "eap_items_1_dist" +num_attr_buckets = 40 + + +def hash_map_column_name(attribute_type: str, i: int) -> str: + return f"_hash_map_{attribute_type}_{i}" + + +class Migration(migration.ClickhouseNodeMigration): + blocking = False + + def forwards_ops(self) -> Sequence[operations.SqlOperation]: + return [ + operations.AddColumn( + storage_set=storage_set_name, + table_name=table_name, + column=Column( + hash_map_column_name(attribute_type, i), + Array( + UInt(64), + Modifiers( + materialized=f"arrayMap(k -> cityHash64(k), mapKeys(attributes_{attribute_type}_{i}))", + ), + ), + ), + after=f"attributes_{attribute_type}_{i}", + target=target, + ) + for i in range(num_attr_buckets) + for attribute_type in {"string", "float"} + for (table_name, target) in [ + (local_table_name, OperationTarget.LOCAL), + (dist_table_name, OperationTarget.DISTRIBUTED), + ] + ] + + def backwards_ops(self) -> Sequence[operations.SqlOperation]: + return [ + operations.DropColumn( + storage_set=storage_set_name, + table_name=table_name, + column_name=hash_map_column_name(attribute_type, i), + target=target, + ) + for i in range(num_attr_buckets) + for attribute_type in {"string", "float"} + for (table_name, target) in [ + (dist_table_name, OperationTarget.DISTRIBUTED), + (local_table_name, OperationTarget.LOCAL), + ] + ] diff --git a/snuba/snuba_migrations/events_analytics_platform/0027_uptime_checks_add_column_in_incident.py b/snuba/snuba_migrations/events_analytics_platform/0027_uptime_checks_add_column_in_incident.py new file mode 100644 index 00000000000..a8a853826e5 --- /dev/null +++ b/snuba/snuba_migrations/events_analytics_platform/0027_uptime_checks_add_column_in_incident.py @@ -0,0 +1,45 @@ +from typing import Sequence + +from snuba.clickhouse.columns import Column, UInt +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import migration, operations +from snuba.migrations.operations import OperationTarget + +storage_set_name = StorageSetKey.EVENTS_ANALYTICS_PLATFORM +local_table_name = "uptime_monitor_checks_v2_local" +dist_table_name = "uptime_monitor_checks_v2_dist" + + +class Migration(migration.ClickhouseNodeMigration): + blocking = False + + def forwards_ops(self) -> Sequence[operations.SqlOperation]: + return [ + operations.AddColumn( + storage_set=storage_set_name, + table_name=table_name, + column=Column( + "incident_status", + UInt(16), + ), + target=target, + ) + for (table_name, target) in [ + (local_table_name, OperationTarget.LOCAL), + (dist_table_name, OperationTarget.DISTRIBUTED), + ] + ] + + def backwards_ops(self) -> Sequence[operations.SqlOperation]: + return [ + operations.DropColumn( + storage_set=storage_set_name, + table_name=table_name, + column_name="incident_status", + target=target, + ) + for (table_name, target) in [ + (dist_table_name, OperationTarget.DISTRIBUTED), + (local_table_name, OperationTarget.LOCAL), + ] + ] diff --git a/snuba/snuba_migrations/generic_metrics/0061_remove_distribution_meta_tag_values_mv.py b/snuba/snuba_migrations/generic_metrics/0061_remove_distribution_meta_tag_values_mv.py new file mode 100644 index 00000000000..119a01b8b2a --- /dev/null +++ b/snuba/snuba_migrations/generic_metrics/0061_remove_distribution_meta_tag_values_mv.py @@ -0,0 +1,67 @@ +from typing import Sequence + +from snuba.clickhouse.columns import AggregateFunction, Column, DateTime, String, UInt +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import migration, operations +from snuba.migrations.columns import MigrationModifiers as Modifiers +from snuba.migrations.operations import OperationTarget +from snuba.utils.schemas import Float + + +class Migration(migration.ClickhouseNodeMigration): + blocking = False + granularity = "2048" + tag_value_view_name = "generic_metric_distributions_meta_tag_values_mv" + tag_value_local_table_name = "generic_metric_distributions_meta_tag_values_local" + tag_value_dist_table_name = "generic_metric_distributions_meta_tag_values_dist" + tag_value_table_columns: Sequence[Column[Modifiers]] = [ + Column("project_id", UInt(64)), + Column("metric_id", UInt(64)), + Column("tag_key", UInt(64)), + Column("tag_value", String()), + Column("timestamp", DateTime(modifiers=Modifiers(codecs=["DoubleDelta"]))), + Column("retention_days", UInt(16)), + Column("count", AggregateFunction("sum", [Float(64)])), + ] + storage_set_key = StorageSetKey.GENERIC_METRICS_DISTRIBUTIONS + + def forwards_ops(self) -> Sequence[operations.SqlOperation]: + return [ + operations.DropTable( + storage_set=self.storage_set_key, + table_name=self.tag_value_view_name, + target=OperationTarget.LOCAL, + ) + ] + + def backwards_ops(self) -> Sequence[operations.SqlOperation]: + return [ + operations.CreateMaterializedView( + storage_set=self.storage_set_key, + view_name=self.tag_value_view_name, + columns=self.tag_value_table_columns, + destination_table_name=self.tag_value_local_table_name, + target=OperationTarget.LOCAL, + query=""" + SELECT + project_id, + metric_id, + tag_key, + tag_value, + toMonday(timestamp) as timestamp, + retention_days, + sumState(count_value) as count + FROM generic_metric_distributions_raw_local + ARRAY JOIN + tags.key AS tag_key, tags.raw_value AS tag_value + WHERE record_meta = 1 + GROUP BY + project_id, + metric_id, + tag_key, + tag_value, + timestamp, + retention_days + """, + ) + ] diff --git a/snuba/web/rpc/v1/endpoint_time_series.py b/snuba/web/rpc/v1/endpoint_time_series.py index 1b6cc1a755b..5e9702440bc 100644 --- a/snuba/web/rpc/v1/endpoint_time_series.py +++ b/snuba/web/rpc/v1/endpoint_time_series.py @@ -3,6 +3,7 @@ from typing import Type from sentry_protos.snuba.v1.endpoint_time_series_pb2 import ( + Expression, TimeSeriesRequest, TimeSeriesResponse, ) @@ -75,6 +76,19 @@ def _validate_time_buckets(request: TimeSeriesRequest) -> None: ) +def _convert_aggregations_to_expressions( + request: TimeSeriesRequest, +) -> TimeSeriesRequest: + if len(request.aggregations) > 0: + new_req = TimeSeriesRequest() + new_req.CopyFrom(request) + new_req.ClearField("aggregations") + for agg in request.aggregations: + new_req.expressions.append(Expression(aggregation=agg, label=agg.label)) + return new_req + return request + + class EndpointTimeSeries(RPCEndpoint[TimeSeriesRequest, TimeSeriesResponse]): @classmethod def version(cls) -> str: @@ -107,5 +121,6 @@ def _execute(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse: raise BadSnubaRPCRequestException( "This endpoint requires meta.trace_item_type to be set (are you requesting spans? logs?)" ) + in_msg = _convert_aggregations_to_expressions(in_msg) resolver = self.get_resolver(in_msg.meta.trace_item_type) return resolver.resolve(in_msg) diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py index eaf0139a7ae..748b9b091b7 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py @@ -1,12 +1,16 @@ import uuid from collections import defaultdict +from dataclasses import replace from datetime import datetime from typing import Any, Dict, Iterable from google.protobuf.json_format import MessageToDict from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1.endpoint_time_series_pb2 import DataPoint +from sentry_protos.snuba.v1.endpoint_time_series_pb2 import ( + Expression as ProtoExpression, +) from sentry_protos.snuba.v1.endpoint_time_series_pb2 import ( - DataPoint, TimeSeries, TimeSeriesRequest, TimeSeriesResponse, @@ -23,6 +27,7 @@ from snuba.query.data_source.simple import Entity from snuba.query.dsl import Functions as f from snuba.query.dsl import column +from snuba.query.expressions import Expression from snuba.query.logical import Query from snuba.query.query_settings import HTTPQuerySettings from snuba.request import Request as SnubaRequest @@ -48,6 +53,13 @@ attribute_key_to_expression, ) +OP_TO_EXPR = { + ProtoExpression.BinaryFormula.OP_ADD: f.plus, + ProtoExpression.BinaryFormula.OP_SUBTRACT: f.minus, + ProtoExpression.BinaryFormula.OP_MULTIPLY: f.multiply, + ProtoExpression.BinaryFormula.OP_DIVIDE: f.divide, +} + def _convert_result_timeseries( request: TimeSeriesRequest, data: list[Dict[str, Any]] @@ -94,7 +106,8 @@ def _convert_result_timeseries( # to convert the results, need to know which were the groupby columns and which ones # were aggregations - aggregation_labels = set([agg.label for agg in request.aggregations]) + aggregation_labels = set([expr.label for expr in request.expressions]) + group_by_labels = set([attr.name for attr in request.group_by]) # create a mapping with (all the group by attribute key,val pairs as strs, label name) @@ -154,7 +167,7 @@ def _convert_result_timeseries( extrapolation_context = ExtrapolationContext.from_row( timeseries.label, row_data ) - if extrapolation_context.is_data_present: + if row_data.get(timeseries.label, None) is not None: timeseries.data_points.append( DataPoint( data=row_data[timeseries.label], @@ -169,23 +182,19 @@ def _convert_result_timeseries( return result_timeseries.values() -def _build_query(request: TimeSeriesRequest) -> Query: - # TODO: This is hardcoded still - entity = Entity( - key=EntityKey("eap_spans"), - schema=get_entity(EntityKey("eap_spans")).get_data_model(), - sample=None, - ) +def _get_reliability_context_columns( + expressions: Iterable[ProtoExpression], +) -> list[SelectedExpression]: + # this reliability logic ignores formulas, meaning formulas may not properly support reliability + additional_context_columns = [] - aggregation_columns = [ - SelectedExpression( - name=aggregation.label, expression=aggregation_to_expression(aggregation) - ) - for aggregation in request.aggregations - ] + aggregates = [] + for e in expressions: + if e.WhichOneof("expression") == "aggregation": + # ignore formulas + aggregates.append(e.aggregation) - additional_context_columns = [] - for aggregation in request.aggregations: + for aggregation in aggregates: if ( aggregation.extrapolation_mode == ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED @@ -211,6 +220,41 @@ def _build_query(request: TimeSeriesRequest) -> Query: additional_context_columns.append( SelectedExpression(name=count_column.alias, expression=count_column) ) + return additional_context_columns + + +def _proto_expression_to_ast_expression(expr: ProtoExpression) -> Expression: + match expr.WhichOneof("expression"): + case "aggregation": + return aggregation_to_expression(expr.aggregation) + case "formula": + formula_expr = OP_TO_EXPR[expr.formula.op]( + _proto_expression_to_ast_expression(expr.formula.left), + _proto_expression_to_ast_expression(expr.formula.right), + ) + formula_expr = replace(formula_expr, alias=expr.label) + return formula_expr + case default: + raise ValueError(f"Unknown expression type: {default}") + + +def _build_query(request: TimeSeriesRequest) -> Query: + # TODO: This is hardcoded still + entity = Entity( + key=EntityKey("eap_spans"), + schema=get_entity(EntityKey("eap_spans")).get_data_model(), + sample=None, + ) + + aggregation_columns = [ + SelectedExpression( + name=expr.label, + expression=_proto_expression_to_ast_expression(expr), + ) + for expr in request.expressions + ] + + additional_context_columns = _get_reliability_context_columns(request.expressions) groupby_columns = [ SelectedExpression( @@ -302,6 +346,10 @@ def trace_item_type(cls) -> TraceItemType.ValueType: return TraceItemType.TRACE_ITEM_TYPE_SPAN def resolve(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse: + # aggregations field is deprecated, it gets converted to request.expressions + # if the user passes it in + assert len(in_msg.aggregations) == 0 + snuba_request = _build_snuba_request(in_msg) res = run_query( dataset=PluggableDataset(name="eap", all_entities=[]), diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_stats.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_stats.py index 04356c935e0..65652d9671e 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_stats.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_stats.py @@ -37,12 +37,13 @@ setup_trace_query_settings, ) from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException -from snuba.web.rpc.v1.endpoint_get_traces import _DEFAULT_ROW_LIMIT from snuba.web.rpc.v1.resolvers import ResolverTraceItemStats from snuba.web.rpc.v1.resolvers.R_eap_spans.common.common import ( attribute_key_to_expression, ) +_DEFAULT_ROW_LIMIT = 10_000 + MAX_BUCKETS = 100 DEFAULT_BUCKETS = 10 diff --git a/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_time_series.py b/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_time_series.py index 051b04355dc..f41b7710501 100644 --- a/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_time_series.py +++ b/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_time_series.py @@ -7,6 +7,7 @@ from google.protobuf.timestamp_pb2 import Timestamp from sentry_protos.snuba.v1.endpoint_time_series_pb2 import ( DataPoint, + Expression, TimeSeries, TimeSeriesRequest, TimeSeriesResponse, @@ -31,6 +32,7 @@ extract_response_meta, setup_trace_query_settings, ) +from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException from snuba.web.rpc.v1.resolvers import ResolverTimeSeries from snuba.web.rpc.v1.resolvers.common.aggregation import aggregation_to_expression from snuba.web.rpc.v1.resolvers.R_uptime_checks.common.common import ( @@ -40,6 +42,18 @@ ) +def _get_aggregation_label(expr: Expression) -> str: + match expr.WhichOneof("expression"): + case "aggregation": + return expr.aggregation.label + case "formula": + raise BadSnubaRPCRequestException( + "formulas are not supported for uptime checks" + ) + case default: + raise BadSnubaRPCRequestException(f"Unknown expression type: {default}") + + def _convert_result_timeseries( request: TimeSeriesRequest, data: list[Dict[str, Any]] ) -> Iterable[TimeSeries]: @@ -85,7 +99,10 @@ def _convert_result_timeseries( # to convert the results, need to know which were the groupby columns and which ones # were aggregations - aggregation_labels = set([agg.label for agg in request.aggregations]) + aggregation_labels = set( + [_get_aggregation_label(expr) for expr in request.expressions] + ) + group_by_labels = set([attr.name for attr in request.group_by]) # create a mapping with (all the group by attribute key,val pairs as strs, label name) @@ -158,16 +175,25 @@ def _build_query(request: TimeSeriesRequest) -> Query: sample=None, ) - aggregation_columns = [ - SelectedExpression( - name=aggregation.label, - expression=aggregation_to_expression( - aggregation, - attribute_key_to_expression(aggregation.key), - ), - ) - for aggregation in request.aggregations - ] + aggregation_columns = [] + for expr in request.expressions: + match expr.WhichOneof("expression"): + case "aggregation": + aggregation_columns.append( + SelectedExpression( + name=expr.aggregation.label, + expression=aggregation_to_expression( + expr.aggregation, + attribute_key_to_expression(expr.aggregation.key), + ), + ) + ) + case "formula": + raise BadSnubaRPCRequestException( + "formulas are not supported for uptime checks" + ) + case default: + raise BadSnubaRPCRequestException(f"Unknown expression type: {default}") groupby_columns = [ SelectedExpression( diff --git a/snuba/web/rpc/v1/resolvers/common/aggregation.py b/snuba/web/rpc/v1/resolvers/common/aggregation.py index 0e655bbda29..f57ffd1d23a 100644 --- a/snuba/web/rpc/v1/resolvers/common/aggregation.py +++ b/snuba/web/rpc/v1/resolvers/common/aggregation.py @@ -60,16 +60,12 @@ class ExtrapolationContext(ABC): confidence_interval: Any average_sample_rate: float sample_count: int + is_extrapolated: bool @property def is_data_present(self) -> bool: return self.sample_count > 0 - @property - @abstractmethod - def is_extrapolated(self) -> bool: - raise NotImplementedError - @property @abstractmethod def reliability(self) -> Reliability.ValueType: @@ -81,6 +77,7 @@ def from_row( row_data: Dict[str, Any], ) -> ExtrapolationContext: value = row_data[column_label] + is_extrapolated = False confidence_interval = None average_sample_rate = 0 @@ -105,6 +102,7 @@ def from_row( continue if custom_column_information.custom_column_id == "confidence_interval": + is_extrapolated = True confidence_interval = col_value is_percentile = custom_column_information.metadata.get( @@ -133,6 +131,7 @@ def from_row( percentile=percentile, granularity=granularity, width=width, + is_extrapolated=is_extrapolated, ) return GenericExtrapolationContext( @@ -140,18 +139,12 @@ def from_row( confidence_interval=confidence_interval, average_sample_rate=average_sample_rate, sample_count=sample_count, + is_extrapolated=is_extrapolated, ) @dataclass(frozen=True) class GenericExtrapolationContext(ExtrapolationContext): - @property - def is_extrapolated(self) -> bool: - # We infer if a column is extrapolated or not by the presence of the - # confidence interval. It will be present for extrapolated aggregates - # but not for non-extrapolated aggregates and scalars. - return self.confidence_interval is not None - @cached_property def reliability(self) -> Reliability.ValueType: if not self.is_extrapolated or not self.is_data_present: @@ -178,13 +171,6 @@ class PercentileExtrapolationContext(ExtrapolationContext): granularity: float width: float - @property - def is_extrapolated(self) -> bool: - # We infer if a column is extrapolated or not by the presence of the - # confidence interval. It will be present for extrapolated aggregates - # but not for non-extrapolated aggregates and scalars. - return self.confidence_interval is not None - @cached_property def reliability(self) -> Reliability.ValueType: if not self.is_extrapolated or not self.is_data_present: diff --git a/snuba/web/rpc/v1/resolvers/common/trace_item_table.py b/snuba/web/rpc/v1/resolvers/common/trace_item_table.py index b7ce8803980..d7a3a4d9815 100644 --- a/snuba/web/rpc/v1/resolvers/common/trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/common/trace_item_table.py @@ -44,14 +44,14 @@ def convert_results( extrapolation_context = ExtrapolationContext.from_row(column_name, row) res[column_name].attribute_name = column_name if value is None: - res[column_name].results.append(AttributeValue(is_null=True)) else: res[column_name].results.append(converters[column_name](value)) - if extrapolation_context.is_extrapolated: - res[column_name].reliabilities.append( - extrapolation_context.reliability - ) + + if extrapolation_context.is_extrapolated: + res[column_name].reliabilities.append( + extrapolation_context.reliability + ) column_ordering = {column.label: i for i, column in enumerate(request.columns)} diff --git a/tests/datasets/test_entity_factory.py b/tests/datasets/test_entity_factory.py index 9b12a747b8a..a14e883049c 100644 --- a/tests/datasets/test_entity_factory.py +++ b/tests/datasets/test_entity_factory.py @@ -50,7 +50,6 @@ EntityKey.GENERIC_METRICS_SETS_META, EntityKey.GENERIC_METRICS_SETS_META_TAG_VALUES, EntityKey.GENERIC_METRICS_DISTRIBUTIONS_META, - EntityKey.GENERIC_METRICS_DISTRIBUTIONS_META_TAG_VALUES, EntityKey.GENERIC_METRICS_COUNTERS_META, EntityKey.GENERIC_METRICS_COUNTERS_META_TAG_VALUES, EntityKey.UPTIME_CHECKS, diff --git a/tests/manual_jobs/recreate_missing_eap_spans_materialized_views.py b/tests/manual_jobs/recreate_missing_eap_spans_materialized_views.py new file mode 100644 index 00000000000..630906e41d1 --- /dev/null +++ b/tests/manual_jobs/recreate_missing_eap_spans_materialized_views.py @@ -0,0 +1,36 @@ +import uuid + +import pytest + +from snuba.clickhouse.errors import ClickhouseError +from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster +from snuba.clusters.storage_sets import StorageSetKey +from snuba.manual_jobs import JobSpec +from snuba.manual_jobs.job_status import JobStatus +from snuba.manual_jobs.runner import get_job_status, run_job + +_VIEWS = {"spans_num_attrs_3_mv", "spans_str_attrs_3_mv"} + + +@pytest.mark.redis_db +@pytest.mark.clickhouse_db +def test() -> None: + cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM) + storage_node = cluster.get_local_nodes()[0] + connection = cluster.get_node_connection( + ClickhouseClientSettings.QUERY, + storage_node, + ) + + for view in _VIEWS: + connection.execute(f"DROP VIEW {view}") + with pytest.raises(ClickhouseError): + connection.execute(f"SELECT * FROM {view}") + + job_id = uuid.uuid4().hex + run_job(JobSpec(job_id, "RecreateMissingEAPSpansMaterializedViews")) + + assert get_job_status(job_id) == JobStatus.FINISHED + + for view in _VIEWS: + connection.execute(f"SELECT * FROM {view}") diff --git a/tests/test_metrics_meta_api.py b/tests/test_metrics_meta_api.py index c87f8f8ce13..e0d7fff5d6a 100644 --- a/tests/test_metrics_meta_api.py +++ b/tests/test_metrics_meta_api.py @@ -163,73 +163,6 @@ def test_retrieve_tag_keys(self, test_entity: str) -> None: assert data["data"][0]["tag_key"] == 112358 assert data["data"][1]["tag_key"] == 132134 - def test_retrieve_tag_values(self, test_entity: str) -> None: - entity_name = f"{test_entity}_meta_tag_values" - query_str = f"""MATCH ({entity_name}) - SELECT tag_value - BY tag_value - WHERE project_id = {self.project_id} - AND metric_id = {self.metric_ids[0]} - AND tag_key = 112358 - AND timestamp >= toDateTime('{self.start_time.isoformat()}') - AND timestamp < toDateTime('{self.end_time.isoformat()}') - ORDER BY tag_value ASC - """ - response = self.app.post( - SNQL_ROUTE, - data=json.dumps( - { - "query": query_str, - "dataset": "generic_metrics", - "tenant_ids": {"referrer": "tests", "organization_id": 1}, - } - ), - ) - data = json.loads(response.data) - assert response.status_code == 200 - assert len(data["data"]) == 4, data - assert data["data"][0]["tag_value"] == "dev" - assert data["data"][1]["tag_value"] == "prod" - assert data["data"][2]["tag_value"] == "staging" - assert data["data"][3]["tag_value"] == "test" - - def test_retrieve_tag_values_with_count(self, test_entity: str) -> None: - entity_name = f"{test_entity}_meta_tag_values" - query_str = f"""MATCH ({entity_name}) - SELECT tag_value, sum(count) AS rank - BY tag_value - WHERE project_id = {self.project_id} - AND metric_id = {self.metric_ids[0]} - AND tag_key = 112358 - AND timestamp >= toDateTime('{self.start_time.isoformat()}') - AND timestamp < toDateTime('{self.end_time.isoformat()}') - ORDER BY tag_value ASC - """ - response = self.app.post( - SNQL_ROUTE, - data=json.dumps( - { - "query": query_str, - "dataset": "generic_metrics", - "tenant_ids": {"referrer": "tests", "organization_id": 1}, - } - ), - ) - data = json.loads(response.data) - assert response.status_code == 200 - assert len(data["data"]) == 4, data - - # in tests, only the counters table populates the count field - if test_entity == "generic_metrics_counters": - values = [12.0, 12.0, 8.0, 8.0] - else: - values = [0.0] * 4 - - assert data["data"][0] == {"tag_value": "dev", "rank": values[0]} - assert data["data"][1] == {"tag_value": "prod", "rank": values[1]} - assert data["data"][2] == {"tag_value": "staging", "rank": values[2]} - assert data["data"][3] == {"tag_value": "test", "rank": values[3]} - @pytest.mark.clickhouse_db @pytest.mark.redis_db diff --git a/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series.py b/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series.py index 2b5b609a4b1..cef2ad1fc17 100644 --- a/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series.py +++ b/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series.py @@ -9,6 +9,7 @@ from google.protobuf.timestamp_pb2 import Timestamp from sentry_protos.snuba.v1.endpoint_time_series_pb2 import ( DataPoint, + Expression, TimeSeries, TimeSeriesRequest, ) @@ -903,6 +904,98 @@ def test_OOM(self, monkeypatch: Any) -> None: sentry_sdk_mock.assert_called_once() assert metrics_mock.increment.call_args_list.count(call("OOM_query")) == 1 + def test_formula(self) -> None: + # store a a test metric with a value of 1, every second of one hour + granularity_secs = 300 + query_duration = 60 * 30 + store_spans_timeseries( + BASE_TIME, + 1, + 3600, + metrics=[DummyMetric("test_metric", get_value=lambda x: 1)], + ) + message = TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp( + seconds=int(BASE_TIME.timestamp() + query_duration) + ), + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, + ), + expressions=[ + Expression( + formula=Expression.BinaryFormula( + op=Expression.BinaryFormula.OP_ADD, + left=Expression( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey( + type=AttributeKey.TYPE_FLOAT, name="test_metric" + ), + label="sum", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ) + ), + right=Expression( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_AVG, + key=AttributeKey( + type=AttributeKey.TYPE_FLOAT, name="test_metric" + ), + label="avg", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ) + ), + ), + label="sum + avg", + ), + ], + granularity_secs=granularity_secs, + ) + response = EndpointTimeSeries().execute(message) + expected_buckets = [ + Timestamp(seconds=int(BASE_TIME.timestamp()) + secs) + for secs in range(0, query_duration, granularity_secs) + ] + expected_avg_timeseries = TimeSeries( + label="avg", + buckets=expected_buckets, + data_points=[ + DataPoint(data=1, data_present=True, sample_count=300) + for _ in range(len(expected_buckets)) + ], + ) + expected_sum_timeseries = TimeSeries( + label="sum", + buckets=expected_buckets, + data_points=[ + DataPoint(data=300, data_present=True) + for _ in range(len(expected_buckets)) + ], + ) + expected_formula_timeseries = TimeSeries( + label="sum + avg", + buckets=expected_buckets, + data_points=[ + DataPoint( + data=sum_datapoint.data + avg_datapoint.data, + data_present=True, + sample_count=sum_datapoint.sample_count, + ) + for sum_datapoint, avg_datapoint in zip( + expected_sum_timeseries.data_points, + expected_avg_timeseries.data_points, + ) + ], + ) + assert sorted(response.result_timeseries, key=lambda x: x.label) == [ + expected_formula_timeseries + ] + class TestUtils: def test_no_duplicate_labels(self) -> None: diff --git a/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series_extrapolation.py b/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series_extrapolation.py index 0efe8157f7e..6ba62aa34c6 100644 --- a/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series_extrapolation.py +++ b/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series_extrapolation.py @@ -7,6 +7,7 @@ from google.protobuf.timestamp_pb2 import Timestamp from sentry_protos.snuba.v1.endpoint_time_series_pb2 import ( DataPoint, + Expression, TimeSeries, TimeSeriesRequest, ) @@ -669,3 +670,84 @@ def test_average_sampling_rate(self) -> None: ], ), ] + + def test_formula(self) -> None: + # store a a test metric with a value of 1, every second for an hour + granularity_secs = 120 + query_duration = 3600 + sample_rate = 0.5 + metric_value = 10 + store_timeseries( + BASE_TIME, + 1, + 3600, + metrics=[DummyMetric("my_test_metric", get_value=lambda x: metric_value)], + measurements=[ + DummyMeasurement( + "client_sample_rate", + get_value=lambda s: sample_rate, + ) + ], + ) + + message = TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp( + seconds=int(BASE_TIME.timestamp() + query_duration) + ), + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, + ), + expressions=[ + Expression( + formula=Expression.BinaryFormula( + op=Expression.BinaryFormula.OP_ADD, + left=Expression( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey( + type=AttributeKey.TYPE_FLOAT, name="my_test_metric" + ), + label="sum(test_metric)", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED, + ) + ), + right=Expression( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey( + type=AttributeKey.TYPE_FLOAT, name="my_test_metric" + ), + label="sum(test_metric)", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED, + ) + ), + ), + label="sum + sum", + ), + ], + granularity_secs=granularity_secs, + ) + response = EndpointTimeSeries().execute(message) + expected_buckets = [ + Timestamp(seconds=int(BASE_TIME.timestamp()) + secs) + for secs in range(0, query_duration, granularity_secs) + ] + expected_sum_plus_sum = (granularity_secs * metric_value * 2) / sample_rate + assert sorted(response.result_timeseries, key=lambda x: x.label) == [ + TimeSeries( + label="sum + sum", + buckets=expected_buckets, + data_points=[ + DataPoint( + data=expected_sum_plus_sum, + data_present=True, + ) + for _ in range(len(expected_buckets)) + ], + ), + ] diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py index 50c18ff4bf5..642bf92b3a1 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py @@ -881,3 +881,105 @@ def test_formula(self) -> None: ], ), ] + + def test_aggregation_with_nulls(self) -> None: + spans_storage = get_storage(StorageKey("eap_spans")) + start = BASE_TIME + messages_a = [ + gen_message( + start - timedelta(minutes=i), + measurements={ + "custom_measurement": {"value": 1}, + "server_sample_rate": {"value": 1.0}, + }, + tags={"custom_tag": "a"}, + ) + for i in range(5) + ] + messages_b = [ + gen_message( + start - timedelta(minutes=i), + measurements={ + "custom_measurement2": {"value": 1}, + "server_sample_rate": {"value": 1.0}, + }, + tags={"custom_tag": "b"}, + ) + for i in range(5) + ] + write_raw_unprocessed_events(spans_storage, messages_a + messages_b) # type: ignore + + ts = Timestamp(seconds=int(BASE_TIME.timestamp())) + hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + message = TraceItemTableRequest( + meta=RequestMeta( + project_ids=[1], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, + ), + columns=[ + Column( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="custom_tag") + ), + Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, name="custom_measurement" + ), + label="sum(custom_measurement)", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED, + ) + ), + Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, name="custom_measurement2" + ), + label="sum(custom_measurement2)", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED, + ) + ), + ], + group_by=[ + AttributeKey(type=AttributeKey.TYPE_STRING, name="custom_tag"), + ], + order_by=[ + TraceItemTableRequest.OrderBy( + column=Column( + key=AttributeKey( + type=AttributeKey.TYPE_STRING, name="custom_tag" + ) + ), + ), + ], + limit=5, + ) + response = EndpointTraceItemTable().execute(message) + assert response.column_values == [ + TraceItemColumnValues( + attribute_name="custom_tag", + results=[AttributeValue(val_str="a"), AttributeValue(val_str="b")], + ), + TraceItemColumnValues( + attribute_name="sum(custom_measurement)", + results=[AttributeValue(val_double=5), AttributeValue(is_null=True)], + reliabilities=[ + Reliability.RELIABILITY_LOW, + Reliability.RELIABILITY_UNSPECIFIED, + ], + ), + TraceItemColumnValues( + attribute_name="sum(custom_measurement2)", + results=[AttributeValue(is_null=True), AttributeValue(val_double=5)], + reliabilities=[ + Reliability.RELIABILITY_UNSPECIFIED, + Reliability.RELIABILITY_LOW, + ], + ), + ] diff --git a/tests/web/test_db_query.py b/tests/web/test_db_query.py index ec7fa4586ef..e7d87b91422 100644 --- a/tests/web/test_db_query.py +++ b/tests/web/test_db_query.py @@ -374,17 +374,6 @@ def test_db_query_success() -> None: "quota_unit": NO_UNITS, "suggestion": NO_SUGGESTION, }, - "BytesScannedWindowAllocationPolicy": { - "can_run": True, - "max_threads": 10, - "explanation": {"storage_key": "StorageKey.ERRORS_RO"}, - "is_throttled": False, - "throttle_threshold": 10000000, - "rejection_threshold": MAX_THRESHOLD, - "quota_used": 0, - "quota_unit": "bytes", - "suggestion": "The feature, organization/project is scanning too many bytes, this usually means they are abusing that API", - }, }, }