Skip to content

Commit

Permalink
Move cost context values into DashMap (#5972)
Browse files Browse the repository at this point in the history
  • Loading branch information
tninesling authored Sep 10, 2024
1 parent 3f03c43 commit d4ab58d
Show file tree
Hide file tree
Showing 14 changed files with 457 additions and 176 deletions.
5 changes: 5 additions & 0 deletions .changesets/feat_tninesling_coprocessor_cost_access.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### Move cost context values into DashMap ([PR #5972](https://github.com/apollographql/router/pull/5972))

Allow Rhai scripts and coprocessors to access demand control information via context.

By [@tninesling](https://github.com/tninesling) in https://github.com/apollographql/router/pull/5972
100 changes: 68 additions & 32 deletions apollo-router/src/plugins/demand_control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,36 +42,11 @@ use crate::Context;
pub(crate) mod cost_calculator;
pub(crate) mod strategy;

/// The cost calculation information stored in context for use in telemetry and other plugins that need to know what cost was calculated.
#[derive(Debug, Clone)]
pub(crate) struct CostContext {
pub(crate) estimated: f64,
pub(crate) actual: f64,
pub(crate) result: &'static str,
pub(crate) strategy: &'static str,
}

impl Default for CostContext {
fn default() -> Self {
Self {
estimated: 0.0,
actual: 0.0,
result: "COST_OK",
strategy: "COST_STRATEGY_UNKNOWN",
}
}
}

impl CostContext {
pub(crate) fn delta(&self) -> f64 {
self.estimated - self.actual
}

pub(crate) fn result(&mut self, error: DemandControlError) -> DemandControlError {
self.result = error.code();
error
}
}
pub(crate) static COST_ESTIMATED_KEY: &str = "cost.estimated";
pub(crate) static COST_ACTUAL_KEY: &str = "cost.actual";
pub(crate) static COST_DELTA_KEY: &str = "cost.delta";
pub(crate) static COST_RESULT_KEY: &str = "cost.result";
pub(crate) static COST_STRATEGY_KEY: &str = "cost.strategy";

/// Algorithm for calculating the cost of an incoming query.
#[derive(Clone, Debug, Deserialize, JsonSchema)]
Expand Down Expand Up @@ -146,6 +121,8 @@ pub(crate) enum DemandControlError {
QueryParseFailure(String),
/// {0}
SubgraphOperationNotInitialized(crate::query_planner::fetch::SubgraphOperationNotInitialized),
/// {0}
ContextSerializationError(String),
}

impl IntoGraphQLErrors for DemandControlError {
Expand Down Expand Up @@ -182,6 +159,10 @@ impl IntoGraphQLErrors for DemandControlError {
.message(self.to_string())
.build()]),
DemandControlError::SubgraphOperationNotInitialized(e) => Ok(e.into_graphql_errors()),
DemandControlError::ContextSerializationError(_) => Ok(vec![graphql::Error::builder()
.extension_code(self.code())
.message(self.to_string())
.build()]),
}
}
}
Expand All @@ -193,6 +174,7 @@ impl DemandControlError {
DemandControlError::ActualCostTooExpensive { .. } => "COST_ACTUAL_TOO_EXPENSIVE",
DemandControlError::QueryParseFailure(_) => "COST_QUERY_PARSE_FAILURE",
DemandControlError::SubgraphOperationNotInitialized(e) => e.code(),
DemandControlError::ContextSerializationError(_) => "COST_CONTEXT_SERIALIZATION_ERROR",
}
}
}
Expand All @@ -219,6 +201,58 @@ impl<'a> From<FieldLookupError<'a>> for DemandControlError {
}
}

