diff --git a/CHANGELOG.md b/CHANGELOG.md index e4b3fba60d..2394b0d160 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ Metrics: - Adds iPad support for device.class synthesis in light normalization. ([#2008](https://github.com/getsentry/relay/pull/2008)) - Pin schemars dependency to un-break schema docs generation. ([#2014](https://github.com/getsentry/relay/pull/2014)) - Remove global service registry. ([#2022](https://github.com/getsentry/relay/pull/2022)) +- Apply schema validation to all topics in local development. ([#2013](https://github.com/getsentry/relay/pull/2013)) Monitors: diff --git a/Cargo.lock b/Cargo.lock index cdf83ebf54..842443e3f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -35,7 +35,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" dependencies = [ "cfg-if", + "getrandom", "once_cell", + "serde", "version_check", ] @@ -316,6 +318,21 @@ dependencies = [ "which", ] +[[package]] +name = "bit-set" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bit-vec" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" + [[package]] name = "bitflags" version = "1.3.2" @@ -1182,6 +1199,16 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "fancy-regex" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b95f7c0680e4142284cf8b22c14a476e87d61b004a3a0861872b32ef7ead40a2" +dependencies = [ + "bit-set", + "regex", +] + [[package]] name = "fastrand" version = "1.8.0" @@ -1267,6 +1294,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fraction" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3027ae1df8d41b4bed2241c8fdad4acc1e7af60c8e17743534b545e77182d678" +dependencies = [ + "lazy_static", + "num", +] + [[package]] name = "futures" version = "0.3.26" @@ -1394,8 +1431,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1802,6 +1841,15 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "iso8601" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924e5d73ea28f59011fec52a0d12185d496a9b075d360657aed2a5707f701153" +dependencies = [ + "nom", +] + [[package]] name = "itertools" version = "0.10.5" @@ -1841,6 +1889,36 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "674456280908e444408d2370a6706371f0fe2b4fc9999814ba35e71c6e99c5ce" +[[package]] +name = "jsonschema" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48354c4c4f088714424ddf090de1ff84acc82b2f08c192d46d226ae2529a465" +dependencies = [ + "ahash 0.8.3", + "anyhow", + "base64 0.21.0", + "bytecount", + "clap 4.1.4", + "fancy-regex", + "fraction", + "getrandom", + "iso8601", + "itoa", + "memchr", + "num-cmp", + "once_cell", + "parking_lot 0.12.1", + "percent-encoding", + "regex", + "reqwest", + "serde", + "serde_json", + "time", + "url", + "uuid", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -2140,6 +2218,46 @@ dependencies = [ "memchr", ] +[[package]] +name = "num" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-cmp" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63335b2e2c34fae2fb0aa2cecfd9f0832a1e24b3b32ecec612c3426d46dc8aaa" + +[[package]] +name = "num-complex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02e0d21255c828d6f128a1e41534206671e8c3ea0c62f32291e808dc82cff17d" +dependencies = [ + "num-traits", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -2150,6 +2268,29 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" +dependencies = [ + "autocfg", + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.15" @@ -2954,11 +3095,13 @@ dependencies = [ name = "relay-kafka" version = "23.3.1" dependencies = [ + "jsonschema", "rdkafka", "rdkafka-sys", "relay-log", "relay-statsd", "rmp-serde", + "sentry-kafka-schemas", "serde", "serde_json", "serde_yaml 0.9.17", @@ -3513,6 +3656,17 @@ dependencies = [ "sentry-core", ] +[[package]] +name = "sentry-kafka-schemas" +version = "0.0.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de282337a0c5e35a16e75c7849528b4fbaa64ac5e782fcb3ee48dc3063478a3" +dependencies = [ + "serde", + "serde_json", + "serde_yaml 0.9.17", +] + [[package]] name = "sentry-log" version = "0.30.0" diff --git a/relay-kafka/Cargo.toml b/relay-kafka/Cargo.toml index 4aa511927e..e83c8dd9d5 100644 --- a/relay-kafka/Cargo.toml +++ b/relay-kafka/Cargo.toml @@ -18,6 +18,8 @@ rmp-serde = { version = "1.1.1", optional = true } serde = { version = "1.0.114", features = ["derive"] } serde_json = { version = "1.0.55", optional = true } thiserror = "1.0.38" +sentry-kafka-schemas = { version = "0.0.29", default_features = false } +jsonschema = "0.17.0" [dev-dependencies] serde_yaml = "0.9.17" diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index e82f054d8b..6edd16cb6b 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -15,7 +15,7 @@ pub enum ConfigError { } /// Define the topics over which Relay communicates with Sentry. -#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] pub enum KafkaTopic { /// Simple events (without attachments) topic. Events, diff --git a/relay-kafka/src/producer/mod.rs b/relay-kafka/src/producer/mod.rs index c90521ce9e..dd825a36d0 100644 --- a/relay-kafka/src/producer/mod.rs +++ b/relay-kafka/src/producer/mod.rs @@ -1,3 +1,5 @@ +#[cfg(debug_assertions)] +use std::cell::RefCell; use std::collections::{BTreeMap, HashMap}; use std::fmt; use std::sync::Arc; @@ -8,11 +10,16 @@ use relay_statsd::metric; use thiserror::Error; use crate::config::{KafkaConfig, KafkaParams, KafkaTopic}; +#[cfg(debug_assertions)] +use crate::producer::schemas::Validator; use crate::statsd::KafkaHistograms; mod utils; use utils::{CaptureErrorContext, ThreadedProducer}; +#[cfg(debug_assertions)] +mod schemas; + /// Kafka producer errors. #[derive(Error, Debug)] pub enum ClientError { @@ -36,6 +43,11 @@ pub enum ClientError { #[error("failed to serialize json message")] InvalidJson(#[source] serde_json::Error), + /// Failed to run schema validation on message. + #[cfg(debug_assertions)] + #[error("failed to run schema validation on message")] + SchemaValidationFailed(#[source] schemas::SchemaError), + /// Configuration is wrong and it cannot be used to identify the number of a shard. #[error("invalid kafka shard")] InvalidShard, @@ -126,6 +138,8 @@ impl fmt::Debug for ShardedProducer { #[derive(Debug)] pub struct KafkaClient { producers: HashMap, + #[cfg(debug_assertions)] + schema_validator: RefCell, } impl KafkaClient { @@ -142,6 +156,11 @@ impl KafkaClient { message: &impl Message, ) -> Result<(), ClientError> { let serialized = message.serialize()?; + #[cfg(debug_assertions)] + self.schema_validator + .borrow_mut() + .validate_message_schema(topic, &serialized) + .map_err(ClientError::SchemaValidationFailed)?; let key = message.key(); self.send(topic, organization_id, &key, message.variant(), &serialized) } @@ -273,6 +292,8 @@ impl KafkaClientBuilder { pub fn build(self) -> KafkaClient { KafkaClient { producers: self.producers, + #[cfg(debug_assertions)] + schema_validator: Validator::default().into(), } } } diff --git a/relay-kafka/src/producer/schemas.rs b/relay-kafka/src/producer/schemas.rs new file mode 100644 index 0000000000..c44d747436 --- /dev/null +++ b/relay-kafka/src/producer/schemas.rs @@ -0,0 +1,102 @@ +use std::collections::btree_map::Entry; +use std::collections::BTreeMap; +use std::fmt::Write; + +use jsonschema::JSONSchema; +use thiserror::Error; + +use crate::config::{KafkaTopic, TopicAssignment, TopicAssignments}; + +#[derive(Debug, Error)] +pub enum SchemaError { + /// The "logical topic" is a concept in sentry-kafka-schemas and Snuba that identifies topics + /// irrespective of how they are structured in production. Snuba has a mapping from "logical + /// topic" to "physical topic" that is very similar to our `TopicAssignments`. + /// + /// The name of a logical topic is almost always equal to the Kafka topic's name in local + /// development. So, in order to determine a logical topic name from a `KafkaTopic`, we get + /// `TopicAssignments::default()` and try to resolve the name through that. + /// + /// When doing that we assume that Relay's default topic assignments don't use slicing/sharding + /// and no custom clusters. + /// + /// If somebody changes `impl Default for TopicAssignments` to have more complex defaults, this + /// error will start occurring. But it should not happen in prod. + #[error("failed to determine logical topic")] + LogicalTopic, + + /// Failed to deserialize message, potentially because it isn't JSON? + #[error("failed to deserialize message")] + MessageJson(#[source] serde_json::Error), + + /// Failed to deserialize schema as JSON + #[error("failed to deserialize schema")] + SchemaJson(#[source] serde_json::Error), + + /// Failed to compile schema + // We stringify the inner error because `jsonschema::ValidationError` has weird lifetimes + #[error("failed to compile schema: {0}")] + SchemaCompiled(String), + + /// Failed to validate message JSON against schema + // We stringify the inner error because `jsonschema::ValidationError` has weird lifetimes + #[error("message violates schema: {0}")] + Message(String), +} + +/// Validates payloads for their given topic's schema. +#[derive(Debug, Default)] +pub struct Validator { + /// Caches the schema for given topics. + schemas: BTreeMap>, +} + +impl Validator { + /// Validate a message for a given topic's schema. + pub fn validate_message_schema( + &mut self, + topic: KafkaTopic, + message: &[u8], + ) -> Result<(), SchemaError> { + let Some(schema) = self.get_schema(topic)? else { return Ok(())}; + let message_value = serde_json::from_slice(message).map_err(SchemaError::MessageJson)?; + + if let Err(e) = schema.validate(&message_value) { + let mut result = String::new(); + for error in e { + writeln!(result, "{}", error).unwrap(); + } + + return Err(SchemaError::Message(result)); + } + + Ok(()) + } + + fn get_schema(&mut self, topic: KafkaTopic) -> Result, SchemaError> { + Ok(match self.schemas.entry(topic) { + Entry::Vacant(entry) => { + entry.insert({ + let default_assignments = TopicAssignments::default(); + let logical_topic_name = match default_assignments.get(topic) { + TopicAssignment::Primary(logical_topic_name) => logical_topic_name, + _ => return Err(SchemaError::LogicalTopic), + }; + + let schema = match sentry_kafka_schemas::get_schema(logical_topic_name, None) { + Ok(schema) => schema, + // No topic found + Err(_) => return Ok(None), + }; + let schema = + serde_json::from_str(&schema.schema).map_err(SchemaError::SchemaJson)?; + let schema = JSONSchema::compile(&schema) + .map_err(|e| SchemaError::SchemaCompiled(e.to_string()))?; + Some(schema) + }) + } + Entry::Occupied(entry) => entry.into_mut(), + } + .as_ref()) + } +} diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index 1df7a3999d..2c283438ef 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -348,7 +348,7 @@ impl EnvelopeManagerService { if let Err(err) = result { relay_log::trace!( "failed to submit the envelope, merging buckets back: {}", - err + LogError(&err) ); self.aggregator .send(MergeBuckets::new(scoping.project_key, buckets));