Skip to content

Commit

Permalink
Merge branch 'master' into rachel/aggregateIf
Browse files Browse the repository at this point in the history
  • Loading branch information
xurui-c authored Feb 18, 2025
2 parents ce2571e + dfbaae0 commit 2c9aeeb
Show file tree
Hide file tree
Showing 43 changed files with 994 additions and 277 deletions.
4 changes: 2 additions & 2 deletions Brewfile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# necessary for rust-snuba
brew 'cmake'
brew 'cmake' # for rust-snuba
brew 'protobuf' # for rust-snuba > sentry_protos
3 changes: 2 additions & 1 deletion docs/source/contributing/environment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ and, in another terminal::
cd ../sentry
git checkout master
git pull
sentry devservices up --exclude=snuba
devservices up
docker stop snuba-snuba-1 snuba-clickhouse-1

This will get the most recent version of Sentry on master, and bring up all snuba's dependencies.

Expand Down
2 changes: 1 addition & 1 deletion docs/source/getstarted.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ in ``~/.sentry/sentry.conf.py``::

And then use::

sentry devservices up --exclude=snuba
devservices up --exclude=snuba

Note that Snuba assumes that everything is running on UTC time. Otherwise
you may experience issues with timezone mismatches.
Expand Down
4 changes: 2 additions & 2 deletions docs/source/migrations/modes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Enabling Local Mode
In your local ``server.py``, set ``SENTRY_DISTRIBUTED_CLICKHOUSE_TABLES``
to False. This is the default setting, so configuration is already
set up for local mode migrations. Start up the corresponding ClickHouse
container (``sentry devservices up clickhouse``).
container (``devservices up clickhouse``).

Now, run migrations as expected (``snuba migrations migrate --force``).

Expand All @@ -36,7 +36,7 @@ Enabling Distributed Mode
============================

