diff --git a/apollo-router/src/lib.rs b/apollo-router/src/lib.rs index fb5befb06e..4d135b2c01 100644 --- a/apollo-router/src/lib.rs +++ b/apollo-router/src/lib.rs @@ -83,7 +83,7 @@ pub use crate::configuration::ListenAddr; pub use crate::context::Context; pub use crate::executable::main; pub use crate::executable::Executable; -pub use crate::notification::Notify; +pub use crate::notification::{HandleStream, Notify, NotifyError}; pub use crate::router::ApolloRouterError; pub use crate::router::ConfigurationSource; pub use crate::router::LicenseSource; diff --git a/apollo-router/src/notification.rs b/apollo-router/src/notification.rs index 90652de818..2386ec4ff9 100644 --- a/apollo-router/src/notification.rs +++ b/apollo-router/src/notification.rs @@ -32,14 +32,19 @@ use crate::Configuration; static NOTIFY_CHANNEL_SIZE: usize = 1024; static DEFAULT_MSG_CHANNEL_SIZE: usize = 128; +/// Error that can happen when using the pubsub #[derive(Error, Debug)] -pub(crate) enum NotifyError { +pub enum NotifyError { + /// Error when sending data to the pubsub #[error("cannot send data to pubsub")] SendError(#[from] SendError), + /// Error sending data to response stream #[error("cannot send data to response stream")] BroadcastSendError(#[from] broadcast::error::SendError), + /// Channel has been closed #[error("cannot send data to pubsub because channel has been closed")] Canceled(#[from] Canceled), + /// Topic doesn't exist #[error("this topic doesn't exist")] UnknownTopic, } @@ -116,8 +121,9 @@ where K: Send + Hash + Eq + Clone + 'static, V: Send + Sync + Clone + 'static, { + /// Create a new pubsub #[builder] - pub(crate) fn new( + pub fn new( ttl: Option, heartbeat_error_message: Option, queue_size: Option, @@ -175,8 +181,9 @@ where self } - // boolean in the tuple means `created` - pub(crate) async fn create_or_subscribe( + /// Create a new topic if it doesn't exist or subscribe to an existing one + /// The boolean in the result tuple means `created` + pub async fn create_or_subscribe( &mut self, topic: K, heartbeat_enabled: bool, @@ -205,7 +212,8 @@ where Ok((handle, created)) } - pub(crate) async fn subscribe(&mut self, topic: K) -> Result, NotifyError> { + /// Subscribe to an existing topic + pub async fn subscribe(&mut self, topic: K) -> Result, NotifyError> { let (sender, receiver) = oneshot::channel(); self.sender @@ -228,7 +236,8 @@ where Ok(handle) } - pub(crate) async fn subscribe_if_exist( + /// Subscribe to an existing topic if it exists + pub async fn subscribe_if_exist( &mut self, topic: K, ) -> Result>, NotifyError> { @@ -254,7 +263,8 @@ where Ok(handle.into()) } - pub(crate) async fn exist(&mut self, topic: K) -> Result> { + /// Check if the topic exists + pub async fn exist(&mut self, topic: K) -> Result> { // Channel to check if the topic still exists or not let (response_tx, response_rx) = oneshot::channel(); @@ -270,7 +280,8 @@ where Ok(resp) } - pub(crate) async fn invalid_ids( + /// Check if topics exist for the given ids + pub async fn invalid_ids( &mut self, topics: Vec, ) -> Result<(Vec, Vec), NotifyError> { @@ -290,7 +301,7 @@ where } /// Delete the topic even if several subscribers are still listening - pub(crate) async fn force_delete(&mut self, topic: K) -> Result<(), NotifyError> { + pub async fn force_delete(&mut self, topic: K) -> Result<(), NotifyError> { // if disconnected, we don't care (the task was stopped) self.sender .send(Notification::ForceDelete { topic }) @@ -428,14 +439,14 @@ where } } - pub(crate) fn into_stream(self) -> HandleStream { + pub fn into_stream(self) -> HandleStream { HandleStream { handle_guard: self.handle_guard, msg_receiver: self.msg_receiver, } } - pub(crate) fn into_sink(self) -> HandleSink { + pub fn into_sink(self) -> HandleSink { HandleSink { handle_guard: self.handle_guard, msg_sender: self.msg_sender, @@ -458,6 +469,7 @@ where } pin_project! { +/// Stream of data from the pubsub pub struct HandleStream where K: Clone, @@ -508,7 +520,7 @@ where V: Clone + 'static + Send, { /// Send data to the subscribed topic - pub(crate) fn send_sync(&mut self, data: V) -> Result<(), NotifyError> { + pub fn send_sync(&mut self, data: V) -> Result<(), NotifyError> { self.msg_sender.send(data.into()).map_err(|err| { NotifyError::BroadcastSendError(broadcast::error::SendError(err.0.unwrap())) })?; @@ -874,7 +886,7 @@ where } } -pub(crate) struct RouterBroadcasts { +pub struct RouterBroadcasts { configuration: ( broadcast::Sender>, broadcast::Receiver>, diff --git a/apollo-router/src/plugin/mod.rs b/apollo-router/src/plugin/mod.rs index 038fe3d903..5a0979f44f 100644 --- a/apollo-router/src/plugin/mod.rs +++ b/apollo-router/src/plugin/mod.rs @@ -66,8 +66,8 @@ pub struct PluginInit { pub config: T, /// Router Supergraph Schema (schema definition language) pub supergraph_sdl: Arc, - - pub(crate) notify: Notify, + /// Pubsub for subscriptions + pub notify: Notify, } impl PluginInit @@ -168,7 +168,8 @@ where /// Factories for plugin schema and configuration. #[derive(Clone)] pub struct PluginFactory { - pub(crate) name: String, + /// The name of the plugin. + pub name: String, instance_factory: InstanceFactory, schema_factory: SchemaFactory, pub(crate) type_id: TypeId, @@ -212,8 +213,9 @@ impl PluginFactory { type_id: TypeId::of::

