Skip to content

Commit

Permalink
Adding back in doc comments for UStreamer
Browse files Browse the repository at this point in the history
  • Loading branch information
PLeVasseur committed Apr 4, 2024
1 parent ec6ff04 commit 08fab3d
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 2 deletions.
3 changes: 1 addition & 2 deletions up-streamer/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,10 @@ const ROUTEFN_NEW_TAG: &str = "new():";
/// # _listener: Arc<dyn UListener>,
/// # ) -> Result<(), UStatus> {
/// # println!("UPClientFoo: registering topic: {:?}", topic);
/// # let uuid = UUIDBuilder::build();
/// # Ok(())
/// # }
/// #
/// # async fn unregister_listener(&self, topic: UUri, listener: Arc<dyn UListener>) -> Result<(), UStatus> {
/// # async fn unregister_listener(&self, topic: UUri, _listener: Arc<dyn UListener>) -> Result<(), UStatus> {
/// # println!(
/// # "UPClientFoo: unregistering topic: {topic:?}"
/// # );
Expand Down
219 changes: 219 additions & 0 deletions up-streamer/src/ustreamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,181 @@ pub struct UStreamer {
forwarding_listeners: ForwardingListenersMap,
}

/// A [`UStreamer`] is used to coordinate the addition and deletion of forwarding rules between
/// [`Route`][crate::Route]s
///
/// Essentially, it's a means of setting up rules so that messages from one transport (e.g. Zenoh)
/// are bridged onto another transport (e.g. SOME/IP).
///
/// # Examples
///
/// ## Typical usage
/// ```
/// use std::sync::Arc;
/// use async_std::sync::Mutex;
/// use up_rust::{Number, UAuthority, UListener, UTransport};
/// use up_streamer::{Route, UStreamer};
/// # pub mod up_client_foo {
/// # use std::sync::Arc;
/// use async_trait::async_trait;
/// # use up_rust::{UListener, UMessage, UStatus, UUIDBuilder, UUri};
/// # use up_rust::UTransport;
/// #
/// # pub struct UPClientFoo;
/// #
/// # #[async_trait]
/// # impl UTransport for UPClientFoo {
/// # async fn send(&self, _message: UMessage) -> Result<(), UStatus> {
/// # todo!()
/// # }
/// #
/// # async fn receive(&self, _topic: UUri) -> Result<UMessage, UStatus> {
/// # todo!()
/// # }
/// #
/// # async fn register_listener(
/// # &self,
/// # topic: UUri,
/// # _listener: Arc<dyn UListener>,
/// # ) -> Result<(), UStatus> {
/// # println!("UPClientFoo: registering topic: {:?}", topic);
/// # Ok(())
/// # }
/// #
/// # async fn unregister_listener(&self, topic: UUri, _listener: Arc<dyn UListener>) -> Result<(), UStatus> {
/// # println!(
/// # "UPClientFoo: unregistering topic: {topic:?}"
/// # );
/// # Ok(())
/// # }
/// # }
/// #
/// # impl UPClientFoo {
/// # pub fn new() -> Self {
/// # Self {}
/// # }
/// # }
/// # }
/// #
/// # pub mod up_client_bar {
/// # use std::sync::Arc;
/// # use async_trait::async_trait;
/// # use up_rust::{UListener, UMessage, UStatus, UTransport, UUIDBuilder, UUri};
/// # pub struct UPClientBar;
/// #
/// # #[async_trait]
/// # impl UTransport for UPClientBar {
/// # async fn send(&self, _message: UMessage) -> Result<(), UStatus> {
/// # todo!()
/// # }
/// #
/// # async fn receive(&self, _topic: UUri) -> Result<UMessage, UStatus> {
/// # todo!()
/// # }
/// #
/// # async fn register_listener(
/// # &self,
/// # topic: UUri,
/// # _listener: Arc<dyn UListener>,
/// # ) -> Result<(), UStatus> {
/// # println!("UPClientBar: registering topic: {:?}", topic);
/// # Ok(())
/// # }
/// #
/// # async fn unregister_listener(&self, topic: UUri, _listener: Arc<dyn UListener>) -> Result<(), UStatus> {
/// # println!(
/// # "UPClientFoo: unregistering topic: {topic:?}"
/// # );
/// # Ok(())
/// # }
/// # }
/// #
/// # impl UPClientBar {
/// # pub fn new() -> Self {
/// # Self {}
/// # }
/// # }
/// # }
/// #
/// # async fn async_main() {
///
/// // Local transport
/// let local_transport: Arc<Mutex<Box<dyn UTransport>>> = Arc::new(Mutex::new(Box::new(up_client_foo::UPClientFoo::new())));
///
/// // Remote transport router
/// let remote_transport: Arc<Mutex<Box<dyn UTransport>>> = Arc::new(Mutex::new(Box::new(up_client_bar::UPClientBar::new())));
///
/// // Local route
/// let local_authority = UAuthority {
/// name: Some("local".to_string()),
/// number: Some(Number::Ip(vec![192, 168, 1, 100])),
/// ..Default::default()
/// };
/// let local_route = Route::new(local_authority, local_transport.clone());
///
/// // A remote route
/// let remote_authority = UAuthority {
/// name: Some("remote".to_string()),
/// number: Some(Number::Ip(vec![192, 168, 1, 200])),
/// ..Default::default()
/// };
/// let remote_route = Route::new(remote_authority, remote_transport.clone());
///
/// let streamer = UStreamer::new("hoge");
///
/// // Add forwarding rules to route local<->remote
/// assert_eq!(
/// streamer
/// .add_forwarding_rule(local_route.clone(), remote_route.clone())
/// .await,
/// Ok(())
/// );
/// assert_eq!(
/// streamer
/// .add_forwarding_rule(remote_route.clone(), local_route.clone())
/// .await,
/// Ok(())
/// );
///
/// // Add forwarding rules to route local<->local, should report an error
/// assert!(streamer
/// .add_forwarding_rule(local_route.clone(), local_route.clone())
/// .await
/// .is_err());
///
/// // Rule already exists so it should report an error
/// assert!(streamer
/// .add_forwarding_rule(local_route.clone(), remote_route.clone())
/// .await
/// .is_err());
///
/// // Try and remove an invalid rule
/// assert!(streamer
/// .delete_forwarding_rule(remote_route.clone(), remote_route.clone())
/// .await
/// .is_err());
///
/// // remove valid routing rules
/// assert_eq!(
/// streamer
/// .delete_forwarding_rule(local_route.clone(), remote_route.clone())
/// .await,
/// Ok(())
/// );
/// assert_eq!(
/// streamer
/// .delete_forwarding_rule(remote_route.clone(), local_route.clone())
/// .await,
/// Ok(())
/// );
///
/// // Try and remove a rule that doesn't exist, should report an error
/// assert!(streamer
/// .delete_forwarding_rule(local_route.clone(), remote_route.clone())
/// .await
/// .is_err());
/// # }
/// ```
impl UStreamer {
pub fn new(name: &str) -> Self {
let name = format!("{USTREAMER_TAG}:{name}:");
Expand All @@ -52,6 +227,28 @@ impl UStreamer {
}
}

/// Adds a forwarding rule to the [`UStreamer`] based on an in [`Route`][crate::Route] and an
/// out [`Route`][crate::Route]
///
/// Works for any [`UMessage`][up_rust::UMessage] type which has a destination / sink contained
/// in its attributes, i.e.
/// * [`UMessageType::UMESSAGE_TYPE_NOTIFICATION`][up_rust::UMessageType::UMESSAGE_TYPE_NOTIFICATION]
/// * [`UMessageType::UMESSAGE_TYPE_REQUEST`][up_rust::UMessageType::UMESSAGE_TYPE_REQUEST]
/// * [`UMessageType::UMESSAGE_TYPE_RESPONSE`][up_rust::UMessageType::UMESSAGE_TYPE_RESPONSE]
///
/// # Parameters
///
/// * `in` - [`Route`][crate::Route] we will bridge _from_
/// * `out` - [`Route`][crate::Route] we will bridge _onto_
///
/// # Errors
///
/// If unable to add this forwarding rule, we return a [`UStatus`][up_rust::UStatus] noting
/// the error.
///
/// Typical errors include
/// * already have this forwarding rule registered
/// * attempting to forward onto the same [`Route`][crate::Route]
pub async fn add_forwarding_rule(&self, r#in: Route, out: Route) -> Result<(), UStatus> {
debug!("adding forwarding rule");

Expand Down Expand Up @@ -83,6 +280,28 @@ impl UStreamer {
}
}

/// Deletes a forwarding rule from the [`UStreamer`] based on an in [`Route`][crate::Route] and an
/// out [`Route`][crate::Route]
///
/// Works for any [`UMessage`][up_rust::UMessage] type which has a destination / sink contained
/// in its attributes, i.e.
/// * [`UMessageType::UMESSAGE_TYPE_NOTIFICATION`][up_rust::UMessageType::UMESSAGE_TYPE_NOTIFICATION]
/// * [`UMessageType::UMESSAGE_TYPE_REQUEST`][up_rust::UMessageType::UMESSAGE_TYPE_REQUEST]
/// * [`UMessageType::UMESSAGE_TYPE_RESPONSE`][up_rust::UMessageType::UMESSAGE_TYPE_RESPONSE]
///
/// # Parameters
///
/// * `in` - [`Route`][crate::Route] we will bridge _from_
/// * `out` - [`Route`][crate::Route] we will bridge _onto_
///
/// # Errors
///
/// If unable to delete this forwarding rule, we return a [`UStatus`][up_rust::UStatus] noting
/// the error.
///
/// Typical errors include
/// * No such route has been added
/// * attempting to delete a forwarding rule where we would forward onto the same [`Route`][crate::Route]
pub async fn delete_forwarding_rule(&self, r#in: Route, out: Route) -> Result<(), UStatus> {
if r#in.authority == out.authority {
return Err(UStatus::fail_with_code(
Expand Down

0 comments on commit 08fab3d

Please sign in to comment.