impl Context {
pub(crate) fn insert_estimated_cost(&self, cost: f64) -> Result<(), DemandControlError> {
self.insert(COST_ESTIMATED_KEY, cost)
.map_err(|e| DemandControlError::ContextSerializationError(e.to_string()))?;
Ok(())
}

pub(crate) fn get_estimated_cost(&self) -> Result<Option<f64>, DemandControlError> {
self.get::<&str, f64>(COST_ESTIMATED_KEY)
.map_err(|e| DemandControlError::ContextSerializationError(e.to_string()))
}

pub(crate) fn insert_actual_cost(&self, cost: f64) -> Result<(), DemandControlError> {
self.insert(COST_ACTUAL_KEY, cost)
.map_err(|e| DemandControlError::ContextSerializationError(e.to_string()))?;
Ok(())
}

pub(crate) fn get_actual_cost(&self) -> Result<Option<f64>, DemandControlError> {
self.get::<&str, f64>(COST_ACTUAL_KEY)
.map_err(|e| DemandControlError::ContextSerializationError(e.to_string()))
}

pub(crate) fn get_cost_delta(&self) -> Result<Option<f64>, DemandControlError> {
let estimated = self.get_estimated_cost()?;
let actual = self.get_actual_cost()?;
Ok(estimated.zip(actual).map(|(est, act)| est - act))
}

pub(crate) fn insert_cost_result(&self, result: String) -> Result<(), DemandControlError> {
self.insert(COST_RESULT_KEY, result)
.map_err(|e| DemandControlError::ContextSerializationError(e.to_string()))?;
Ok(())
}

pub(crate) fn get_cost_result(&self) -> Result<Option<String>, DemandControlError> {
self.get::<&str, String>(COST_RESULT_KEY)
.map_err(|e| DemandControlError::ContextSerializationError(e.to_string()))
}

pub(crate) fn insert_cost_strategy(&self, strategy: String) -> Result<(), DemandControlError> {
self.insert(COST_STRATEGY_KEY, strategy)
.map_err(|e| DemandControlError::ContextSerializationError(e.to_string()))?;
Ok(())
}

pub(crate) fn get_cost_strategy(&self) -> Result<Option<String>, DemandControlError> {
self.get::<&str, String>(COST_STRATEGY_KEY)
.map_err(|e| DemandControlError::ContextSerializationError(e.to_string()))
}
}

