Skip to content

Commit

Permalink
switch to using a new coprocessor field
Browse files Browse the repository at this point in the history
  • Loading branch information
lennyburdette committed Nov 4, 2024
1 parent f8e98f7 commit 6c06def
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 51 deletions.
36 changes: 0 additions & 36 deletions apollo-router/src/plugins/connectors/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,21 @@ use serde::Deserialize;
use serde::Serialize;
use serde_json_bytes::json;
use tower::BoxError;
use tower::ServiceBuilder;
use tower::ServiceExt as TowerServiceExt;

use super::query_plans::get_connectors;
use crate::layers::ServiceExt;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::plugins::connectors::configuration::ConnectorsConfig;
use crate::plugins::connectors::request_limit::RequestLimits;
use crate::register_plugin;
use crate::services::connector_service::ConnectorSourceRef;
use crate::services::execution;
use crate::services::router::body::RouterBody;
use crate::services::supergraph;

const CONNECTORS_DEBUG_HEADER_NAME: &str = "Apollo-Connectors-Debugging";
const CONNECTORS_DEBUG_ENV: &str = "APOLLO_CONNECTORS_DEBUGGING";
const CONNECTORS_DEBUG_KEY: &str = "apolloConnectorsDebugging";
const CONNECTORS_MAX_REQUESTS_ENV: &str = "APOLLO_CONNECTORS_MAX_REQUESTS_PER_OPERATION";
const CONNECTOR_SOURCES_IN_QUERY_PLAN: &str = "apollo_connectors::sources_in_query_plan";

static LAST_DEBUG_ENABLED_VALUE: AtomicBool = AtomicBool::new(false);

Expand Down Expand Up @@ -149,37 +144,6 @@ impl Plugin for Connectors {
)
.boxed()
}

fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
ServiceBuilder::new()
.map_request(|req: execution::Request| {
let Some(connectors) = get_connectors(&req.context) else {
return req;
};

// add [{"subgraph_name": "", "source_name": ""}] to the context
// for connectors with sources in the query plan.
let list = req
.query_plan
.root
.service_usage()
.unique()
.flat_map(|service_name| {
connectors
.get(service_name)
.map(|connector| ConnectorSourceRef::try_from(connector).ok())
})
.unique()
.collect_vec();

req.context
.insert(CONNECTOR_SOURCES_IN_QUERY_PLAN, list)
.unwrap();
req
})
.service(service)
.boxed()
}
}

pub(crate) const PLUGIN_NAME: &str = "preview_connectors";
Expand Down
17 changes: 3 additions & 14 deletions apollo-router/src/plugins/connectors/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1401,7 +1401,7 @@ async fn test_sources_in_context() {
"url": format!("{}/coprocessor", mock_server.uri()),
"execution": {
"request": {
"context": true
"sources": true
}
}
}
Expand All @@ -1416,19 +1416,8 @@ async fn test_sources_in_context() {
.body_json::<serde_json_bytes::Value>()
.unwrap();
pretty_assertions::assert_eq!(
body.get("context")
.unwrap()
.as_object()
.unwrap()
.get("entries")
.unwrap()
.as_object()
.unwrap()
.get("apollo_connectors::sources_in_query_plan")
.unwrap(),
&serde_json_bytes::json!([
{ "subgraph_name": "connectors", "source_name": "jsonPlaceholder" }
])
body.get("sources").unwrap(),
&serde_json_bytes::json!(["connectors.jsonPlaceholder"])
);
}

Expand Down
33 changes: 33 additions & 0 deletions apollo-router/src/plugins/coprocessor/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ use std::sync::Arc;

use futures::future;
use futures::stream;
use itertools::Itertools;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use services::execution::QueryPlan;
use tower::BoxError;
use tower::ServiceBuilder;
use tower_service::Service;
Expand All @@ -15,8 +17,10 @@ use super::*;
use crate::graphql;
use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer;
use crate::layers::ServiceBuilderExt;
use crate::plugins::connectors::query_plans::get_connectors;
use crate::plugins::coprocessor::EXTERNAL_SPAN_NAME;
use crate::services::execution;
use crate::Context;

/// What information is passed to a router request/response stage
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)]
Expand All @@ -34,6 +38,8 @@ pub(super) struct ExecutionRequestConf {
pub(super) method: bool,
/// Send the query plan
pub(super) query_plan: bool,
/// Send the sources in the query plan (subgraphs and connectors)
pub(super) sources: bool,
}

