Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add list of subgraphs/sources to context #6215

Merged
merged 7 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2175,6 +2175,11 @@ expression: "&schema"
"description": "Enables connector debugging information on response extensions if the feature is enabled",
"type": "boolean"
},
"expose_sources_in_context": {
"default": false,
"description": "When enabled, adds an entry to the context for use in coprocessors ```json { \"context\": { \"entries\": { \"apollo_connectors::sources_in_query_plan\": [ { \"subgraph_name\": \"subgraph\", \"source_name\": \"source\" } ] } } } ```",
"type": "boolean"
},
"max_requests_per_operation_per_source": {
"default": null,
"description": "The maximum number of requests for a connector source",
Expand Down
15 changes: 15 additions & 0 deletions apollo-router/src/plugins/connectors/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ pub(crate) struct ConnectorsConfig {
/// The maximum number of requests for a connector source
#[serde(default)]
pub(crate) max_requests_per_operation_per_source: Option<usize>,

/// When enabled, adds an entry to the context for use in coprocessors
/// ```json
/// {
/// "context": {
/// "entries": {
/// "apollo_connectors::sources_in_query_plan": [
/// { "subgraph_name": "subgraph", "source_name": "source" }
/// ]
/// }
/// }
/// }
/// ```
#[serde(default)]
pub(crate) expose_sources_in_context: bool,
}

/// Configuration for a connector subgraph
Expand Down
45 changes: 44 additions & 1 deletion apollo-router/src/plugins/connectors/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,34 @@ use serde::Deserialize;
use serde::Serialize;
use serde_json_bytes::json;
use tower::BoxError;
use tower::ServiceBuilder;
use tower::ServiceExt as TowerServiceExt;

use super::query_plans::get_connectors;
use crate::layers::ServiceExt;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::plugins::connectors::configuration::ConnectorsConfig;
use crate::plugins::connectors::request_limit::RequestLimits;
use crate::register_plugin;
use crate::services::connector_service::ConnectorSourceRef;
use crate::services::execution;
use crate::services::router::body::RouterBody;
use crate::services::supergraph;

const CONNECTORS_DEBUG_HEADER_NAME: &str = "Apollo-Connectors-Debugging";
const CONNECTORS_DEBUG_ENV: &str = "APOLLO_CONNECTORS_DEBUGGING";
const CONNECTORS_DEBUG_KEY: &str = "apolloConnectorsDebugging";
const CONNECTORS_MAX_REQUESTS_ENV: &str = "APOLLO_CONNECTORS_MAX_REQUESTS_PER_OPERATION";
const CONNECTOR_SOURCES_IN_QUERY_PLAN: &str = "apollo_connectors::sources_in_query_plan";

static LAST_DEBUG_ENABLED_VALUE: AtomicBool = AtomicBool::new(false);

#[derive(Debug, Clone)]
struct Connectors {
debug_extensions: bool,
max_requests: Option<usize>,
expose_sources_in_context: bool,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -70,6 +77,7 @@ impl Plugin for Connectors {
Ok(Connectors {
debug_extensions,
max_requests,
expose_sources_in_context: init.config.expose_sources_in_context,
})
}

Expand Down Expand Up @@ -121,7 +129,7 @@ impl Plugin for Connectors {
if let Some(first) = &mut first {
if let Some(inner) = Arc::into_inner(debug) {
first.extensions.insert(
"apolloConnectorsDebugging",
CONNECTORS_DEBUG_KEY,
json!({"version": "1", "data": inner.into_inner().serialize() }),
);
}
Expand All @@ -143,6 +151,41 @@ impl Plugin for Connectors {
)
.boxed()
}

fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
if !self.expose_sources_in_context {
return service;
}

ServiceBuilder::new()
.map_request(|req: execution::Request| {
let Some(connectors) = get_connectors(&req.context) else {
return req;
};

// add [{"subgraph_name": "", "source_name": ""}] to the context
// for connectors with sources in the query plan.
let list = req
.query_plan
.root
.service_usage_set()
.into_iter()
.flat_map(|service_name| {
connectors
.get(service_name)
.map(|connector| ConnectorSourceRef::try_from(connector).ok())
})
.unique()
.collect_vec();

req.context
.insert(CONNECTOR_SOURCES_IN_QUERY_PLAN, list)
.unwrap();
req
})
.service(service)
.boxed()
}
}

pub(crate) const PLUGIN_NAME: &str = "preview_connectors";
Expand Down
28 changes: 23 additions & 5 deletions apollo-router/src/plugins/connectors/query_plans.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
use std::sync::Arc;

use apollo_federation::sources::connect::Connector;
use indexmap::IndexMap;

use crate::query_planner::PlanNode;
use crate::Context;

type ConnectorsContext = Arc<IndexMap<Arc<str>, String>>;
type ConnectorsByServiceName = Arc<IndexMap<Arc<str>, Connector>>;

pub(crate) fn store_connectors_context(
pub(crate) fn store_connectors(
context: &Context,
connectors_by_service_name: Arc<IndexMap<Arc<str>, Connector>>,
) {
context
.extensions()
.with_lock(|mut lock| lock.insert::<ConnectorsByServiceName>(connectors_by_service_name));
}

pub(crate) fn get_connectors(context: &Context) -> Option<ConnectorsByServiceName> {
context
.extensions()
.with_lock(|lock| lock.get::<ConnectorsByServiceName>().cloned())
}

type ConnectorLabels = Arc<IndexMap<Arc<str>, String>>;

pub(crate) fn store_connectors_labels(
context: &Context,
labels_by_service_name: Arc<IndexMap<Arc<str>, String>>,
) {
context
.extensions()
.with_lock(|mut lock| lock.insert::<ConnectorsContext>(labels_by_service_name));
.with_lock(|mut lock| lock.insert::<ConnectorLabels>(labels_by_service_name));
}

pub(crate) fn replace_connector_service_names_text(
Expand All @@ -22,7 +40,7 @@ pub(crate) fn replace_connector_service_names_text(
) -> Option<Arc<String>> {
let replacements = context
.extensions()
.with_lock(|lock| lock.get::<ConnectorsContext>().cloned());
.with_lock(|lock| lock.get::<ConnectorLabels>().cloned());
if let Some(replacements) = replacements {
text.as_ref().map(|text| {
let mut text = text.to_string();
Expand All @@ -42,7 +60,7 @@ pub(crate) fn replace_connector_service_names(
) -> Arc<PlanNode> {
let replacements = context
.extensions()
.with_lock(|lock| lock.get::<ConnectorsContext>().cloned());
.with_lock(|lock| lock.get::<ConnectorLabels>().cloned());

return if let Some(replacements) = replacements {
let mut plan = plan.clone();
Expand Down
75 changes: 75 additions & 0 deletions apollo-router/src/plugins/connectors/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,81 @@ async fn test_interface_object() {
);
}

#[tokio::test]
async fn test_sources_in_context() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/coprocessor"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"control": "continue",
"version": 1,
"stage": "ExecutionRequest"
})))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/posts"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([
{ "userId": 1, "id": 1, "title": "title", "body": "body" },
{ "userId": 1, "id": 2, "title": "title", "body": "body" }]
)))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/users/1"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"id": 1,
"name": "Leanne Graham",
"username": "Bret"
})))
.mount(&mock_server)
.await;
let uri = mock_server.uri();

