Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Apply schema validation to all topics in local development #2013

Merged
merged 10 commits into from
Apr 14, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Metrics:
- Adds iPad support for device.class synthesis in light normalization. ([#2008](https://github.com/getsentry/relay/pull/2008))
- Pin schemars dependency to un-break schema docs generation. ([#2014](https://github.com/getsentry/relay/pull/2014))
- Remove global service registry. ([#2022](https://github.com/getsentry/relay/pull/2022))
- Apply schema validation to all topics in local development. ([#2013](https://github.com/getsentry/relay/pull/2013))

Monitors:

Expand Down
154 changes: 154 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions relay-kafka/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ rmp-serde = { version = "1.1.1", optional = true }
serde = { version = "1.0.114", features = ["derive"] }
serde_json = { version = "1.0.55", optional = true }
thiserror = "1.0.38"
sentry-kafka-schemas = { version = "0.0.29", default_features = false }
jsonschema = "0.17.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would make sense to hide the jsonschema dependency in sentry-kafka-schemas. That is, if we removed this dependency here, and sentry-kafka-schemas exposed a validate(topic, message) function.

That way, the library client (Relay) would not have to deal with compiling schemas, and the way that sentry-kafka-schemas validates would be completely encapsulated (i.e. you could change the library or even the format it uses for schema definition and validation).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my reply to the local cache comment. For now I would like to avoid that. There's still many unsolved questions around how the workflow in Relay will look like, what performance requirements we have on the entire system, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK! Definitely not a blocker.

As such it doesn't have many dependencies. This is a good thing if you're only using the generated types in your service.

Makes sense, though this could be solved with cargo features. I.e. there could be a codegen feature and a validation feature, and clients use either-or. My gut feeling is that 90% of clients that want to perform validation will not care about what schema definition language or validation library is used under the hood, hence the idea to hide it inside the library.

Copy link
Member Author

@untitaker untitaker Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm reconsidering my response now. one of the reasons i was hesitant to change this was because python currently also works the same (it just exposes the raw schema, doesn't prescribe a schema validation library)

but at the same time it would be good to have a vetted recommended library that everybody ought to use (for both python and rust)


[dev-dependencies]
serde_yaml = "0.9.17"
Expand Down
2 changes: 1 addition & 1 deletion relay-kafka/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum ConfigError {
}

/// Define the topics over which Relay communicates with Sentry.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum KafkaTopic {
/// Simple events (without attachments) topic.
Events,
Expand Down
21 changes: 21 additions & 0 deletions relay-kafka/src/producer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(debug_assertions)]
use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap};
use std::fmt;
use std::sync::Arc;
Expand All @@ -8,11 +10,16 @@ use relay_statsd::metric;
use thiserror::Error;

use crate::config::{KafkaConfig, KafkaParams, KafkaTopic};
#[cfg(debug_assertions)]
use crate::producer::schemas::Validator;
use crate::statsd::KafkaHistograms;

mod utils;
use utils::{CaptureErrorContext, ThreadedProducer};

#[cfg(debug_assertions)]
mod schemas;

/// Kafka producer errors.
#[derive(Error, Debug)]
pub enum ClientError {
Expand All @@ -36,6 +43,11 @@ pub enum ClientError {
#[error("failed to serialize json message")]
InvalidJson(#[source] serde_json::Error),

/// Failed to run schema validation on message.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be conditional?

Suggested change
/// Failed to run schema validation on message.
#[cfg(debug_assertions)]
/// Failed to run schema validation on message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it has to be, fixed

#[cfg(debug_assertions)]
#[error("failed to run schema validation on message")]
SchemaValidationFailed(#[source] schemas::SchemaError),

/// Configuration is wrong and it cannot be used to identify the number of a shard.
#[error("invalid kafka shard")]
InvalidShard,
Expand Down Expand Up @@ -126,6 +138,8 @@ impl fmt::Debug for ShardedProducer {
#[derive(Debug)]
pub struct KafkaClient {
producers: HashMap<KafkaTopic, Producer>,
#[cfg(debug_assertions)]
schema_validator: RefCell<schemas::Validator>,
}

impl KafkaClient {
Expand All @@ -142,6 +156,11 @@ impl KafkaClient {
message: &impl Message,
) -> Result<(), ClientError> {
let serialized = message.serialize()?;
#[cfg(debug_assertions)]
self.schema_validator
.borrow_mut()
.validate_message_schema(topic, &serialized)
.map_err(ClientError::SchemaValidationFailed)?;
let key = message.key();
self.send(topic, organization_id, &key, message.variant(), &serialized)
}
Expand Down Expand Up @@ -273,6 +292,8 @@ impl KafkaClientBuilder {
pub fn build(self) -> KafkaClient {
KafkaClient {
producers: self.producers,
#[cfg(debug_assertions)]
schema_validator: Validator::default().into(),
}
}
}
Expand Down
Loading