(), } } - - pub(crate) async fn create_instance( + + /// Create a plugin factory. + pub async fn create_instance( &self, configuration: &serde_json::Value, supergraph_sdl: Arc, @@ -237,7 +239,7 @@ impl PluginFactory { // If we wanted to create a custom subset of plugins, this is where we would do it /// Get a copy of the registered plugin factories. -pub(crate) fn plugins() -> impl Iterator> { +pub fn plugins() -> impl Iterator> { PLUGINS.iter() } @@ -324,7 +326,7 @@ fn get_type_of(_: &T) -> &'static str { /// The trait also provides a default implementations for each hook, which returns the associated service unmodified. /// For more information about the plugin lifecycle please check this documentation #[async_trait] -pub(crate) trait DynPlugin: Send + Sync + 'static { +pub trait DynPlugin: Send + Sync + 'static { /// This service runs at the very beginning and very end of the request lifecycle. /// It's the entrypoint of every requests and also the last hook before sending the response. /// Define supergraph_service if your customization needs to interact at the earliest or latest point possible. diff --git a/apollo-router/src/plugins/subscription.rs b/apollo-router/src/plugins/subscription.rs index afd7e5a6d1..4bd5521b7a 100644 --- a/apollo-router/src/plugins/subscription.rs +++ b/apollo-router/src/plugins/subscription.rs @@ -440,7 +440,7 @@ impl Service for CallbackService { return Ok(router::Response { response: http::Response::builder() .status(StatusCode::NOT_FOUND) - .body("suscription doesn't exist".into()) + .body("subscription doesn't exist".into()) .map_err(BoxError::from)?, context: req.context, }); @@ -511,7 +511,7 @@ impl Service for CallbackService { Ok(router::Response { response: http::Response::builder() .status(StatusCode::NOT_FOUND) - .body("suscriptions don't exist".into()) + .body("subscriptions don't exist".into()) .map_err(BoxError::from)?, context: req.context, }) diff --git a/apollo-router/src/query_planner/execution.rs b/apollo-router/src/query_planner/execution.rs index 35945844cc..7a4aa73791 100644 --- a/apollo-router/src/query_planner/execution.rs +++ b/apollo-router/src/query_planner/execution.rs @@ -91,7 +91,7 @@ impl QueryPlan { } } -// holds the query plan executon arguments that do not change between calls +// holds the query plan execution arguments that do not change between calls pub(crate) struct ExecutionParameters<'a> { pub(crate) context: &'a Context, pub(crate) service_factory: &'a Arc, diff --git a/apollo-router/src/router_factory.rs b/apollo-router/src/router_factory.rs index 55b44a8edd..7ee8f5f66c 100644 --- a/apollo-router/src/router_factory.rs +++ b/apollo-router/src/router_factory.rs @@ -91,7 +91,9 @@ impl Endpoint { handler: Handler::new(handler), } } - pub(crate) fn into_router(self) -> axum::Router { + + /// Consume self and produce axum Router with the endpoint + pub fn into_router(self) -> axum::Router { let handler = move |req: http::Request| { let endpoint = self.handler.clone(); async move { diff --git a/apollo-router/src/services/subgraph.rs b/apollo-router/src/services/subgraph.rs index e926bda188..dd89f229ee 100644 --- a/apollo-router/src/services/subgraph.rs +++ b/apollo-router/src/services/subgraph.rs @@ -20,7 +20,7 @@ use crate::json_ext::Object; use crate::json_ext::Path; use crate::notification::HandleStream; use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS; -use crate::query_planner::fetch::OperationKind; +pub use crate::query_planner::fetch::OperationKind; use crate::Context; pub type BoxService = tower::util::BoxService; @@ -40,7 +40,7 @@ pub struct Request { pub context: Context, /// Channel to send the subscription stream to listen on events coming from subgraph in a task - pub(crate) subscription_stream: Option>>, + pub subscription_stream: Option>>, /// Channel triggered when the client connection has been dropped pub(crate) connection_closed_signal: Option>, }