diff --git a/.changesets/feat_tninesling_coprocessor_cost_access.md b/.changesets/feat_tninesling_coprocessor_cost_access.md new file mode 100644 index 0000000000..a8c1a7a6f5 --- /dev/null +++ b/.changesets/feat_tninesling_coprocessor_cost_access.md @@ -0,0 +1,5 @@ +### Move cost context values into DashMap ([PR #5972](https://github.com/apollographql/router/pull/5972)) + +Allow Rhai scripts and coprocessors to access demand control information via context. + +By [@tninesling](https://github.com/tninesling) in https://github.com/apollographql/router/pull/5972 diff --git a/apollo-router/src/plugins/demand_control/mod.rs b/apollo-router/src/plugins/demand_control/mod.rs index b3faaef747..0e779cad01 100644 --- a/apollo-router/src/plugins/demand_control/mod.rs +++ b/apollo-router/src/plugins/demand_control/mod.rs @@ -42,36 +42,11 @@ use crate::Context; pub(crate) mod cost_calculator; pub(crate) mod strategy; -/// The cost calculation information stored in context for use in telemetry and other plugins that need to know what cost was calculated. -#[derive(Debug, Clone)] -pub(crate) struct CostContext { - pub(crate) estimated: f64, - pub(crate) actual: f64, - pub(crate) result: &'static str, - pub(crate) strategy: &'static str, -} - -impl Default for CostContext { - fn default() -> Self { - Self { - estimated: 0.0, - actual: 0.0, - result: "COST_OK", - strategy: "COST_STRATEGY_UNKNOWN", - } - } -} - -impl CostContext { - pub(crate) fn delta(&self) -> f64 { - self.estimated - self.actual - } - - pub(crate) fn result(&mut self, error: DemandControlError) -> DemandControlError { - self.result = error.code(); - error - } -} +pub(crate) static COST_ESTIMATED_KEY: &str = "cost.estimated"; +pub(crate) static COST_ACTUAL_KEY: &str = "cost.actual"; +pub(crate) static COST_DELTA_KEY: &str = "cost.delta"; +pub(crate) static COST_RESULT_KEY: &str = "cost.result"; +pub(crate) static COST_STRATEGY_KEY: &str = "cost.strategy"; /// Algorithm for calculating the cost of an incoming query. #[derive(Clone, Debug, Deserialize, JsonSchema)] @@ -146,6 +121,8 @@ pub(crate) enum DemandControlError { QueryParseFailure(String), /// {0} SubgraphOperationNotInitialized(crate::query_planner::fetch::SubgraphOperationNotInitialized), + /// {0} + ContextSerializationError(String), } impl IntoGraphQLErrors for DemandControlError { @@ -182,6 +159,10 @@ impl IntoGraphQLErrors for DemandControlError { .message(self.to_string()) .build()]), DemandControlError::SubgraphOperationNotInitialized(e) => Ok(e.into_graphql_errors()), + DemandControlError::ContextSerializationError(_) => Ok(vec![graphql::Error::builder() + .extension_code(self.code()) + .message(self.to_string()) + .build()]), } } } @@ -193,6 +174,7 @@ impl DemandControlError { DemandControlError::ActualCostTooExpensive { .. } => "COST_ACTUAL_TOO_EXPENSIVE", DemandControlError::QueryParseFailure(_) => "COST_QUERY_PARSE_FAILURE", DemandControlError::SubgraphOperationNotInitialized(e) => e.code(), + DemandControlError::ContextSerializationError(_) => "COST_CONTEXT_SERIALIZATION_ERROR", } } } @@ -219,6 +201,58 @@ impl<'a> From> for DemandControlError { } } +impl Context { + pub(crate) fn insert_estimated_cost(&self, cost: f64) -> Result<(), DemandControlError> { + self.insert(COST_ESTIMATED_KEY, cost) + .map_err(|e| DemandControlError::ContextSerializationError(e.to_string()))?; + Ok(()) + } + + pub(crate) fn get_estimated_cost(&self) -> Result, DemandControlError> { + self.get::<&str, f64>(COST_ESTIMATED_KEY) + .map_err(|e| DemandControlError::ContextSerializationError(e.to_string())) + } + + pub(crate) fn insert_actual_cost(&self, cost: f64) -> Result<(), DemandControlError> { + self.insert(COST_ACTUAL_KEY, cost) + .map_err(|e| DemandControlError::ContextSerializationError(e.to_string()))?; + Ok(()) + } + + pub(crate) fn get_actual_cost(&self) -> Result, DemandControlError> { + self.get::<&str, f64>(COST_ACTUAL_KEY) + .map_err(|e| DemandControlError::ContextSerializationError(e.to_string())) + } + + pub(crate) fn get_cost_delta(&self) -> Result, DemandControlError> { + let estimated = self.get_estimated_cost()?; + let actual = self.get_actual_cost()?; + Ok(estimated.zip(actual).map(|(est, act)| est - act)) + } + + pub(crate) fn insert_cost_result(&self, result: String) -> Result<(), DemandControlError> { + self.insert(COST_RESULT_KEY, result) + .map_err(|e| DemandControlError::ContextSerializationError(e.to_string()))?; + Ok(()) + } + + pub(crate) fn get_cost_result(&self) -> Result, DemandControlError> { + self.get::<&str, String>(COST_RESULT_KEY) + .map_err(|e| DemandControlError::ContextSerializationError(e.to_string())) + } + + pub(crate) fn insert_cost_strategy(&self, strategy: String) -> Result<(), DemandControlError> { + self.insert(COST_STRATEGY_KEY, strategy) + .map_err(|e| DemandControlError::ContextSerializationError(e.to_string()))?; + Ok(()) + } + + pub(crate) fn get_cost_strategy(&self) -> Result, DemandControlError> { + self.get::<&str, String>(COST_STRATEGY_KEY) + .map_err(|e| DemandControlError::ContextSerializationError(e.to_string())) + } +} + pub(crate) struct DemandControl { config: DemandControlConfig, strategy_factory: StrategyFactory, @@ -227,8 +261,10 @@ pub(crate) struct DemandControl { impl DemandControl { fn report_operation_metric(context: Context) { let result = context - .extensions() - .with_lock(|lock| lock.get::().map_or("NO_CONTEXT", |c| c.result)); + .get(COST_RESULT_KEY) + .ok() + .flatten() + .unwrap_or("NO_CONTEXT".to_string()); u64_counter!( "apollo.router.operations.demand_control", "Total operations with demand control enabled", diff --git a/apollo-router/src/plugins/demand_control/strategy/static_estimated.rs b/apollo-router/src/plugins/demand_control/strategy/static_estimated.rs index 41de6926b4..3ee1894473 100644 --- a/apollo-router/src/plugins/demand_control/strategy/static_estimated.rs +++ b/apollo-router/src/plugins/demand_control/strategy/static_estimated.rs @@ -3,7 +3,6 @@ use apollo_compiler::ExecutableDocument; use crate::graphql; use crate::plugins::demand_control::cost_calculator::static_cost::StaticCostCalculator; use crate::plugins::demand_control::strategy::StrategyImpl; -use crate::plugins::demand_control::CostContext; use crate::plugins::demand_control::DemandControlError; use crate::services::execution; use crate::services::subgraph; @@ -20,21 +19,24 @@ impl StrategyImpl for StaticEstimated { self.cost_calculator .planned(&request.query_plan) .and_then(|cost| { - request.context.extensions().with_lock(|mut lock| { - let cost_result = lock.get_or_default_mut::(); - cost_result.strategy = "static_estimated"; - cost_result.estimated = cost; - if cost > self.max { - Err( - cost_result.result(DemandControlError::EstimatedCostTooExpensive { - estimated_cost: cost, - max_cost: self.max, - }), - ) - } else { - Ok(()) - } - }) + request + .context + .insert_cost_strategy("static_estimated".to_string())?; + request.context.insert_cost_result("COST_OK".to_string())?; + request.context.insert_estimated_cost(cost)?; + + if cost > self.max { + let error = DemandControlError::EstimatedCostTooExpensive { + estimated_cost: cost, + max_cost: self.max, + }; + request + .context + .insert_cost_result(error.code().to_string())?; + Err(error) + } else { + Ok(()) + } }) } @@ -58,9 +60,7 @@ impl StrategyImpl for StaticEstimated { ) -> Result<(), DemandControlError> { if response.data.is_some() { let cost = self.cost_calculator.actual(request, response)?; - context - .extensions() - .with_lock(|mut lock| lock.get_or_default_mut::().actual = cost); + context.insert_actual_cost(cost)?; } Ok(()) } diff --git a/apollo-router/src/plugins/demand_control/strategy/test.rs b/apollo-router/src/plugins/demand_control/strategy/test.rs index 347f77d79f..265613472d 100644 --- a/apollo-router/src/plugins/demand_control/strategy/test.rs +++ b/apollo-router/src/plugins/demand_control/strategy/test.rs @@ -3,7 +3,6 @@ use apollo_compiler::ExecutableDocument; use crate::plugins::demand_control::strategy::StrategyImpl; use crate::plugins::demand_control::test::TestError; use crate::plugins::demand_control::test::TestStage; -use crate::plugins::demand_control::CostContext; use crate::plugins::demand_control::DemandControlError; use crate::services::execution::Request; use crate::services::subgraph::Response; @@ -17,32 +16,38 @@ pub(crate) struct Test { impl StrategyImpl for Test { fn on_execution_request(&self, request: &Request) -> Result<(), DemandControlError> { - request.context.extensions().with_lock(|mut lock| { - let cost_context = lock.get_or_default_mut::(); - match self { - Test { - stage: TestStage::ExecutionRequest, - error, - } => Err(cost_context.result(error.into())), - _ => Ok(()), + match self { + Test { + stage: TestStage::ExecutionRequest, + error, + } => { + let error: DemandControlError = error.into(); + request + .context + .insert_cost_result(error.code().to_string())?; + Err(error) } - }) + _ => Ok(()), + } } fn on_subgraph_request( &self, request: &crate::services::subgraph::Request, ) -> Result<(), DemandControlError> { - request.context.extensions().with_lock(|mut lock| { - let cost_context = lock.get_or_default_mut::(); - match self { - Test { - stage: TestStage::SubgraphRequest, - error, - } => Err(cost_context.result(error.into())), - _ => Ok(()), + match self { + Test { + stage: TestStage::SubgraphRequest, + error, + } => { + let error: DemandControlError = error.into(); + request + .context + .insert_cost_result(error.code().to_string())?; + Err(error) } - }) + _ => Ok(()), + } } fn on_subgraph_response( @@ -50,16 +55,19 @@ impl StrategyImpl for Test { _request: &ExecutableDocument, response: &Response, ) -> Result<(), DemandControlError> { - response.context.extensions().with_lock(|mut lock| { - let cost_context = lock.get_or_default_mut::(); - match self { - Test { - stage: TestStage::SubgraphResponse, - error, - } => Err(cost_context.result(error.into())), - _ => Ok(()), + match self { + Test { + stage: TestStage::SubgraphResponse, + error, + } => { + let error: DemandControlError = error.into(); + response + .context + .insert_cost_result(error.code().to_string())?; + Err(error) } - }) + _ => Ok(()), + } } fn on_execution_response( @@ -68,15 +76,16 @@ impl StrategyImpl for Test { _request: &ExecutableDocument, _response: &crate::graphql::Response, ) -> Result<(), DemandControlError> { - context.extensions().with_lock(|mut lock| { - let cost_context = lock.get_or_default_mut::(); - match self { - Test { - stage: TestStage::ExecutionResponse, - error, - } => Err(cost_context.result(error.into())), - _ => Ok(()), + match self { + Test { + stage: TestStage::ExecutionResponse, + error, + } => { + let error: DemandControlError = error.into(); + context.insert_cost_result(error.code().to_string())?; + Err(error) } - }) + _ => Ok(()), + } } } diff --git a/apollo-router/src/plugins/rhai/engine.rs b/apollo-router/src/plugins/rhai/engine.rs index da87655c17..062143711f 100644 --- a/apollo-router/src/plugins/rhai/engine.rs +++ b/apollo-router/src/plugins/rhai/engine.rs @@ -46,6 +46,10 @@ use crate::graphql::Response; use crate::http_ext; use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS; use crate::plugins::cache::entity::CONTEXT_CACHE_KEY; +use crate::plugins::demand_control::COST_ACTUAL_KEY; +use crate::plugins::demand_control::COST_ESTIMATED_KEY; +use crate::plugins::demand_control::COST_RESULT_KEY; +use crate::plugins::demand_control::COST_STRATEGY_KEY; use crate::plugins::subscription::SUBSCRIPTION_WS_CUSTOM_CONNECTION_PARAMS; use crate::query_planner::APOLLO_OPERATION_ID; use crate::Context; @@ -1777,6 +1781,14 @@ impl Rhai { ); global_variables.insert("APOLLO_ENTITY_CACHE_KEY".into(), CONTEXT_CACHE_KEY.into()); global_variables.insert("APOLLO_OPERATION_ID".into(), APOLLO_OPERATION_ID.into()); + // Demand Control Context Keys + global_variables.insert( + "APOLLO_COST_ESTIMATED_KEY".into(), + COST_ESTIMATED_KEY.into(), + ); + global_variables.insert("APOLLO_COST_ACTUAL_KEY".into(), COST_ACTUAL_KEY.into()); + global_variables.insert("APOLLO_COST_STRATEGY_KEY".into(), COST_STRATEGY_KEY.into()); + global_variables.insert("APOLLO_COST_RESULT_KEY".into(), COST_RESULT_KEY.into()); let shared_globals = Arc::new(global_variables); diff --git a/apollo-router/src/plugins/rhai/tests.rs b/apollo-router/src/plugins/rhai/tests.rs index b47c25774d..11c4f3a03a 100644 --- a/apollo-router/src/plugins/rhai/tests.rs +++ b/apollo-router/src/plugins/rhai/tests.rs @@ -785,3 +785,63 @@ async fn test_router_service_adds_timestamp_header() -> Result<(), BoxError> { Ok(()) } + +#[tokio::test] +async fn it_can_access_demand_control_context() -> Result<(), BoxError> { + let mut mock_service = MockSupergraphService::new(); + mock_service + .expect_call() + .times(1) + .returning(move |req: SupergraphRequest| { + Ok(SupergraphResponse::fake_builder() + .context(req.context) + .build() + .unwrap()) + }); + + let dyn_plugin: Box = crate::plugin::plugins() + .find(|factory| factory.name == "apollo.rhai") + .expect("Plugin not found") + .create_instance_without_schema( + &Value::from_str(r#"{"scripts":"tests/fixtures", "main":"demand_control.rhai"}"#) + .unwrap(), + ) + .await + .unwrap(); + + let mut router_service = dyn_plugin.supergraph_service(BoxService::new(mock_service)); + let context = Context::new(); + context.insert_estimated_cost(50.0).unwrap(); + context.insert_actual_cost(35.0).unwrap(); + context + .insert_cost_strategy("test_strategy".to_string()) + .unwrap(); + context.insert_cost_result("COST_OK".to_string()).unwrap(); + let supergraph_req = SupergraphRequest::fake_builder().context(context).build()?; + + let service_response = router_service.ready().await?.call(supergraph_req).await?; + assert_eq!(StatusCode::OK, service_response.response.status()); + + let headers = service_response.response.headers().clone(); + let demand_control_header = headers + .get("demand-control-estimate") + .map(|h| h.to_str().unwrap()); + assert_eq!(demand_control_header, Some("50.0")); + + let demand_control_header = headers + .get("demand-control-actual") + .map(|h| h.to_str().unwrap()); + assert_eq!(demand_control_header, Some("35.0")); + + let demand_control_header = headers + .get("demand-control-strategy") + .map(|h| h.to_str().unwrap()); + assert_eq!(demand_control_header, Some("test_strategy")); + + let demand_control_header = headers + .get("demand-control-result") + .map(|h| h.to_str().unwrap()); + assert_eq!(demand_control_header, Some("COST_OK")); + + Ok(()) +} diff --git a/apollo-router/src/plugins/telemetry/config_new/cost/mod.rs b/apollo-router/src/plugins/telemetry/config_new/cost/mod.rs index 31693b6a31..8341790eac 100644 --- a/apollo-router/src/plugins/telemetry/config_new/cost/mod.rs +++ b/apollo-router/src/plugins/telemetry/config_new/cost/mod.rs @@ -13,7 +13,10 @@ use super::attributes::StandardAttribute; use super::instruments::Increment; use super::instruments::StaticInstrument; use crate::metrics; -use crate::plugins::demand_control::CostContext; +use crate::plugins::demand_control::COST_ACTUAL_KEY; +use crate::plugins::demand_control::COST_DELTA_KEY; +use crate::plugins::demand_control::COST_ESTIMATED_KEY; +use crate::plugins::demand_control::COST_RESULT_KEY; use crate::plugins::telemetry::config::AttributeValue; use crate::plugins::telemetry::config_new::attributes::SupergraphAttributes; use crate::plugins::telemetry::config_new::conditions::Condition; @@ -38,10 +41,6 @@ pub(crate) const APOLLO_PRIVATE_COST_STRATEGY: Key = pub(crate) const APOLLO_PRIVATE_COST_RESULT: Key = Key::from_static_str("apollo_private.cost.result"); -static COST_ESTIMATED: &str = "cost.estimated"; -static COST_ACTUAL: &str = "cost.actual"; -static COST_DELTA: &str = "cost.delta"; - /// Attributes for Cost #[derive(Deserialize, JsonSchema, Clone, Default, Debug, PartialEq)] #[serde(deny_unknown_fields, default)] @@ -79,43 +78,60 @@ impl Selectors for SupergraphCostAttributes { fn on_response_event(&self, _response: &Self::EventResponse, ctx: &Context) -> Vec { let mut attrs = Vec::with_capacity(4); - let cost_result = ctx - .extensions() - .with_lock(|lock| lock.get::().cloned()); - if let Some(cost_result) = cost_result { - if let Some(key) = self - .cost_estimated - .as_ref() - .and_then(|a| a.key(Key::from_static_str("cost.estimated"))) - { - attrs.push(KeyValue::new(key, cost_result.estimated)); - } - if let Some(key) = self - .cost_actual - .as_ref() - .and_then(|a| a.key(Key::from_static_str("cost.actual"))) - { - attrs.push(KeyValue::new(key, cost_result.actual)); - } - if let Some(key) = self - .cost_delta - .as_ref() - .and_then(|a| a.key(Key::from_static_str("cost.delta"))) - { - attrs.push(KeyValue::new(key, cost_result.delta())); - } - if let Some(key) = self - .cost_result - .as_ref() - .and_then(|a| a.key(Key::from_static_str("cost.result"))) - { - attrs.push(KeyValue::new(key, cost_result.result)); - } + if let Some(estimated_cost) = self.estimated_cost_if_configured(ctx) { + attrs.push(estimated_cost); + } + if let Some(actual_cost) = self.actual_cost_if_configured(ctx) { + attrs.push(actual_cost); + } + if let Some(cost_delta) = self.cost_delta_if_configured(ctx) { + attrs.push(cost_delta); + } + if let Some(cost_result) = self.cost_result_if_configured(ctx) { + attrs.push(cost_result); } attrs } } +impl SupergraphCostAttributes { + fn estimated_cost_if_configured(&self, ctx: &Context) -> Option { + let key = self + .cost_estimated + .as_ref()? + .key(Key::from_static_str(COST_ESTIMATED_KEY))?; + let value = ctx.get_estimated_cost().ok()??; + Some(KeyValue::new(key, value)) + } + + fn actual_cost_if_configured(&self, ctx: &Context) -> Option { + let key = self + .cost_actual + .as_ref()? + .key(Key::from_static_str(COST_ACTUAL_KEY))?; + let value = ctx.get_actual_cost().ok()??; + Some(KeyValue::new(key, value)) + } + + fn cost_delta_if_configured(&self, ctx: &Context) -> Option { + let key = self + .cost_delta + .as_ref()? + .key(Key::from_static_str("cost.delta"))?; + let value = ctx.get_cost_delta().ok()??; + Some(KeyValue::new(key, value)) + } + + fn cost_result_if_configured(&self, ctx: &Context) -> Option { + let key = self + .cost_result + .as_ref()? + .key(Key::from_static_str(COST_RESULT_KEY))?; + let value = ctx.get_cost_result().ok()??; + Some(KeyValue::new(key, value)) + } +} + #[derive(Deserialize, JsonSchema, Clone, Default, Debug)] #[serde(deny_unknown_fields, default)] pub(crate) struct CostInstrumentsConfig { @@ -139,14 +155,14 @@ impl CostInstrumentsConfig { .meter(crate::plugins::telemetry::config_new::instruments::METER_NAME); [( - COST_ESTIMATED.to_string(), - StaticInstrument::Histogram(meter.f64_histogram(COST_ESTIMATED).with_description("Estimated cost of the operation using the currently configured cost model").init()), + COST_ESTIMATED_KEY.to_string(), + StaticInstrument::Histogram(meter.f64_histogram(COST_ESTIMATED_KEY).with_description("Estimated cost of the operation using the currently configured cost model").init()), ),( - COST_ACTUAL.to_string(), - StaticInstrument::Histogram(meter.f64_histogram(COST_ACTUAL).with_description("Actual cost of the operation using the currently configured cost model").init()), + COST_ACTUAL_KEY.to_string(), + StaticInstrument::Histogram(meter.f64_histogram(COST_ACTUAL_KEY).with_description("Actual cost of the operation using the currently configured cost model").init()), ),( - COST_DELTA.to_string(), - StaticInstrument::Histogram(meter.f64_histogram(COST_DELTA).with_description("Delta between the estimated and actual cost of the operation using the currently configured cost model").init()), + COST_DELTA_KEY.to_string(), + StaticInstrument::Histogram(meter.f64_histogram(COST_DELTA_KEY).with_description("Delta between the estimated and actual cost of the operation using the currently configured cost model").init()), )] .into_iter() .collect() @@ -158,7 +174,7 @@ impl CostInstrumentsConfig { ) -> CostInstruments { let cost_estimated = self.cost_estimated.is_enabled().then(|| { Self::histogram( - COST_ESTIMATED, + COST_ESTIMATED_KEY, &self.cost_estimated, SupergraphSelector::Cost { cost: CostValue::Estimated, @@ -169,7 +185,7 @@ impl CostInstrumentsConfig { let cost_actual = self.cost_actual.is_enabled().then(|| { Self::histogram( - COST_ACTUAL, + COST_ACTUAL_KEY, &self.cost_actual, SupergraphSelector::Cost { cost: CostValue::Actual, @@ -180,7 +196,7 @@ impl CostInstrumentsConfig { let cost_delta = self.cost_delta.is_enabled().then(|| { Self::histogram( - COST_DELTA, + COST_DELTA_KEY, &self.cost_delta, SupergraphSelector::Cost { cost: CostValue::Delta, @@ -331,26 +347,30 @@ pub(crate) enum CostValue { } pub(crate) fn add_cost_attributes(context: &Context, custom_attributes: &mut Vec) { - context.extensions().with_lock(|c| { - if let Some(cost) = c.get::() { - custom_attributes.push(KeyValue::new( - APOLLO_PRIVATE_COST_ESTIMATED.clone(), - AttributeValue::F64(cost.estimated), - )); - custom_attributes.push(KeyValue::new( - APOLLO_PRIVATE_COST_ACTUAL.clone(), - AttributeValue::F64(cost.actual), - )); - custom_attributes.push(KeyValue::new( - APOLLO_PRIVATE_COST_RESULT.clone(), - AttributeValue::String(cost.result.into()), - )); - custom_attributes.push(KeyValue::new( - APOLLO_PRIVATE_COST_STRATEGY.clone(), - AttributeValue::String(cost.strategy.into()), - )); - } - }); + if let Ok(Some(cost)) = context.get_estimated_cost() { + custom_attributes.push(KeyValue::new( + APOLLO_PRIVATE_COST_ESTIMATED.clone(), + AttributeValue::F64(cost), + )); + } + if let Ok(Some(cost)) = context.get_actual_cost() { + custom_attributes.push(KeyValue::new( + APOLLO_PRIVATE_COST_ACTUAL.clone(), + AttributeValue::F64(cost), + )); + } + if let Ok(Some(result)) = context.get_cost_result() { + custom_attributes.push(KeyValue::new( + APOLLO_PRIVATE_COST_RESULT.clone(), + AttributeValue::String(result), + )); + } + if let Ok(Some(strategy)) = context.get_cost_strategy() { + custom_attributes.push(KeyValue::new( + APOLLO_PRIVATE_COST_STRATEGY.clone(), + AttributeValue::String(strategy), + )); + } } #[cfg(test)] @@ -358,7 +378,6 @@ mod test { use std::sync::Arc; use crate::context::OPERATION_NAME; - use crate::plugins::demand_control::CostContext; use crate::plugins::telemetry::config_new::cost::CostInstruments; use crate::plugins::telemetry::config_new::cost::CostInstrumentsConfig; use crate::plugins::telemetry::config_new::instruments::Instrumented; @@ -463,13 +482,11 @@ mod test { fn make_request(instruments: &CostInstruments) { let context = Context::new(); - context.extensions().with_lock(|mut lock| { - lock.insert(CostContext::default()); - let cost_result = lock.get_or_default_mut::(); - cost_result.estimated = 100.0; - cost_result.actual = 10.0; - cost_result.result = "COST_TOO_EXPENSIVE" - }); + context.insert_estimated_cost(100.0).unwrap(); + context.insert_actual_cost(10.0).unwrap(); + context + .insert_cost_result("COST_TOO_EXPENSIVE".to_string()) + .unwrap(); let _ = context.insert(OPERATION_NAME, "Test".to_string()).unwrap(); instruments.on_request( &supergraph::Request::fake_builder() diff --git a/apollo-router/src/plugins/telemetry/config_new/selectors.rs b/apollo-router/src/plugins/telemetry/config_new/selectors.rs index 9047764a80..8c9c8dde7a 100644 --- a/apollo-router/src/plugins/telemetry/config_new/selectors.rs +++ b/apollo-router/src/plugins/telemetry/config_new/selectors.rs @@ -14,7 +14,6 @@ use crate::plugin::serde::deserialize_json_query; use crate::plugin::serde::deserialize_jsonpath; use crate::plugins::cache::entity::CacheSubgraph; use crate::plugins::cache::metrics::CacheMetricContextKey; -use crate::plugins::demand_control::CostContext; use crate::plugins::telemetry::config::AttributeValue; use crate::plugins::telemetry::config::TraceIdFormat; use crate::plugins::telemetry::config_new::cost::CostValue; @@ -1090,14 +1089,28 @@ impl Selector for SupergraphSelector { val.maybe_to_otel_value() } .or_else(|| default.maybe_to_otel_value()), - SupergraphSelector::Cost { cost } => ctx.extensions().with_lock(|lock| { - lock.get::().map(|cost_result| match cost { - CostValue::Estimated => cost_result.estimated.into(), - CostValue::Actual => cost_result.actual.into(), - CostValue::Delta => cost_result.delta().into(), - CostValue::Result => cost_result.result.into(), - }) - }), + SupergraphSelector::Cost { cost } => match cost { + CostValue::Estimated => ctx + .get_estimated_cost() + .ok() + .flatten() + .map(opentelemetry::Value::from), + CostValue::Actual => ctx + .get_actual_cost() + .ok() + .flatten() + .map(opentelemetry::Value::from), + CostValue::Delta => ctx + .get_cost_delta() + .ok() + .flatten() + .map(opentelemetry::Value::from), + CostValue::Result => ctx + .get_cost_result() + .ok() + .flatten() + .map(opentelemetry::Value::from), + }, SupergraphSelector::OnGraphQLError { on_graphql_error } if *on_graphql_error => { if ctx.get_json_value(CONTAINS_GRAPHQL_ERROR) == Some(serde_json_bytes::Value::Bool(true)) diff --git a/apollo-router/src/plugins/telemetry/mod.rs b/apollo-router/src/plugins/telemetry/mod.rs index 7fdc8cf496..886bebea1a 100644 --- a/apollo-router/src/plugins/telemetry/mod.rs +++ b/apollo-router/src/plugins/telemetry/mod.rs @@ -1516,12 +1516,11 @@ impl Telemetry { let root_error_stats = Self::per_path_error_stats(&traces); let limits_stats = context.extensions().with_lock(|guard| { let strategy = guard.get::(); - let cost_ctx = guard.get::(); let query_limits = guard.get::>(); SingleLimitsStats { strategy: strategy.and_then(|s| serde_json::to_string(&s.mode).ok()), - cost_estimated: cost_ctx.map(|ctx| ctx.estimated), - cost_actual: cost_ctx.map(|ctx| ctx.actual), + cost_estimated: context.get_estimated_cost().ok().flatten(), + cost_actual: context.get_actual_cost().ok().flatten(), // These limits are related to the Traffic Shaping feature, unrelated to the Demand Control plugin depth: query_limits.map_or(0, |ql| ql.depth as u64), @@ -2176,8 +2175,11 @@ mod tests { use crate::plugin::test::MockSubgraphService; use crate::plugin::test::MockSupergraphService; use crate::plugin::DynPlugin; - use crate::plugins::demand_control::CostContext; use crate::plugins::demand_control::DemandControlError; + use crate::plugins::demand_control::COST_ACTUAL_KEY; + use crate::plugins::demand_control::COST_ESTIMATED_KEY; + use crate::plugins::demand_control::COST_RESULT_KEY; + use crate::plugins::demand_control::COST_STRATEGY_KEY; use crate::plugins::telemetry::config::TraceIdFormat; use crate::plugins::telemetry::handle_error_internal; use crate::services::router::body::get_body_bytes; @@ -3249,6 +3251,14 @@ mod tests { ); } + #[derive(Clone)] + struct CostContext { + pub(crate) estimated: f64, + pub(crate) actual: f64, + pub(crate) result: &'static str, + pub(crate) strategy: &'static str, + } + async fn make_failed_demand_control_request(plugin: &dyn DynPlugin, cost_details: CostContext) { let mut mock_service = MockSupergraphService::new(); mock_service @@ -3258,6 +3268,18 @@ mod tests { req.context.extensions().with_lock(|mut lock| { lock.insert(cost_details.clone()); }); + req.context + .insert(COST_ESTIMATED_KEY, cost_details.estimated) + .unwrap(); + req.context + .insert(COST_ACTUAL_KEY, cost_details.actual) + .unwrap(); + req.context + .insert(COST_RESULT_KEY, cost_details.result.to_string()) + .unwrap(); + req.context + .insert(COST_STRATEGY_KEY, cost_details.strategy.to_string()) + .unwrap(); let errors = if cost_details.result == "COST_ESTIMATED_TOO_EXPENSIVE" { DemandControlError::EstimatedCostTooExpensive { diff --git a/apollo-router/src/router_factory.rs b/apollo-router/src/router_factory.rs index 640cbc9aa7..bb4c4a1522 100644 --- a/apollo-router/src/router_factory.rs +++ b/apollo-router/src/router_factory.rs @@ -682,11 +682,11 @@ pub(crate) async fn create_plugins( add_optional_apollo_plugin!("preview_file_uploads"); add_optional_apollo_plugin!("preview_entity_cache"); add_mandatory_apollo_plugin!("progressive_override"); + add_optional_apollo_plugin!("demand_control"); // This relative ordering is documented in `docs/source/customizations/native.mdx`: add_optional_apollo_plugin!("rhai"); add_optional_apollo_plugin!("coprocessor"); - add_optional_apollo_plugin!("demand_control"); add_user_plugins!(); // Macros above remove from `apollo_plugin_factories`, so anything left at the end diff --git a/apollo-router/tests/fixtures/demand_control.rhai b/apollo-router/tests/fixtures/demand_control.rhai new file mode 100644 index 0000000000..8232b1ad28 --- /dev/null +++ b/apollo-router/tests/fixtures/demand_control.rhai @@ -0,0 +1,18 @@ +fn supergraph_service(service) { + const response_callback = Fn("process_response"); + service.map_response(response_callback); +} + +fn process_response(response) { + const estimate = response.context[Router.APOLLO_COST_ESTIMATED_KEY]; + response.headers["demand-control-estimate"] = to_string(estimate); + + const actual = response.context[Router.APOLLO_COST_ACTUAL_KEY]; + response.headers["demand-control-actual"] = to_string(actual); + + const strategy = response.context[Router.APOLLO_COST_STRATEGY_KEY]; + response.headers["demand-control-strategy"] = strategy; + + const result = response.context[Router.APOLLO_COST_RESULT_KEY]; + response.headers["demand-control-result"] = result; +} \ No newline at end of file diff --git a/apollo-router/tests/integration/coprocessor.rs b/apollo-router/tests/integration/coprocessor.rs index 213947eae4..d1492fbc87 100644 --- a/apollo-router/tests/integration/coprocessor.rs +++ b/apollo-router/tests/integration/coprocessor.rs @@ -82,3 +82,71 @@ async fn test_coprocessor_limit_payload() -> Result<(), BoxError> { router.graceful_shutdown().await; Ok(()) } + +#[tokio::test(flavor = "multi_thread")] +async fn test_coprocessor_demand_control_access() -> Result<(), BoxError> { + if !graph_os_enabled() { + return Ok(()); + } + + let mock_server = wiremock::MockServer::start().await; + let coprocessor_address = mock_server.uri(); + + // Assert the execution request stage has access to the estimated cost + Mock::given(method("POST")) + .and(path("/")) + .and(body_partial_json(json!({ + "stage": "ExecutionRequest", + "context": { + "entries": { + "cost.estimated": 10.0, + "cost.result": "COST_OK", + "cost.strategy": "static_estimated" + }}}))) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "version":1, + "stage":"ExecutionRequest", + "control":"continue", + }))) + .expect(1) + .mount(&mock_server) + .await; + + // Assert the supergraph response stage also includes the actual cost + Mock::given(method("POST")) + .and(path("/")) + .and(body_partial_json(json!({ + "stage": "SupergraphResponse", + "context": {"entries": { + "cost.actual": 3.0, + "cost.estimated": 10.0, + "cost.result": "COST_OK", + "cost.strategy": "static_estimated" + }}}))) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "version":1, + "stage":"SupergraphResponse", + "control":"continue", + }))) + .expect(1) + .mount(&mock_server) + .await; + + let mut router = IntegrationTest::builder() + .config( + include_str!("fixtures/coprocessor_demand_control.router.yaml") + .replace("", &coprocessor_address), + ) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let (_trace_id, response) = router.execute_default_query().await; + assert_eq!(response.status(), 200); + + router.graceful_shutdown().await; + + Ok(()) +} diff --git a/apollo-router/tests/integration/fixtures/coprocessor_demand_control.router.yaml b/apollo-router/tests/integration/fixtures/coprocessor_demand_control.router.yaml new file mode 100644 index 0000000000..6535dd92bf --- /dev/null +++ b/apollo-router/tests/integration/fixtures/coprocessor_demand_control.router.yaml @@ -0,0 +1,17 @@ +# This coprocessor url will be updated to a test-scoped mock server +coprocessor: + url: "" + execution: + request: + context: true + supergraph: + response: + context: true + +demand_control: + enabled: true + mode: measure + strategy: + static_estimated: + list_size: 10 + max: 1000 \ No newline at end of file diff --git a/docs/source/customizations/rhai-api.mdx b/docs/source/customizations/rhai-api.mdx index c4e4a316f6..f0c0959d00 100644 --- a/docs/source/customizations/rhai-api.mdx +++ b/docs/source/customizations/rhai-api.mdx @@ -359,6 +359,10 @@ Router.APOLLO_AUTHENTICATION_JWT_CLAIMS // Context key to access authentication Router.APOLLO_SUBSCRIPTION_WS_CUSTOM_CONNECTION_PARAMS // Context key to modify or access the custom connection params when using subscriptions in WebSocket to subgraphs (cf subscription docs) Router.APOLLO_ENTITY_CACHE_KEY // Context key to access the entity cache key Router.APOLLO_OPERATION_ID // Context key to get the value of apollo operation id (studio trace id) from the context +Router.APOLLO_COST_ESTIMATED_KEY // Context key to get the estimated cost of an operation +Router.APOLLO_COST_ACTUAL_KEY // Context key to get the actual cost of an operation +Router.APOLLO_COST_STRATEGY_KEY // Context key to get the strategy used to calculate cost +Router.APOLLO_COST_RESULT_KEY // Context key to get the cost result of an operation ``` ## `Request` interface