diff --git a/apollo-router/src/plugins/connectors/plugin.rs b/apollo-router/src/plugins/connectors/plugin.rs index 411a8dec5fa..3d27d7dd734 100644 --- a/apollo-router/src/plugins/connectors/plugin.rs +++ b/apollo-router/src/plugins/connectors/plugin.rs @@ -15,18 +15,14 @@ use serde::Deserialize; use serde::Serialize; use serde_json_bytes::json; use tower::BoxError; -use tower::ServiceBuilder; use tower::ServiceExt as TowerServiceExt; -use super::query_plans::get_connectors; use crate::layers::ServiceExt; use crate::plugin::Plugin; use crate::plugin::PluginInit; use crate::plugins::connectors::configuration::ConnectorsConfig; use crate::plugins::connectors::request_limit::RequestLimits; use crate::register_plugin; -use crate::services::connector_service::ConnectorSourceRef; -use crate::services::execution; use crate::services::router::body::RouterBody; use crate::services::supergraph; @@ -34,7 +30,6 @@ const CONNECTORS_DEBUG_HEADER_NAME: &str = "Apollo-Connectors-Debugging"; const CONNECTORS_DEBUG_ENV: &str = "APOLLO_CONNECTORS_DEBUGGING"; const CONNECTORS_DEBUG_KEY: &str = "apolloConnectorsDebugging"; const CONNECTORS_MAX_REQUESTS_ENV: &str = "APOLLO_CONNECTORS_MAX_REQUESTS_PER_OPERATION"; -const CONNECTOR_SOURCES_IN_QUERY_PLAN: &str = "apollo_connectors::sources_in_query_plan"; static LAST_DEBUG_ENABLED_VALUE: AtomicBool = AtomicBool::new(false); @@ -149,37 +144,6 @@ impl Plugin for Connectors { ) .boxed() } - - fn execution_service(&self, service: execution::BoxService) -> execution::BoxService { - ServiceBuilder::new() - .map_request(|req: execution::Request| { - let Some(connectors) = get_connectors(&req.context) else { - return req; - }; - - // add [{"subgraph_name": "", "source_name": ""}] to the context - // for connectors with sources in the query plan. - let list = req - .query_plan - .root - .service_usage() - .unique() - .flat_map(|service_name| { - connectors - .get(service_name) - .map(|connector| ConnectorSourceRef::try_from(connector).ok()) - }) - .unique() - .collect_vec(); - - req.context - .insert(CONNECTOR_SOURCES_IN_QUERY_PLAN, list) - .unwrap(); - req - }) - .service(service) - .boxed() - } } pub(crate) const PLUGIN_NAME: &str = "preview_connectors"; diff --git a/apollo-router/src/plugins/connectors/tests.rs b/apollo-router/src/plugins/connectors/tests.rs index 9e3c640e0a4..e3c2a9eb7ce 100644 --- a/apollo-router/src/plugins/connectors/tests.rs +++ b/apollo-router/src/plugins/connectors/tests.rs @@ -1401,7 +1401,7 @@ async fn test_sources_in_context() { "url": format!("{}/coprocessor", mock_server.uri()), "execution": { "request": { - "context": true + "sources": true } } } @@ -1416,19 +1416,8 @@ async fn test_sources_in_context() { .body_json::() .unwrap(); pretty_assertions::assert_eq!( - body.get("context") - .unwrap() - .as_object() - .unwrap() - .get("entries") - .unwrap() - .as_object() - .unwrap() - .get("apollo_connectors::sources_in_query_plan") - .unwrap(), - &serde_json_bytes::json!([ - { "subgraph_name": "connectors", "source_name": "jsonPlaceholder" } - ]) + body.get("sources").unwrap(), + &serde_json_bytes::json!(["connectors.jsonPlaceholder"]) ); } diff --git a/apollo-router/src/plugins/coprocessor/execution.rs b/apollo-router/src/plugins/coprocessor/execution.rs index c5342db5bdb..47977bcaf98 100644 --- a/apollo-router/src/plugins/coprocessor/execution.rs +++ b/apollo-router/src/plugins/coprocessor/execution.rs @@ -3,9 +3,11 @@ use std::sync::Arc; use futures::future; use futures::stream; +use itertools::Itertools; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; +use services::execution::QueryPlan; use tower::BoxError; use tower::ServiceBuilder; use tower_service::Service; @@ -15,8 +17,10 @@ use super::*; use crate::graphql; use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer; use crate::layers::ServiceBuilderExt; +use crate::plugins::connectors::query_plans::get_connectors; use crate::plugins::coprocessor::EXTERNAL_SPAN_NAME; use crate::services::execution; +use crate::Context; /// What information is passed to a router request/response stage #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] @@ -34,6 +38,8 @@ pub(super) struct ExecutionRequestConf { pub(super) method: bool, /// Send the query plan pub(super) query_plan: bool, + /// Send the sources in the query plan (subgraphs and connectors) + pub(super) sources: bool, } /// What information is passed to a router request/response stage @@ -218,6 +224,9 @@ where let query_plan = request_config .query_plan .then(|| request.query_plan.clone()); + let sources = request_config + .sources + .then(|| make_list_of_services(&request.query_plan, &request.context)); let payload = Externalizable::execution_builder() .stage(PipelineStep::ExecutionRequest) @@ -229,6 +238,7 @@ where .and_method(method) .and_sdl(sdl_to_send) .and_query_plan(query_plan) + .and_sources(sources) .build(); tracing::debug!(?payload, "externalized output"); @@ -503,6 +513,27 @@ where }) } +fn make_list_of_services(query_plan: &QueryPlan, context: &Context) -> Vec { + let connectors = get_connectors(context); + + query_plan + .root + .service_usage() + .flat_map(|service_name| { + if let Some(connector) = connectors.as_ref().and_then(|c| c.get(service_name)) { + connector + .id + .source_name + .as_ref() + .map(|source_name| format!("{}.{}", connector.id.subgraph_name, source_name)) + } else { + Some(service_name.to_string()) + } + }) + .unique() + .collect() +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -576,6 +607,7 @@ mod tests { sdl: false, method: false, query_plan: false, + sources: false, }, response: Default::default(), }; @@ -710,6 +742,7 @@ mod tests { sdl: false, method: false, query_plan: false, + sources: false, }, response: Default::default(), }; diff --git a/apollo-router/src/services/external.rs b/apollo-router/src/services/external.rs index bf3ff9fb9c4..f0f7ff6dbec 100644 --- a/apollo-router/src/services/external.rs +++ b/apollo-router/src/services/external.rs @@ -102,6 +102,8 @@ pub(crate) struct Externalizable { pub(crate) has_next: Option, #[serde(skip_serializing_if = "Option::is_none")] query_plan: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + sources: Option>, } #[buildstructor::buildstructor] @@ -145,6 +147,7 @@ where service_name: None, has_next: None, query_plan: None, + sources: None, } } @@ -184,6 +187,7 @@ where service_name: None, has_next, query_plan: None, + sources: None, } } @@ -203,6 +207,7 @@ where sdl: Option, has_next: Option, query_plan: Option>, + sources: Option>, ) -> Self { assert!(matches!( stage, @@ -224,6 +229,7 @@ where service_name: None, has_next, query_plan, + sources, } } @@ -263,6 +269,7 @@ where service_name, has_next: None, query_plan: None, + sources: None, } } diff --git a/docs/source/customizations/coprocessor.mdx b/docs/source/customizations/coprocessor.mdx index 935468ce9c0..92ee7334269 100644 --- a/docs/source/customizations/coprocessor.mdx +++ b/docs/source/customizations/coprocessor.mdx @@ -453,7 +453,10 @@ Properties of the JSON body are divided into two high-level categories: "formatted_query_plan":"QueryPlan {\n Fetch(service: \"accounts\") {\n {\n me {\n name\n username\n }\n }\n },\n}", "query":{ "string":"query Me {\n me {\n name\n username\n }\n}\n","fragments":{"map":{}},"operations":[{"name":"Me","kind":"query","type_name":"Query","selection_set":[{"Field":{"name":"me","alias":null,"selection_set":[{"Field":{"name":"name","alias":null,"selection_set":null,"field_type":{"Named":"String"},"include_skip":{"include":"Yes","skip":"No"}}},{"Field":{"name":"username","alias":null,"selection_set":null,"field_type":{"Named":"String"},"include_skip":{"include":"Yes","skip":"No"}}}],"field_type":{"Named":"User"},"include_skip":{"include":"Yes","skip":"No"}}}],"variables":{}}],"subselections":{},"unauthorized":{"paths":[],"errors":{"log":true,"response":"errors"}},"filtered_query":null,"defer_stats":{"has_defer":false,"has_unconditional_defer":false,"conditional_defer_variable_names":[]},"is_original":true} - } + }, + "sources": [ + "accounts" + ] } ``` @@ -1009,6 +1012,21 @@ When `stage` is `ExecutionRequest`, this contains the query plan for the client + + + +##### `sources` + +`string` + + + + +When `stage` is `ExecutionRequest`, this contains the subgraph names and connector source names in the query plan for the client query. It cannot be modified by the coprocessor. + + + +