Skip to content

Commit

Permalink
ref(project): Treat invalid project states as pending and unify state…
Browse files Browse the repository at this point in the history
… types (#3770)

We currently deal with different envelope states inconsistently: Because
the state of a project is encoded into many separate fields and types,
it's hard to enforce that the config is validated correctly everywhere
that it is used.

For example, invalid (i.e. unparsable) project configs are sometimes
treated the same as pending, sometimes the same as disabled.
  • Loading branch information
jjbayer committed Jul 22, 2024
1 parent 1c49b90 commit 332619c
Show file tree
Hide file tree
Showing 24 changed files with 915 additions and 845 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

**Bug Fixes**:

- Do not drop envelopes for unparsable project configs. ([#3770](https://github.com/getsentry/relay/pull/3770))

**Features**:

- "Cardinality limit" outcomes now report which limit was exceeded. ([#3825](https://github.com/getsentry/relay/pull/3825))
Expand Down
2 changes: 1 addition & 1 deletion relay-dynamic-config/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub struct ProjectConfig {

impl ProjectConfig {
/// Validates fields in this project config and removes values that are partially invalid.
pub fn sanitize(&mut self) {
pub fn sanitized(&mut self) {
self.quotas.retain(Quota::is_valid);

metrics::convert_conditional_tagging(self);
Expand Down
4 changes: 3 additions & 1 deletion relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,16 +297,18 @@ fn queue_envelope(
}

// Split off the envelopes by item type.
let scoping = managed_envelope.scoping();
let envelopes = ProcessingGroup::split_envelope(*managed_envelope.take_envelope());
for (group, envelope) in envelopes {
let envelope = buffer_guard
let mut envelope = buffer_guard
.enter(
envelope,
state.outcome_aggregator().clone(),
state.test_store().clone(),
group,
)
.map_err(BadStoreRequest::QueueFailed)?;
envelope.scope(scoping);
state.project_cache().send(ValidateEnvelope::new(envelope));
}
// The entire envelope is taken for a split above, and it's empty at this point, we can just
Expand Down
36 changes: 24 additions & 12 deletions relay-server/src/endpoints/project_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::endpoints::forward;
use crate::extractors::SignedJson;
use crate::service::ServiceState;
use crate::services::global_config::{self, StatusResponse};
use crate::services::project::{LimitedProjectState, ProjectState};
use crate::services::project::{LimitedParsedProjectState, ParsedProjectState, ProjectState};
use crate::services::project_cache::{GetCachedProjectState, GetProjectState};

/// V2 version of this endpoint.
Expand Down Expand Up @@ -49,13 +49,13 @@ struct VersionQuery {
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
enum ProjectStateWrapper {
Full(ProjectState),
Limited(#[serde(with = "LimitedProjectState")] ProjectState),
Full(ParsedProjectState),
Limited(#[serde(with = "LimitedParsedProjectState")] ParsedProjectState),
}

impl ProjectStateWrapper {
/// Create a wrapper which forces serialization into external or internal format
pub fn new(state: ProjectState, full: bool) -> Self {
pub fn new(state: ParsedProjectState, full: bool) -> Self {
if full {
Self::Full(state)
} else {
Expand All @@ -76,7 +76,7 @@ impl ProjectStateWrapper {
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct GetProjectStatesResponseWrapper {
configs: HashMap<ProjectKey, Option<ProjectStateWrapper>>,
configs: HashMap<ProjectKey, ProjectStateWrapper>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pending: Vec<ProjectKey>,
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -123,7 +123,6 @@ async fn inner(
project_cache
.send(GetProjectState::new(project_key).no_cache(no_cache))
.await
.map(Some)
};

(project_key, state_result)
Expand All @@ -146,23 +145,36 @@ async fn inner(
let mut pending = Vec::with_capacity(keys_len);
let mut configs = HashMap::with_capacity(keys_len);
for (project_key, state_result) in future::join_all(futures).await {
let Some(project_state) = state_result? else {
pending.push(project_key);
continue;
let project_info = match state_result? {
ProjectState::Enabled(info) => info,
ProjectState::Disabled => {
// Don't insert project config. Downstream Relay will consider it disabled.
continue;
}
ProjectState::Pending => {
pending.push(project_key);
continue;
}
};

// If public key is known (even if rate-limited, which is Some(false)), it has
// access to the project config
let has_access = relay.internal
|| project_state
|| project_info
.config
.trusted_relays
.contains(&relay.public_key);

if has_access {
let full = relay.internal && inner.full_config;
let wrapper = ProjectStateWrapper::new((*project_state).clone(), full);
configs.insert(project_key, Some(wrapper));
let wrapper = ProjectStateWrapper::new(
ParsedProjectState {
disabled: false,
info: project_info.as_ref().clone(),
},
full,
);
configs.insert(project_key, wrapper);
} else {
relay_log::debug!(
relay = %relay.public_key,
Expand Down
19 changes: 19 additions & 0 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
//!
//! ```
use relay_base_schema::project::ProjectKey;
use std::borrow::Borrow;
use std::collections::BTreeMap;
use std::fmt;
Expand Down Expand Up @@ -1218,6 +1219,24 @@ impl Envelope {
self.headers.sent_at
}

/// Returns the project key defined in the `trace` header of the envelope.
///
/// This function returns `None` if:
/// - there is no [`DynamicSamplingContext`] in the envelope headers.
/// - there are no transactions or events in the envelope, since in this case sampling by trace is redundant.
pub fn sampling_key(&self) -> Option<ProjectKey> {
// If the envelope item is not of type transaction or event, we will not return a sampling key
// because it doesn't make sense to load the root project state if we don't perform trace
// sampling.
self.get_item_by(|item| {
matches!(
item.ty(),
ItemType::Transaction | ItemType::Event | ItemType::Span
)
})?;
self.dsc().map(|dsc| dsc.public_key)
}

/// Sets the event id on the envelope.
pub fn set_event_id(&mut self, event_id: EventId) {
self.headers.event_id = Some(event_id);
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/metrics_extraction/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod tests {
metric_extraction: metric_extraction.map(ErrorBoundary::Ok).unwrap_or_default(),
..ProjectConfig::default()
};
project.sanitize(); // enables metrics extraction rules
project.sanitized(); // enables metrics extraction rules
let project = project.metric_extraction.ok().unwrap();

OwnedConfig { global, project }
Expand Down
Loading

0 comments on commit 332619c

Please sign in to comment.