Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(eap): Clean up the mutations interface #6344

Merged
merged 2 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust_snuba/src/mutations/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use sentry::SentryFutureExt;
use serde::Deserialize;

use crate::arroyo_utils::invalid_message_err;
use crate::processors::eap_spans::PrimaryKey;
use crate::processors::eap_spans::{FromPrimaryKey, PrimaryKey};

#[derive(Debug, Default, Deserialize, JsonSchema)]
pub(crate) struct Update {
Expand All @@ -32,7 +32,7 @@ impl Update {
#[derive(Debug, Default, Deserialize, JsonSchema)]
pub(crate) struct MutationMessage {
// primary key, the mutation only applies on the rows that match this filter
pub filter: PrimaryKey,
pub filter: FromPrimaryKey,

// the span attributes to update
pub update: Update,
Expand Down
4 changes: 3 additions & 1 deletion rust_snuba/src/mutations_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::config;
use crate::metrics::global_tags::set_global_tag;
use crate::mutations::clickhouse::ClickhouseWriter;
use crate::mutations::parser::{MutationBatch, MutationMessage, MutationParser};
use crate::processors::eap_spans::PrimaryKey;

pub struct MutConsumerStrategyFactory {
pub storage_config: config::StorageConfig,
Expand Down Expand Up @@ -67,7 +68,8 @@ impl ProcessingStrategyFactory<KafkaPayload> for MutConsumerStrategyFactory {
Arc::new(
move |mut batch: MutationBatch, message: Message<MutationMessage>| {
let message = message.into_payload();
match batch.0.entry(message.filter) {
let filter: PrimaryKey = message.filter.into();
match batch.0.entry(filter) {
std::collections::btree_map::Entry::Occupied(mut entry) => {
entry.get_mut().merge(message.update);
}
Expand Down
32 changes: 25 additions & 7 deletions rust_snuba/src/processors/eap_spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ impl AttributeMap {
}
}

#[derive(
Debug, Default, Deserialize, Serialize, JsonSchema, Ord, PartialOrd, Eq, PartialEq, Clone,
)]
#[derive(Debug, Default, Serialize, Ord, PartialOrd, Eq, PartialEq, Clone)]
pub(crate) struct PrimaryKey {
pub organization_id: u64,
pub _sort_timestamp: u32,
Expand Down Expand Up @@ -131,15 +129,35 @@ fn fnv_1a(input: &[u8]) -> u32 {
res
}

#[derive(Debug, Default, Deserialize, JsonSchema)]
pub(crate) struct FromPrimaryKey {
pub organization_id: u64,
pub start_timestamp_ms: u64,
pub trace_id: Uuid,
pub span_id: String,
}

impl From<FromPrimaryKey> for PrimaryKey {
fn from(from: FromPrimaryKey) -> PrimaryKey {
PrimaryKey {
organization_id: from.organization_id,
_sort_timestamp: (from.start_timestamp_ms / 1000) as u32,
trace_id: from.trace_id,
span_id: u64::from_str_radix(&from.span_id, 16).unwrap_or_default(),
}
}
}

impl From<FromSpanMessage> for EAPSpan {
fn from(from: FromSpanMessage) -> EAPSpan {
let mut res = Self {
primary_key: PrimaryKey {
primary_key: FromPrimaryKey {
organization_id: from.organization_id,
_sort_timestamp: (from.start_timestamp_ms / 1000) as u32,
start_timestamp_ms: from.start_timestamp_ms,
trace_id: from.trace_id,
span_id: u64::from_str_radix(&from.span_id, 16).unwrap_or_default(),
},
span_id: from.span_id,
}
.into(),
project_id: from.project_id,
service: from.project_id.to_string(),
parent_span_id: from
Expand Down
Loading