Skip to content

Commit

Permalink
Added more doc comments and unit tests back in
Browse files Browse the repository at this point in the history
  • Loading branch information
PLeVasseur committed Apr 4, 2024
1 parent 6a0c771 commit 3ef0986
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 1 deletion.
63 changes: 63 additions & 0 deletions up-streamer/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,69 @@ use up_rust::{UAuthority, UTransport};
const ROUTE_TAG: &str = "Route:";
const ROUTEFN_NEW_TAG: &str = "new():";

///
/// [`Route`] is defined as a combination of [`UAuthority`][up_rust::UAuthority] and
/// [`Arc<Mutex<Box<dyn UTransport>>>`][up_rust::UTransport] as routes are at the [`UAuthority`][up_rust::UAuthority] level.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_std::sync::Mutex;
/// use up_rust::{Number, UAuthority, UTransport};
/// use up_streamer::Route;
///
/// # pub mod up_client_foo {
/// # use std::sync::Arc;
/// # use up_rust::{UMessage, UTransport, UStatus, UUIDBuilder, UUri, UListener};
/// # use async_trait::async_trait;
/// # pub struct UPClientFoo;
/// #
/// # #[async_trait]
/// # impl UTransport for UPClientFoo {
/// # async fn send(&self, _message: UMessage) -> Result<(), UStatus> {
/// # todo!()
/// # }
/// #
/// # async fn receive(&self, _topic: UUri) -> Result<UMessage, UStatus> {
/// # todo!()
/// # }
/// #
/// # async fn register_listener(
/// # &self,
/// # topic: UUri,
/// # _listener: &Arc<dyn UListener>,
/// # ) -> Result<(), UStatus> {
/// # println!("UPClientFoo: registering topic: {:?}", topic);
/// # let uuid = UUIDBuilder::build();
/// # Ok(())
/// # }
/// #
/// # async fn unregister_listener(&self, topic: UUri, listener: Arc<dyn UListener>) -> Result<(), UStatus> {
/// # println!(
/// # "UPClientFoo: unregistering topic: {topic:?}"
/// # );
/// # Ok(())
/// # }
/// # }
/// #
/// # impl UPClientFoo {
/// # pub fn new() -> Self {
/// # Self {}
/// # }
/// # }
/// # }
///
/// let local_transport: Arc<Mutex<Box<dyn UTransport>>> = Arc::new(Mutex::new(Box::new(up_client_foo::UPClientFoo::new())));
///
/// let authority_foo = UAuthority {
/// name: Some("foo_name".to_string()).into(),
/// number: Some(Number::Ip(vec![192, 168, 1, 100])),
/// ..Default::default()
/// };
///
/// let local_route = Route::new(authority_foo, local_transport);
/// ```
#[derive(Clone)]
pub struct Route {
pub(crate) authority: UAuthority,
Expand Down
128 changes: 127 additions & 1 deletion up-streamer/src/ustreamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ impl UStreamer {
r#in.transport
.lock()
.await
.register_listener(Self::uauthority_to_uuri(out.authority), &forwarding_listener)
.register_listener(
Self::uauthority_to_uuri(out.authority),
&forwarding_listener,
)
.await
}
}
Expand Down Expand Up @@ -120,6 +123,9 @@ impl ForwardingListener {
#[async_trait]
impl UListener for ForwardingListener {
async fn on_receive(&self, msg: UMessage) {
// TODO: This will currently just immediately upon receipt of the message send it back out
// We may want to implement some kind of queueing mechanism here explicitly to handle
// if we're busy still receiving but another message is available
let out_transport = self.out_transport.lock().await;
let _res = out_transport.send(msg).await;
}
Expand Down Expand Up @@ -266,4 +272,124 @@ mod tests {
.await
.is_err());
}

#[async_std::test]
async fn test_advanced_where_there_is_a_local_route_and_two_remote_routes() {
// Local route
let local_authority = UAuthority {
name: Some("local".to_string()),
number: Some(Number::Ip(vec![192, 168, 1, 100])),
..Default::default()
};
let local_transport: Arc<Mutex<Box<dyn UTransport>>> =
Arc::new(Mutex::new(Box::new(UPClientFoo)));
let local_route = Route::new(local_authority.clone(), local_transport.clone());

// Remote route - A
let remote_authority_a = UAuthority {
name: Some("remote_a".to_string()),
number: Some(Number::Ip(vec![192, 168, 1, 200])),
..Default::default()
};
let remote_transport_a: Arc<Mutex<Box<dyn UTransport>>> =
Arc::new(Mutex::new(Box::new(UPClientBar)));
let remote_route_a = Route::new(remote_authority_a.clone(), remote_transport_a.clone());

// Remote route - B
let remote_authority_b = UAuthority {
name: Some("remote_b".to_string()),
number: Some(Number::Ip(vec![192, 168, 1, 201])),
..Default::default()
};
let remote_transport_b: Arc<Mutex<Box<dyn UTransport>>> =
Arc::new(Mutex::new(Box::new(UPClientBar)));
let remote_route_b = Route::new(remote_authority_b.clone(), remote_transport_b.clone());

let ustreamer = UStreamer::new("foo_bar_streamer");

// Add forwarding rules to route local<->remote_a
assert!(ustreamer
.add_forwarding_rule(local_route.clone(), remote_route_a.clone())
.await
.is_ok());
assert!(ustreamer
.add_forwarding_rule(remote_route_a.clone(), local_route.clone())
.await
.is_ok());

// Add forwarding rules to route local<->remote_b
assert!(ustreamer
.add_forwarding_rule(local_route.clone(), remote_route_b.clone())
.await
.is_ok());
assert!(ustreamer
.add_forwarding_rule(remote_route_b.clone(), local_route.clone())
.await
.is_ok());

// Add forwarding rules to route remote_a<->remote_b
assert!(ustreamer
.add_forwarding_rule(remote_route_a.clone(), remote_route_b.clone())
.await
.is_ok());
assert!(ustreamer
.add_forwarding_rule(remote_route_b.clone(), remote_route_a.clone())
.await
.is_ok());
}

#[async_std::test]
async fn test_advanced_where_there_is_a_local_route_and_two_remote_routes_but_the_remote_routes_have_the_same_instance_of_utransport(
) {
// Local route
let local_authority = UAuthority {
name: Some("local".to_string()),
number: Some(Number::Ip(vec![192, 168, 1, 100])),
..Default::default()
};
let local_transport: Arc<Mutex<Box<dyn UTransport>>> =
Arc::new(Mutex::new(Box::new(UPClientFoo)));
let local_route = Route::new(local_authority.clone(), local_transport.clone());

let remote_transport: Arc<Mutex<Box<dyn UTransport>>> =
Arc::new(Mutex::new(Box::new(UPClientBar)));

// Remote route - A
let remote_authority_a = UAuthority {
name: Some("remote_a".to_string()),
number: Some(Number::Ip(vec![192, 168, 1, 200])),
..Default::default()
};
let remote_route_a = Route::new(remote_authority_a.clone(), remote_transport.clone());

// Remote route - B
let remote_authority_b = UAuthority {
name: Some("remote_b".to_string()),
number: Some(Number::Ip(vec![192, 168, 1, 201])),
..Default::default()
};
let remote_route_b = Route::new(remote_authority_b.clone(), remote_transport.clone());

let ustreamer = UStreamer::new("foo_bar_streamer");

// Add forwarding rules to route local<->remote_a
assert!(ustreamer
.add_forwarding_rule(local_route.clone(), remote_route_a.clone())
.await
.is_ok());
assert!(ustreamer
.add_forwarding_rule(remote_route_a.clone(), local_route.clone())
.await
.is_ok());

// Add forwarding rules to route local<->remote_b
assert!(ustreamer
.add_forwarding_rule(local_route.clone(), remote_route_b.clone())
.await
.is_ok());
assert!(ustreamer
.add_forwarding_rule(remote_route_b.clone(), local_route.clone())
.await
.is_ok());
}
}

0 comments on commit 3ef0986

Please sign in to comment.