diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index e82f054d8b..6edd16cb6b 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -15,7 +15,7 @@ pub enum ConfigError { } /// Define the topics over which Relay communicates with Sentry. -#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] pub enum KafkaTopic { /// Simple events (without attachments) topic. Events, diff --git a/relay-kafka/src/producer/mod.rs b/relay-kafka/src/producer/mod.rs index 70b8455b7f..dd825a36d0 100644 --- a/relay-kafka/src/producer/mod.rs +++ b/relay-kafka/src/producer/mod.rs @@ -1,3 +1,5 @@ +#[cfg(debug_assertions)] +use std::cell::RefCell; use std::collections::{BTreeMap, HashMap}; use std::fmt; use std::sync::Arc; @@ -8,6 +10,8 @@ use relay_statsd::metric; use thiserror::Error; use crate::config::{KafkaConfig, KafkaParams, KafkaTopic}; +#[cfg(debug_assertions)] +use crate::producer::schemas::Validator; use crate::statsd::KafkaHistograms; mod utils; @@ -134,6 +138,8 @@ impl fmt::Debug for ShardedProducer { #[derive(Debug)] pub struct KafkaClient { producers: HashMap, + #[cfg(debug_assertions)] + schema_validator: RefCell, } impl KafkaClient { @@ -151,7 +157,9 @@ impl KafkaClient { ) -> Result<(), ClientError> { let serialized = message.serialize()?; #[cfg(debug_assertions)] - schemas::validate_message_schema(topic, &serialized) + self.schema_validator + .borrow_mut() + .validate_message_schema(topic, &serialized) .map_err(ClientError::SchemaValidationFailed)?; let key = message.key(); self.send(topic, organization_id, &key, message.variant(), &serialized) @@ -284,6 +292,8 @@ impl KafkaClientBuilder { pub fn build(self) -> KafkaClient { KafkaClient { producers: self.producers, + #[cfg(debug_assertions)] + schema_validator: Validator::default().into(), } } } diff --git a/relay-kafka/src/producer/schemas.rs b/relay-kafka/src/producer/schemas.rs index 1cbde0a94c..c44d747436 100644 --- a/relay-kafka/src/producer/schemas.rs +++ b/relay-kafka/src/producer/schemas.rs @@ -1,3 +1,5 @@ +use std::collections::btree_map::Entry; +use std::collections::BTreeMap; use std::fmt::Write; use jsonschema::JSONSchema; @@ -42,31 +44,59 @@ pub enum SchemaError { Message(String), } -pub fn validate_message_schema(topic: KafkaTopic, message: &[u8]) -> Result<(), SchemaError> { - let default_assignments = TopicAssignments::default(); - let logical_topic_name = match default_assignments.get(topic) { - TopicAssignment::Primary(logical_topic_name) => logical_topic_name, - _ => return Err(SchemaError::LogicalTopic), - }; +/// Validates payloads for their given topic's schema. +#[derive(Debug, Default)] +pub struct Validator { + /// Caches the schema for given topics. + schemas: BTreeMap>, +} + +impl Validator { + /// Validate a message for a given topic's schema. + pub fn validate_message_schema( + &mut self, + topic: KafkaTopic, + message: &[u8], + ) -> Result<(), SchemaError> { + let Some(schema) = self.get_schema(topic)? else { return Ok(())}; + let message_value = serde_json::from_slice(message).map_err(SchemaError::MessageJson)?; - let schema = match sentry_kafka_schemas::get_schema(logical_topic_name, None) { - Ok(schema) => schema, - // No topic found - Err(_) => return Ok(()), - }; - let schema = serde_json::from_str(&schema.schema).map_err(SchemaError::SchemaJson)?; - let schema = - JSONSchema::compile(&schema).map_err(|e| SchemaError::SchemaCompiled(e.to_string()))?; - let message_value = serde_json::from_slice(message).map_err(SchemaError::MessageJson)?; + if let Err(e) = schema.validate(&message_value) { + let mut result = String::new(); + for error in e { + writeln!(result, "{}", error).unwrap(); + } - if let Err(e) = schema.validate(&message_value) { - let mut result = String::new(); - for error in e { - writeln!(result, "{}", error).unwrap(); + return Err(SchemaError::Message(result)); } - return Err(SchemaError::Message(result)); + Ok(()) } - Ok(()) + fn get_schema(&mut self, topic: KafkaTopic) -> Result, SchemaError> { + Ok(match self.schemas.entry(topic) { + Entry::Vacant(entry) => { + entry.insert({ + let default_assignments = TopicAssignments::default(); + let logical_topic_name = match default_assignments.get(topic) { + TopicAssignment::Primary(logical_topic_name) => logical_topic_name, + _ => return Err(SchemaError::LogicalTopic), + }; + + let schema = match sentry_kafka_schemas::get_schema(logical_topic_name, None) { + Ok(schema) => schema, + // No topic found + Err(_) => return Ok(None), + }; + let schema = + serde_json::from_str(&schema.schema).map_err(SchemaError::SchemaJson)?; + let schema = JSONSchema::compile(&schema) + .map_err(|e| SchemaError::SchemaCompiled(e.to_string()))?; + Some(schema) + }) + } + Entry::Occupied(entry) => entry.into_mut(), + } + .as_ref()) + } }