Skip to content

Commit

Permalink
feat(ourlogs): Read and write ourlogs from the new items table (#6911)
Browse files Browse the repository at this point in the history
- Creates a new temporary storage to bridge the existing kafka topic
with the new clickhouse table
- Modifies consumer to point to the new items table
- Modifies querying code to point to new items table
  • Loading branch information
colin-sentry authored and volokluev committed Feb 27, 2025
1 parent 1d9752f commit 18fee20
Show file tree
Hide file tree
Showing 21 changed files with 546 additions and 238 deletions.
2 changes: 1 addition & 1 deletion gocd/templates/bash/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}"
--container-name="eap-spans-subscriptions-executor" \
--container-name="uptime-results-consumer" \
--container-name="lw-deletions-search-issues-consumer" \
--container-name="eap-logs-consumer" \
--container-name="eap-items-log-consumer" \
--container-name="eap-items-span-consumer" \
&& /devinfra/scripts/k8s/k8s-deploy.py \
--label-selector="${LABEL_SELECTOR}" \
Expand Down
7 changes: 7 additions & 0 deletions rust_snuba/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust_snuba/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ tokio = { version = "1.19.2", features = ["full"] }
statsdproxy = { version = "0.1.2", features = ["cadence-adapter", "sentry"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
uuid = "1.5.0"
uuid = { version = "1.5.0", features = ["v4", "v7"] }
parking_lot = "0.12.1"
sentry_usage_accountant = { version = "0.1.0", features = ["kafka"] }
adler = "1.0.2"
Expand Down
14 changes: 7 additions & 7 deletions rust_snuba/src/processors/eap_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,17 @@ pub(crate) struct PrimaryKey {
}

#[derive(Debug, Default, Serialize)]
struct EAPItem {
pub(crate) struct EAPItem {
#[serde(flatten)]
primary_key: PrimaryKey,
pub(crate) primary_key: PrimaryKey,

trace_id: Uuid,
item_id: u128,
sampling_weight: u64,
retention_days: Option<u16>,
pub(crate) trace_id: Uuid,
pub(crate) item_id: u128,
pub(crate) sampling_weight: u64,
pub(crate) retention_days: Option<u16>,

#[serde(flatten)]
attributes: AttributeMap,
pub(crate) attributes: AttributeMap,
}

fn fnv_1a(input: &[u8]) -> u32 {
Expand Down
4 changes: 4 additions & 0 deletions rust_snuba/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ mod tests {
settings.add_redaction(".*.message_timestamp", "<event timestamp>");
}

if *topic_name == "snuba-ourlogs" {
settings.add_redaction(".*.item_id", "<item ID>") //UUID7 has timestamp in it
}

settings.set_description(std::str::from_utf8(example.payload()).unwrap());
let _guard = settings.bind_to_scope();

Expand Down
84 changes: 43 additions & 41 deletions rust_snuba/src/processors/ourlogs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Context;
use chrono::DateTime;
use serde::{de, Deserialize, Deserializer, Serialize};
use serde::{de, Deserialize, Deserializer};
use std::collections::BTreeMap;
use std::fmt;
use uuid::Uuid;
Expand All @@ -10,6 +10,7 @@ use sentry_arroyo::backends::kafka::types::KafkaPayload;
use serde::de::{MapAccess, Visitor};

use crate::config::ProcessorConfig;
use crate::processors::eap_items::{EAPItem, PrimaryKey};
use crate::processors::utils::enforce_retention;
use crate::types::{InsertBatch, KafkaMessageMetadata};

Expand Down Expand Up @@ -107,64 +108,65 @@ pub fn process_message(
(msg.observed_timestamp_nanos / 1_000_000_000) as i64,
(msg.observed_timestamp_nanos % 1_000_000_000) as u32,
);
let mut ourlog: Ourlog = msg.into();
let mut item: EAPItem = msg.into();

ourlog.retention_days = enforce_retention(Some(ourlog.retention_days), &config.env_config);
item.retention_days = Some(enforce_retention(item.retention_days, &config.env_config));

InsertBatch::from_rows([ourlog], origin_timestamp)
InsertBatch::from_rows([item], origin_timestamp)
}

#[derive(Debug, Default, Serialize)]
struct Ourlog {
organization_id: u64,
project_id: u64,
trace_id: Uuid,
span_id: u64,
severity_text: String,
severity_number: u8,
retention_days: u16,
timestamp: u64,
body: String,
attr_string: BTreeMap<String, String>,
attr_int: BTreeMap<String, i64>,
attr_double: BTreeMap<String, f64>,
attr_bool: BTreeMap<String, bool>,
}

impl From<FromLogMessage> for Ourlog {
fn from(from: FromLogMessage) -> Ourlog {
impl From<FromLogMessage> for EAPItem {
fn from(from: FromLogMessage) -> EAPItem {
let mut res = Self {
organization_id: from.organization_id,
project_id: from.project_id,
primary_key: PrimaryKey {
organization_id: from.organization_id,
project_id: from.project_id,
item_type: 3, // TRACE_ITEM_TYPE_LOG
timestamp: (from.timestamp_nanos / 1_000_000_000) as u32,
},

trace_id: from.trace_id.unwrap_or_default(),
span_id: from
.span_id
.map_or(0, |s| u64::from_str_radix(&s, 16).unwrap_or(0)),
severity_text: from.severity_text.unwrap_or_else(|| "INFO".into()),
severity_number: from.severity_number.unwrap_or_default(),
retention_days: from.retention_days,
timestamp: from.timestamp_nanos,
body: from.body,
attr_string: BTreeMap::new(),
attr_int: BTreeMap::new(),
attr_double: BTreeMap::new(),
attr_bool: BTreeMap::new(),
item_id: u128::from_be_bytes(
*Uuid::new_v7(uuid::Timestamp::from_unix(
uuid::NoContext,
from.timestamp_nanos / 1_000_000_000,
(from.timestamp_nanos % 1_000_000_000) as u32,
))
.as_bytes(),
),
sampling_weight: 1,
retention_days: Some(from.retention_days),
attributes: Default::default(),
};
res.attributes.insert_str(
"sentry.severity_text".to_string(),
from.severity_text.unwrap_or_else(|| "INFO".into()),
);
res.attributes.insert_int(
"sentry.severity_number".to_string(),
from.severity_number.unwrap_or_default().into(),
);
res.attributes
.insert_str("sentry.body".to_string(), from.body);
if let Some(span_id) = from.span_id {
res.attributes
.insert_str("sentry.span_id".to_string(), span_id)
}

if let Some(attributes) = from.attributes {
for (k, v) in attributes {
match v {
FromAttribute::String(s) => {
res.attr_string.insert(k, s);
res.attributes.insert_str(k, s);
}
FromAttribute::Int(i) => {
res.attr_int.insert(k, i);
res.attributes.insert_int(k, i);
}
FromAttribute::Double(d) => {
res.attr_double.insert(k, d);
res.attributes.insert_float(k, d);
}
FromAttribute::Bool(b) => {
res.attr_bool.insert(k, b);
res.attributes.insert_bool(k, b);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,44 @@ expression: snapshot_payload
---
[
{
"attr_bool": {
"attributes_bool": {
"bool.user.tag": true
},
"attr_double": {
"attributes_float_15": {
"double.user.tag": -10.59
},
"attr_int": {
"another.user.tag": 10
"attributes_float_24": {
"bool.user.tag": 1.0
},
"attr_string": {
"attributes_float_35": {
"another.user.tag": 10.0
},
"attributes_float_37": {
"sentry.severity_number": 1.0
},
"attributes_int": {
"another.user.tag": 10,
"sentry.severity_number": 1
},
"attributes_string_0": {
"sentry.body": "hello world!"
},
"attributes_string_12": {
"sentry.span_id": "11002233AABBCCDD"
},
"attributes_string_28": {
"some.user.tag": "hello"
},
"body": "hello world!",
"attributes_string_35": {
"sentry.severity_text": "WARNING"
},
"item_id": "<item ID>",
"item_type": 3,
"organization_id": 69,
"project_id": 1,
"retention_days": 90,
"severity_number": 1,
"severity_text": "WARNING",
"span_id": 1225016703947885789,
"timestamp": 1715868485371000,
"sampling_weight": 1,
"timestamp": 1715868,
"trace_id": "3c8c20d5-0a54-4a1c-ba10-f76f574d856f"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,26 @@ expression: snapshot_payload
---
[
{
"attr_bool": {},
"attr_double": {},
"attr_int": {},
"attr_string": {},
"body": "hello world!",
"attributes_bool": {},
"attributes_float_37": {
"sentry.severity_number": 0.0
},
"attributes_int": {
"sentry.severity_number": 0
},
"attributes_string_0": {
"sentry.body": "hello world!"
},
"attributes_string_35": {
"sentry.severity_text": "INFO"
},
"item_id": "<item ID>",
"item_type": 3,
"organization_id": 69,
"project_id": 1,
"retention_days": 90,
"severity_number": 0,
"severity_text": "INFO",
"span_id": 0,
"timestamp": 1715868485371000,
"sampling_weight": 1,
"timestamp": 1715868,
"trace_id": "00000000-0000-0000-0000-000000000000"
}
]
2 changes: 1 addition & 1 deletion snuba/cli/devserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def devserver(*, bootstrap: bool, workers: bool) -> None:
[
"snuba",
"rust-consumer",
"--storage=ourlogs",
"--storage=eap_items_log",
"--consumer-group=ourlogs_group",
"--use-rust-processor",
*COMMON_RUST_CONSUMER_DEV_OPTIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ entities:
- eap_spans
- spans_num_attrs
- spans_str_attrs
- ourlogs
- uptime_checks
- eap_items
- eap_items_log
Loading

0 comments on commit 18fee20

Please sign in to comment.