diff --git a/Cargo.lock b/Cargo.lock index a43f12ad..e1fc76d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1141,6 +1141,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "outref" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" + [[package]] name = "parking" version = "2.2.0" @@ -1276,6 +1282,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58678a64de2fced2bdec6bca052a6716a0efe692d6e3f53d1bda6a1def64cfc0" dependencies = [ + "bytes", "once_cell", "protobuf-support", "thiserror", @@ -1879,7 +1886,7 @@ dependencies = [ [[package]] name = "up-rust" version = "0.1.5" -source = "git+https://github.com/eclipse-uprotocol/up-rust?rev=1bb08ba7a3666e58c316489fbcf3da3e29dee611#1bb08ba7a3666e58c316489fbcf3da3e29dee611" +source = "git+https://github.com/eclipse-uprotocol/up-rust?rev=f5248a89cf1db6232f463ee3ce7b1cb20d79cfdb#f5248a89cf1db6232f463ee3ce7b1cb20d79cfdb" dependencies = [ "async-trait", "bytes", @@ -1892,8 +1899,8 @@ dependencies = [ "rand", "regex", "reqwest", - "url", - "uuid", + "uriparse", + "uuid-simd", ] [[package]] @@ -1914,6 +1921,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "uriparse" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0200d0fc04d809396c2ad43f3c95da3582a2556eba8d453c1087f4120ee352ff" +dependencies = [ + "fnv", + "lazy_static", +] + [[package]] name = "url" version = "2.5.0" @@ -1931,6 +1948,16 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +[[package]] +name = "uuid-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b082222b4f6619906941c17eb2297fff4c2fb96cb60164170522942a200bd8" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "value-bag" version = "1.8.1" @@ -1943,6 +1970,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "waker-fn" version = "1.1.1" diff --git a/Cargo.toml b/Cargo.toml index f74ed1e5..0a55dc9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,7 @@ prost = { version = "0.12" } prost-types = { version = "0.12" } serde_json = { version = "1.0.111" } uuid = { version = "1.7.0" } -up-rust = { default-features = false, git = "https://github.com/eclipse-uprotocol/up-rust", rev = "1bb08ba7a3666e58c316489fbcf3da3e29dee611" } +up-rust = { default-features = false, git = "https://github.com/eclipse-uprotocol/up-rust", rev = "f5248a89cf1db6232f463ee3ce7b1cb20d79cfdb" } [profile.dev] diff --git a/example-utils/integration-test-utils/src/integration_test_utils.rs b/example-utils/integration-test-utils/src/integration_test_utils.rs index 4de1158a..db69ed1a 100644 --- a/example-utils/integration-test-utils/src/integration_test_utils.rs +++ b/example-utils/integration-test-utils/src/integration_test_utils.rs @@ -139,6 +139,16 @@ pub struct ClientControl { pub client_command: Arc>, } +fn any_uuri() -> UUri { + UUri { + authority_name: "*".to_string(), + ue_id: 0x0000_FFFF, // any instance, any service + ue_version_major: 0xFF, // any + resource_id: 0xFFFF, // any + ..Default::default() + } +} + async fn configure_client(client_configuration: &ClientConfiguration) -> UPClientFoo { let name = client_configuration.name.clone(); let rx = client_configuration.rx.clone(); @@ -149,7 +159,7 @@ async fn configure_client(client_configuration: &ClientConfiguration) -> UPClien let client = UPClientFoo::new(&name, rx, tx).await; let register_res = client - .register_listener(my_client_uuri.clone(), listener) + .register_listener(&any_uuri(), Some(&my_client_uuri.clone()), listener) .await; let Ok(_registration_string) = register_res else { panic!("Unable to register!"); @@ -185,7 +195,7 @@ async fn poll_for_new_command( ClientCommand::NoOp => {} ClientCommand::ConnectedToStreamer(active_connections) => { debug!("{} commmand: ConnectedToStreamer", &name); - *active_connection_listing = active_connections.clone(); + active_connection_listing.clone_from(active_connections); debug!( "{} set connected_to_streamer to: {:?}", &name, active_connection_listing @@ -193,7 +203,7 @@ async fn poll_for_new_command( } ClientCommand::DisconnectedFromStreamer(active_connections) => { debug!("{} commmand: DisconnectedFromStreamer", &name); - *active_connection_listing = active_connections.clone(); + active_connection_listing.clone_from(active_connections); debug!( "{} set connected_to_streamer to: {:?}", &name, active_connection_listing diff --git a/example-utils/integration-test-utils/src/integration_test_uuris.rs b/example-utils/integration-test-utils/src/integration_test_uuris.rs index b7689908..1b79443d 100644 --- a/example-utils/integration-test-utils/src/integration_test_uuris.rs +++ b/example-utils/integration-test-utils/src/integration_test_uuris.rs @@ -1,53 +1,33 @@ -use up_rust::{Number, UAuthority, UEntity, UUri}; +use up_rust::UUri; -pub fn local_authority() -> UAuthority { - UAuthority { - name: Some("local_authority".to_string()), - number: Number::Ip(vec![192, 168, 1, 100]).into(), - ..Default::default() - } +pub fn local_authority() -> String { + "local_authority".to_string() } -pub fn remote_authority_a() -> UAuthority { - UAuthority { - name: Some("remote_authority_a".to_string()), - number: Number::Ip(vec![192, 168, 1, 200]).into(), - ..Default::default() - } +pub fn remote_authority_a() -> String { + "remote_authority_a".to_string() } -pub fn remote_authority_b() -> UAuthority { - UAuthority { - name: Some("remote_authority_b".to_string()), - number: Number::Ip(vec![192, 168, 1, 201]).into(), - ..Default::default() - } +pub fn remote_authority_b() -> String { + "remote_authority_b".to_string() } pub fn local_client_uuri(id: u32) -> UUri { UUri { - authority: Some(local_authority()).into(), - entity: Some(UEntity { - name: format!("local_entity_{id}").to_string(), - id: Some(id), - version_major: Some(1), - ..Default::default() - }) - .into(), + authority_name: local_authority(), + ue_id: id, + ue_version_major: 1, + resource_id: 2, ..Default::default() } } -pub fn remote_client_uuri(authority: UAuthority, id: u32) -> UUri { +pub fn remote_client_uuri(authority: String, id: u32) -> UUri { UUri { - authority: Some(authority).into(), - entity: Some(UEntity { - name: format!("remote_entity_{id}").to_string(), - id: Some(id), - version_major: Some(1), - ..Default::default() - }) - .into(), + authority_name: authority, + ue_id: id, + ue_version_major: 1, + resource_id: 2, ..Default::default() } } diff --git a/example-utils/integration-test-utils/src/up_client_foo.rs b/example-utils/integration-test-utils/src/up_client_foo.rs index 18611117..1c5e1b87 100644 --- a/example-utils/integration-test-utils/src/up_client_foo.rs +++ b/example-utils/integration-test-utils/src/up_client_foo.rs @@ -21,16 +21,19 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::thread; use up_rust::{ - ComparableListener, UAttributes, UAuthority, UCode, UListener, UMessage, UMessageType, UStatus, - UTransport, UUri, + ComparableListener, UAttributes, UCode, UListener, UMessage, UMessageType, UStatus, UTransport, + UUri, }; +type TopicListenerMap = Arc), HashSet>>>; +type AuthorityListenerMap = Arc>>>; + pub struct UPClientFoo { name: Arc, protocol_receiver: Receiver>, protocol_sender: Sender>, - listeners: Arc>>>, - authority_listeners: Arc>>>, + listeners: TopicListenerMap, + authority_listeners: AuthorityListenerMap, pub times_received: Arc, } @@ -41,10 +44,8 @@ impl UPClientFoo { protocol_sender: Sender>, ) -> Self { let name = Arc::new(name.to_string()); - let listeners: Arc>>> = - Arc::new(Mutex::new(HashMap::new())); - let authority_listeners: Arc>>> = - Arc::new(Mutex::new(HashMap::new())); + let listeners = Arc::new(Mutex::new(HashMap::new())); + let authority_listeners = Arc::new(Mutex::new(HashMap::new())); let times_received = Arc::new(AtomicU64::new(0)); @@ -90,7 +91,7 @@ impl UPClientFoo { authority_listeners.clone(), times_received.clone(), ) - .await; + .await; } UMessageType::UMESSAGE_TYPE_PUBLISH => { unimplemented!("Still need to handle Publish messages"); @@ -105,7 +106,7 @@ impl UPClientFoo { authority_listeners.clone(), times_received.clone(), ) - .await; + .await; } UMessageType::UMESSAGE_TYPE_RESPONSE => { UPClientFoo::process_message( @@ -117,7 +118,7 @@ impl UPClientFoo { authority_listeners.clone(), times_received.clone(), ) - .await; + .await; } _ => { debug!("No matching type or an error occurred!"); @@ -138,57 +139,65 @@ impl UPClientFoo { msg: &UMessage, attr: &UAttributes, msg_type: &str, - listeners: Arc>>>, - authority_listeners: Arc>>>, + listeners: TopicListenerMap, + authority_listeners: AuthorityListenerMap, times_received: Arc, ) { let sink_uuri = attr.sink.as_ref(); debug!("{}: {msg_type} sink uuri: {sink_uuri:?}", name); match sink_uuri { None => { - debug!("{}: No sink uuri!", name); + debug!("{}: No source uuri!", name); } - Some(topic) => { + Some(sink) => { + let authority_name = sink.authority_name.clone(); let authority_listeners = authority_listeners.lock().await; - if let Some(authority) = topic.authority.as_ref() { - debug!("{}: {msg_type}: authority: {authority:?}", name); - - let authority_listeners = authority_listeners.get(authority); + debug!("{}: {msg_type}: authority_name: {authority_name}", name); - if let Some(authority_listeners) = authority_listeners { - debug!( - "{}: {msg_type}: authority listeners found: {authority:?}", - name - ); + let authority_listeners = authority_listeners.get(&authority_name); + if let Some(authority_listeners) = authority_listeners { + debug!( + "{}: {msg_type}: authority listeners found: {authority_name:?}", + name + ); - for (authority_listener_num, al) in authority_listeners.iter().enumerate() { - debug!( - "{}: {msg_type}: Authority listener num: {}", - name, authority_listener_num - ); - al.on_receive(msg.clone()).await; - } - } else { + for (authority_listener_num, al) in authority_listeners.iter().enumerate() { debug!( - "{}: {msg_type}: authority no listeners: {authority:?}", - name + "{}: {msg_type}: Authority listener num: {}", + name, authority_listener_num ); + al.on_receive(msg.clone()).await; } + } else { + debug!( + "{}: {msg_type}: authority no listeners: {authority_name:?}", + name + ); } let listeners = listeners.lock().await; - let topic_listeners = listeners.get(topic); + let topic_listeners = listeners.get(&( + attr.source.as_ref().cloned().unwrap(), + attr.sink.as_ref().cloned(), + )); if let Some(topic_listeners) = topic_listeners { - debug!("{}: {msg_type}: topic: {topic:?} -- listeners found", name); + debug!( + "{}: {msg_type}: source: {:?} sink: {:?} -- topic listeners found", + name, + attr.source.as_ref(), + attr.sink.as_ref() + ); times_received.fetch_add(1, Ordering::SeqCst); for tl in topic_listeners.iter() { tl.on_receive(msg.clone()).await; } } else { debug!( - "{}: {msg_type}: topic: {topic:?} -- listeners not found", - name + "{}: {msg_type}: source: {:?} sink: {:?} -- listeners not found", + name, + attr.source.as_ref(), + attr.sink.as_ref() ); } } @@ -209,48 +218,78 @@ impl UTransport for UPClientFoo { } } - async fn receive(&self, _topic: UUri) -> Result { + async fn receive( + &self, + _source_filter: &UUri, + _sink_filter: Option<&UUri>, + ) -> Result { unimplemented!() } async fn register_listener( &self, - topic: UUri, + source_filter: &UUri, + sink_filter: Option<&UUri>, listener: Arc, ) -> Result<(), UStatus> { - debug!("{}: registering listener for: {topic:?}", &self.name); + debug!( + "{}: registering listener for: source: {:?} sink: {:?}", + self.name, source_filter, sink_filter + ); - return if topic.resource.is_none() && topic.entity.is_none() { - debug!("{}: registering authority listener", &self.name); + let sink_for_specific = { + if let Some(sink) = sink_filter { + sink.authority_name != "*" + } else { + false + } + }; + return if source_filter.authority_name == "*" && sink_for_specific { + let sink_authority = sink_filter.unwrap().clone().authority_name; let mut authority_listeners = self.authority_listeners.lock().await; - let Some(authority) = topic.authority.as_ref() else { - return Err(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "No authority provided!", - )); - }; + let authority = sink_authority; + debug!( + "{}: registering authority listener on authority: {}", + &self.name, authority + ); let authority_listeners = authority_listeners.entry(authority.clone()).or_default(); let comparable_listener = ComparableListener::new(listener); let inserted = authority_listeners.insert(comparable_listener); match inserted { - true => Ok(()), + true => { + debug!("{}: successfully registered authority listener for: authority: {}", &self.name, authority); + + Ok(()) + }, false => Err(UStatus::fail_with_code( UCode::ALREADY_EXISTS, - "UUri and listener already registered!", + format!("{}: UUri and listener already registered! failed to register authority listener for: authority: {}", &self.name, authority) )), } } else { - debug!("{}: registering regular listener", &self.name); + debug!( + "{}: registering regular listener for: source: {:?} sink: {:?}", + &self.name, source_filter, sink_filter + ); let mut listeners = self.listeners.lock().await; - let topic_listeners = listeners.entry(topic).or_default(); + let topic_listeners = listeners + .entry((source_filter.clone(), sink_filter.cloned())) + .or_default(); let comparable_listener = ComparableListener::new(listener); let inserted = topic_listeners.insert(comparable_listener); match inserted { - true => Ok(()), + true => { + debug!( + "{}: successfully registered regular listener for: source: {:?} sink: {:?}", + &self.name, source_filter, sink_filter + ); + + Ok(()) + } false => Err(UStatus::fail_with_code( UCode::ALREADY_EXISTS, "UUri and listener already registered!", @@ -261,29 +300,34 @@ impl UTransport for UPClientFoo { async fn unregister_listener( &self, - topic: UUri, + source_filter: &UUri, + sink_filter: Option<&UUri>, listener: Arc, ) -> Result<(), UStatus> { - debug!("{} unregistering listener for topic: {topic:?}", &self.name); + debug!( + "{} unregistering listener for source_filter: {source_filter:?}", + &self.name + ); - return if topic.resource.is_none() && topic.entity.is_none() { + let sink_for_any = { + if let Some(sink) = sink_filter { + sink.authority_name == "*" + } else { + false + } + }; + + return if source_filter.authority_name != "*" && sink_for_any { debug!("{}: unregistering authority listener", &self.name); let mut authority_listeners = self.authority_listeners.lock().await; - let Some(authority) = topic.authority.as_ref() else { - let err = UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - format!("Missing authority portion of topic: {topic:?}"), - ); - error!("{} {err:?}", &self.name); - return Err(err); - }; + let authority = source_filter.authority_name.clone(); - let Some(authority_listeners) = authority_listeners.get_mut(authority) else { + let Some(authority_listeners) = authority_listeners.get_mut(&authority) else { let err = UStatus::fail_with_code( UCode::NOT_FOUND, - format!("No authority listeners for topic: {topic:?}"), + format!("{} No authority listeners for: source: {:?} sink: {:?} -- unable to unregister", self.name, source_filter, sink_filter) ); error!("{} {err:?}", &self.name); return Err(err); @@ -296,7 +340,7 @@ impl UTransport for UPClientFoo { false => { let err = UStatus::fail_with_code( UCode::NOT_FOUND, - format!("Unable to find authority listener for topic: {topic:?}"), + format!("{} Unable to find authority listener for: source: {:?} sink: {:?} -- unable to unregister", self.name, source_filter, sink_filter) ); error!("{} {err:?}", &self.name); Err(err) @@ -304,7 +348,9 @@ impl UTransport for UPClientFoo { } } else { let mut listeners = self.listeners.lock().await; - let Some(topic_listeners) = listeners.get_mut(&topic) else { + let Some(topic_listeners) = + listeners.get_mut(&(source_filter.clone(), sink_filter.cloned())) + else { return Err(UStatus::fail_with_code( UCode::NOT_FOUND, "No listeners registered for topic!", diff --git a/up-streamer/src/endpoint.rs b/up-streamer/src/endpoint.rs index b057566a..52ab6ee2 100644 --- a/up-streamer/src/endpoint.rs +++ b/up-streamer/src/endpoint.rs @@ -13,21 +13,21 @@ use async_std::sync::Arc; use log::*; -use up_rust::{UAuthority, UTransport}; +use up_rust::UTransport; const ENDPOINT_TAG: &str = "Endpoint:"; const ENDPOINT_FN_NEW_TAG: &str = "new():"; /// -/// [`Endpoint`] is defined as a combination of [`UAuthority`][up_rust::UAuthority] and -/// [`Arc>>`][up_rust::UTransport] as endpoints are at the [`UAuthority`][up_rust::UAuthority] level. +/// [`Endpoint`] is defined as a combination of `authority_name` and +/// [`Arc>>`][up_rust::UTransport] as endpoints are at the authority level. /// /// # Examples /// /// ``` /// use std::sync::Arc; /// use async_std::sync::Mutex; -/// use up_rust::{Number, UAuthority, UTransport}; +/// use up_rust::UTransport; /// use up_streamer::Endpoint; /// /// # pub mod up_client_foo { @@ -42,22 +42,32 @@ const ENDPOINT_FN_NEW_TAG: &str = "new():"; /// # todo!() /// # } /// # -/// # async fn receive(&self, _topic: UUri) -> Result { +/// # async fn receive( +/// # &self, +/// # _source_filter: &UUri, +/// # _sink_filter: Option<&UUri>, +/// # ) -> Result { /// # todo!() /// # } /// # /// # async fn register_listener( -/// # &self, -/// # topic: UUri, -/// # _listener: Arc, +/// # &self, +/// # source_filter: &UUri, +/// # sink_filter: Option<&UUri>, +/// # listener: Arc, /// # ) -> Result<(), UStatus> { -/// # println!("UPClientFoo: registering topic: {:?}", topic); +/// # println!("UPClientFoo: registering source_filter: {:?}", source_filter); /// # Ok(()) /// # } /// # -/// # async fn unregister_listener(&self, topic: UUri, _listener: Arc) -> Result<(), UStatus> { +/// # async fn unregister_listener( +/// # &self, +/// # source_filter: &UUri, +/// # sink_filter: Option<&UUri>, +/// # listener: Arc, +/// # ) -> Result<(), UStatus> { /// # println!( -/// # "UPClientFoo: unregistering topic: {topic:?}" +/// # "UPClientFoo: unregistering source_filter: {source_filter:?}" /// # ); /// # Ok(()) /// # } @@ -72,23 +82,19 @@ const ENDPOINT_FN_NEW_TAG: &str = "new():"; /// /// let local_transport: Arc = Arc::new(up_client_foo::UPClientFoo::new()); /// -/// let authority_foo = UAuthority { -/// name: Some("foo_name".to_string()).into(), -/// number: Some(Number::Ip(vec![192, 168, 1, 100])), -/// ..Default::default() -/// }; +/// let authority_foo = "foo_authority"; /// /// let local_endpoint = Endpoint::new("local_endpoint", authority_foo, local_transport); /// ``` #[derive(Clone)] pub struct Endpoint { pub(crate) name: String, - pub(crate) authority: UAuthority, + pub(crate) authority: String, pub(crate) transport: Arc, } impl Endpoint { - pub fn new(name: &str, authority: UAuthority, transport: Arc) -> Self { + pub fn new(name: &str, authority: &str, transport: Arc) -> Self { // Try to initiate logging. // Required in case of dynamic lib, otherwise no logs. // But cannot be done twice in case of static link. @@ -99,7 +105,7 @@ impl Endpoint { ); Self { name: name.to_string(), - authority, + authority: authority.to_string(), transport, } } diff --git a/up-streamer/src/ustreamer.rs b/up-streamer/src/ustreamer.rs index e1d228d0..0655a33f 100644 --- a/up-streamer/src/ustreamer.rs +++ b/up-streamer/src/ustreamer.rs @@ -21,30 +21,36 @@ use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::ops::Deref; use std::thread; -use up_rust::{UAuthority, UCode, UListener, UMessage, UStatus, UTransport, UUIDBuilder, UUri}; +use up_rust::{UCode, UListener, UMessage, UStatus, UTransport, UUIDBuilder, UUri}; const USTREAMER_TAG: &str = "UStreamer:"; const USTREAMER_FN_NEW_TAG: &str = "new():"; const USTREAMER_FN_ADD_FORWARDING_RULE_TAG: &str = "add_forwarding_rule():"; const USTREAMER_FN_DELETE_FORWARDING_RULE_TAG: &str = "delete_forwarding_rule():"; -fn uauthority_to_uuri(authority: UAuthority) -> UUri { +fn uauthority_to_uuri(authority_name: &str) -> UUri { UUri { - authority: Some(authority).into(), + authority_name: authority_name.to_string(), + ue_id: 0x0000_FFFF, // any instance, any service + ue_version_major: 0xFF, // any + resource_id: 0xFFFF, // any + ..Default::default() + } +} + +fn any_uuri() -> UUri { + UUri { + authority_name: "*".to_string(), + ue_id: 0x0000_FFFF, // any instance, any service + ue_version_major: 0xFF, // any + resource_id: 0xFFFF, // any ..Default::default() } } // the 'gatekeeper' which will prevent us from erroneously being able to add duplicate // forwarding rules or delete those rules which don't exist -type ForwardingRules = Mutex< - HashSet<( - UAuthority, - UAuthority, - ComparableTransport, - ComparableTransport, - )>, ->; +type ForwardingRules = Mutex>; const TRANSPORT_FORWARDERS_TAG: &str = "TransportForwarders:"; const TRANSPORT_FORWARDERS_FN_INSERT_TAG: &str = "insert:"; @@ -119,7 +125,7 @@ const FORWARDING_LISTENERS_FN_INSERT_TAG: &str = "insert:"; const FORWARDING_LISTENERS_FN_REMOVE_TAG: &str = "remove:"; type ForwardingListenersContainer = - Mutex)>>; + Mutex)>>; // we must have only a single listener per in UTransport and out UAuthority struct ForwardingListeners { @@ -136,7 +142,7 @@ impl ForwardingListeners { pub async fn insert( &self, in_transport: Arc, - out_authority: UAuthority, + out_authority: &str, forwarding_id: &str, out_sender: Sender>, ) -> Option> { @@ -145,12 +151,12 @@ impl ForwardingListeners { let mut forwarding_listeners = self.listeners.lock().await; let (active, forwarding_listener) = forwarding_listeners - .entry((in_comparable_transport.clone(), out_authority.clone())) + .entry((in_comparable_transport.clone(), out_authority.to_string())) .or_insert_with(|| { let forwarding_listener = Arc::new(ForwardingListener::new(forwarding_id, out_sender)); let reg_res = task::block_on(in_transport - .register_listener(uauthority_to_uuri(out_authority), forwarding_listener.clone())); + .register_listener(&any_uuri(), Some(&uauthority_to_uuri(out_authority)), forwarding_listener.clone())); if let Err(err) = reg_res { warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register listener, error: {err}"); @@ -172,14 +178,14 @@ impl ForwardingListeners { } } - pub async fn remove(&self, in_transport: Arc, out_authority: UAuthority) { + pub async fn remove(&self, in_transport: Arc, out_authority: &str) { let in_comparable_transport = ComparableTransport::new(in_transport.clone()); let mut forwarding_listeners = self.listeners.lock().await; let active_num = { let Some((active, _)) = forwarding_listeners - .get_mut(&(in_comparable_transport.clone(), out_authority.clone())) + .get_mut(&(in_comparable_transport.clone(), out_authority.to_string())) else { warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_REMOVE_TAG} no such out_comparable_transport, out_authority: {out_authority:?}"); return; @@ -190,12 +196,13 @@ impl ForwardingListeners { if active_num == 0 { let removed = - forwarding_listeners.remove(&(in_comparable_transport, out_authority.clone())); + forwarding_listeners.remove(&(in_comparable_transport, out_authority.to_string())); warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_REMOVE_TAG} removing ForwardingListener, out_authority: {out_authority:?}"); if let Some((_, forwarding_listener)) = removed { warn!("ForwardingListeners::remove: ForwardingListener found we can remove, out_authority: {out_authority:?}"); let unreg_res = task::block_on(in_transport.unregister_listener( - uauthority_to_uuri(out_authority.clone()), + &uauthority_to_uuri(out_authority), + Some(&any_uuri()), forwarding_listener, )); @@ -223,7 +230,7 @@ impl ForwardingListeners { /// ``` /// use std::sync::Arc; /// use async_std::sync::Mutex; -/// use up_rust::{Number, UAuthority, UListener, UTransport}; +/// use up_rust::{UListener, UTransport}; /// use up_streamer::{Endpoint, UStreamer}; /// # pub mod up_client_foo { /// # use std::sync::Arc; @@ -239,22 +246,32 @@ impl ForwardingListeners { /// # todo!() /// # } /// # -/// # async fn receive(&self, _topic: UUri) -> Result { +/// # async fn receive( +/// # &self, +/// # _source_filter: &UUri, +/// # _sink_filter: Option<&UUri>, +/// # ) -> Result { /// # todo!() /// # } /// # /// # async fn register_listener( -/// # &self, -/// # topic: UUri, -/// # _listener: Arc, +/// # &self, +/// # source_filter: &UUri, +/// # sink_filter: Option<&UUri>, +/// # listener: Arc, /// # ) -> Result<(), UStatus> { -/// # println!("UPClientFoo: registering topic: {:?}", topic); +/// # println!("UPClientFoo: registering source_filter: {:?}", source_filter); /// # Ok(()) /// # } /// # -/// # async fn unregister_listener(&self, topic: UUri, _listener: Arc) -> Result<(), UStatus> { +/// # async fn unregister_listener( +/// # &self, +/// # source_filter: &UUri, +/// # sink_filter: Option<&UUri>, +/// # listener: Arc, +/// # ) -> Result<(), UStatus> { /// # println!( -/// # "UPClientFoo: unregistering topic: {topic:?}" +/// # "UPClientFoo: unregistering source_filter: {source_filter:?}" /// # ); /// # Ok(()) /// # } @@ -279,22 +296,32 @@ impl ForwardingListeners { /// # todo!() /// # } /// # -/// # async fn receive(&self, _topic: UUri) -> Result { +/// # async fn receive( +/// # &self, +/// # _source_filter: &UUri, +/// # _sink_filter: Option<&UUri>, +/// # ) -> Result { /// # todo!() /// # } /// # /// # async fn register_listener( -/// # &self, -/// # topic: UUri, -/// # _listener: Arc, +/// # &self, +/// # source_filter: &UUri, +/// # sink_filter: Option<&UUri>, +/// # listener: Arc, /// # ) -> Result<(), UStatus> { -/// # println!("UPClientBar: registering topic: {:?}", topic); +/// # println!("UPClientBar: registering source_filter: {:?}", source_filter); /// # Ok(()) /// # } /// # -/// # async fn unregister_listener(&self, topic: UUri, _listener: Arc) -> Result<(), UStatus> { +/// # async fn unregister_listener( +/// # &self, +/// # source_filter: &UUri, +/// # sink_filter: Option<&UUri>, +/// # listener: Arc, +/// # ) -> Result<(), UStatus> { /// # println!( -/// # "UPClientFoo: unregistering topic: {topic:?}" +/// # "UPClientBar: unregistering source_filter: {source_filter:?}" /// # ); /// # Ok(()) /// # } @@ -316,20 +343,12 @@ impl ForwardingListeners { /// let remote_transport: Arc = Arc::new(up_client_bar::UPClientBar::new()); /// /// // Local endpoint -/// let local_authority = UAuthority { -/// name: Some("local".to_string()), -/// number: Some(Number::Ip(vec![192, 168, 1, 100])), -/// ..Default::default() -/// }; -/// let local_endpoint = Endpoint::new("local_endpoint", local_authority, local_transport.clone()); +/// let local_authority = "local"; +/// let local_endpoint = Endpoint::new("local_endpoint", local_authority, local_transport); /// /// // A remote endpoint -/// let remote_authority = UAuthority { -/// name: Some("remote".to_string()), -/// number: Some(Number::Ip(vec![192, 168, 1, 200])), -/// ..Default::default() -/// }; -/// let remote_endpoint = Endpoint::new("remote_endpoint", remote_authority, remote_transport.clone()); +/// let remote_authority = "remote"; +/// let remote_endpoint = Endpoint::new("remote_endpoint", remote_authority, remote_transport); /// /// let mut streamer = UStreamer::new("hoge", 100); /// @@ -502,7 +521,7 @@ impl UStreamer { self.forwarding_listeners .insert( r#in.transport.clone(), - out.authority.clone(), + &out.authority, &Self::forwarding_id(&r#in, &out), out_sender, ) @@ -578,7 +597,7 @@ impl UStreamer { .remove(out.transport.clone()) .await; self.forwarding_listeners - .remove(r#in.transport.clone(), out.authority.clone()) + .remove(r#in.transport.clone(), &out.authority) .await; Ok(()) } @@ -714,7 +733,7 @@ mod tests { use crate::{Endpoint, UStreamer}; use async_trait::async_trait; use std::sync::Arc; - use up_rust::{Number, UAuthority, UListener, UMessage, UStatus, UTransport, UUri}; + use up_rust::{UListener, UMessage, UStatus, UTransport, UUri}; pub struct UPClientFoo; @@ -724,25 +743,37 @@ mod tests { todo!() } - async fn receive(&self, _topic: UUri) -> Result { + async fn receive( + &self, + _source_filter: &UUri, + _sink_filter: Option<&UUri>, + ) -> Result { todo!() } async fn register_listener( &self, - topic: UUri, + source_filter: &UUri, + _sink_filter: Option<&UUri>, _listener: Arc, ) -> Result<(), UStatus> { - println!("UPClientFoo: registering topic: {:?}", topic); + println!( + "UPClientFoo: registering source_filter: {:?}", + source_filter + ); Ok(()) } async fn unregister_listener( &self, - topic: UUri, + source_filter: &UUri, + _sink_filter: Option<&UUri>, _listener: Arc, ) -> Result<(), UStatus> { - println!("UPClientFoo: unregistering topic: {topic:?}"); + println!( + "UPClientFoo: unregistering source_filter: {:?}", + source_filter + ); Ok(()) } } @@ -755,25 +786,37 @@ mod tests { todo!() } - async fn receive(&self, _topic: UUri) -> Result { + async fn receive( + &self, + _source_filter: &UUri, + _sink_filter: Option<&UUri>, + ) -> Result { todo!() } async fn register_listener( &self, - topic: UUri, + source_filter: &UUri, + _sink_filter: Option<&UUri>, _listener: Arc, ) -> Result<(), UStatus> { - println!("UPClientBar: registering topic: {:?}", topic); + println!( + "UPClientBar: registering source_filter: {:?}", + source_filter + ); Ok(()) } async fn unregister_listener( &self, - topic: UUri, + source_filter: &UUri, + _sink_filter: Option<&UUri>, _listener: Arc, ) -> Result<(), UStatus> { - println!("UPClientBar: unregistering topic: {topic:?}"); + println!( + "UPClientBar: unregistering source_filter: {:?}", + source_filter + ); Ok(()) } } @@ -781,28 +824,17 @@ mod tests { #[async_std::test] async fn test_simple_with_a_single_input_and_output_endpoint() { // Local endpoint - let local_authority = UAuthority { - name: Some("local".to_string()), - number: Some(Number::Ip(vec![192, 168, 1, 100])), - ..Default::default() - }; + let local_authority = "local"; let local_transport: Arc = Arc::new(UPClientFoo); - let local_endpoint = Endpoint::new( - "local_endpoint", - local_authority.clone(), - local_transport.clone(), - ); + let local_endpoint = + Endpoint::new("local_endpoint", local_authority, local_transport.clone()); // A remote endpoint - let remote_authority = UAuthority { - name: Some("remote".to_string()), - number: Some(Number::Ip(vec![192, 168, 1, 200])), - ..Default::default() - }; + let remote_authority = "remote"; let remote_transport: Arc = Arc::new(UPClientBar); let remote_endpoint = Endpoint::new( "remote_endpoint", - remote_authority.clone(), + remote_authority, remote_transport.clone(), ); @@ -855,41 +887,26 @@ mod tests { #[async_std::test] async fn test_advanced_where_there_is_a_local_endpoint_and_two_remote_endpoints() { // Local endpoint - let local_authority = UAuthority { - name: Some("local".to_string()), - number: Some(Number::Ip(vec![192, 168, 1, 100])), - ..Default::default() - }; + let local_authority = "local"; let local_transport: Arc = Arc::new(UPClientFoo); - let local_endpoint = Endpoint::new( - "local_endpoint", - local_authority.clone(), - local_transport.clone(), - ); + let local_endpoint = + Endpoint::new("local_endpoint", local_authority, local_transport.clone()); // Remote endpoint - A - let remote_authority_a = UAuthority { - name: Some("remote_a".to_string()), - number: Some(Number::Ip(vec![192, 168, 1, 200])), - ..Default::default() - }; + let remote_authority_a = "remote_a"; let remote_transport_a: Arc = Arc::new(UPClientBar); let remote_endpoint_a = Endpoint::new( "remote_endpoint_a", - remote_authority_a.clone(), + remote_authority_a, remote_transport_a.clone(), ); // Remote endpoint - B - let remote_authority_b = UAuthority { - name: Some("remote_b".to_string()), - number: Some(Number::Ip(vec![192, 168, 1, 201])), - ..Default::default() - }; + let remote_authority_b = "remote_b"; let remote_transport_b: Arc = Arc::new(UPClientBar); let remote_endpoint_b = Endpoint::new( "remote_endpoint_b", - remote_authority_b.clone(), + remote_authority_b, remote_transport_b.clone(), ); @@ -930,41 +947,26 @@ mod tests { async fn test_advanced_where_there_is_a_local_endpoint_and_two_remote_endpoints_but_the_remote_endpoints_have_the_same_instance_of_utransport( ) { // Local endpoint - let local_authority = UAuthority { - name: Some("local".to_string()), - number: Some(Number::Ip(vec![192, 168, 1, 100])), - ..Default::default() - }; + let local_authority = "local"; let local_transport: Arc = Arc::new(UPClientFoo); - let local_endpoint = Endpoint::new( - "local_endpoint", - local_authority.clone(), - local_transport.clone(), - ); + let local_endpoint = + Endpoint::new("local_endpoint", local_authority, local_transport.clone()); let remote_transport: Arc = Arc::new(UPClientBar); // Remote endpoint - A - let remote_authority_a = UAuthority { - name: Some("remote_a".to_string()), - number: Some(Number::Ip(vec![192, 168, 1, 200])), - ..Default::default() - }; + let remote_authority_a = "remote_a"; let remote_endpoint_a = Endpoint::new( "remote_endpoint_a", - remote_authority_a.clone(), + remote_authority_a, remote_transport.clone(), ); // Remote endpoint - B - let remote_authority_b = UAuthority { - name: Some("remote_b".to_string()), - number: Some(Number::Ip(vec![192, 168, 1, 201])), - ..Default::default() - }; + let remote_authority_b = "remote_b"; let remote_endpoint_b = Endpoint::new( "remote_endpoint_b", - remote_authority_b.clone(), + remote_authority_b, remote_transport.clone(), ); diff --git a/up-streamer/tests/single_local_single_remote.rs b/up-streamer/tests/single_local_single_remote.rs index 8d6b3e34..5cd3af6f 100644 --- a/up-streamer/tests/single_local_single_remote.rs +++ b/up-streamer/tests/single_local_single_remote.rs @@ -50,8 +50,8 @@ async fn single_local_single_remote() { let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000); // setting up endpoints between authorities and protocols - let local_endpoint = Endpoint::new("local_endpoint", local_authority(), utransport_foo); - let remote_endpoint = Endpoint::new("remote_endpoint", remote_authority_a(), utransport_bar); + let local_endpoint = Endpoint::new("local_endpoint", &local_authority(), utransport_foo); + let remote_endpoint = Endpoint::new("remote_endpoint", &remote_authority_a(), utransport_bar); // adding local to remote routing let add_forwarding_rule_res = ustreamer @@ -125,15 +125,15 @@ async fn single_local_single_remote() { }, ClientMessages { notification_msgs: vec![notification_from_remote_client_for_local_client( - remote_client_uuri(remote_authority_a(), 20), + remote_client_uuri(remote_authority_a(), 200), 10, )], request_msgs: vec![request_from_remote_client_for_local_client( - remote_client_uuri(remote_authority_a(), 20), + remote_client_uuri(remote_authority_a(), 200), 10, )], response_msgs: vec![response_from_remote_client_for_local_client( - remote_client_uuri(remote_authority_a(), 20), + remote_client_uuri(remote_authority_a(), 200), 10, )], }, diff --git a/up-streamer/tests/single_local_two_remote_add_remove_rules.rs b/up-streamer/tests/single_local_two_remote_add_remove_rules.rs index c235cb85..8a04c703 100644 --- a/up-streamer/tests/single_local_two_remote_add_remove_rules.rs +++ b/up-streamer/tests/single_local_two_remote_add_remove_rules.rs @@ -54,11 +54,11 @@ async fn single_local_two_remote_add_remove_rules() { let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000); // setting up endpoints between authorities and protocols - let local_endpoint = Endpoint::new("local_endpoint", local_authority(), utransport_foo); + let local_endpoint = Endpoint::new("local_endpoint", &local_authority(), utransport_foo); let remote_endpoint_a = - Endpoint::new("remote_endpoint_a", remote_authority_a(), utransport_bar_1); + Endpoint::new("remote_endpoint_a", &remote_authority_a(), utransport_bar_1); let remote_endpoint_b = - Endpoint::new("remote_endpoint_b", remote_authority_b(), utransport_bar_2); + Endpoint::new("remote_endpoint_b", &remote_authority_b(), utransport_bar_2); // adding local to remote_a routing let add_forwarding_rule_res = ustreamer diff --git a/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs b/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs index 18a17bf5..5a28abaa 100644 --- a/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs +++ b/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs @@ -54,11 +54,11 @@ async fn single_local_two_remote_authorities_different_remote_transport() { let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000); // setting up endpoints between authorities and protocols - let local_endpoint = Endpoint::new("local_endpoint", local_authority(), utransport_foo); + let local_endpoint = Endpoint::new("local_endpoint", &local_authority(), utransport_foo); let remote_endpoint_a = - Endpoint::new("remote_endpoint_a", remote_authority_a(), utransport_bar_1); + Endpoint::new("remote_endpoint_a", &remote_authority_a(), utransport_bar_1); let remote_endpoint_b = - Endpoint::new("remote_endpoint_b", remote_authority_b(), utransport_bar_2); + Endpoint::new("remote_endpoint_b", &remote_authority_b(), utransport_bar_2); // adding local to remote_a routing let add_forwarding_rule_res = ustreamer diff --git a/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs b/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs index 7dbb96a0..24acab59 100644 --- a/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs +++ b/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs @@ -33,7 +33,7 @@ use std::time::Duration; use up_rust::{UListener, UTransport}; use up_streamer::{Endpoint, UStreamer}; -const DURATION_TO_RUN_CLIENTS: u128 = 1000; +const DURATION_TO_RUN_CLIENTS: u128 = 1_000; const SENT_MESSAGE_VEC_CAPACITY: usize = 10_000; #[async_std::test] @@ -51,14 +51,14 @@ async fn single_local_two_remote_authorities_same_remote_transport() { let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000); // setting up endpoints between authorities and protocols - let local_endpoint = Endpoint::new("local_endpoint", local_authority(), utransport_foo); + let local_endpoint = Endpoint::new("local_endpoint", &local_authority(), utransport_foo); let remote_endpoint_a = Endpoint::new( "remote_endpoint_a", - remote_authority_a(), + &remote_authority_a(), utransport_bar.clone(), ); let remote_endpoint_b = - Endpoint::new("remote_endpoint_b", remote_authority_b(), utransport_bar); + Endpoint::new("remote_endpoint_b", &remote_authority_b(), utransport_bar); // adding local to remote_a routing let add_forwarding_rule_res = ustreamer