From b2da267b60cb3a1486aa6dc252b93e87496ea15a Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 5 Apr 2023 16:50:35 +0200 Subject: [PATCH 1/7] feat: Apply schema validation to all topics in local development Alternative to https://github.com/getsentry/relay/pull/1981 -- apply schema validation to all outgoing messages when Relay runs in development mode. This means that messages are validated as part of integration tests. The additional dependencies are compiled unconditionally, but unused in release builds. The only way to conditionally compile them is to introduce yet another featureflag, but that seems burdensome to use since we use `--all-features` everywhere right now. We have had similar issues introducing optional SSL support in Kafka a long time ago, IIRC --- Cargo.lock | 246 +++++++++++++++++++++++++++- relay-kafka/Cargo.toml | 2 + relay-kafka/src/producer/mod.rs | 10 ++ relay-kafka/src/producer/schemas.rs | 73 +++++++++ 4 files changed, 325 insertions(+), 6 deletions(-) create mode 100644 relay-kafka/src/producer/schemas.rs diff --git a/Cargo.lock b/Cargo.lock index 97f46af1a6..984b0a382f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -35,7 +35,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" dependencies = [ "cfg-if", + "getrandom", "once_cell", + "serde", "version_check", ] @@ -316,6 +318,21 @@ dependencies = [ "which", ] +[[package]] +name = "bit-set" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bit-vec" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" + [[package]] name = "bitflags" version = "1.3.2" @@ -1182,6 +1199,16 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "fancy-regex" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b95f7c0680e4142284cf8b22c14a476e87d61b004a3a0861872b32ef7ead40a2" +dependencies = [ + "bit-set", + "regex", +] + [[package]] name = "fastrand" version = "1.8.0" @@ -1267,6 +1294,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fraction" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3027ae1df8d41b4bed2241c8fdad4acc1e7af60c8e17743534b545e77182d678" +dependencies = [ + "lazy_static", + "num", +] + [[package]] name = "futures" version = "0.3.26" @@ -1394,8 +1431,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1802,6 +1841,15 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "iso8601" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924e5d73ea28f59011fec52a0d12185d496a9b075d360657aed2a5707f701153" +dependencies = [ + "nom", +] + [[package]] name = "itertools" version = "0.10.5" @@ -1841,6 +1889,36 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "674456280908e444408d2370a6706371f0fe2b4fc9999814ba35e71c6e99c5ce" +[[package]] +name = "jsonschema" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48354c4c4f088714424ddf090de1ff84acc82b2f08c192d46d226ae2529a465" +dependencies = [ + "ahash 0.8.3", + "anyhow", + "base64 0.21.0", + "bytecount", + "clap 4.1.4", + "fancy-regex", + "fraction", + "getrandom", + "iso8601", + "itoa", + "memchr", + "num-cmp", + "once_cell", + "parking_lot 0.12.1", + "percent-encoding", + "regex", + "reqwest", + "serde", + "serde_json", + "time", + "url", + "uuid", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -2140,6 +2218,46 @@ dependencies = [ "memchr", ] +[[package]] +name = "num" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-cmp" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63335b2e2c34fae2fb0aa2cecfd9f0832a1e24b3b32ecec612c3426d46dc8aaa" + +[[package]] +name = "num-complex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02e0d21255c828d6f128a1e41534206671e8c3ea0c62f32291e808dc82cff17d" +dependencies = [ + "num-traits", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -2150,6 +2268,29 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" +dependencies = [ + "autocfg", + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.15" @@ -2512,6 +2653,16 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6fa0831dd7cc608c38a5e323422a0077678fa5744aa2be4ad91c4ece8eec8d5" +[[package]] +name = "prettyplease" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ceca8aaf45b5c46ec7ed39fff75f57290368c1846d33d24a122ca81416ab058" +dependencies = [ + "proc-macro2", + "syn 2.0.11", +] + [[package]] name = "proc-macro-crate" version = "1.3.0" @@ -2741,6 +2892,16 @@ version = "0.6.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" +[[package]] +name = "regress" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d995d590bd8ec096d1893f414bf3f5e8b0ee4c9eed9a5642b9766ef2c8e2e8e9" +dependencies = [ + "hashbrown 0.13.2", + "memchr", +] + [[package]] name = "relay" version = "23.3.1" @@ -2954,11 +3115,13 @@ dependencies = [ name = "relay-kafka" version = "23.3.1" dependencies = [ + "jsonschema", "rdkafka", "rdkafka-sys", "relay-log", "relay-statsd", "rmp-serde", + "sentry-kafka-schemas", "serde", "serde_json", "serde_yaml 0.9.17", @@ -3349,9 +3512,9 @@ dependencies = [ [[package]] name = "schemars" -version = "0.8.10" +version = "0.8.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1847b767a3d62d95cbf3d8a9f0e421cf57a0d8aa4f411d4b16525afb0284d4ed" +checksum = "02c613288622e5f0c3fdc5dbd4db1c5fbe752746b1d1a56a0630b78fd00de44f" dependencies = [ "chrono", "dyn-clone", @@ -3363,9 +3526,9 @@ dependencies = [ [[package]] name = "schemars_derive" -version = "0.8.10" +version = "0.8.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af4d7e1b012cb3d9129567661a63755ea4b8a7386d339dc945ae187e403c6743" +checksum = "109da1e6b197438deb6db99952990c7f959572794b80ff93707d55a232545e7c" dependencies = [ "proc-macro2", "quote", @@ -3513,6 +3676,22 @@ dependencies = [ "sentry-core", ] +[[package]] +name = "sentry-kafka-schemas" +version = "0.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "596d636984768ec6c22f740c9d6825bdbc88b23de7ce0b253a088ff9580cbc2f" +dependencies = [ + "prettyplease", + "regress", + "schemars", + "serde", + "serde_json", + "serde_yaml 0.9.17", + "syn 2.0.11", + "typify", +] + [[package]] name = "sentry-log" version = "0.30.0" @@ -3641,6 +3820,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_tokenstream" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "797ba1d80299b264f3aac68ab5d12e5825a561749db4df7cd7c8083900c5d4e9" +dependencies = [ + "proc-macro2", + "serde", + "syn 1.0.109", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -4358,6 +4548,50 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +[[package]] +name = "typify" +version = "0.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30bfde96849e25d7feef1bbf652e9cfc51deb63203fdc07b115b8bc3bcfe20b9" +dependencies = [ + "typify-impl", + "typify-macro", +] + +[[package]] +name = "typify-impl" +version = "0.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95d27d749378ceab6ec22188ed7ad102205c89ddb92ab662371c850ffc71aa1a" +dependencies = [ + "heck", + "log", + "proc-macro2", + "quote", + "regress", + "schemars", + "serde_json", + "syn 1.0.109", + "thiserror", + "unicode-ident", +] + +[[package]] +name = "typify-macro" +version = "0.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35db6fc2bd9220ecdac6eeb88158824b83610de3dda0c6d0f2142b49efd858b0" +dependencies = [ + "proc-macro2", + "quote", + "schemars", + "serde", + "serde_json", + "serde_tokenstream", + "syn 1.0.109", + "typify-impl", +] + [[package]] name = "uaparser" version = "0.6.0" @@ -4404,9 +4638,9 @@ checksum = "d54675592c1dbefd78cbd98db9bacd89886e1ca50692a0692baefffdeb92dd58" [[package]] name = "unicode-ident" -version = "1.0.6" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc" +checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" [[package]] name = "unicode-normalization" diff --git a/relay-kafka/Cargo.toml b/relay-kafka/Cargo.toml index 4aa511927e..6731b3cdad 100644 --- a/relay-kafka/Cargo.toml +++ b/relay-kafka/Cargo.toml @@ -18,6 +18,8 @@ rmp-serde = { version = "1.1.1", optional = true } serde = { version = "1.0.114", features = ["derive"] } serde_json = { version = "1.0.55", optional = true } thiserror = "1.0.38" +sentry-kafka-schemas = "0.0.25" +jsonschema = "0.17.0" [dev-dependencies] serde_yaml = "0.9.17" diff --git a/relay-kafka/src/producer/mod.rs b/relay-kafka/src/producer/mod.rs index c90521ce9e..5cc62422d3 100644 --- a/relay-kafka/src/producer/mod.rs +++ b/relay-kafka/src/producer/mod.rs @@ -13,6 +13,9 @@ use crate::statsd::KafkaHistograms; mod utils; use utils::{CaptureErrorContext, ThreadedProducer}; +#[cfg(debug_assertions)] +mod schemas; + /// Kafka producer errors. #[derive(Error, Debug)] pub enum ClientError { @@ -36,6 +39,10 @@ pub enum ClientError { #[error("failed to serialize json message")] InvalidJson(#[source] serde_json::Error), + /// Failed to run schema validation on message. + #[error("failed to run schema validation on message")] + SchemaValidationFailed(#[source] schemas::SchemaError), + /// Configuration is wrong and it cannot be used to identify the number of a shard. #[error("invalid kafka shard")] InvalidShard, @@ -142,6 +149,9 @@ impl KafkaClient { message: &impl Message, ) -> Result<(), ClientError> { let serialized = message.serialize()?; + #[cfg(debug_assertions)] + schemas::validate_message_schema(topic, &serialized) + .map_err(ClientError::SchemaValidationFailed)?; let key = message.key(); self.send(topic, organization_id, &key, message.variant(), &serialized) } diff --git a/relay-kafka/src/producer/schemas.rs b/relay-kafka/src/producer/schemas.rs new file mode 100644 index 0000000000..436e618a91 --- /dev/null +++ b/relay-kafka/src/producer/schemas.rs @@ -0,0 +1,73 @@ +use std::fmt::Write; + +use jsonschema::JSONSchema; +use thiserror::Error; + +use crate::config::{KafkaTopic, TopicAssignment, TopicAssignments}; + +#[derive(Debug, Error)] +pub enum SchemaError { + /// The "logical topic" is a concept in sentry-kafka-schemas and Snuba that identifies topics + /// irrespective of how they are structured in production. Snuba has a mapping from "logical + /// topic" to "physical topic" that is very similar to our `TopicAssignments`. + /// + /// The name of a logical topic is almost always equal to the Kafka topic's name in local + /// development. So, in order to determine a logical topic name from a `KafkaTopic`, we get + /// `TopicAssignments::default()` and try to resolve the name through that. + /// + /// When doing that we assume that Relay's default topic assignments don't use slicing/sharding + /// and no custom clusters. + /// + /// If somebody changes `impl Default for TopicAssignments` to have more complex defaults, this + /// error will start occurring. But it should not happen in prod. + #[error("failed to determine logical topic")] + InvalidLogicalTopic, + + /// Failed to deserialize message, potentially because it isn't JSON? + #[error("failed to deserialize message")] + InvalidMessageJson(#[source] serde_json::Error), + + /// Failed to deserialize schema as JSON + #[error("failed to deserialize schema")] + InvalidSchemaJson(#[source] serde_json::Error), + + /// Failed to compile schema + // We stringify the inner error because `jsonschema::ValidationError` has weird lifetimes + #[error("failed to compile schema: {0}")] + InvalidSchemaCompiled(String), + + /// Failed to validate message JSON against schema + // We stringify the inner error because `jsonschema::ValidationError` has weird lifetimes + #[error("message violates schema: {0}")] + InvalidMessage(String), +} + +pub fn validate_message_schema(topic: KafkaTopic, message: &[u8]) -> Result<(), SchemaError> { + let default_assignments = TopicAssignments::default(); + let logical_topic_name = match default_assignments.get(topic) { + TopicAssignment::Primary(logical_topic_name) => logical_topic_name, + _ => return Err(SchemaError::InvalidLogicalTopic), + }; + + let schema = match sentry_kafka_schemas::get_schema(logical_topic_name, None) { + Ok(schema) => schema, + // No topic found + Err(_) => return Ok(()), + }; + let schema = serde_json::from_str(&schema.schema).map_err(SchemaError::InvalidSchemaJson)?; + let schema = JSONSchema::compile(&schema) + .map_err(|e| SchemaError::InvalidSchemaCompiled(e.to_string()))?; + let message_value = + serde_json::from_slice(&message).map_err(SchemaError::InvalidMessageJson)?; + + if let Err(e) = schema.validate(&message_value) { + let mut result = String::new(); + for error in e { + writeln!(result, "{}", error).unwrap(); + } + + return Err(SchemaError::InvalidMessage(result)); + } + + Ok(()) +} From 76f2ded4b99d9cfbdd6b96f1d29b3ac0a062dfab Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 6 Apr 2023 14:35:14 +0200 Subject: [PATCH 2/7] add missing cfg debug_assertions --- relay-kafka/src/producer/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/relay-kafka/src/producer/mod.rs b/relay-kafka/src/producer/mod.rs index 5cc62422d3..70b8455b7f 100644 --- a/relay-kafka/src/producer/mod.rs +++ b/relay-kafka/src/producer/mod.rs @@ -40,6 +40,7 @@ pub enum ClientError { InvalidJson(#[source] serde_json::Error), /// Failed to run schema validation on message. + #[cfg(debug_assertions)] #[error("failed to run schema validation on message")] SchemaValidationFailed(#[source] schemas::SchemaError), From 13301a0eb0482dc5165504c948a9e4d9ac73baa6 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 6 Apr 2023 18:31:23 +0200 Subject: [PATCH 3/7] bump sentry-kafka-schemas --- Cargo.lock | 85 ++++-------------------------------------- relay-kafka/Cargo.toml | 2 +- 2 files changed, 9 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 984b0a382f..1393d6aa92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2653,16 +2653,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6fa0831dd7cc608c38a5e323422a0077678fa5744aa2be4ad91c4ece8eec8d5" -[[package]] -name = "prettyplease" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ceca8aaf45b5c46ec7ed39fff75f57290368c1846d33d24a122ca81416ab058" -dependencies = [ - "proc-macro2", - "syn 2.0.11", -] - [[package]] name = "proc-macro-crate" version = "1.3.0" @@ -3512,9 +3502,9 @@ dependencies = [ [[package]] name = "schemars" -version = "0.8.12" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02c613288622e5f0c3fdc5dbd4db1c5fbe752746b1d1a56a0630b78fd00de44f" +checksum = "1847b767a3d62d95cbf3d8a9f0e421cf57a0d8aa4f411d4b16525afb0284d4ed" dependencies = [ "chrono", "dyn-clone", @@ -3526,9 +3516,9 @@ dependencies = [ [[package]] name = "schemars_derive" -version = "0.8.12" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "109da1e6b197438deb6db99952990c7f959572794b80ff93707d55a232545e7c" +checksum = "af4d7e1b012cb3d9129567661a63755ea4b8a7386d339dc945ae187e403c6743" dependencies = [ "proc-macro2", "quote", @@ -3678,18 +3668,14 @@ dependencies = [ [[package]] name = "sentry-kafka-schemas" -version = "0.0.25" +version = "0.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "596d636984768ec6c22f740c9d6825bdbc88b23de7ce0b253a088ff9580cbc2f" +checksum = "983b6a74344b3c7c4c997ee56de6b25dbc6f173c6aabb69069bb10239c581537" dependencies = [ - "prettyplease", "regress", - "schemars", "serde", "serde_json", "serde_yaml 0.9.17", - "syn 2.0.11", - "typify", ] [[package]] @@ -3820,17 +3806,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_tokenstream" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "797ba1d80299b264f3aac68ab5d12e5825a561749db4df7cd7c8083900c5d4e9" -dependencies = [ - "proc-macro2", - "serde", - "syn 1.0.109", -] - [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -4548,50 +4523,6 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" -[[package]] -name = "typify" -version = "0.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30bfde96849e25d7feef1bbf652e9cfc51deb63203fdc07b115b8bc3bcfe20b9" -dependencies = [ - "typify-impl", - "typify-macro", -] - -[[package]] -name = "typify-impl" -version = "0.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95d27d749378ceab6ec22188ed7ad102205c89ddb92ab662371c850ffc71aa1a" -dependencies = [ - "heck", - "log", - "proc-macro2", - "quote", - "regress", - "schemars", - "serde_json", - "syn 1.0.109", - "thiserror", - "unicode-ident", -] - -[[package]] -name = "typify-macro" -version = "0.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35db6fc2bd9220ecdac6eeb88158824b83610de3dda0c6d0f2142b49efd858b0" -dependencies = [ - "proc-macro2", - "quote", - "schemars", - "serde", - "serde_json", - "serde_tokenstream", - "syn 1.0.109", - "typify-impl", -] - [[package]] name = "uaparser" version = "0.6.0" @@ -4638,9 +4569,9 @@ checksum = "d54675592c1dbefd78cbd98db9bacd89886e1ca50692a0692baefffdeb92dd58" [[package]] name = "unicode-ident" -version = "1.0.8" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" +checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc" [[package]] name = "unicode-normalization" diff --git a/relay-kafka/Cargo.toml b/relay-kafka/Cargo.toml index 6731b3cdad..6a82af4639 100644 --- a/relay-kafka/Cargo.toml +++ b/relay-kafka/Cargo.toml @@ -18,7 +18,7 @@ rmp-serde = { version = "1.1.1", optional = true } serde = { version = "1.0.114", features = ["derive"] } serde_json = { version = "1.0.55", optional = true } thiserror = "1.0.38" -sentry-kafka-schemas = "0.0.25" +sentry-kafka-schemas = { version = "0.0.26", default_features = false } jsonschema = "0.17.0" [dev-dependencies] From 6d484e34c39985588bf9fd39c895f44df8dd7614 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 6 Apr 2023 18:49:58 +0200 Subject: [PATCH 4/7] fix clippy --- relay-kafka/src/producer/schemas.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/relay-kafka/src/producer/schemas.rs b/relay-kafka/src/producer/schemas.rs index 436e618a91..1cbde0a94c 100644 --- a/relay-kafka/src/producer/schemas.rs +++ b/relay-kafka/src/producer/schemas.rs @@ -21,32 +21,32 @@ pub enum SchemaError { /// If somebody changes `impl Default for TopicAssignments` to have more complex defaults, this /// error will start occurring. But it should not happen in prod. #[error("failed to determine logical topic")] - InvalidLogicalTopic, + LogicalTopic, /// Failed to deserialize message, potentially because it isn't JSON? #[error("failed to deserialize message")] - InvalidMessageJson(#[source] serde_json::Error), + MessageJson(#[source] serde_json::Error), /// Failed to deserialize schema as JSON #[error("failed to deserialize schema")] - InvalidSchemaJson(#[source] serde_json::Error), + SchemaJson(#[source] serde_json::Error), /// Failed to compile schema // We stringify the inner error because `jsonschema::ValidationError` has weird lifetimes #[error("failed to compile schema: {0}")] - InvalidSchemaCompiled(String), + SchemaCompiled(String), /// Failed to validate message JSON against schema // We stringify the inner error because `jsonschema::ValidationError` has weird lifetimes #[error("message violates schema: {0}")] - InvalidMessage(String), + Message(String), } pub fn validate_message_schema(topic: KafkaTopic, message: &[u8]) -> Result<(), SchemaError> { let default_assignments = TopicAssignments::default(); let logical_topic_name = match default_assignments.get(topic) { TopicAssignment::Primary(logical_topic_name) => logical_topic_name, - _ => return Err(SchemaError::InvalidLogicalTopic), + _ => return Err(SchemaError::LogicalTopic), }; let schema = match sentry_kafka_schemas::get_schema(logical_topic_name, None) { @@ -54,11 +54,10 @@ pub fn validate_message_schema(topic: KafkaTopic, message: &[u8]) -> Result<(), // No topic found Err(_) => return Ok(()), }; - let schema = serde_json::from_str(&schema.schema).map_err(SchemaError::InvalidSchemaJson)?; - let schema = JSONSchema::compile(&schema) - .map_err(|e| SchemaError::InvalidSchemaCompiled(e.to_string()))?; - let message_value = - serde_json::from_slice(&message).map_err(SchemaError::InvalidMessageJson)?; + let schema = serde_json::from_str(&schema.schema).map_err(SchemaError::SchemaJson)?; + let schema = + JSONSchema::compile(&schema).map_err(|e| SchemaError::SchemaCompiled(e.to_string()))?; + let message_value = serde_json::from_slice(message).map_err(SchemaError::MessageJson)?; if let Err(e) = schema.validate(&message_value) { let mut result = String::new(); @@ -66,7 +65,7 @@ pub fn validate_message_schema(topic: KafkaTopic, message: &[u8]) -> Result<(), writeln!(result, "{}", error).unwrap(); } - return Err(SchemaError::InvalidMessage(result)); + return Err(SchemaError::Message(result)); } Ok(()) From 4b38eaba88e82c935e8dc60e5927c8dc5da2f0a6 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 6 Apr 2023 19:10:28 +0200 Subject: [PATCH 5/7] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cdcd5888fe..9a0580a154 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ Metrics: - Apply transaction clustering rules before UUID scrubbing rules. ([#1964](https://github.com/getsentry/relay/pull/1964)) - Use exposed device-class-synthesis feature flag to gate device.class synthesis in light normalization. ([#1974](https://github.com/getsentry/relay/pull/1974)) - Pin schemars dependency to un-break schema docs generation. ([#2014](https://github.com/getsentry/relay/pull/2014)) +- Apply schema validation to all topics in local development. ([#2013](https://github.com/getsentry/relay/pull/2013)) ## 23.3.1 From 66a9f7da608594804670178e3354068f3636e4e5 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 13 Apr 2023 15:44:05 +0200 Subject: [PATCH 6/7] feat(kafka): Cache schemas for validation (#2028) Add a caching layer to https://github.com/getsentry/relay/pull/2013 such that the schema does not have to be parsed from disk for every message. --------- Co-authored-by: Markus Unterwaditzer --- relay-kafka/src/config.rs | 2 +- relay-kafka/src/producer/mod.rs | 12 ++++- relay-kafka/src/producer/schemas.rs | 72 ++++++++++++++++++++--------- 3 files changed, 63 insertions(+), 23 deletions(-) diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index e82f054d8b..6edd16cb6b 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -15,7 +15,7 @@ pub enum ConfigError { } /// Define the topics over which Relay communicates with Sentry. -#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] pub enum KafkaTopic { /// Simple events (without attachments) topic. Events, diff --git a/relay-kafka/src/producer/mod.rs b/relay-kafka/src/producer/mod.rs index 70b8455b7f..dd825a36d0 100644 --- a/relay-kafka/src/producer/mod.rs +++ b/relay-kafka/src/producer/mod.rs @@ -1,3 +1,5 @@ +#[cfg(debug_assertions)] +use std::cell::RefCell; use std::collections::{BTreeMap, HashMap}; use std::fmt; use std::sync::Arc; @@ -8,6 +10,8 @@ use relay_statsd::metric; use thiserror::Error; use crate::config::{KafkaConfig, KafkaParams, KafkaTopic}; +#[cfg(debug_assertions)] +use crate::producer::schemas::Validator; use crate::statsd::KafkaHistograms; mod utils; @@ -134,6 +138,8 @@ impl fmt::Debug for ShardedProducer { #[derive(Debug)] pub struct KafkaClient { producers: HashMap, + #[cfg(debug_assertions)] + schema_validator: RefCell, } impl KafkaClient { @@ -151,7 +157,9 @@ impl KafkaClient { ) -> Result<(), ClientError> { let serialized = message.serialize()?; #[cfg(debug_assertions)] - schemas::validate_message_schema(topic, &serialized) + self.schema_validator + .borrow_mut() + .validate_message_schema(topic, &serialized) .map_err(ClientError::SchemaValidationFailed)?; let key = message.key(); self.send(topic, organization_id, &key, message.variant(), &serialized) @@ -284,6 +292,8 @@ impl KafkaClientBuilder { pub fn build(self) -> KafkaClient { KafkaClient { producers: self.producers, + #[cfg(debug_assertions)] + schema_validator: Validator::default().into(), } } } diff --git a/relay-kafka/src/producer/schemas.rs b/relay-kafka/src/producer/schemas.rs index 1cbde0a94c..c44d747436 100644 --- a/relay-kafka/src/producer/schemas.rs +++ b/relay-kafka/src/producer/schemas.rs @@ -1,3 +1,5 @@ +use std::collections::btree_map::Entry; +use std::collections::BTreeMap; use std::fmt::Write; use jsonschema::JSONSchema; @@ -42,31 +44,59 @@ pub enum SchemaError { Message(String), } -pub fn validate_message_schema(topic: KafkaTopic, message: &[u8]) -> Result<(), SchemaError> { - let default_assignments = TopicAssignments::default(); - let logical_topic_name = match default_assignments.get(topic) { - TopicAssignment::Primary(logical_topic_name) => logical_topic_name, - _ => return Err(SchemaError::LogicalTopic), - }; +/// Validates payloads for their given topic's schema. +#[derive(Debug, Default)] +pub struct Validator { + /// Caches the schema for given topics. + schemas: BTreeMap>, +} + +impl Validator { + /// Validate a message for a given topic's schema. + pub fn validate_message_schema( + &mut self, + topic: KafkaTopic, + message: &[u8], + ) -> Result<(), SchemaError> { + let Some(schema) = self.get_schema(topic)? else { return Ok(())}; + let message_value = serde_json::from_slice(message).map_err(SchemaError::MessageJson)?; - let schema = match sentry_kafka_schemas::get_schema(logical_topic_name, None) { - Ok(schema) => schema, - // No topic found - Err(_) => return Ok(()), - }; - let schema = serde_json::from_str(&schema.schema).map_err(SchemaError::SchemaJson)?; - let schema = - JSONSchema::compile(&schema).map_err(|e| SchemaError::SchemaCompiled(e.to_string()))?; - let message_value = serde_json::from_slice(message).map_err(SchemaError::MessageJson)?; + if let Err(e) = schema.validate(&message_value) { + let mut result = String::new(); + for error in e { + writeln!(result, "{}", error).unwrap(); + } - if let Err(e) = schema.validate(&message_value) { - let mut result = String::new(); - for error in e { - writeln!(result, "{}", error).unwrap(); + return Err(SchemaError::Message(result)); } - return Err(SchemaError::Message(result)); + Ok(()) } - Ok(()) + fn get_schema(&mut self, topic: KafkaTopic) -> Result, SchemaError> { + Ok(match self.schemas.entry(topic) { + Entry::Vacant(entry) => { + entry.insert({ + let default_assignments = TopicAssignments::default(); + let logical_topic_name = match default_assignments.get(topic) { + TopicAssignment::Primary(logical_topic_name) => logical_topic_name, + _ => return Err(SchemaError::LogicalTopic), + }; + + let schema = match sentry_kafka_schemas::get_schema(logical_topic_name, None) { + Ok(schema) => schema, + // No topic found + Err(_) => return Ok(None), + }; + let schema = + serde_json::from_str(&schema.schema).map_err(SchemaError::SchemaJson)?; + let schema = JSONSchema::compile(&schema) + .map_err(|e| SchemaError::SchemaCompiled(e.to_string()))?; + Some(schema) + }) + } + Entry::Occupied(entry) => entry.into_mut(), + } + .as_ref()) + } } From 226cd44c8dbf695cb900af9c01c53fb6f9017e98 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 13 Apr 2023 19:28:11 +0200 Subject: [PATCH 7/7] bump sentry-kafka-schemas and improve metrics logging --- Cargo.lock | 15 ++------------- relay-kafka/Cargo.toml | 2 +- relay-server/src/actors/envelopes.rs | 2 +- 3 files changed, 4 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1393d6aa92..a74a6b2b9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2882,16 +2882,6 @@ version = "0.6.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" -[[package]] -name = "regress" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d995d590bd8ec096d1893f414bf3f5e8b0ee4c9eed9a5642b9766ef2c8e2e8e9" -dependencies = [ - "hashbrown 0.13.2", - "memchr", -] - [[package]] name = "relay" version = "23.3.1" @@ -3668,11 +3658,10 @@ dependencies = [ [[package]] name = "sentry-kafka-schemas" -version = "0.0.26" +version = "0.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "983b6a74344b3c7c4c997ee56de6b25dbc6f173c6aabb69069bb10239c581537" +checksum = "5de282337a0c5e35a16e75c7849528b4fbaa64ac5e782fcb3ee48dc3063478a3" dependencies = [ - "regress", "serde", "serde_json", "serde_yaml 0.9.17", diff --git a/relay-kafka/Cargo.toml b/relay-kafka/Cargo.toml index 6a82af4639..e83c8dd9d5 100644 --- a/relay-kafka/Cargo.toml +++ b/relay-kafka/Cargo.toml @@ -18,7 +18,7 @@ rmp-serde = { version = "1.1.1", optional = true } serde = { version = "1.0.114", features = ["derive"] } serde_json = { version = "1.0.55", optional = true } thiserror = "1.0.38" -sentry-kafka-schemas = { version = "0.0.26", default_features = false } +sentry-kafka-schemas = { version = "0.0.29", default_features = false } jsonschema = "0.17.0" [dev-dependencies] diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index 1df7a3999d..2c283438ef 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -348,7 +348,7 @@ impl EnvelopeManagerService { if let Err(err) = result { relay_log::trace!( "failed to submit the envelope, merging buckets back: {}", - err + LogError(&err) ); self.aggregator .send(MergeBuckets::new(scoping.project_key, buckets));