/// What information is passed to a router request/response stage
Expand Down Expand Up @@ -218,6 +224,9 @@ where
let query_plan = request_config
.query_plan
.then(|| request.query_plan.clone());
let sources = request_config
.sources
.then(|| make_list_of_services(&request.query_plan, &request.context));

let payload = Externalizable::execution_builder()
.stage(PipelineStep::ExecutionRequest)
Expand All @@ -229,6 +238,7 @@ where
.and_method(method)
.and_sdl(sdl_to_send)
.and_query_plan(query_plan)
.and_sources(sources)
.build();

tracing::debug!(?payload, "externalized output");
Expand Down Expand Up @@ -503,6 +513,27 @@ where
})
}

fn make_list_of_services(query_plan: &QueryPlan, context: &Context) -> Vec<String> {
let connectors = get_connectors(context);

query_plan
.root
.service_usage()
.flat_map(|service_name| {
if let Some(connector) = connectors.as_ref().and_then(|c| c.get(service_name)) {
connector
.id
.source_name
.as_ref()
.map(|source_name| format!("{}.{}", connector.id.subgraph_name, source_name))
} else {
Some(service_name.to_string())
}
})
.unique()
.collect()
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -576,6 +607,7 @@ mod tests {
sdl: false,
method: false,
query_plan: false,
sources: false,
},
response: Default::default(),
};
Expand Down Expand Up @@ -710,6 +742,7 @@ mod tests {
sdl: false,
method: false,
query_plan: false,
sources: false,
},
response: Default::default(),
};
Expand Down
7 changes: 7 additions & 0 deletions apollo-router/src/services/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ pub(crate) struct Externalizable<T> {
pub(crate) has_next: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
query_plan: Option<Arc<QueryPlan>>,
#[serde(skip_serializing_if = "Option::is_none")]
sources: Option<Vec<String>>,
}

#[buildstructor::buildstructor]
Expand Down Expand Up @@ -145,6 +147,7 @@ where
service_name: None,
has_next: None,
query_plan: None,
sources: None,
}
}

Expand Down Expand Up @@ -184,6 +187,7 @@ where
service_name: None,
has_next,
query_plan: None,
sources: None,
}
}

Expand All @@ -203,6 +207,7 @@ where
sdl: Option<String>,
has_next: Option<bool>,
query_plan: Option<Arc<QueryPlan>>,
sources: Option<Vec<String>>,
) -> Self {
assert!(matches!(
stage,
Expand All @@ -224,6 +229,7 @@ where
service_name: None,
has_next,
query_plan,
sources,
}
}

Expand Down Expand Up @@ -263,6 +269,7 @@ where
service_name,
has_next: None,
query_plan: None,
sources: None,
}
}

Expand Down
20 changes: 19 additions & 1 deletion docs/source/customizations/coprocessor.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,10 @@ Properties of the JSON body are divided into two high-level categories:
"formatted_query_plan":"QueryPlan {\n Fetch(service: \"accounts\") {\n {\n me {\n name\n username\n }\n }\n },\n}",
"query":{
"string":"query Me {\n me {\n name\n username\n }\n}\n","fragments":{"map":{}},"operations":[{"name":"Me","kind":"query","type_name":"Query","selection_set":[{"Field":{"name":"me","alias":null,"selection_set":[{"Field":{"name":"name","alias":null,"selection_set":null,"field_type":{"Named":"String"},"include_skip":{"include":"Yes","skip":"No"}}},{"Field":{"name":"username","alias":null,"selection_set":null,"field_type":{"Named":"String"},"include_skip":{"include":"Yes","skip":"No"}}}],"field_type":{"Named":"User"},"include_skip":{"include":"Yes","skip":"No"}}}],"variables":{}}],"subselections":{},"unauthorized":{"paths":[],"errors":{"log":true,"response":"errors"}},"filtered_query":null,"defer_stats":{"has_defer":false,"has_unconditional_defer":false,"conditional_defer_variable_names":[]},"is_original":true}
}
},
"sources": [
"accounts"
]
}
```
Expand Down Expand Up @@ -1009,6 +1012,21 @@ When `stage` is `ExecutionRequest`, this contains the query plan for the client
</td>
</tr>

<tr>
<td>

##### `sources`

`string`

</td>
<td>

When `stage` is `ExecutionRequest`, this contains the subgraph names and connector source names in the query plan for the client query. It cannot be modified by the coprocessor.

</td>
</tr>

</tbody>
</table>

Expand Down

0 comments on commit 6c06def

Please sign in to comment.