let _ = execute(
&QUICKSTART_SCHEMA.replace("https://jsonplaceholder.typicode.com", &mock_server.uri()),
&uri,
"query Posts { posts { id body title author { name username } } }",
Default::default(),
Some(json!({
"preview_connectors": {
"expose_sources_in_context": true
},
"coprocessor": {
"url": format!("{}/coprocessor", mock_server.uri()),
"execution": {
"request": {
"context": true
}
}
}
})),
|_| {},
)
.await;

let requests = &mock_server.received_requests().await.unwrap();
let coprocessor_request = requests.first().unwrap();
let body = coprocessor_request
.body_json::<serde_json_bytes::Value>()
.unwrap();
pretty_assertions::assert_eq!(
body.get("context")
.unwrap()
.as_object()
.unwrap()
.get("entries")
.unwrap()
.as_object()
.unwrap()
.get("apollo_connectors::sources_in_query_plan")
.unwrap(),
&serde_json_bytes::json!([
{ "subgraph_name": "connectors", "source_name": "jsonPlaceholder" }
])
);
}

mod quickstart_tests {
use super::*;

Expand Down
46 changes: 46 additions & 0 deletions apollo-router/src/query_planner/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use apollo_compiler::collections::HashSet;
use apollo_compiler::validation::Valid;
use router_bridge::planner::PlanOptions;
use router_bridge::planner::UsageReporting;
Expand Down Expand Up @@ -503,6 +504,51 @@ impl PlanNode {
}
}