pub(crate) struct DemandControl {
config: DemandControlConfig,
strategy_factory: StrategyFactory,
Expand All @@ -227,8 +261,10 @@ pub(crate) struct DemandControl {
impl DemandControl {
fn report_operation_metric(context: Context) {
let result = context
.extensions()
.with_lock(|lock| lock.get::<CostContext>().map_or("NO_CONTEXT", |c| c.result));
.get(COST_RESULT_KEY)
.ok()
.flatten()
.unwrap_or("NO_CONTEXT".to_string());
u64_counter!(
"apollo.router.operations.demand_control",
"Total operations with demand control enabled",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use apollo_compiler::ExecutableDocument;
use crate::graphql;
use crate::plugins::demand_control::cost_calculator::static_cost::StaticCostCalculator;
use crate::plugins::demand_control::strategy::StrategyImpl;
use crate::plugins::demand_control::CostContext;
use crate::plugins::demand_control::DemandControlError;
use crate::services::execution;
use crate::services::subgraph;
Expand All @@ -20,21 +19,24 @@ impl StrategyImpl for StaticEstimated {
self.cost_calculator
.planned(&request.query_plan)
.and_then(|cost| {
request.context.extensions().with_lock(|mut lock| {
let cost_result = lock.get_or_default_mut::<CostContext>();
cost_result.strategy = "static_estimated";
cost_result.estimated = cost;
if cost > self.max {
Err(
cost_result.result(DemandControlError::EstimatedCostTooExpensive {
estimated_cost: cost,
max_cost: self.max,
}),
)
} else {
Ok(())
}
})
request
.context
.insert_cost_strategy("static_estimated".to_string())?;
request.context.insert_cost_result("COST_OK".to_string())?;
request.context.insert_estimated_cost(cost)?;

if cost > self.max {
let error = DemandControlError::EstimatedCostTooExpensive {
estimated_cost: cost,
max_cost: self.max,
};
request
.context
.insert_cost_result(error.code().to_string())?;
Err(error)
} else {
Ok(())
}
})
}

Expand All @@ -58,9 +60,7 @@ impl StrategyImpl for StaticEstimated {
) -> Result<(), DemandControlError> {
if response.data.is_some() {
let cost = self.cost_calculator.actual(request, response)?;
context
.extensions()
.with_lock(|mut lock| lock.get_or_default_mut::<CostContext>().actual = cost);
context.insert_actual_cost(cost)?;
}
Ok(())
}
Expand Down
83 changes: 46 additions & 37 deletions apollo-router/src/plugins/demand_control/strategy/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use apollo_compiler::ExecutableDocument;
use crate::plugins::demand_control::strategy::StrategyImpl;
use crate::plugins::demand_control::test::TestError;
use crate::plugins::demand_control::test::TestStage;
use crate::plugins::demand_control::CostContext;
use crate::plugins::demand_control::DemandControlError;
use crate::services::execution::Request;
use crate::services::subgraph::Response;
Expand All @@ -17,49 +16,58 @@ pub(crate) struct Test {

impl StrategyImpl for Test {
fn on_execution_request(&self, request: &Request) -> Result<(), DemandControlError> {
request.context.extensions().with_lock(|mut lock| {
let cost_context = lock.get_or_default_mut::<CostContext>();
match self {
Test {
stage: TestStage::ExecutionRequest,
error,
} => Err(cost_context.result(error.into())),
_ => Ok(()),
match self {
Test {
stage: TestStage::ExecutionRequest,
error,
} => {
let error: DemandControlError = error.into();
request
.context
.insert_cost_result(error.code().to_string())?;
Err(error)
}
})
_ => Ok(()),
}
}

fn on_subgraph_request(
&self,
request: &crate::services::subgraph::Request,
) -> Result<(), DemandControlError> {
request.context.extensions().with_lock(|mut lock| {
let cost_context = lock.get_or_default_mut::<CostContext>();
match self {
Test {
stage: TestStage::SubgraphRequest,
error,
} => Err(cost_context.result(error.into())),
_ => Ok(()),
match self {
Test {
stage: TestStage::SubgraphRequest,
error,
} => {
let error: DemandControlError = error.into();
request
.context
.insert_cost_result(error.code().to_string())?;
Err(error)
}
})
_ => Ok(()),
}
}

fn on_subgraph_response(
&self,
_request: &ExecutableDocument,
response: &Response,
) -> Result<(), DemandControlError> {
response.context.extensions().with_lock(|mut lock| {
let cost_context = lock.get_or_default_mut::<CostContext>();
match self {
Test {
stage: TestStage::SubgraphResponse,
error,
} => Err(cost_context.result(error.into())),
_ => Ok(()),
match self {
Test {
stage: TestStage::SubgraphResponse,
error,
} => {
let error: DemandControlError = error.into();
response
.context
.insert_cost_result(error.code().to_string())?;
Err(error)
}
})
_ => Ok(()),
}
}

fn on_execution_response(
Expand All @@ -68,15 +76,16 @@ impl StrategyImpl for Test {
_request: &ExecutableDocument,
_response: &crate::graphql::Response,
) -> Result<(), DemandControlError> {
context.extensions().with_lock(|mut lock| {
let cost_context = lock.get_or_default_mut::<CostContext>();
match self {
Test {
stage: TestStage::ExecutionResponse,
error,
} => Err(cost_context.result(error.into())),
_ => Ok(()),
match self {
Test {
stage: TestStage::ExecutionResponse,
error,
} => {
let error: DemandControlError = error.into();
context.insert_cost_result(error.code().to_string())?;
Err(error)
}
})
_ => Ok(()),
}
}
}
12 changes: 12 additions & 0 deletions apollo-router/src/plugins/rhai/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ use crate::graphql::Response;
use crate::http_ext;
use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS;
use crate::plugins::cache::entity::CONTEXT_CACHE_KEY;
use crate::plugins::demand_control::COST_ACTUAL_KEY;
use crate::plugins::demand_control::COST_ESTIMATED_KEY;
use crate::plugins::demand_control::COST_RESULT_KEY;
use crate::plugins::demand_control::COST_STRATEGY_KEY;
use crate::plugins::subscription::SUBSCRIPTION_WS_CUSTOM_CONNECTION_PARAMS;
use crate::query_planner::APOLLO_OPERATION_ID;
use crate::Context;
Expand Down Expand Up @@ -1777,6 +1781,14 @@ impl Rhai {
);
global_variables.insert("APOLLO_ENTITY_CACHE_KEY".into(), CONTEXT_CACHE_KEY.into());
global_variables.insert("APOLLO_OPERATION_ID".into(), APOLLO_OPERATION_ID.into());
// Demand Control Context Keys
global_variables.insert(
"APOLLO_COST_ESTIMATED_KEY".into(),
COST_ESTIMATED_KEY.into(),
);
global_variables.insert("APOLLO_COST_ACTUAL_KEY".into(), COST_ACTUAL_KEY.into());
global_variables.insert("APOLLO_COST_STRATEGY_KEY".into(), COST_STRATEGY_KEY.into());
global_variables.insert("APOLLO_COST_RESULT_KEY".into(), COST_RESULT_KEY.into());

let shared_globals = Arc::new(global_variables);

Expand Down
Loading

0 comments on commit d4ab58d

Please sign in to comment.