Skip to content

Commit

Permalink
feat(kafka): Cache schemas for validation (#2028)
Browse files Browse the repository at this point in the history
Add a caching layer to #2013 such
that the schema does not have to be parsed from disk for every message.

---------

Co-authored-by: Markus Unterwaditzer <[email protected]>
  • Loading branch information
jjbayer and untitaker authored Apr 13, 2023
1 parent 4b38eab commit 66a9f7d
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 23 deletions.
2 changes: 1 addition & 1 deletion relay-kafka/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum ConfigError {
}

/// Define the topics over which Relay communicates with Sentry.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum KafkaTopic {
/// Simple events (without attachments) topic.
Events,
Expand Down
12 changes: 11 additions & 1 deletion relay-kafka/src/producer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(debug_assertions)]
use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap};
use std::fmt;
use std::sync::Arc;
Expand All @@ -8,6 +10,8 @@ use relay_statsd::metric;
use thiserror::Error;

use crate::config::{KafkaConfig, KafkaParams, KafkaTopic};
#[cfg(debug_assertions)]
use crate::producer::schemas::Validator;
use crate::statsd::KafkaHistograms;

mod utils;
Expand Down Expand Up @@ -134,6 +138,8 @@ impl fmt::Debug for ShardedProducer {
#[derive(Debug)]
pub struct KafkaClient {
producers: HashMap<KafkaTopic, Producer>,
#[cfg(debug_assertions)]
schema_validator: RefCell<schemas::Validator>,
}

impl KafkaClient {
Expand All @@ -151,7 +157,9 @@ impl KafkaClient {
) -> Result<(), ClientError> {
let serialized = message.serialize()?;
#[cfg(debug_assertions)]
schemas::validate_message_schema(topic, &serialized)
self.schema_validator
.borrow_mut()
.validate_message_schema(topic, &serialized)
.map_err(ClientError::SchemaValidationFailed)?;
let key = message.key();
self.send(topic, organization_id, &key, message.variant(), &serialized)
Expand Down Expand Up @@ -284,6 +292,8 @@ impl KafkaClientBuilder {
pub fn build(self) -> KafkaClient {
KafkaClient {
producers: self.producers,
#[cfg(debug_assertions)]
schema_validator: Validator::default().into(),
}
}
}
Expand Down
72 changes: 51 additions & 21 deletions relay-kafka/src/producer/schemas.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::fmt::Write;

use jsonschema::JSONSchema;
Expand Down Expand Up @@ -42,31 +44,59 @@ pub enum SchemaError {
Message(String),
}

pub fn validate_message_schema(topic: KafkaTopic, message: &[u8]) -> Result<(), SchemaError> {
let default_assignments = TopicAssignments::default();
let logical_topic_name = match default_assignments.get(topic) {
TopicAssignment::Primary(logical_topic_name) => logical_topic_name,
_ => return Err(SchemaError::LogicalTopic),
};
/// Validates payloads for their given topic's schema.
#[derive(Debug, Default)]
pub struct Validator {
/// Caches the schema for given topics.
schemas: BTreeMap<KafkaTopic, Option<JSONSchema>>,
}

impl Validator {
/// Validate a message for a given topic's schema.
pub fn validate_message_schema(
&mut self,
topic: KafkaTopic,
message: &[u8],
) -> Result<(), SchemaError> {
let Some(schema) = self.get_schema(topic)? else { return Ok(())};
let message_value = serde_json::from_slice(message).map_err(SchemaError::MessageJson)?;

let schema = match sentry_kafka_schemas::get_schema(logical_topic_name, None) {
Ok(schema) => schema,
// No topic found
Err(_) => return Ok(()),
};
let schema = serde_json::from_str(&schema.schema).map_err(SchemaError::SchemaJson)?;
let schema =
JSONSchema::compile(&schema).map_err(|e| SchemaError::SchemaCompiled(e.to_string()))?;
let message_value = serde_json::from_slice(message).map_err(SchemaError::MessageJson)?;
if let Err(e) = schema.validate(&message_value) {
let mut result = String::new();
for error in e {
writeln!(result, "{}", error).unwrap();
}

if let Err(e) = schema.validate(&message_value) {
let mut result = String::new();
for error in e {
writeln!(result, "{}", error).unwrap();
return Err(SchemaError::Message(result));
}

return Err(SchemaError::Message(result));
Ok(())
}

Ok(())
fn get_schema(&mut self, topic: KafkaTopic) -> Result<Option<&JSONSchema>, SchemaError> {
Ok(match self.schemas.entry(topic) {
Entry::Vacant(entry) => {
entry.insert({
let default_assignments = TopicAssignments::default();
let logical_topic_name = match default_assignments.get(topic) {
TopicAssignment::Primary(logical_topic_name) => logical_topic_name,
_ => return Err(SchemaError::LogicalTopic),
};

let schema = match sentry_kafka_schemas::get_schema(logical_topic_name, None) {
Ok(schema) => schema,
// No topic found
Err(_) => return Ok(None),
};
let schema =
serde_json::from_str(&schema.schema).map_err(SchemaError::SchemaJson)?;
let schema = JSONSchema::compile(&schema)
.map_err(|e| SchemaError::SchemaCompiled(e.to_string()))?;
Some(schema)
})
}
Entry::Occupied(entry) => entry.into_mut(),
}
.as_ref())
}
}

0 comments on commit 66a9f7d

Please sign in to comment.