Skip to content

Commit

Permalink
Merge branch 'master' into rachel/aggregateIf
Browse files Browse the repository at this point in the history
  • Loading branch information
xurui-c authored Feb 14, 2025
2 parents dca9e59 + 2dea118 commit c8d8a92
Show file tree
Hide file tree
Showing 27 changed files with 869 additions and 87 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
black==22.6.0
blinker==1.5
click==8.1.7
clickhouse-driver==0.2.6
clickhouse-driver==0.2.9
confluent-kafka==2.7.0
datadog==0.21.0
devservices==1.0.16
Expand Down
1 change: 1 addition & 0 deletions rust_snuba/benches/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ fn create_factory(
},
stop_at_timestamp: None,
batch_write_timeout: None,
custom_envoy_request_timeout: None,
};
Box::new(factory)
}
Expand Down
4 changes: 4 additions & 0 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub fn consumer(
stop_at_timestamp: Option<i64>,
batch_write_timeout_ms: Option<u64>,
max_dlq_buffer_length: Option<usize>,
custom_envoy_request_timeout: Option<u64>,
) -> usize {
py.allow_threads(|| {
consumer_impl(
Expand All @@ -65,6 +66,7 @@ pub fn consumer(
batch_write_timeout_ms,
mutations_mode,
max_dlq_buffer_length,
custom_envoy_request_timeout,
)
})
}
Expand All @@ -87,6 +89,7 @@ pub fn consumer_impl(
batch_write_timeout_ms: Option<u64>,
mutations_mode: bool,
max_dlq_buffer_length: Option<usize>,
custom_envoy_request_timeout: Option<u64>,
) -> usize {
setup_logging();

Expand Down Expand Up @@ -281,6 +284,7 @@ pub fn consumer_impl(
accountant_topic_config: consumer_config.accountant_topic,
stop_at_timestamp,
batch_write_timeout,
custom_envoy_request_timeout,
};

StreamProcessor::with_kafka(config, factory, topic, dlq_policy)
Expand Down
2 changes: 2 additions & 0 deletions rust_snuba/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct ConsumerStrategyFactory {
pub accountant_topic_config: config::TopicConfig,
pub stop_at_timestamp: Option<i64>,
pub batch_write_timeout: Option<Duration>,
pub custom_envoy_request_timeout: Option<u64>,
}

impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
Expand Down Expand Up @@ -117,6 +118,7 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
&self.storage_config.clickhouse_cluster.password,
self.async_inserts,
self.batch_write_timeout,
self.custom_envoy_request_timeout,
);

let accumulator = Arc::new(
Expand Down
15 changes: 14 additions & 1 deletion rust_snuba/src/strategies/clickhouse/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ impl BatchFactory {
clickhouse_password: &str,
async_inserts: bool,
batch_write_timeout: Option<Duration>,
custom_envoy_request_timeout: Option<u64>,
) -> Self {
let mut headers = HeaderMap::with_capacity(5);
let mut headers = HeaderMap::with_capacity(6);
headers.insert(CONNECTION, HeaderValue::from_static("keep-alive"));
headers.insert(ACCEPT_ENCODING, HeaderValue::from_static("gzip,deflate"));
headers.insert(
Expand All @@ -59,6 +60,12 @@ impl BatchFactory {
"X-ClickHouse-Database",
HeaderValue::from_str(database).unwrap(),
);
if let Some(custom_envoy_request_timeout) = custom_envoy_request_timeout {
headers.insert(
"x-envoy-upstream-rq-per-try-timeout-ms",
HeaderValue::from_str(&custom_envoy_request_timeout.to_string()).unwrap(),
);
}

let mut query_params = String::new();
query_params.push_str("load_balancing=in_order&insert_distributed_sync=1");
Expand Down Expand Up @@ -268,6 +275,7 @@ mod tests {
"",
false,
None,
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -303,6 +311,7 @@ mod tests {
"",
true,
None,
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -337,6 +346,7 @@ mod tests {
"",
false,
None,
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -369,6 +379,7 @@ mod tests {
"",
false,
None,
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -405,6 +416,7 @@ mod tests {
// pass in an unreasonably short timeout
// which prevents the client request from reaching Clickhouse
Some(Duration::from_millis(0)),
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -439,6 +451,7 @@ mod tests {
true,
// pass in a reasonable timeout
Some(Duration::from_millis(1000)),
None,
);

let mut batch = factory.new_batch();
Expand Down
13 changes: 7 additions & 6 deletions snuba/admin/static/tracing/query_display.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ function QueryDisplay(props: {
predefinedQueryOptions: Array<PredefinedQuery>;
}) {
const [storages, setStorages] = useState<string[]>([]);
const [checkedGatherProfileEvents, setCheckedGatherProfileEvents] = useState<boolean>(true);
const [query, setQuery] = useState<QueryState>({
storage: getParamFromStorage("storage"),
gather_profile_events: checkedGatherProfileEvents
});
const [queryResultHistory, setQueryResultHistory] = useState<TracingResult[]>(
getRecentHistory(HISTORY_KEY)
Expand All @@ -53,6 +55,7 @@ function QueryDisplay(props: {
}

function executeQuery() {
query.gather_profile_events = checkedGatherProfileEvents;
return props.api
.executeTracingQuery(query as TracingRequest)
.then((result) => {
Expand Down Expand Up @@ -105,12 +108,10 @@ function QueryDisplay(props: {
</div>
<div style={{ display: "flex", alignItems: "center", gap: "1rem" }}>
<Switch
checked={query.gather_profile_events ?? true}
onChange={(evt: React.ChangeEvent<HTMLInputElement>) =>
setQuery((prevQuery) => ({
...prevQuery,
gather_profile_events: evt.currentTarget.checked,
}))
checked={checkedGatherProfileEvents}
onChange={(evt: React.ChangeEvent<HTMLInputElement>) => {
setCheckedGatherProfileEvents(evt.currentTarget.checked);
}
}
onLabel="PROFILE"
offLabel="NO PROFILE"
Expand Down
2 changes: 1 addition & 1 deletion snuba/admin/static/tracing/types.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export type TracingRequest = {
sql: string;
storage: string;
gather_profile_events?: boolean;
gather_profile_events: boolean;
};

type TracingResult = {
Expand Down
9 changes: 9 additions & 0 deletions snuba/cli/rust_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@
default=None,
help="Optional timeout for batch writer client connecting and sending request to Clickhouse",
)
@click.option(
"--custom-envoy-request-timeout",
type=int,
default=None,
help="Optional request timeout value for Snuba -> Envoy -> Clickhouse connection",
)
@click.option(
"--quantized-rebalance-consumer-group-delay-secs",
type=int,
Expand Down Expand Up @@ -216,6 +222,7 @@ def rust_consumer(
mutations_mode: bool,
max_dlq_buffer_length: Optional[int],
quantized_rebalance_consumer_group_delay_secs: Optional[int],
custom_envoy_request_timeout: Optional[int],
) -> None:
"""
Experimental alternative to `snuba consumer`
Expand All @@ -236,6 +243,7 @@ def rust_consumer(
slice_id=slice_id,
group_instance_id=group_instance_id,
quantized_rebalance_consumer_group_delay_secs=quantized_rebalance_consumer_group_delay_secs,
custom_envoy_request_timeout=custom_envoy_request_timeout,
)

consumer_config_raw = json.dumps(asdict(consumer_config))
Expand Down Expand Up @@ -269,6 +277,7 @@ def rust_consumer(
stop_at_timestamp,
batch_write_timeout_ms,
max_dlq_buffer_length,
custom_envoy_request_timeout,
)

sys.exit(exitcode)
8 changes: 4 additions & 4 deletions snuba/clickhouse/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ def query_execute() -> Any:
result_data = query_execute()

profile_data = ClickhouseProfile(
bytes=conn.last_query.profile_info.bytes or 0,
progress_bytes=conn.last_query.progress.bytes or 0,
blocks=conn.last_query.profile_info.blocks or 0,
rows=conn.last_query.profile_info.rows or 0,
blocks=getattr(conn.last_query.profile_info, "blocks", 0),
bytes=getattr(conn.last_query.profile_info, "bytes", 0),
elapsed=conn.last_query.elapsed or 0.0,
progress_bytes=getattr(conn.last_query.progress, "bytes", 0),
rows=getattr(conn.last_query.profile_info, "rows", 0),
)
if with_column_types:
result = ClickhouseResult(
Expand Down
1 change: 1 addition & 0 deletions snuba/consumers/consumer_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def resolve_consumer_config(
queued_min_messages: Optional[int] = None,
group_instance_id: Optional[str] = None,
quantized_rebalance_consumer_group_delay_secs: Optional[int] = None,
custom_envoy_request_timeout: Optional[int] = None,
) -> ConsumerConfig:
"""
Resolves the ClickHouse cluster and Kafka brokers, and the physical topic name
Expand Down
9 changes: 0 additions & 9 deletions snuba/datasets/configuration/events/storages/errors_ro.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -279,15 +279,6 @@ allocation_policies:
getsentry.tasks.backfill_grouping_records:
max_threads: 7
concurrent_limit: 60
- name: BytesScannedWindowAllocationPolicy
args:
required_tenant_types:
- organization_id
- referrer
default_config_overrides:
is_enforced: 1
throttled_thread_number: 1
org_limit_bytes_scanned: 10000000
query_processors:
- processor: UniqInSelectAndHavingProcessor
- processor: TupleUnaliaser
Expand Down
4 changes: 4 additions & 0 deletions snuba/datasets/storages/tags_hash_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
)


def get_array_vals_hash(col_name: str) -> str:
return f"arrayMap(k -> cityHash64(k), {col_name})"


def hash_map_int_column_definition(key_column_name: str, value_column_name: str) -> str:
return (
f"arrayMap((k, v) -> cityHash64(concat(toString(k), '=', toString(v))), "
Expand Down
29 changes: 29 additions & 0 deletions snuba/manual_jobs/recreate_missing_eap_spans_materialized_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env python3

from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster
from snuba.clusters.storage_sets import StorageSetKey
from snuba.manual_jobs import Job, JobLogger, JobSpec

materialized_views = [
"CREATE MATERIALIZED VIEW IF NOT EXISTS spans_num_attrs_3_mv TO spans_num_attrs_3_local (`organization_id` UInt64, `project_id` UInt64, `attr_key` String CODEC(ZSTD(1)), `attr_min_value` SimpleAggregateFunction(min, Float64), `attr_max_value` SimpleAggregateFunction(max, Float64), `timestamp` DateTime CODEC(DoubleDelta, ZSTD(1)), `retention_days` UInt16, `count` SimpleAggregateFunction(sum, UInt64)) AS SELECT organization_id, project_id, attrs.1 AS attr_key, attrs.2 AS attr_min_value, attrs.2 AS attr_max_value, toStartOfDay(_sort_timestamp) AS timestamp, retention_days, 1 AS count FROM eap_spans_2_local LEFT ARRAY JOIN arrayConcat(CAST(attr_num_0, 'Array(Tuple(String, Float64))'), CAST(attr_num_1, 'Array(Tuple(String, Float64))'), CAST(attr_num_2, 'Array(Tuple(String, Float64))'), CAST(attr_num_3, 'Array(Tuple(String, Float64))'), CAST(attr_num_4, 'Array(Tuple(String, Float64))'), CAST(attr_num_5, 'Array(Tuple(String, Float64))'), CAST(attr_num_6, 'Array(Tuple(String, Float64))'), CAST(attr_num_7, 'Array(Tuple(String, Float64))'), CAST(attr_num_8, 'Array(Tuple(String, Float64))'), CAST(attr_num_9, 'Array(Tuple(String, Float64))'), CAST(attr_num_10, 'Array(Tuple(String, Float64))'), CAST(attr_num_11, 'Array(Tuple(String, Float64))'), CAST(attr_num_12, 'Array(Tuple(String, Float64))'), CAST(attr_num_13, 'Array(Tuple(String, Float64))'), CAST(attr_num_14, 'Array(Tuple(String, Float64))'), CAST(attr_num_15, 'Array(Tuple(String, Float64))'), CAST(attr_num_16, 'Array(Tuple(String, Float64))'), CAST(attr_num_17, 'Array(Tuple(String, Float64))'), CAST(attr_num_18, 'Array(Tuple(String, Float64))'), CAST(attr_num_19, 'Array(Tuple(String, Float64))'), [('sentry.duration_ms', duration_micro / 1000)]) AS attrs GROUP BY organization_id, project_id, attrs.1, attrs.2, timestamp, retention_days",
"CREATE MATERIALIZED VIEW IF NOT EXISTS spans_str_attrs_3_mv TO spans_str_attrs_3_local (`organization_id` UInt64, `project_id` UInt64, `attr_key` String CODEC(ZSTD(1)), `attr_value` String CODEC(ZSTD(1)), `timestamp` DateTime CODEC(DoubleDelta, ZSTD(1)), `retention_days` UInt16, `count` SimpleAggregateFunction(sum, UInt64)) AS SELECT organization_id, project_id, attrs.1 AS attr_key, attrs.2 AS attr_value, toStartOfDay(_sort_timestamp) AS timestamp, retention_days, 1 AS count FROM eap_spans_2_local LEFT ARRAY JOIN arrayConcat(CAST(attr_str_0, 'Array(Tuple(String, String))'), CAST(attr_str_1, 'Array(Tuple(String, String))'), CAST(attr_str_2, 'Array(Tuple(String, String))'), CAST(attr_str_3, 'Array(Tuple(String, String))'), CAST(attr_str_4, 'Array(Tuple(String, String))'), CAST(attr_str_5, 'Array(Tuple(String, String))'), CAST(attr_str_6, 'Array(Tuple(String, String))'), CAST(attr_str_7, 'Array(Tuple(String, String))'), CAST(attr_str_8, 'Array(Tuple(String, String))'), CAST(attr_str_9, 'Array(Tuple(String, String))'), CAST(attr_str_10, 'Array(Tuple(String, String))'), CAST(attr_str_11, 'Array(Tuple(String, String))'), CAST(attr_str_12, 'Array(Tuple(String, String))'), CAST(attr_str_13, 'Array(Tuple(String, String))'), CAST(attr_str_14, 'Array(Tuple(String, String))'), CAST(attr_str_15, 'Array(Tuple(String, String))'), CAST(attr_str_16, 'Array(Tuple(String, String))'), CAST(attr_str_17, 'Array(Tuple(String, String))'), CAST(attr_str_18, 'Array(Tuple(String, String))'), CAST(attr_str_19, 'Array(Tuple(String, String))'), [('sentry.service', service), ('sentry.segment_name', segment_name), ('sentry.name', name)]) AS attrs GROUP BY organization_id, project_id, attr_key, attr_value, timestamp, retention_days",
]


class RecreateMissingEAPSpansMaterializedViews(Job):
def __init__(self, job_spec: JobSpec) -> None:
super().__init__(job_spec)

def execute(self, logger: JobLogger) -> None:
cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM)

for storage_node in cluster.get_local_nodes():
connection = cluster.get_node_connection(
ClickhouseClientSettings.MIGRATE,
storage_node,
)
for query in materialized_views:
logger.info("Executing query: {query}")
connection.execute(query=query)

logger.info("complete")
Loading

0 comments on commit c8d8a92

Please sign in to comment.