In your local ``server.py``, set ``SENTRY_DISTRIBUTED_CLICKHOUSE_TABLES``
to True. Start up the corresponding ClickHouse container (``sentry devservices up clickhouse``).
to True. Start up the corresponding ClickHouse container (``devservices up clickhouse``).
Make sure that the Zookeeper container is also running; without it, distributed migrations
will not work properly.

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
black==22.6.0
blinker==1.5
click==8.1.7
clickhouse-driver==0.2.6
clickhouse-driver==0.2.9
confluent-kafka==2.7.0
datadog==0.21.0
devservices==1.0.16
Expand Down
1 change: 1 addition & 0 deletions rust_snuba/benches/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ fn create_factory(
},
stop_at_timestamp: None,
batch_write_timeout: None,
custom_envoy_request_timeout: None,
};
Box::new(factory)
}
Expand Down
4 changes: 4 additions & 0 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub fn consumer(
stop_at_timestamp: Option<i64>,
batch_write_timeout_ms: Option<u64>,
max_dlq_buffer_length: Option<usize>,
custom_envoy_request_timeout: Option<u64>,
) -> usize {
py.allow_threads(|| {
consumer_impl(
Expand All @@ -65,6 +66,7 @@ pub fn consumer(
batch_write_timeout_ms,
mutations_mode,
max_dlq_buffer_length,
custom_envoy_request_timeout,
)
})
}
Expand All @@ -87,6 +89,7 @@ pub fn consumer_impl(
batch_write_timeout_ms: Option<u64>,
mutations_mode: bool,
max_dlq_buffer_length: Option<usize>,
custom_envoy_request_timeout: Option<u64>,
) -> usize {
setup_logging();

Expand Down Expand Up @@ -281,6 +284,7 @@ pub fn consumer_impl(
accountant_topic_config: consumer_config.accountant_topic,
stop_at_timestamp,
batch_write_timeout,
custom_envoy_request_timeout,
};

StreamProcessor::with_kafka(config, factory, topic, dlq_policy)
Expand Down
2 changes: 2 additions & 0 deletions rust_snuba/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct ConsumerStrategyFactory {
pub accountant_topic_config: config::TopicConfig,
pub stop_at_timestamp: Option<i64>,
pub batch_write_timeout: Option<Duration>,
pub custom_envoy_request_timeout: Option<u64>,
}

impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
Expand Down Expand Up @@ -117,6 +118,7 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
&self.storage_config.clickhouse_cluster.password,
self.async_inserts,
self.batch_write_timeout,
self.custom_envoy_request_timeout,
);

let accumulator = Arc::new(
Expand Down
15 changes: 14 additions & 1 deletion rust_snuba/src/strategies/clickhouse/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ impl BatchFactory {
clickhouse_password: &str,
async_inserts: bool,
batch_write_timeout: Option<Duration>,
custom_envoy_request_timeout: Option<u64>,
) -> Self {
let mut headers = HeaderMap::with_capacity(5);
let mut headers = HeaderMap::with_capacity(6);
headers.insert(CONNECTION, HeaderValue::from_static("keep-alive"));
headers.insert(ACCEPT_ENCODING, HeaderValue::from_static("gzip,deflate"));
headers.insert(
Expand All @@ -59,6 +60,12 @@ impl BatchFactory {
"X-ClickHouse-Database",
HeaderValue::from_str(database).unwrap(),
);
if let Some(custom_envoy_request_timeout) = custom_envoy_request_timeout {
headers.insert(
"x-envoy-upstream-rq-per-try-timeout-ms",
HeaderValue::from_str(&custom_envoy_request_timeout.to_string()).unwrap(),
);
}

let mut query_params = String::new();
query_params.push_str("load_balancing=in_order&insert_distributed_sync=1");
Expand Down Expand Up @@ -268,6 +275,7 @@ mod tests {
"",
false,
None,
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -303,6 +311,7 @@ mod tests {
"",
true,
None,
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -337,6 +346,7 @@ mod tests {
"",
false,
None,
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -369,6 +379,7 @@ mod tests {
"",
false,
None,
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -405,6 +416,7 @@ mod tests {
// pass in an unreasonably short timeout
// which prevents the client request from reaching Clickhouse
Some(Duration::from_millis(0)),
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -439,6 +451,7 @@ mod tests {
true,
// pass in a reasonable timeout
Some(Duration::from_millis(1000)),
None,
);

let mut batch = factory.new_batch();
Expand Down
2 changes: 1 addition & 1 deletion snuba/admin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ snuba admin

The server should be running on http://127.0.0.1:1219

note: please ensure that sentry devservices are up via `sentry devservices up --exclude=snuba` from within the sentry repository
note: please ensure that sentry devservices are up via `devservices up --exclude=snuba` from within the sentry repository

# Developing the Javascript

Expand Down
13 changes: 7 additions & 6 deletions snuba/admin/static/tracing/query_display.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ function QueryDisplay(props: {
predefinedQueryOptions: Array<PredefinedQuery>;
}) {
const [storages, setStorages] = useState<string[]>([]);
const [checkedGatherProfileEvents, setCheckedGatherProfileEvents] = useState<boolean>(true);
const [query, setQuery] = useState<QueryState>({
storage: getParamFromStorage("storage"),
gather_profile_events: checkedGatherProfileEvents
});
const [queryResultHistory, setQueryResultHistory] = useState<TracingResult[]>(
getRecentHistory(HISTORY_KEY)
Expand All @@ -53,6 +55,7 @@ function QueryDisplay(props: {
}

function executeQuery() {
query.gather_profile_events = checkedGatherProfileEvents;
return props.api
.executeTracingQuery(query as TracingRequest)
.then((result) => {
Expand Down Expand Up @@ -105,12 +108,10 @@ function QueryDisplay(props: {
</div>
<div style={{ display: "flex", alignItems: "center", gap: "1rem" }}>
<Switch
checked={query.gather_profile_events ?? true}
onChange={(evt: React.ChangeEvent<HTMLInputElement>) =>
setQuery((prevQuery) => ({
...prevQuery,
gather_profile_events: evt.currentTarget.checked,
}))
checked={checkedGatherProfileEvents}
onChange={(evt: React.ChangeEvent<HTMLInputElement>) => {
setCheckedGatherProfileEvents(evt.currentTarget.checked);
}
}
onLabel="PROFILE"
offLabel="NO PROFILE"
Expand Down
2 changes: 1 addition & 1 deletion snuba/admin/static/tracing/types.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export type TracingRequest = {
sql: string;
storage: string;
gather_profile_events?: boolean;
gather_profile_events: boolean;
};

type TracingResult = {
Expand Down
9 changes: 9 additions & 0 deletions snuba/cli/rust_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@
default=None,
help="Optional timeout for batch writer client connecting and sending request to Clickhouse",
)
@click.option(
"--custom-envoy-request-timeout",
type=int,
default=None,
help="Optional request timeout value for Snuba -> Envoy -> Clickhouse connection",
)
@click.option(
"--quantized-rebalance-consumer-group-delay-secs",
type=int,
Expand Down Expand Up @@ -216,6 +222,7 @@ def rust_consumer(
mutations_mode: bool,
max_dlq_buffer_length: Optional[int],
quantized_rebalance_consumer_group_delay_secs: Optional[int],
custom_envoy_request_timeout: Optional[int],
) -> None:
"""
Experimental alternative to `snuba consumer`
Expand All @@ -236,6 +243,7 @@ def rust_consumer(
slice_id=slice_id,
group_instance_id=group_instance_id,
quantized_rebalance_consumer_group_delay_secs=quantized_rebalance_consumer_group_delay_secs,
custom_envoy_request_timeout=custom_envoy_request_timeout,
)

consumer_config_raw = json.dumps(asdict(consumer_config))
Expand Down Expand Up @@ -269,6 +277,7 @@ def rust_consumer(
stop_at_timestamp,
batch_write_timeout_ms,
max_dlq_buffer_length,
custom_envoy_request_timeout,
)

sys.exit(exitcode)
8 changes: 4 additions & 4 deletions snuba/clickhouse/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ def query_execute() -> Any:
result_data = query_execute()

profile_data = ClickhouseProfile(
bytes=conn.last_query.profile_info.bytes or 0,
progress_bytes=conn.last_query.progress.bytes or 0,
blocks=conn.last_query.profile_info.blocks or 0,
rows=conn.last_query.profile_info.rows or 0,
blocks=getattr(conn.last_query.profile_info, "blocks", 0),
bytes=getattr(conn.last_query.profile_info, "bytes", 0),
elapsed=conn.last_query.elapsed or 0.0,
progress_bytes=getattr(conn.last_query.progress, "bytes", 0),
rows=getattr(conn.last_query.profile_info, "rows", 0),
)
if with_column_types:
result = ClickhouseResult(
Expand Down
1 change: 1 addition & 0 deletions snuba/consumers/consumer_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def resolve_consumer_config(
queued_min_messages: Optional[int] = None,
group_instance_id: Optional[str] = None,
quantized_rebalance_consumer_group_delay_secs: Optional[int] = None,
custom_envoy_request_timeout: Optional[int] = None,
) -> ConsumerConfig:
"""
Resolves the ClickHouse cluster and Kafka brokers, and the physical topic name
Expand Down
9 changes: 0 additions & 9 deletions snuba/datasets/configuration/events/storages/errors_ro.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -279,15 +279,6 @@ allocation_policies:
getsentry.tasks.backfill_grouping_records:
max_threads: 7
concurrent_limit: 60
- name: BytesScannedWindowAllocationPolicy
args:
required_tenant_types:
- organization_id
- referrer
default_config_overrides:
is_enforced: 1
throttled_thread_number: 1
org_limit_bytes_scanned: 10000000
query_processors:
- processor: UniqInSelectAndHavingProcessor
- processor: TupleUnaliaser
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ storage:
key: eap_spans
set_key: events_analytics_platform

readiness_state: partial
readiness_state: complete

schema:
columns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ storage:
key: spans_num_attrs
set_key: events_analytics_platform

readiness_state: partial
readiness_state: complete

schema:
columns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ storage:
key: spans_str_attrs
set_key: events_analytics_platform

readiness_state: partial
readiness_state: complete

schema:
columns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ name: uptime_monitor_checks
storage:
key: uptime_monitor_checks
set_key: events_analytics_platform
readiness_state: partial
readiness_state: complete
schema:
columns:
[
Expand Down

This file was deleted.

Loading

0 comments on commit 2c9aeeb

Please sign in to comment.