Skip to content

Commit

Permalink
Consolidate subgraph resolution for oneshot and watchers (#2329)
Browse files Browse the repository at this point in the history
  • Loading branch information
dotdat authored Jan 3, 2025
1 parent bb194da commit 98f43a2
Show file tree
Hide file tree
Showing 34 changed files with 1,783 additions and 916 deletions.
6 changes: 4 additions & 2 deletions crates/rover-client/src/blocking/studio_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,16 @@ impl StudioClient {
.client(self.reqwest_client.clone())
.build()?,
);

Ok(service)
}

pub fn http_service(&self) -> Result<HttpService, RoverClientError> {
let service = ReqwestService::builder()
.client(self.reqwest_client.clone())
.build()
.map_err(|err| RoverClientError::ServiceReady(Box::new(err)))?;
Ok(service.boxed_clone())
.map_err(|err| RoverClientError::ServiceReady(Box::new(err)))?
.boxed_clone();
Ok(service)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,5 @@ mod service;
pub(crate) mod types;

pub use runner::run;
pub use service::{
SubgraphIntrospect, SubgraphIntrospectError, SubgraphIntrospectLayer,
SubgraphIntrospectLayerError,
};
pub use service::{SubgraphIntrospect, SubgraphIntrospectError};
pub use types::{SubgraphIntrospectInput, SubgraphIntrospectResponse};
51 changes: 36 additions & 15 deletions crates/rover-client/src/operations/subgraph/introspect/runner.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,52 @@
use http::{HeaderMap, HeaderName, HeaderValue};
use reqwest::Client;
use rover_graphql::GraphQLLayer;
use rover_http::extend_headers::ExtendHeadersLayer;
use rover_http::retry::RetryPolicy;
use rover_http::ReqwestService;
use tower::retry::RetryLayer;
use tower::{Service, ServiceBuilder, ServiceExt};

use crate::operations::subgraph::introspect::types::*;
use crate::RoverClientError;

use super::service::SubgraphIntrospectLayer;
use super::SubgraphIntrospect;

pub async fn run(
input: SubgraphIntrospectInput,
client: &Client,
) -> Result<SubgraphIntrospectResponse, RoverClientError> {
let subgraph_introspect_layer = SubgraphIntrospectLayer::builder()
.endpoint(input.endpoint)
.headers(input.headers)
.should_retry(input.should_retry)
.retry_period(input.retry_period)
.build()?;
let mut service = ServiceBuilder::new()
.layer(subgraph_introspect_layer)
.service(
ReqwestService::builder()
.client(client.clone())
.build()
.map_err(|err| RoverClientError::ServiceReady(Box::new(err)))?
.boxed_clone(),
let retry_layer = if input.should_retry {
Some(RetryLayer::new(RetryPolicy::new(input.retry_period)))
} else {
None
};

let http_service = ReqwestService::builder()
.client(client.clone())
.build()
.map_err(|err| RoverClientError::ServiceReady(Box::new(err)))?;

let mut header_map = HeaderMap::new();

for (header_key, header_value) in input.headers {
header_map.insert(
HeaderName::from_bytes(header_key.as_bytes())?,
HeaderValue::from_str(&header_value)?,
);
}

let http_service_stack = ServiceBuilder::new()
.boxed_clone()
.option_layer(retry_layer)
.layer(ExtendHeadersLayer::new(header_map))
.service(http_service);

let mut service = ServiceBuilder::new()
.layer_fn(SubgraphIntrospect::new)
.layer(GraphQLLayer::new(input.endpoint.clone()))
.service(http_service_stack);

let service = service.ready().await?;
let resp = service.call(()).await?;
Ok(resp)
Expand Down
89 changes: 9 additions & 80 deletions crates/rover-client/src/operations/subgraph/introspect/service.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
use std::{collections::HashMap, future::Future, pin::Pin, time::Duration};
use std::{future::Future, pin::Pin};

use buildstructor::buildstructor;
use graphql_client::GraphQLQuery;
use http::{
header::{InvalidHeaderName, InvalidHeaderValue},
HeaderMap, HeaderName, HeaderValue,
};
use rover_graphql::{GraphQLLayer, GraphQLRequest, GraphQLService, GraphQLServiceError};
use rover_http::{extend_headers::ExtendHeadersLayer, retry::RetryPolicy, HttpService};
use tower::{retry::RetryLayer, Layer, Service, ServiceBuilder};
use rover_graphql::{GraphQLRequest, GraphQLServiceError};
use tower::Service;

use crate::{EndpointKind, RoverClientError};

Expand Down Expand Up @@ -53,82 +47,17 @@ impl From<SubgraphIntrospectError> for RoverClientError {
}
}

#[derive(thiserror::Error, Debug)]
pub enum SubgraphIntrospectLayerError {
#[error(transparent)]
HeaderName(#[from] InvalidHeaderName),
#[error(transparent)]
HeaderValue(#[from] InvalidHeaderValue),
}

impl From<SubgraphIntrospectLayerError> for RoverClientError {
fn from(value: SubgraphIntrospectLayerError) -> Self {
match value {
SubgraphIntrospectLayerError::HeaderName(err) => RoverClientError::from(err),
SubgraphIntrospectLayerError::HeaderValue(err) => RoverClientError::from(err),
}
}
}

pub struct SubgraphIntrospectLayer {
endpoint: url::Url,
headers: HeaderMap,
should_retry: bool,
retry_period: Duration,
}

#[buildstructor]
impl SubgraphIntrospectLayer {
#[builder]
pub fn new(
endpoint: url::Url,
headers: HashMap<String, String>,
should_retry: bool,
retry_period: Duration,
) -> Result<SubgraphIntrospectLayer, SubgraphIntrospectLayerError> {
let mut header_map = HeaderMap::new();
for (header_key, header_value) in headers {
header_map.insert(
HeaderName::from_bytes(header_key.as_bytes())?,
HeaderValue::from_str(&header_value)?,
);
}
Ok(SubgraphIntrospectLayer {
endpoint,
headers: header_map,
should_retry,
retry_period,
})
}
}

impl Layer<HttpService> for SubgraphIntrospectLayer {
type Service = SubgraphIntrospect<GraphQLService<HttpService>>;
fn layer(&self, inner: HttpService) -> Self::Service {
let retry_layer = if self.should_retry {
Some(RetryLayer::new(RetryPolicy::new(self.retry_period)))
} else {
None
};
let http_service_stack = ServiceBuilder::new()
.boxed_clone()
.option_layer(retry_layer)
.layer(ExtendHeadersLayer::new(self.headers.clone()))
.service(inner);
let graphql_service_stack = ServiceBuilder::new()
.layer(GraphQLLayer::new(self.endpoint.clone()))
.service(http_service_stack);
SubgraphIntrospect {
inner: graphql_service_stack,
}
}
}

#[derive(Clone)]
pub struct SubgraphIntrospect<S: Clone> {
inner: S,
}

impl<S: Clone> SubgraphIntrospect<S> {
pub fn new(inner: S) -> SubgraphIntrospect<S> {
SubgraphIntrospect { inner }
}
}

impl<S, Fut> Service<()> for SubgraphIntrospect<S>
where
S: Service<
Expand Down
30 changes: 17 additions & 13 deletions src/command/dev/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,18 @@ use houston::{Config, Profile};
use router::{install::InstallRouter, run::RunRouter, watchers::file::FileWatcher};
use rover_client::operations::config::who_am_i::WhoAmI;
use rover_std::{errln, infoln, warnln};
use tap::TapFallible;
use tower::ServiceExt;

use self::router::config::{RouterAddress, RunRouterConfig};
use crate::{
command::{dev::OVERRIDE_DEV_COMPOSITION_VERSION, dev::OVERRIDE_DEV_ROUTER_VERSION, Dev},
composition::{
pipeline::CompositionPipeline,
supergraph::{
config::resolver::{
supergraph::config::{
full::introspect::MakeResolveIntrospectSubgraph,
resolver::{
fetch_remote_subgraph::MakeFetchRemoteSubgraph,
fetch_remote_subgraphs::MakeFetchRemoteSubgraphs,
},
version::SupergraphVersion,
},
},
utils::{
Expand All @@ -36,6 +35,8 @@ use crate::{
RoverError, RoverOutput, RoverResult,
};

use self::router::config::{RouterAddress, RunRouterConfig};

mod router;

impl Dev {
Expand Down Expand Up @@ -79,14 +80,17 @@ impl Dev {
.studio_graphql_service()?;
let service = WhoAmI::new(service);

let make_fetch_remote_subgraphs = MakeFetchRemoteSubgraphs::builder()
let fetch_remote_subgraphs_factory = MakeFetchRemoteSubgraphs::builder()
.studio_client_config(client_config.clone())
.profile(profile.clone())
.build();
let make_fetch_remote_subgraph = MakeFetchRemoteSubgraph::builder()
let fetch_remote_subgraph_factory = MakeFetchRemoteSubgraph::builder()
.studio_client_config(client_config.clone())
.profile(profile.clone())
.build();
.build()
.boxed_clone();
let resolve_introspect_subgraph_factory =
MakeResolveIntrospectSubgraph::new(client_config.service()?).boxed_clone();

// We resolve supergraph binary overrides (ie, composition version) in this order:
//
Expand Down Expand Up @@ -116,14 +120,14 @@ impl Dev {
let composition_pipeline = CompositionPipeline::default()
.init(
&mut stdin(),
make_fetch_remote_subgraphs,
fetch_remote_subgraphs_factory,
supergraph_config_path.clone(),
graph_ref.clone(),
)
.await?
.resolve_federation_version(
&client_config,
make_fetch_remote_subgraph,
resolve_introspect_subgraph_factory.clone(),
fetch_remote_subgraph_factory.clone(),
federation_version,
)
.await?
Expand Down Expand Up @@ -153,8 +157,8 @@ impl Dev {
exec_command_impl,
read_file_impl.clone(),
write_file_impl.clone(),
profile,
&client_config,
client_config.service()?,
fetch_remote_subgraph_factory.boxed_clone(),
self.opts.subgraph_opts.subgraph_polling_interval,
tmp_config_dir_path.clone(),
)
Expand Down
5 changes: 4 additions & 1 deletion src/command/dev/next/router/config/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ health_check:
let config_yaml = serde_yaml::from_str(&config_yaml_str)?;
let router_config = RouterConfigParser { yaml: &config_yaml };
let health_check = router_config.health_check_endpoint();
assert_that!(health_check).is_equal_to(Ok(Uri::from_str("http://127.0.0.1:8088/health")?))
assert_that!(health_check)
.is_ok()
.is_equal_to(Uri::from_str("http://127.0.0.1:8088/health")?);
Ok(())
}

#[rstest]
Expand Down
4 changes: 2 additions & 2 deletions src/command/dev/next/router/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ mod tests {
use crate::{
options::LicenseAccepter,
utils::{
client::{ClientBuilder, StudioClientConfig},
client::{ClientBuilder, ClientTimeout, StudioClientConfig},
effect::install::InstallBinary,
},
};
Expand Down Expand Up @@ -166,7 +166,7 @@ mod tests {
config,
false,
ClientBuilder::default(),
None,
ClientTimeout::default(),
)
}

Expand Down
26 changes: 17 additions & 9 deletions src/command/supergraph/compose/do_compose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use rover_client::{shared::GraphRef, RoverClientError};
use rover_std::warnln;
use semver::Version;
use serde::Serialize;
#[cfg(feature = "composition-rewrite")]
use tower::ServiceExt;

// TODO: remove once we're no longer using the composition-rewrite feature flag
#[allow(unused_imports)]
Expand Down Expand Up @@ -164,9 +166,12 @@ impl Compose {
) -> RoverResult<RoverOutput> {
use crate::composition::{
pipeline::CompositionPipeline,
supergraph::config::resolver::{
fetch_remote_subgraph::MakeFetchRemoteSubgraph,
fetch_remote_subgraphs::MakeFetchRemoteSubgraphs,
supergraph::config::{
full::introspect::MakeResolveIntrospectSubgraph,
resolver::{
fetch_remote_subgraph::MakeFetchRemoteSubgraph,
fetch_remote_subgraphs::MakeFetchRemoteSubgraphs,
},
},
};

Expand All @@ -183,27 +188,30 @@ impl Compose {
let profile = self.opts.plugin_opts.profile.clone();
let graph_ref = self.opts.supergraph_config_source.graph_ref.clone();

let make_fetch_remote_subgraphs = MakeFetchRemoteSubgraphs::builder()
let fetch_remote_subgraphs_factory = MakeFetchRemoteSubgraphs::builder()
.studio_client_config(client_config.clone())
.profile(profile.clone())
.build();

let make_fetch_remote_subgraph = MakeFetchRemoteSubgraph::builder()
let fetch_remote_subgraph_factory = MakeFetchRemoteSubgraph::builder()
.studio_client_config(client_config.clone())
.profile(profile.clone())
.build();
.build()
.boxed_clone();
let resolve_introspect_subgraph_factory =
MakeResolveIntrospectSubgraph::new(client_config.service()?).boxed_clone();

let composition_pipeline = CompositionPipeline::default()
.init(
&mut stdin(),
make_fetch_remote_subgraphs,
fetch_remote_subgraphs_factory,
supergraph_yaml,
graph_ref.clone(),
)
.await?
.resolve_federation_version(
&client_config,
make_fetch_remote_subgraph,
resolve_introspect_subgraph_factory,
fetch_remote_subgraph_factory,
self.opts.federation_version.clone(),
)
.await?
Expand Down
Loading

0 comments on commit 98f43a2

Please sign in to comment.