Skip to content

Commit

Permalink
expose for external subscription impl
Browse files Browse the repository at this point in the history
  • Loading branch information
fotoetienne committed Oct 17, 2023
1 parent 8587d1d commit c8ee3bf
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 27 deletions.
2 changes: 1 addition & 1 deletion apollo-router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub use crate::configuration::ListenAddr;
pub use crate::context::Context;
pub use crate::executable::main;
pub use crate::executable::Executable;
pub use crate::notification::Notify;
pub use crate::notification::{HandleStream, Notify, NotifyError};
pub use crate::router::ApolloRouterError;
pub use crate::router::ConfigurationSource;
pub use crate::router::LicenseSource;
Expand Down
38 changes: 25 additions & 13 deletions apollo-router/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,19 @@ use crate::Configuration;
static NOTIFY_CHANNEL_SIZE: usize = 1024;
static DEFAULT_MSG_CHANNEL_SIZE: usize = 128;

/// Error that can happen when using the pubsub
#[derive(Error, Debug)]
pub(crate) enum NotifyError<V> {
pub enum NotifyError<V> {
/// Error when sending data to the pubsub
#[error("cannot send data to pubsub")]
SendError(#[from] SendError),
/// Error sending data to response stream
#[error("cannot send data to response stream")]
BroadcastSendError(#[from] broadcast::error::SendError<V>),
/// Channel has been closed
#[error("cannot send data to pubsub because channel has been closed")]
Canceled(#[from] Canceled),
/// Topic doesn't exist
#[error("this topic doesn't exist")]
UnknownTopic,
}
Expand Down Expand Up @@ -116,8 +121,9 @@ where
K: Send + Hash + Eq + Clone + 'static,
V: Send + Sync + Clone + 'static,
{
/// Create a new pubsub
#[builder]
pub(crate) fn new(
pub fn new(
ttl: Option<Duration>,
heartbeat_error_message: Option<V>,
queue_size: Option<usize>,
Expand Down Expand Up @@ -175,8 +181,9 @@ where
self
}

// boolean in the tuple means `created`
pub(crate) async fn create_or_subscribe(
/// Create a new topic if it doesn't exist or subscribe to an existing one
/// The boolean in the result tuple means `created`
pub async fn create_or_subscribe(
&mut self,
topic: K,
heartbeat_enabled: bool,
Expand Down Expand Up @@ -205,7 +212,8 @@ where
Ok((handle, created))
}

pub(crate) async fn subscribe(&mut self, topic: K) -> Result<Handle<K, V>, NotifyError<V>> {
/// Subscribe to an existing topic
pub async fn subscribe(&mut self, topic: K) -> Result<Handle<K, V>, NotifyError<V>> {
let (sender, receiver) = oneshot::channel();

self.sender
Expand All @@ -228,7 +236,8 @@ where
Ok(handle)
}

pub(crate) async fn subscribe_if_exist(
/// Subscribe to an existing topic if it exists
pub async fn subscribe_if_exist(
&mut self,
topic: K,
) -> Result<Option<Handle<K, V>>, NotifyError<V>> {
Expand All @@ -254,7 +263,8 @@ where
Ok(handle.into())
}

pub(crate) async fn exist(&mut self, topic: K) -> Result<bool, NotifyError<V>> {
/// Check if the topic exists
pub async fn exist(&mut self, topic: K) -> Result<bool, NotifyError<V>> {
// Channel to check if the topic still exists or not
let (response_tx, response_rx) = oneshot::channel();

Expand All @@ -270,7 +280,8 @@ where
Ok(resp)
}

pub(crate) async fn invalid_ids(
/// Check if topics exist for the given ids
pub async fn invalid_ids(
&mut self,
topics: Vec<K>,
) -> Result<(Vec<K>, Vec<K>), NotifyError<V>> {
Expand All @@ -290,7 +301,7 @@ where
}

/// Delete the topic even if several subscribers are still listening
pub(crate) async fn force_delete(&mut self, topic: K) -> Result<(), NotifyError<V>> {
pub async fn force_delete(&mut self, topic: K) -> Result<(), NotifyError<V>> {
// if disconnected, we don't care (the task was stopped)
self.sender
.send(Notification::ForceDelete { topic })
Expand Down Expand Up @@ -428,14 +439,14 @@ where
}
}

pub(crate) fn into_stream(self) -> HandleStream<K, V> {
pub fn into_stream(self) -> HandleStream<K, V> {
HandleStream {
handle_guard: self.handle_guard,
msg_receiver: self.msg_receiver,
}
}

pub(crate) fn into_sink(self) -> HandleSink<K, V> {
pub fn into_sink(self) -> HandleSink<K, V> {
HandleSink {
handle_guard: self.handle_guard,
msg_sender: self.msg_sender,
Expand All @@ -458,6 +469,7 @@ where
}

pin_project! {
/// Stream of data from the pubsub
pub struct HandleStream<K, V>
where
K: Clone,
Expand Down Expand Up @@ -508,7 +520,7 @@ where
V: Clone + 'static + Send,
{
/// Send data to the subscribed topic
pub(crate) fn send_sync(&mut self, data: V) -> Result<(), NotifyError<V>> {
pub fn send_sync(&mut self, data: V) -> Result<(), NotifyError<V>> {
self.msg_sender.send(data.into()).map_err(|err| {
NotifyError::BroadcastSendError(broadcast::error::SendError(err.0.unwrap()))
})?;
Expand Down Expand Up @@ -874,7 +886,7 @@ where
}
}

pub(crate) struct RouterBroadcasts {
pub struct RouterBroadcasts {
configuration: (
broadcast::Sender<Weak<Configuration>>,
broadcast::Receiver<Weak<Configuration>>,
Expand Down
16 changes: 9 additions & 7 deletions apollo-router/src/plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ pub struct PluginInit<T> {
pub config: T,
/// Router Supergraph Schema (schema definition language)
pub supergraph_sdl: Arc<String>,

pub(crate) notify: Notify<String, graphql::Response>,
/// Pubsub for subscriptions
pub notify: Notify<String, graphql::Response>,
}

impl<T> PluginInit<T>
Expand Down Expand Up @@ -168,7 +168,8 @@ where
/// Factories for plugin schema and configuration.
#[derive(Clone)]
pub struct PluginFactory {
pub(crate) name: String,
/// The name of the plugin.
pub name: String,
instance_factory: InstanceFactory,
schema_factory: SchemaFactory,
pub(crate) type_id: TypeId,
Expand Down Expand Up @@ -212,8 +213,9 @@ impl PluginFactory {
type_id: TypeId::of::<P>(),
}
}

pub(crate) async fn create_instance(

/// Create a plugin factory.
pub async fn create_instance(
&self,
configuration: &serde_json::Value,
supergraph_sdl: Arc<String>,
Expand All @@ -237,7 +239,7 @@ impl PluginFactory {

// If we wanted to create a custom subset of plugins, this is where we would do it
/// Get a copy of the registered plugin factories.
pub(crate) fn plugins() -> impl Iterator<Item = &'static Lazy<PluginFactory>> {
pub fn plugins() -> impl Iterator<Item = &'static Lazy<PluginFactory>> {
PLUGINS.iter()
}

Expand Down Expand Up @@ -324,7 +326,7 @@ fn get_type_of<T>(_: &T) -> &'static str {
/// The trait also provides a default implementations for each hook, which returns the associated service unmodified.
/// For more information about the plugin lifecycle please check this documentation <https://www.apollographql.com/docs/router/customizations/native/#plugin-lifecycle>
#[async_trait]
pub(crate) trait DynPlugin: Send + Sync + 'static {
pub trait DynPlugin: Send + Sync + 'static {
/// This service runs at the very beginning and very end of the request lifecycle.
/// It's the entrypoint of every requests and also the last hook before sending the response.
/// Define supergraph_service if your customization needs to interact at the earliest or latest point possible.
Expand Down
4 changes: 2 additions & 2 deletions apollo-router/src/plugins/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ impl Service<router::Request> for CallbackService {
return Ok(router::Response {
response: http::Response::builder()
.status(StatusCode::NOT_FOUND)
.body("suscription doesn't exist".into())
.body("subscription doesn't exist".into())
.map_err(BoxError::from)?,
context: req.context,
});
Expand Down Expand Up @@ -511,7 +511,7 @@ impl Service<router::Request> for CallbackService {
Ok(router::Response {
response: http::Response::builder()
.status(StatusCode::NOT_FOUND)
.body("suscriptions don't exist".into())
.body("subscriptions don't exist".into())
.map_err(BoxError::from)?,
context: req.context,
})
Expand Down
2 changes: 1 addition & 1 deletion apollo-router/src/query_planner/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl QueryPlan {
}
}

// holds the query plan executon arguments that do not change between calls
// holds the query plan execution arguments that do not change between calls
pub(crate) struct ExecutionParameters<'a> {
pub(crate) context: &'a Context,
pub(crate) service_factory: &'a Arc<SubgraphServiceFactory>,
Expand Down
4 changes: 3 additions & 1 deletion apollo-router/src/router_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ impl Endpoint {
handler: Handler::new(handler),
}
}
pub(crate) fn into_router(self) -> axum::Router {

/// Consume self and produce axum Router with the endpoint
pub fn into_router(self) -> axum::Router {
let handler = move |req: http::Request<hyper::Body>| {
let endpoint = self.handler.clone();
async move {
Expand Down
4 changes: 2 additions & 2 deletions apollo-router/src/services/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::json_ext::Object;
use crate::json_ext::Path;
use crate::notification::HandleStream;
use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS;
use crate::query_planner::fetch::OperationKind;
pub use crate::query_planner::fetch::OperationKind;
use crate::Context;

pub type BoxService = tower::util::BoxService<Request, Response, BoxError>;
Expand All @@ -40,7 +40,7 @@ pub struct Request {
pub context: Context,

/// Channel to send the subscription stream to listen on events coming from subgraph in a task
pub(crate) subscription_stream: Option<mpsc::Sender<HandleStream<String, graphql::Response>>>,
pub subscription_stream: Option<mpsc::Sender<HandleStream<String, graphql::Response>>>,
/// Channel triggered when the client connection has been dropped
pub(crate) connection_closed_signal: Option<broadcast::Receiver<()>>,
}
Expand Down

0 comments on commit c8ee3bf

Please sign in to comment.