/// A version of `service_usage` that doesn't use recursion
/// and returns a `HashSet` instead of an `Iterator`.
pub(crate) fn service_usage_set(&self) -> HashSet<&str> {
let mut services = HashSet::default();
let mut stack = vec![self];
while let Some(node) = stack.pop() {
match node {
Self::Sequence { nodes } | Self::Parallel { nodes } => {
stack.extend(nodes.iter());
}
Self::Fetch(fetch) => {
services.insert(fetch.service_name.as_ref());
}
Self::Subscription { primary, rest } => {
services.insert(primary.service_name.as_ref());
if let Some(rest) = rest {
stack.push(rest);
}
}
Self::Flatten(flatten) => {
stack.push(&flatten.node);
}
Self::Defer { primary, deferred } => {
if let Some(primary) = primary.node.as_ref() {
stack.push(primary);
}
stack.extend(deferred.iter().flat_map(|d| d.node.as_deref()));
}
Self::Condition {
if_clause,
else_clause,
..
} => {
if let Some(if_clause) = if_clause {
stack.push(if_clause);
}
if let Some(else_clause) = else_clause {
stack.push(else_clause);
}
}
}
}
services
}

pub(crate) fn extract_authorization_metadata(
&mut self,
schema: &Valid<apollo_compiler::Schema>,
Expand Down
2 changes: 1 addition & 1 deletion apollo-router/src/router_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,9 +709,9 @@ pub(crate) async fn create_plugins(
add_optional_apollo_plugin!("demand_control");

// This relative ordering is documented in `docs/source/customizations/native.mdx`:
add_optional_apollo_plugin!("preview_connectors");
add_optional_apollo_plugin!("rhai");
add_optional_apollo_plugin!("coprocessor");
add_optional_apollo_plugin!("preview_connectors");
add_user_plugins!();

// Macros above remove from `apollo_plugin_factories`, so anything left at the end
Expand Down
13 changes: 12 additions & 1 deletion apollo-router/src/services/connector_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl From<&Connector> for ConnectorInfo {
}

/// A reference to a unique Connector source.
#[derive(Hash, Eq, PartialEq, Clone)]
#[derive(Hash, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub(crate) struct ConnectorSourceRef {
pub(crate) subgraph_name: String,
pub(crate) source_name: String,
Expand Down Expand Up @@ -117,6 +117,17 @@ impl FromStr for ConnectorSourceRef {
}
}

impl TryFrom<&Connector> for ConnectorSourceRef {
type Error = ();

fn try_from(value: &Connector) -> Result<Self, Self::Error> {
Ok(Self {
subgraph_name: value.id.subgraph_name.to_string(),
source_name: value.id.source_name.clone().ok_or(())?,
})
}
}

impl tower::Service<ConnectRequest> for ConnectorService {
type Response = ConnectResponse;
type Error = BoxError;
Expand Down
6 changes: 4 additions & 2 deletions apollo-router/src/services/supergraph/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use crate::graphql;
use crate::graphql::IntoGraphQLErrors;
use crate::graphql::Response;
use crate::plugin::DynPlugin;
use crate::plugins::connectors::query_plans::store_connectors_context;
use crate::plugins::connectors::query_plans::store_connectors;
use crate::plugins::connectors::query_plans::store_connectors_labels;
use crate::plugins::subscription::Subscription;
use crate::plugins::subscription::SubscriptionConfig;
use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN;
Expand Down Expand Up @@ -130,7 +131,8 @@ impl Service<SupergraphRequest> for SupergraphService {

fn call(&mut self, req: SupergraphRequest) -> Self::Future {
if let Some(connectors) = &self.schema.connectors {
store_connectors_context(&req.context, connectors.labels_by_service_name.clone());
store_connectors_labels(&req.context, connectors.labels_by_service_name.clone());
store_connectors(&req.context, connectors.by_service_name.clone());
}

// Consume our cloned services and allow ownership to be transferred to the async block.
Expand Down