From 78188411349f7729aaa323ca32e3445d07a9c193 Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Wed, 10 Apr 2024 11:46:16 +0800 Subject: [PATCH 1/3] Upgrade up-rust. Signed-off-by: ChenYing Kuo --- Cargo.toml | 2 +- src/lib.rs | 24 +++-- src/rpc.rs | 2 +- src/utransport.rs | 200 ++++++++++++++++++++---------------------- tests/publish.rs | 81 +++++++++-------- tests/register.rs | 50 ++++++----- tests/rpc.rs | 165 +++++++++++++++++++++------------- tests/special_uuri.rs | 140 ++++++++++++++++++----------- tests/test_lib.rs | 4 +- 9 files changed, 373 insertions(+), 295 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 441c88a..c22d884 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ prost = "0.12" prost-types = "0.12" protobuf = { version = "3.3" } rand = "0.8.5" -up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "a30d3655ab13f8d97815280d718f4891f693ed2d" } +up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "c705ac97602ad6917a93d23651e8a504ec7bb718" } zenoh = { version = "0.10.1-rc", features = ["unstable"]} [dev-dependencies] diff --git a/src/lib.rs b/src/lib.rs index 3366035..bf323dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,11 +17,11 @@ pub mod utransport; use protobuf::{Enum, Message}; use std::{ collections::HashMap, - sync::{atomic::AtomicU64, Arc, Mutex}, + sync::{Arc, Mutex}, }; use up_rust::{ - UAttributes, UAuthority, UCode, UEntity, UMessage, UPayloadFormat, UPriority, UResourceBuilder, - UStatus, UUri, + ComparableListener, UAttributes, UAuthority, UCode, UEntity, UListener, UPayloadFormat, + UPriority, UResourceBuilder, UStatus, UUri, }; use zenoh::{ config::Config, @@ -31,23 +31,22 @@ use zenoh::{ subscriber::Subscriber, }; -pub type UtransportListener = Box) + Send + Sync + 'static>; - const UATTRIBUTE_VERSION: u8 = 1; -pub struct ZenohListener {} +type SubscriberMap = Arc>>>; +type QueryableMap = Arc>>>; +type QueryMap = Arc>>; +type RpcCallbackMap = Arc>>>; pub struct UPClientZenoh { session: Arc, // Able to unregister Subscriber - subscriber_map: Arc>>>, + subscriber_map: SubscriberMap, // Able to unregister Queryable - queryable_map: Arc>>>, + queryable_map: QueryableMap, // Save the reqid to be able to send back response - query_map: Arc>>, + query_map: QueryMap, // Save the callback for RPC response - rpc_callback_map: Arc>>>, - // Used to identify different callback - callback_counter: AtomicU64, + rpc_callback_map: RpcCallbackMap, // Source UUri in RPC source_uuri: UUri, } @@ -117,7 +116,6 @@ impl UPClientZenoh { queryable_map: Arc::new(Mutex::new(HashMap::new())), query_map: Arc::new(Mutex::new(HashMap::new())), rpc_callback_map: Arc::new(Mutex::new(HashMap::new())), - callback_counter: AtomicU64::new(0), source_uuri, }) } diff --git a/src/rpc.rs b/src/rpc.rs index 03f3544..8361093 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -44,7 +44,7 @@ impl RpcClient for UPClientZenoh { // Create UAttributes and put into Zenoh user attachment let uattributes = UAttributes::request( - UUIDBuilder::new().build(), + UUIDBuilder::build(), topic, self.get_response_uuri(), options.clone(), diff --git a/src/utransport.rs b/src/utransport.rs index ba081ff..0269857 100644 --- a/src/utransport.rs +++ b/src/utransport.rs @@ -11,15 +11,13 @@ // Contributors: // ZettaScale Zenoh Team, // -use crate::{UPClientZenoh, UtransportListener}; +use crate::UPClientZenoh; +use async_std::task::block_on; use async_trait::async_trait; -use std::{ - sync::{atomic::Ordering, Arc}, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; use up_rust::{ - Data, UAttributes, UAttributesValidators, UCode, UMessage, UMessageType, UPayload, - UPayloadFormat, UStatus, UTransport, UUri, UriValidator, + ComparableListener, Data, UAttributes, UAttributesValidators, UCode, UListener, UMessage, + UMessageType, UPayload, UPayloadFormat, UStatus, UTransport, UUri, UriValidator, }; use zenoh::{ prelude::{r#async::*, Sample}, @@ -75,6 +73,8 @@ impl UPClientZenoh { Ok(()) } + // TODO: Might need some refactors + #[allow(clippy::too_many_lines)] async fn send_request( &self, zenoh_key: &str, @@ -102,12 +102,11 @@ impl UPClientZenoh { log::error!("{msg}"); UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg) })?; - let hashmap_key = UPClientZenoh::to_zenoh_key_string(&source_uuri)?; let resp_callback = self .rpc_callback_map .lock() .unwrap() - .get(&hashmap_key) + .get(&source_uuri) .ok_or_else(|| { let msg = "Unable to get callback".to_string(); log::error!("{msg}"); @@ -115,20 +114,24 @@ impl UPClientZenoh { })? .clone(); let zenoh_callback = move |reply: Reply| { - let msg = match reply.sample { + match reply.sample { Ok(sample) => { // Get the encoding of UPayload let Some(encoding) = UPClientZenoh::to_upayload_format(&sample.encoding) else { let msg = "Unable to get the encoding".to_string(); log::error!("{msg}"); - resp_callback(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on( + resp_callback.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg)), + ); return; }; // Get UAttribute from the attachment let Some(attachment) = sample.attachment() else { let msg = "Unable to get the attachment".to_string(); log::error!("{msg}"); - resp_callback(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on( + resp_callback.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg)), + ); return; }; let u_attribute = match UPClientZenoh::attachment_to_uattributes(attachment) { @@ -137,30 +140,34 @@ impl UPClientZenoh { let msg = format!("Unable to transform attachment to UAttributes: {e:?}"); log::error!("{msg}"); - resp_callback(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on( + resp_callback + .on_error(UStatus::fail_with_code(UCode::INTERNAL, msg)), + ); return; } }; // Create UMessage - Ok(UMessage { - attributes: Some(u_attribute).into(), - payload: Some(UPayload { - length: Some(0), - format: encoding.into(), - data: Some(Data::Value(sample.payload.contiguous().to_vec())), + block_on( + resp_callback.on_receive(UMessage { + attributes: Some(u_attribute).into(), + payload: Some(UPayload { + length: Some(0), + format: encoding.into(), + data: Some(Data::Value(sample.payload.contiguous().to_vec())), + ..Default::default() + }) + .into(), ..Default::default() - }) - .into(), - ..Default::default() - }) + }), + ); } Err(e) => { let msg = format!("Error while parsing Zenoh reply: {e:?}"); log::error!("{msg}"); - Err(UStatus::fail_with_code(UCode::INTERNAL, msg)) + block_on(resp_callback.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg))); } - }; - resp_callback(msg); + } }; // Send query @@ -249,25 +256,19 @@ impl UPClientZenoh { async fn register_publish_notification_listener( &self, topic: &UUri, - listener: Arc, - ) -> Result { + listener: Arc, + ) -> Result<(), UStatus> { // Get Zenoh key let zenoh_key = UPClientZenoh::to_zenoh_key_string(topic)?; - // Generate listener string for users to delete - let hashmap_key = format!( - "{}_{:X}", - zenoh_key, - self.callback_counter.fetch_add(1, Ordering::SeqCst) - ); - // Setup callback + let listener_cloned = listener.clone(); let callback = move |sample: Sample| { // Get the UAttribute from Zenoh user attachment let Some(attachment) = sample.attachment() else { let msg = "Unable to get attachment"; log::error!("{msg}"); - listener(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on(listener_cloned.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg))); return; }; let u_attribute = match UPClientZenoh::attachment_to_uattributes(attachment) { @@ -275,7 +276,9 @@ impl UPClientZenoh { Err(e) => { let msg = format!("Unable to transform attachment to UAttributes: {e:?}"); log::error!("{msg}"); - listener(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on( + listener_cloned.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg)), + ); return; } }; @@ -283,7 +286,7 @@ impl UPClientZenoh { let Some(encoding) = UPClientZenoh::to_upayload_format(&sample.encoding) else { let msg = "Unable to get payload encoding"; log::error!("{msg}"); - listener(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on(listener_cloned.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg))); return; }; let u_payload = UPayload { @@ -298,7 +301,7 @@ impl UPClientZenoh { payload: Some(u_payload).into(), ..Default::default() }; - listener(Ok(msg)); + block_on(listener_cloned.on_receive(msg)); }; // Create Zenoh subscriber @@ -309,42 +312,36 @@ impl UPClientZenoh { .res() .await { - self.subscriber_map - .lock() - .unwrap() - .insert(hashmap_key.clone(), subscriber); + self.subscriber_map.lock().unwrap().insert( + (topic.clone(), ComparableListener::new(listener)), + subscriber, + ); } else { let msg = "Unable to register callback with Zenoh"; log::error!("{msg}"); return Err(UStatus::fail_with_code(UCode::INTERNAL, msg)); } - Ok(hashmap_key) + Ok(()) } async fn register_request_listener( &self, topic: &UUri, - listener: Arc, - ) -> Result { + listener: Arc, + ) -> Result<(), UStatus> { // Get Zenoh key let zenoh_key = UPClientZenoh::to_zenoh_key_string(topic)?; - // Generate listener string for users to delete - let hashmap_key = format!( - "{}_{:X}", - zenoh_key, - self.callback_counter.fetch_add(1, Ordering::SeqCst) - ); - // Setup callback + let listener_cloned = listener.clone(); let query_map = self.query_map.clone(); let callback = move |query: Query| { // Create UAttribute from Zenoh user attachment let Some(attachment) = query.attachment() else { let msg = "Unable to get attachment".to_string(); log::error!("{msg}"); - listener(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on(listener_cloned.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg))); return; }; let u_attribute = match UPClientZenoh::attachment_to_uattributes(attachment) { @@ -352,7 +349,9 @@ impl UPClientZenoh { Err(e) => { let msg = format!("Unable to transform user attachment to UAttributes: {e:?}"); log::error!("{msg}"); - listener(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on( + listener_cloned.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg)), + ); return; } }; @@ -362,7 +361,9 @@ impl UPClientZenoh { let Some(encoding) = UPClientZenoh::to_upayload_format(&value.encoding) else { let msg = "Unable to get payload encoding".to_string(); log::error!("{msg}"); - listener(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on( + listener_cloned.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg)), + ); return; }; UPayload { @@ -389,7 +390,7 @@ impl UPClientZenoh { .lock() .unwrap() .insert(u_attribute.id.to_string(), query); - listener(Ok(msg)); + block_on(listener_cloned.on_receive(msg)); }; // Create Zenoh queryable @@ -400,34 +401,25 @@ impl UPClientZenoh { .res() .await { - self.queryable_map - .lock() - .unwrap() - .insert(hashmap_key.clone(), queryable); + self.queryable_map.lock().unwrap().insert( + (topic.clone(), ComparableListener::new(listener)), + queryable, + ); } else { let msg = "Unable to register callback with Zenoh".to_string(); log::error!("{msg}"); return Err(UStatus::fail_with_code(UCode::INTERNAL, msg)); } - Ok(hashmap_key) + Ok(()) } - fn register_response_listener( - &self, - topic: &UUri, - listener: Arc, - ) -> Result { - // Get Zenoh key - let zenoh_key = UPClientZenoh::to_zenoh_key_string(topic)?; - + fn register_response_listener(&self, topic: &UUri, listener: Arc) { // Store the response callback (Will be used in send_request) self.rpc_callback_map .lock() .unwrap() - .insert(zenoh_key.clone(), listener); - - Ok(zenoh_key) + .insert(topic.clone(), listener); } } @@ -527,24 +519,19 @@ impl UTransport for UPClientZenoh { async fn register_listener( &self, topic: UUri, - listener: Box) + Send + Sync + 'static>, - ) -> Result { - let listener = Arc::new(listener); + listener: Arc, + ) -> Result<(), UStatus> { if topic.authority.is_some() && topic.entity.is_none() && topic.resource.is_none() { // This is special UUri which means we need to register for all of Publish, Notification, Request, and Response // RPC response - let mut listener_str = self.register_response_listener(&topic, listener.clone())?; + self.register_response_listener(&topic, listener.clone()); // RPC request - listener_str += "&"; - listener_str += &self - .register_request_listener(&topic, listener.clone()) + self.register_request_listener(&topic, listener.clone()) .await?; // Publish & Notification - listener_str += "&"; - listener_str += &self - .register_publish_notification_listener(&topic, listener.clone()) + self.register_publish_notification_listener(&topic, listener.clone()) .await?; - Ok(listener_str) + Ok(()) } else { // Do the validation UriValidator::validate(&topic) @@ -552,7 +539,8 @@ impl UTransport for UPClientZenoh { if UriValidator::is_rpc_response(&topic) { // RPC response - self.register_response_listener(&topic, listener.clone()) + self.register_response_listener(&topic, listener.clone()); + Ok(()) } else if UriValidator::is_rpc_method(&topic) { // RPC request self.register_request_listener(&topic, listener.clone()) @@ -565,22 +553,20 @@ impl UTransport for UPClientZenoh { } } - async fn unregister_listener(&self, topic: UUri, listener: &str) -> Result<(), UStatus> { - let mut pub_listener_str: Option<&str> = None; - let mut req_listener_str: Option<&str> = None; - let mut resp_listener_str: Option<&str> = None; + async fn unregister_listener( + &self, + topic: UUri, + listener: Arc, + ) -> Result<(), UStatus> { + let mut remove_pub_listener = false; + let mut remove_req_listener = false; + let mut remove_resp_listener = false; if topic.authority.is_some() && topic.entity.is_none() && topic.resource.is_none() { // This is special UUri which means we need to unregister all listeners - let listener_vec = listener.split('&').collect::>(); - if listener_vec.len() != 3 { - let msg = "Invalid listener string".to_string(); - log::error!("{msg}"); - return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)); - } - resp_listener_str = Some(listener_vec[0]); - req_listener_str = Some(listener_vec[1]); - pub_listener_str = Some(listener_vec[2]); + remove_pub_listener = true; + remove_req_listener = true; + remove_resp_listener = true; } else { // Do the validation UriValidator::validate(&topic).map_err(|_| { @@ -590,20 +576,20 @@ impl UTransport for UPClientZenoh { })?; if UriValidator::is_rpc_response(&topic) { - resp_listener_str = Some(listener); + remove_resp_listener = true; } else if UriValidator::is_rpc_method(&topic) { - req_listener_str = Some(listener); + remove_req_listener = true; } else { - pub_listener_str = Some(listener); + remove_pub_listener = true; } } - if let Some(listener) = resp_listener_str { + if remove_resp_listener { // RPC response if self .rpc_callback_map .lock() .unwrap() - .remove(listener) + .remove(&topic) .is_none() { let msg = "RPC response callback doesn't exist".to_string(); @@ -611,13 +597,13 @@ impl UTransport for UPClientZenoh { return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)); } } - if let Some(listener) = req_listener_str { + if remove_req_listener { // RPC request if self .queryable_map .lock() .unwrap() - .remove(listener) + .remove(&(topic.clone(), ComparableListener::new(listener.clone()))) .is_none() { let msg = "RPC request listener doesn't exist".to_string(); @@ -625,13 +611,13 @@ impl UTransport for UPClientZenoh { return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)); } } - if let Some(listener) = pub_listener_str { + if remove_pub_listener { // Normal publish if self .subscriber_map .lock() .unwrap() - .remove(listener) + .remove(&(topic.clone(), ComparableListener::new(listener.clone()))) .is_none() { let msg = "Publish listener doesn't exist".to_string(); diff --git a/tests/publish.rs b/tests/publish.rs index b002913..4469d89 100644 --- a/tests/publish.rs +++ b/tests/publish.rs @@ -14,11 +14,42 @@ pub mod test_lib; use async_std::task; +use async_trait::async_trait; use std::{ sync::{Arc, Mutex}, time, }; -use up_rust::{Data, UMessage, UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUIDBuilder}; +use up_rust::{ + Data, UListener, UMessage, UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUIDBuilder, +}; + +struct PublishNotificationListener { + recv_data: Arc>, +} +impl PublishNotificationListener { + fn new() -> Self { + PublishNotificationListener { + recv_data: Arc::new(Mutex::new(String::new())), + } + } + fn get_recv_data(&self) -> String { + self.recv_data.lock().unwrap().clone() + } +} +#[async_trait] +impl UListener for PublishNotificationListener { + async fn on_receive(&self, msg: UMessage) { + if let Data::Value(v) = msg.payload.unwrap().data.unwrap() { + let value = v.into_iter().map(|c| c as char).collect::(); + *self.recv_data.lock().unwrap() = value; + } else { + panic!("The message should be Data::Value type."); + } + } + async fn on_error(&self, err: UStatus) { + panic!("Internal Error: {err:?}"); + } +} #[async_std::test] async fn test_publish_and_subscribe() { @@ -28,29 +59,17 @@ async fn test_publish_and_subscribe() { let target_data = String::from("Hello World!"); let upclient = test_lib::create_up_client_zenoh().await.unwrap(); let uuri = test_lib::create_utransport_uuri(0); - let verified_data = Arc::new(Mutex::new(String::new())); // Register the listener - let verified_data_cloned = verified_data.clone(); - let listener = move |result: Result| match result { - Ok(msg) => { - if let Data::Value(v) = msg.payload.unwrap().data.unwrap() { - let value = v.into_iter().map(|c| c as char).collect::(); - *verified_data_cloned.lock().unwrap() = value; - } else { - panic!("The message should be Data::Value type."); - } - } - Err(ustatus) => panic!("Internal Error: {ustatus:?}"), - }; - let listener_string = upclient - .register_listener(uuri.clone(), Box::new(listener)) + let pub_listener = Arc::new(PublishNotificationListener::new()); + upclient + .register_listener(uuri.clone(), pub_listener.clone()) .await .unwrap(); // Send UMessage let umessage = UMessageBuilder::publish(uuri.clone()) - .with_message_id(UUIDBuilder::new().build()) + .with_message_id(UUIDBuilder::build()) .build_with_payload( target_data.as_bytes().to_vec().into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT, @@ -62,11 +81,11 @@ async fn test_publish_and_subscribe() { task::sleep(time::Duration::from_millis(1000)).await; // Compare the result - assert_eq!(*verified_data.lock().unwrap(), target_data); + assert_eq!(pub_listener.get_recv_data(), target_data); // Cleanup upclient - .unregister_listener(uuri.clone(), &listener_string) + .unregister_listener(uuri.clone(), pub_listener) .await .unwrap(); } @@ -80,29 +99,17 @@ async fn test_notification_and_subscribe() { let upclient = test_lib::create_up_client_zenoh().await.unwrap(); let src_uuri = test_lib::create_utransport_uuri(0); let dst_uuri = test_lib::create_utransport_uuri(1); - let verified_data = Arc::new(Mutex::new(String::new())); // Register the listener - let verified_data_cloned = verified_data.clone(); - let listener = move |result: Result| match result { - Ok(msg) => { - if let Data::Value(v) = msg.payload.unwrap().data.unwrap() { - let value = v.into_iter().map(|c| c as char).collect::(); - *verified_data_cloned.lock().unwrap() = value; - } else { - panic!("The message should be Data::Value type."); - } - } - Err(ustatus) => panic!("Internal Error: {ustatus:?}"), - }; - let listener_string = upclient - .register_listener(dst_uuri.clone(), Box::new(listener)) + let notification_listener = Arc::new(PublishNotificationListener::new()); + upclient + .register_listener(dst_uuri.clone(), notification_listener.clone()) .await .unwrap(); // Send UMessage let umessage = UMessageBuilder::notification(src_uuri.clone(), dst_uuri.clone()) - .with_message_id(UUIDBuilder::new().build()) + .with_message_id(UUIDBuilder::build()) .build_with_payload( target_data.as_bytes().to_vec().into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT, @@ -114,11 +121,11 @@ async fn test_notification_and_subscribe() { task::sleep(time::Duration::from_millis(1000)).await; // Compare the result - assert_eq!(*verified_data.lock().unwrap(), target_data); + assert_eq!(notification_listener.get_recv_data(), target_data); // Cleanup upclient - .unregister_listener(dst_uuri.clone(), &listener_string) + .unregister_listener(dst_uuri.clone(), notification_listener) .await .unwrap(); } diff --git a/tests/register.rs b/tests/register.rs index b3f44a4..3b06ec2 100644 --- a/tests/register.rs +++ b/tests/register.rs @@ -13,8 +13,17 @@ // pub mod test_lib; -use up_rust::{UCode, UStatus, UTransport}; +use std::sync::Arc; +use async_trait::async_trait; +use up_rust::{UCode, UListener, UMessage, UStatus, UTransport}; + +struct FooListener; +#[async_trait] +impl UListener for FooListener { + async fn on_receive(&self, _msg: UMessage) {} + async fn on_error(&self, _err: UStatus) {} +} #[async_std::test] async fn test_utransport_register_and_unregister() { test_lib::before_test(); @@ -22,23 +31,23 @@ async fn test_utransport_register_and_unregister() { // Initialization let upclient = test_lib::create_up_client_zenoh().await.unwrap(); let uuri = test_lib::create_utransport_uuri(0); + let foo_listener = Arc::new(FooListener); - // Compare the return string - let listener_string = upclient - .register_listener(uuri.clone(), Box::new(|_| {})) + // Register the listener + upclient + .register_listener(uuri.clone(), foo_listener.clone()) .await .unwrap(); - assert_eq!(listener_string, "upl/0100162e04d20100_0"); // Able to ungister upclient - .unregister_listener(uuri.clone(), &listener_string) + .unregister_listener(uuri.clone(), foo_listener.clone()) .await .unwrap(); // Unable to ungister let result = upclient - .unregister_listener(uuri.clone(), &listener_string) + .unregister_listener(uuri.clone(), foo_listener.clone()) .await; assert_eq!( result, @@ -56,23 +65,23 @@ async fn test_rpcserver_register_and_unregister() { // Initialization let upclient = test_lib::create_up_client_zenoh().await.unwrap(); let uuri = test_lib::create_rpcserver_uuri(); + let foo_listener = Arc::new(FooListener); - // Compare the return string - let listener_string = upclient - .register_listener(uuri.clone(), Box::new(|_| {})) + // Register the listener + upclient + .register_listener(uuri.clone(), foo_listener.clone()) .await .unwrap(); - assert_eq!(listener_string, "upl/0100162e04d20100_0"); // Able to ungister upclient - .unregister_listener(uuri.clone(), &listener_string) + .unregister_listener(uuri.clone(), foo_listener.clone()) .await .unwrap(); // Unable to ungister let result = upclient - .unregister_listener(uuri.clone(), &listener_string) + .unregister_listener(uuri.clone(), foo_listener.clone()) .await; assert_eq!( result, @@ -90,26 +99,23 @@ async fn test_utransport_special_uuri_register_and_unregister() { // Initialization let upclient = test_lib::create_up_client_zenoh().await.unwrap(); let uuri = test_lib::create_special_uuri(); + let foo_listener = Arc::new(FooListener); - // Compare the return string - let listener_string = upclient - .register_listener(uuri.clone(), Box::new(|_| {})) + // Register the listener + upclient + .register_listener(uuri.clone(), foo_listener.clone()) .await .unwrap(); - assert_eq!( - listener_string, - "upr/060102030a0b0c/**&upr/060102030a0b0c/**_0&upr/060102030a0b0c/**_1" - ); // Able to ungister upclient - .unregister_listener(uuri.clone(), &listener_string) + .unregister_listener(uuri.clone(), foo_listener.clone()) .await .unwrap(); // Unable to ungister let result = upclient - .unregister_listener(uuri.clone(), &listener_string) + .unregister_listener(uuri.clone(), foo_listener.clone()) .await; assert_eq!( result, diff --git a/tests/rpc.rs b/tests/rpc.rs index ddb665d..6485b44 100644 --- a/tests/rpc.rs +++ b/tests/rpc.rs @@ -14,12 +14,92 @@ pub mod test_lib; use async_std::task::{self, block_on}; -use std::{sync::Arc, time}; +use async_trait::async_trait; +use std::{ + sync::{Arc, Mutex}, + time, +}; +use up_client_zenoh::UPClientZenoh; use up_rust::{ - CallOptions, Data, RpcClient, UMessage, UMessageBuilder, UPayload, UPayloadFormat, UStatus, - UTransport, UUIDBuilder, + CallOptions, Data, RpcClient, UListener, UMessage, UMessageBuilder, UPayload, UPayloadFormat, + UStatus, UTransport, UUIDBuilder, }; +struct RequestListener { + up_client: Arc, + request_data: String, + response_data: String, +} +impl RequestListener { + fn new(up_client: Arc, request_data: String, response_data: String) -> Self { + RequestListener { + up_client, + request_data, + response_data, + } + } +} +#[async_trait] +impl UListener for RequestListener { + async fn on_receive(&self, msg: UMessage) { + let UMessage { + attributes, + payload, + .. + } = msg; + // Check the payload of request + if let Data::Value(v) = payload.unwrap().data.unwrap() { + let value = v.into_iter().map(|c| c as char).collect::(); + assert_eq!(self.request_data, value); + } else { + panic!("The message should be Data::Value type."); + } + // Send back result + let umessage = UMessageBuilder::response_for_request(&attributes) + .with_message_id(UUIDBuilder::build()) + .build_with_payload( + self.response_data.as_bytes().to_vec().into(), + UPayloadFormat::UPAYLOAD_FORMAT_TEXT, + ) + .unwrap(); + block_on(self.up_client.send(umessage)).unwrap(); + } + async fn on_error(&self, err: UStatus) { + panic!("Internal Error: {err:?}"); + } +} + +struct ResponseListener { + response_data: Arc>, +} +impl ResponseListener { + fn new() -> Self { + ResponseListener { + response_data: Arc::new(Mutex::new(String::new())), + } + } + fn get_response_data(&self) -> String { + self.response_data.lock().unwrap().clone() + } +} +#[async_trait] +impl UListener for ResponseListener { + async fn on_receive(&self, msg: UMessage) { + let UMessage { payload, .. } = msg; + // Check the response data + if let Data::Value(v) = payload.unwrap().data.unwrap() { + let value = v.into_iter().map(|c| c as char).collect::(); + *self.response_data.lock().unwrap() = value; + } else { + panic!("The message should be Data::Value type."); + } + } + async fn on_error(&self, _err: UStatus) { + // TODO: Check why we also receive timeout after receiving correct data + //panic!("Internal Error: {err:?}"); + } +} + #[async_std::test] async fn test_rpc_server_client() { test_lib::before_test(); @@ -32,41 +112,13 @@ async fn test_rpc_server_client() { let dst_uuri = test_lib::create_rpcserver_uuri(); // Setup RpcServer callback - let upclient_server_cloned = upclient_server.clone(); - let response_data_cloned = response_data.clone(); - let request_data_cloned = request_data.clone(); - let callback = move |result: Result| { - match result { - Ok(msg) => { - let UMessage { - attributes, - payload, - .. - } = msg; - // Check the payload of request - if let Data::Value(v) = payload.unwrap().data.unwrap() { - let value = v.into_iter().map(|c| c as char).collect::(); - assert_eq!(request_data_cloned, value); - } else { - panic!("The message should be Data::Value type."); - } - // Send back result - let umessage = UMessageBuilder::response_for_request(&attributes) - .with_message_id(UUIDBuilder::new().build()) - .build_with_payload( - response_data_cloned.as_bytes().to_vec().into(), - UPayloadFormat::UPAYLOAD_FORMAT_TEXT, - ) - .unwrap(); - block_on(upclient_server_cloned.send(umessage)).unwrap(); - } - Err(ustatus) => { - panic!("Internal Error: {ustatus:?}"); - } - } - }; - let listener_string = upclient_server - .register_listener(dst_uuri.clone(), Box::new(callback)) + let request_listener = Arc::new(RequestListener::new( + upclient_server.clone(), + request_data.clone(), + response_data.clone(), + )); + upclient_server + .register_listener(dst_uuri.clone(), request_listener.clone()) .await .unwrap(); @@ -103,32 +155,16 @@ async fn test_rpc_server_client() { // Send Request with send { // Register Response callback - let callback = move |result: Result| { - match result { - Ok(msg) => { - let UMessage { payload, .. } = msg; - // Check the response data - if let Data::Value(v) = payload.unwrap().data.unwrap() { - let value = v.into_iter().map(|c| c as char).collect::(); - assert_eq!(response_data, value); - } else { - panic!("The message should be Data::Value type."); - } - } - Err(ustatus) => { - panic!("Internal Error: {ustatus:?}"); - } - } - }; let response_uuri = upclient_client.get_response_uuri(); + let response_listener = Arc::new(ResponseListener::new()); upclient_client - .register_listener(response_uuri.clone(), Box::new(callback)) + .register_listener(response_uuri.clone(), response_listener.clone()) .await .unwrap(); // Send request - let umessage = UMessageBuilder::request(dst_uuri.clone(), response_uuri, 3000) - .with_message_id(UUIDBuilder::new().build()) + let umessage = UMessageBuilder::request(dst_uuri.clone(), response_uuri.clone(), 2000) + .with_message_id(UUIDBuilder::build()) .build_with_payload( request_data.as_bytes().to_vec().into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT, @@ -137,12 +173,21 @@ async fn test_rpc_server_client() { upclient_client.send(umessage).await.unwrap(); // Waiting for the callback to process data - task::sleep(time::Duration::from_millis(1000)).await; + task::sleep(time::Duration::from_millis(5000)).await; + + // Compare the result + assert_eq!(response_listener.get_response_data(), response_data); + + // Cleanup + upclient_client + .unregister_listener(response_uuri.clone(), response_listener.clone()) + .await + .unwrap(); } // Cleanup upclient_server - .unregister_listener(dst_uuri.clone(), &listener_string) + .unregister_listener(dst_uuri.clone(), request_listener.clone()) .await .unwrap(); } diff --git a/tests/special_uuri.rs b/tests/special_uuri.rs index b1e6678..ebf2fc0 100644 --- a/tests/special_uuri.rs +++ b/tests/special_uuri.rs @@ -14,72 +14,104 @@ pub mod test_lib; use async_std::task::{self, block_on}; -use std::{sync::Arc, time}; +use async_trait::async_trait; +use std::{ + sync::{Arc, Mutex}, + time, +}; +use up_client_zenoh::UPClientZenoh; use up_rust::{ - CallOptions, Data, RpcClient, UMessage, UMessageBuilder, UMessageType, UPayload, + CallOptions, Data, RpcClient, UListener, UMessage, UMessageBuilder, UMessageType, UPayload, UPayloadFormat, UStatus, UTransport, UUIDBuilder, }; +struct SpecialListener { + up_client: Arc, + recv_data: Arc>, + request_data: String, + response_data: String, +} +impl SpecialListener { + fn new(up_client: Arc, request_data: String, response_data: String) -> Self { + SpecialListener { + up_client, + recv_data: Arc::new(Mutex::new(String::new())), + request_data, + response_data, + } + } + // TODO: Should be removed later + #[allow(dead_code)] + fn get_recv_data(&self) -> String { + self.recv_data.lock().unwrap().clone() + } +} + +#[async_trait] +impl UListener for SpecialListener { + async fn on_receive(&self, msg: UMessage) { + let UMessage { + attributes, + payload, + .. + } = msg; + let value = if let Data::Value(v) = payload.clone().unwrap().data.unwrap() { + v.into_iter().map(|c| c as char).collect::() + } else { + panic!("The message should be Data::Value type."); + }; + match attributes.type_.enum_value().unwrap() { + UMessageType::UMESSAGE_TYPE_PUBLISH => { + *self.recv_data.lock().unwrap() = value; + } + UMessageType::UMESSAGE_TYPE_NOTIFICATION => { + panic!("Notification type"); + } + UMessageType::UMESSAGE_TYPE_REQUEST => { + assert_eq!(self.request_data, value); + // Send back result + let umessage = UMessageBuilder::response_for_request(&attributes) + .with_message_id(UUIDBuilder::build()) + .build_with_payload( + self.response_data.as_bytes().to_vec().into(), + UPayloadFormat::UPAYLOAD_FORMAT_TEXT, + ) + .unwrap(); + block_on(self.up_client.send(umessage)).unwrap(); + } + UMessageType::UMESSAGE_TYPE_RESPONSE => { + panic!("Response type"); + } + UMessageType::UMESSAGE_TYPE_UNSPECIFIED => { + panic!("Unknown type"); + } + } + } + async fn on_error(&self, err: UStatus) { + panic!("Internal Error: {err:?}"); + } +} + #[async_std::test] async fn test_register_listener_with_special_uuri() { test_lib::before_test(); // Initialization let upclient1 = Arc::new(test_lib::create_up_client_zenoh().await.unwrap()); - let upclient1_clone = upclient1.clone(); let upclient2 = test_lib::create_up_client_zenoh().await.unwrap(); let publish_data = String::from("Hello World!"); let request_data = String::from("This is the request data"); - let response_data = String::from("This is the request data"); + let response_data = String::from("This is the response data"); // Register the listener - let publish_data_cloned = publish_data.clone(); - let request_data_cloned = request_data.clone(); - let response_data_cloned = response_data.clone(); let listener_uuri = test_lib::create_special_uuri(); - let listener = move |result: Result| match result { - Ok(msg) => { - let UMessage { - attributes, - payload, - .. - } = msg; - let value = if let Data::Value(v) = payload.clone().unwrap().data.unwrap() { - v.into_iter().map(|c| c as char).collect::() - } else { - panic!("The message should be Data::Value type."); - }; - match attributes.type_.enum_value().unwrap() { - UMessageType::UMESSAGE_TYPE_PUBLISH => { - assert_eq!(publish_data_cloned, value); - } - UMessageType::UMESSAGE_TYPE_NOTIFICATION => { - panic!("Notification type"); - } - UMessageType::UMESSAGE_TYPE_REQUEST => { - assert_eq!(request_data_cloned, value); - // Send back result - let umessage = UMessageBuilder::response_for_request(&attributes) - .with_message_id(UUIDBuilder::new().build()) - .build_with_payload( - response_data_cloned.as_bytes().to_vec().into(), - UPayloadFormat::UPAYLOAD_FORMAT_TEXT, - ) - .unwrap(); - block_on(upclient1_clone.send(umessage)).unwrap(); - } - UMessageType::UMESSAGE_TYPE_RESPONSE => { - panic!("Response type"); - } - UMessageType::UMESSAGE_TYPE_UNSPECIFIED => { - panic!("Unknown type"); - } - } - } - Err(ustatus) => panic!("Internal Error: {ustatus:?}"), - }; - let listener_string = upclient1 - .register_listener(listener_uuri.clone(), Box::new(listener)) + let special_listener = Arc::new(SpecialListener::new( + upclient1.clone(), + request_data.clone(), + response_data.clone(), + )); + upclient1 + .register_listener(listener_uuri.clone(), special_listener.clone()) .await .unwrap(); @@ -91,7 +123,7 @@ async fn test_register_listener_with_special_uuri() { // Send Publish data let umessage = UMessageBuilder::publish(publish_uuri) - .with_message_id(UUIDBuilder::new().build()) + .with_message_id(UUIDBuilder::build()) .build_with_payload( publish_data.as_bytes().to_vec().into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT, @@ -101,6 +133,10 @@ async fn test_register_listener_with_special_uuri() { // Waiting for the subscriber to receive data task::sleep(time::Duration::from_millis(1000)).await; + + // Compare the result + //TODO: Update the UAuthority + //assert_eq!(special_listener.get_recv_data(), response_data); } // Send Request @@ -137,7 +173,7 @@ async fn test_register_listener_with_special_uuri() { // Cleanup upclient1 - .unregister_listener(listener_uuri, &listener_string) + .unregister_listener(listener_uuri, special_listener.clone()) .await .unwrap(); } diff --git a/tests/test_lib.rs b/tests/test_lib.rs index 95a6342..5422a56 100644 --- a/tests/test_lib.rs +++ b/tests/test_lib.rs @@ -105,8 +105,8 @@ pub fn create_rpcserver_uuri() -> UUri { #[allow(clippy::must_use_candidate)] pub fn create_authority() -> UAuthority { UAuthority { - name: Some("UAuthName".to_string()), - number: Some(Number::Id(vec![1, 2, 3, 10, 11, 12])), + name: Some("MyAuthName".to_string()), + number: Some(Number::Id(vec![1, 2, 3, 4])), ..Default::default() } } From 680dc87ea334ae54e07a361a8d2138f7aac70d89 Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Thu, 11 Apr 2024 14:31:00 +0800 Subject: [PATCH 2/3] Reorganize the test code. Signed-off-by: ChenYing Kuo --- src/utransport.rs | 5 +- tests/publish.rs | 47 ++++++----- tests/register.rs | 86 ++------------------ tests/rpc.rs | 20 ++--- tests/special_uuri.rs | 179 ------------------------------------------ tests/test_lib.rs | 132 +++++++++++++++---------------- 6 files changed, 110 insertions(+), 359 deletions(-) delete mode 100644 tests/special_uuri.rs diff --git a/src/utransport.rs b/src/utransport.rs index 0269857..ff050f8 100644 --- a/src/utransport.rs +++ b/src/utransport.rs @@ -73,8 +73,6 @@ impl UPClientZenoh { Ok(()) } - // TODO: Might need some refactors - #[allow(clippy::too_many_lines)] async fn send_request( &self, zenoh_key: &str, @@ -137,8 +135,7 @@ impl UPClientZenoh { let u_attribute = match UPClientZenoh::attachment_to_uattributes(attachment) { Ok(uattr) => uattr, Err(e) => { - let msg = - format!("Unable to transform attachment to UAttributes: {e:?}"); + let msg = format!("Transform attachment to UAttributes failed: {e:?}"); log::error!("{msg}"); block_on( resp_callback diff --git a/tests/publish.rs b/tests/publish.rs index 4469d89..ea8ecf8 100644 --- a/tests/publish.rs +++ b/tests/publish.rs @@ -19,8 +19,10 @@ use std::{ sync::{Arc, Mutex}, time, }; +use test_case::test_case; use up_rust::{ Data, UListener, UMessage, UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUIDBuilder, + UUri, }; struct PublishNotificationListener { @@ -51,31 +53,35 @@ impl UListener for PublishNotificationListener { } } +#[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0), test_lib::create_utransport_uuri(Some(0), 0, 0); "Normal UUri")] +#[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0), test_lib::create_special_uuri(0); "Special UUri")] #[async_std::test] -async fn test_publish_and_subscribe() { +async fn test_publish_and_subscribe(origin_uuri: UUri, dst_uuri: UUri) { test_lib::before_test(); // Initialization let target_data = String::from("Hello World!"); - let upclient = test_lib::create_up_client_zenoh().await.unwrap(); - let uuri = test_lib::create_utransport_uuri(0); + let upclient_send = test_lib::create_up_client_zenoh(0, 0).await.unwrap(); + let upclient_recv = test_lib::create_up_client_zenoh(1, 1).await.unwrap(); // Register the listener let pub_listener = Arc::new(PublishNotificationListener::new()); - upclient - .register_listener(uuri.clone(), pub_listener.clone()) + upclient_recv + .register_listener(dst_uuri.clone(), pub_listener.clone()) .await .unwrap(); + // Waiting for listener to take effect + task::sleep(time::Duration::from_millis(1000)).await; // Send UMessage - let umessage = UMessageBuilder::publish(uuri.clone()) + let umessage = UMessageBuilder::publish(origin_uuri.clone()) .with_message_id(UUIDBuilder::build()) .build_with_payload( target_data.as_bytes().to_vec().into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT, ) .unwrap(); - upclient.send(umessage).await.unwrap(); + upclient_send.send(umessage).await.unwrap(); // Waiting for the subscriber to receive data task::sleep(time::Duration::from_millis(1000)).await; @@ -84,38 +90,41 @@ async fn test_publish_and_subscribe() { assert_eq!(pub_listener.get_recv_data(), target_data); // Cleanup - upclient - .unregister_listener(uuri.clone(), pub_listener) + upclient_recv + .unregister_listener(dst_uuri.clone(), pub_listener) .await .unwrap(); } +#[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0), test_lib::create_utransport_uuri(Some(1), 1, 1), test_lib::create_utransport_uuri(Some(1), 1, 1); "Normal UUri")] +#[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0), test_lib::create_utransport_uuri(Some(1), 1, 1), test_lib::create_special_uuri(1); "Special UUri")] #[async_std::test] -async fn test_notification_and_subscribe() { +async fn test_notification_and_subscribe(origin_uuri: UUri, dst_uuri: UUri, listen_uuri: UUri) { test_lib::before_test(); // Initialization let target_data = String::from("Hello World!"); - let upclient = test_lib::create_up_client_zenoh().await.unwrap(); - let src_uuri = test_lib::create_utransport_uuri(0); - let dst_uuri = test_lib::create_utransport_uuri(1); + let upclient_notify = test_lib::create_up_client_zenoh(0, 0).await.unwrap(); + let upclient_recv = test_lib::create_up_client_zenoh(1, 1).await.unwrap(); // Register the listener let notification_listener = Arc::new(PublishNotificationListener::new()); - upclient - .register_listener(dst_uuri.clone(), notification_listener.clone()) + upclient_recv + .register_listener(listen_uuri.clone(), notification_listener.clone()) .await .unwrap(); + // Waiting for listener to take effect + task::sleep(time::Duration::from_millis(1000)).await; // Send UMessage - let umessage = UMessageBuilder::notification(src_uuri.clone(), dst_uuri.clone()) + let umessage = UMessageBuilder::notification(origin_uuri.clone(), dst_uuri.clone()) .with_message_id(UUIDBuilder::build()) .build_with_payload( target_data.as_bytes().to_vec().into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT, ) .unwrap(); - upclient.send(umessage).await.unwrap(); + upclient_notify.send(umessage).await.unwrap(); // Waiting for the subscriber to receive data task::sleep(time::Duration::from_millis(1000)).await; @@ -124,8 +133,8 @@ async fn test_notification_and_subscribe() { assert_eq!(notification_listener.get_recv_data(), target_data); // Cleanup - upclient - .unregister_listener(dst_uuri.clone(), notification_listener) + upclient_recv + .unregister_listener(listen_uuri.clone(), notification_listener) .await .unwrap(); } diff --git a/tests/register.rs b/tests/register.rs index 3b06ec2..e48406e 100644 --- a/tests/register.rs +++ b/tests/register.rs @@ -16,7 +16,8 @@ pub mod test_lib; use std::sync::Arc; use async_trait::async_trait; -use up_rust::{UCode, UListener, UMessage, UStatus, UTransport}; +use test_case::test_case; +use up_rust::{UListener, UMessage, UStatus, UTransport, UUri}; struct FooListener; #[async_trait] @@ -24,81 +25,16 @@ impl UListener for FooListener { async fn on_receive(&self, _msg: UMessage) {} async fn on_error(&self, _err: UStatus) {} } -#[async_std::test] -async fn test_utransport_register_and_unregister() { - test_lib::before_test(); - - // Initialization - let upclient = test_lib::create_up_client_zenoh().await.unwrap(); - let uuri = test_lib::create_utransport_uuri(0); - let foo_listener = Arc::new(FooListener); - - // Register the listener - upclient - .register_listener(uuri.clone(), foo_listener.clone()) - .await - .unwrap(); - - // Able to ungister - upclient - .unregister_listener(uuri.clone(), foo_listener.clone()) - .await - .unwrap(); - - // Unable to ungister - let result = upclient - .unregister_listener(uuri.clone(), foo_listener.clone()) - .await; - assert_eq!( - result, - Err(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "Publish listener doesn't exist" - )) - ); -} - -#[async_std::test] -async fn test_rpcserver_register_and_unregister() { - test_lib::before_test(); - - // Initialization - let upclient = test_lib::create_up_client_zenoh().await.unwrap(); - let uuri = test_lib::create_rpcserver_uuri(); - let foo_listener = Arc::new(FooListener); - - // Register the listener - upclient - .register_listener(uuri.clone(), foo_listener.clone()) - .await - .unwrap(); - - // Able to ungister - upclient - .unregister_listener(uuri.clone(), foo_listener.clone()) - .await - .unwrap(); - - // Unable to ungister - let result = upclient - .unregister_listener(uuri.clone(), foo_listener.clone()) - .await; - assert_eq!( - result, - Err(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "RPC request listener doesn't exist" - )) - ); -} +#[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0); "Publish / Notification register_listener")] +#[test_case(test_lib::create_rpcserver_uuri(Some(0), 0); "RPC register_listener")] +#[test_case(test_lib::create_special_uuri(0); "Special UUri register_listener")] #[async_std::test] -async fn test_utransport_special_uuri_register_and_unregister() { +async fn test_register_and_unregister(uuri: UUri) { test_lib::before_test(); // Initialization - let upclient = test_lib::create_up_client_zenoh().await.unwrap(); - let uuri = test_lib::create_special_uuri(); + let upclient = test_lib::create_up_client_zenoh(0, 0).await.unwrap(); let foo_listener = Arc::new(FooListener); // Register the listener @@ -117,11 +53,5 @@ async fn test_utransport_special_uuri_register_and_unregister() { let result = upclient .unregister_listener(uuri.clone(), foo_listener.clone()) .await; - assert_eq!( - result, - Err(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "RPC response callback doesn't exist" - )) - ); + assert!(result.is_err()); } diff --git a/tests/rpc.rs b/tests/rpc.rs index 6485b44..27cdd36 100644 --- a/tests/rpc.rs +++ b/tests/rpc.rs @@ -19,10 +19,11 @@ use std::{ sync::{Arc, Mutex}, time, }; +use test_case::test_case; use up_client_zenoh::UPClientZenoh; use up_rust::{ CallOptions, Data, RpcClient, UListener, UMessage, UMessageBuilder, UPayload, UPayloadFormat, - UStatus, UTransport, UUIDBuilder, + UStatus, UTransport, UUIDBuilder, UUri, }; struct RequestListener { @@ -95,21 +96,21 @@ impl UListener for ResponseListener { } } async fn on_error(&self, _err: UStatus) { - // TODO: Check why we also receive timeout after receiving correct data //panic!("Internal Error: {err:?}"); } } +#[test_case(test_lib::create_rpcserver_uuri(Some(1), 1), test_lib::create_rpcserver_uuri(Some(1), 1); "Normal RPC UUri")] +#[test_case(test_lib::create_rpcserver_uuri(Some(1), 1), test_lib::create_special_uuri(1); "Special listen UUri")] #[async_std::test] -async fn test_rpc_server_client() { +async fn test_rpc_server_client(dst_uuri: UUri, listen_uuri: UUri) { test_lib::before_test(); // Initialization - let upclient_client = test_lib::create_up_client_zenoh().await.unwrap(); - let upclient_server = Arc::new(test_lib::create_up_client_zenoh().await.unwrap()); + let upclient_client = test_lib::create_up_client_zenoh(0, 0).await.unwrap(); + let upclient_server = Arc::new(test_lib::create_up_client_zenoh(1, 1).await.unwrap()); let request_data = String::from("This is the request data"); let response_data = String::from("This is the response data"); - let dst_uuri = test_lib::create_rpcserver_uuri(); // Setup RpcServer callback let request_listener = Arc::new(RequestListener::new( @@ -118,10 +119,9 @@ async fn test_rpc_server_client() { response_data.clone(), )); upclient_server - .register_listener(dst_uuri.clone(), request_listener.clone()) + .register_listener(listen_uuri.clone(), request_listener.clone()) .await .unwrap(); - // Need some time for queryable to run task::sleep(time::Duration::from_millis(1000)).await; @@ -163,7 +163,7 @@ async fn test_rpc_server_client() { .unwrap(); // Send request - let umessage = UMessageBuilder::request(dst_uuri.clone(), response_uuri.clone(), 2000) + let umessage = UMessageBuilder::request(dst_uuri.clone(), response_uuri.clone(), 1000) .with_message_id(UUIDBuilder::build()) .build_with_payload( request_data.as_bytes().to_vec().into(), @@ -187,7 +187,7 @@ async fn test_rpc_server_client() { // Cleanup upclient_server - .unregister_listener(dst_uuri.clone(), request_listener.clone()) + .unregister_listener(listen_uuri.clone(), request_listener.clone()) .await .unwrap(); } diff --git a/tests/special_uuri.rs b/tests/special_uuri.rs deleted file mode 100644 index ebf2fc0..0000000 --- a/tests/special_uuri.rs +++ /dev/null @@ -1,179 +0,0 @@ -// -// Copyright (c) 2024 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -pub mod test_lib; - -use async_std::task::{self, block_on}; -use async_trait::async_trait; -use std::{ - sync::{Arc, Mutex}, - time, -}; -use up_client_zenoh::UPClientZenoh; -use up_rust::{ - CallOptions, Data, RpcClient, UListener, UMessage, UMessageBuilder, UMessageType, UPayload, - UPayloadFormat, UStatus, UTransport, UUIDBuilder, -}; - -struct SpecialListener { - up_client: Arc, - recv_data: Arc>, - request_data: String, - response_data: String, -} -impl SpecialListener { - fn new(up_client: Arc, request_data: String, response_data: String) -> Self { - SpecialListener { - up_client, - recv_data: Arc::new(Mutex::new(String::new())), - request_data, - response_data, - } - } - // TODO: Should be removed later - #[allow(dead_code)] - fn get_recv_data(&self) -> String { - self.recv_data.lock().unwrap().clone() - } -} - -#[async_trait] -impl UListener for SpecialListener { - async fn on_receive(&self, msg: UMessage) { - let UMessage { - attributes, - payload, - .. - } = msg; - let value = if let Data::Value(v) = payload.clone().unwrap().data.unwrap() { - v.into_iter().map(|c| c as char).collect::() - } else { - panic!("The message should be Data::Value type."); - }; - match attributes.type_.enum_value().unwrap() { - UMessageType::UMESSAGE_TYPE_PUBLISH => { - *self.recv_data.lock().unwrap() = value; - } - UMessageType::UMESSAGE_TYPE_NOTIFICATION => { - panic!("Notification type"); - } - UMessageType::UMESSAGE_TYPE_REQUEST => { - assert_eq!(self.request_data, value); - // Send back result - let umessage = UMessageBuilder::response_for_request(&attributes) - .with_message_id(UUIDBuilder::build()) - .build_with_payload( - self.response_data.as_bytes().to_vec().into(), - UPayloadFormat::UPAYLOAD_FORMAT_TEXT, - ) - .unwrap(); - block_on(self.up_client.send(umessage)).unwrap(); - } - UMessageType::UMESSAGE_TYPE_RESPONSE => { - panic!("Response type"); - } - UMessageType::UMESSAGE_TYPE_UNSPECIFIED => { - panic!("Unknown type"); - } - } - } - async fn on_error(&self, err: UStatus) { - panic!("Internal Error: {err:?}"); - } -} - -#[async_std::test] -async fn test_register_listener_with_special_uuri() { - test_lib::before_test(); - - // Initialization - let upclient1 = Arc::new(test_lib::create_up_client_zenoh().await.unwrap()); - let upclient2 = test_lib::create_up_client_zenoh().await.unwrap(); - let publish_data = String::from("Hello World!"); - let request_data = String::from("This is the request data"); - let response_data = String::from("This is the response data"); - - // Register the listener - let listener_uuri = test_lib::create_special_uuri(); - let special_listener = Arc::new(SpecialListener::new( - upclient1.clone(), - request_data.clone(), - response_data.clone(), - )); - upclient1 - .register_listener(listener_uuri.clone(), special_listener.clone()) - .await - .unwrap(); - - // Send Publish - { - // Initialization - let mut publish_uuri = test_lib::create_utransport_uuri(0); - publish_uuri.authority = Some(test_lib::create_authority()).into(); - - // Send Publish data - let umessage = UMessageBuilder::publish(publish_uuri) - .with_message_id(UUIDBuilder::build()) - .build_with_payload( - publish_data.as_bytes().to_vec().into(), - UPayloadFormat::UPAYLOAD_FORMAT_TEXT, - ) - .unwrap(); - upclient2.send(umessage).await.unwrap(); - - // Waiting for the subscriber to receive data - task::sleep(time::Duration::from_millis(1000)).await; - - // Compare the result - //TODO: Update the UAuthority - //assert_eq!(special_listener.get_recv_data(), response_data); - } - - // Send Request - { - // Initialization - let mut request_uuri = test_lib::create_rpcserver_uuri(); - request_uuri.authority = Some(test_lib::create_authority()).into(); - - // RpcClient: Send Request data - let payload = UPayload { - format: UPayloadFormat::UPAYLOAD_FORMAT_TEXT.into(), - data: Some(Data::Value(request_data.as_bytes().to_vec())), - ..Default::default() - }; - let result = upclient2 - .invoke_method( - request_uuri, - payload, - CallOptions { - ttl: 1000, - ..Default::default() - }, - ) - .await; - - // Process the result - if let Data::Value(v) = result.unwrap().payload.unwrap().data.unwrap() { - let value = v.into_iter().map(|c| c as char).collect::(); - assert_eq!(response_data, value); - } else { - panic!("Failed to get result from invoke_method."); - } - } - - // Cleanup - upclient1 - .unregister_listener(listener_uuri, special_listener.clone()) - .await - .unwrap(); -} diff --git a/tests/test_lib.rs b/tests/test_lib.rs index 5422a56..7bf056f 100644 --- a/tests/test_lib.rs +++ b/tests/test_lib.rs @@ -17,82 +17,85 @@ use up_rust::{Number, UAuthority, UEntity, UResource, UResourceBuilder, UStatus, use zenoh::config::Config; static INIT: Once = Once::new(); +static AUTH_NAME: &str = "auth_name"; +static ENTITY_NAME: &str = "entity_name"; +static RESOURCE_NAME: &str = "resource_name"; +static INSTANCE_NAME: &str = "instance_name"; +static MESSAGE_NAME: &str = "message_name"; pub fn before_test() { INIT.call_once(env_logger::init); } -/// # Errors -/// Will return `Err` if unable to create `UPClientZenoh` -pub async fn create_up_client_zenoh() -> Result { - let uauthority = UAuthority { - name: Some("MyAuthName".to_string()), - number: Some(Number::Id(vec![1, 2, 3, 4])), +#[allow(clippy::must_use_candidate)] +pub fn create_authority(idx: u8) -> UAuthority { + UAuthority { + name: Some(format!("{AUTH_NAME}{idx}")), + number: Some(Number::Id(vec![1, 2, 3, 4 + idx])), ..Default::default() - }; - let uentity = UEntity { - name: "default.entity".to_string(), - id: Some(u32::from(rand::random::())), + } +} + +#[allow(clippy::must_use_candidate)] +pub fn create_entity(idx: u32) -> UEntity { + UEntity { + name: format!("{ENTITY_NAME}{idx}"), + id: Some(idx), version_major: Some(1), version_minor: None, ..Default::default() - }; + } +} + +#[allow(clippy::must_use_candidate)] +pub fn create_resource(idx: u32) -> UResource { + UResource { + name: format!("{RESOURCE_NAME}{idx}"), + instance: Some(format!("{INSTANCE_NAME}{idx}")), + message: Some(format!("{MESSAGE_NAME}{idx}")), + id: Some(idx), + ..Default::default() + } +} + +/// # Errors +/// Will return `Err` if unable to create `UPClientZenoh` +pub async fn create_up_client_zenoh( + auth_idx: u8, + entity_idx: u32, +) -> Result { + let uauthority = create_authority(auth_idx); + let uentity = create_entity(entity_idx); UPClientZenoh::new(Config::default(), uauthority, uentity).await } #[allow(clippy::must_use_candidate)] -pub fn create_utransport_uuri(index: u8) -> UUri { - if index == 1 { - UUri { - entity: Some(UEntity { - name: "entity1".to_string(), - version_major: Some(1), - id: Some(1111), - ..Default::default() - }) - .into(), - resource: Some(UResource { - name: "name1".to_string(), - instance: Some("instance1".to_string()), - message: Some("message1".to_string()), - id: Some(1111), - ..Default::default() - }) - .into(), - ..Default::default() - } - } else { - UUri { - entity: Some(UEntity { - name: "body.access".to_string(), - version_major: Some(1), - id: Some(1234), - ..Default::default() - }) - .into(), - resource: Some(UResource { - name: "door".to_string(), - instance: Some("front_left".to_string()), - message: Some("Door".to_string()), - id: Some(5678), - ..Default::default() - }) - .into(), - ..Default::default() - } +pub fn create_utransport_uuri(auth_idx: Option, entity_idx: u32, resource_idx: u32) -> UUri { + UUri { + authority: if let Some(idx) = auth_idx { + // Remote UUri + Some(create_authority(idx)).into() + } else { + // Local UUri + None.into() + }, + entity: Some(create_entity(entity_idx)).into(), + resource: Some(create_resource(resource_idx)).into(), + ..Default::default() } } #[allow(clippy::must_use_candidate)] -pub fn create_rpcserver_uuri() -> UUri { +pub fn create_rpcserver_uuri(auth_idx: Option, entity_idx: u32) -> UUri { UUri { - entity: Some(UEntity { - name: "test_rpc.app".to_string(), - version_major: Some(1), - id: Some(1234), - ..Default::default() - }) - .into(), + authority: if let Some(idx) = auth_idx { + // Remote UUri + Some(create_authority(idx)).into() + } else { + // Local UUri + None.into() + }, + entity: Some(create_entity(entity_idx)).into(), resource: Some(UResourceBuilder::for_rpc_request( Some("SimpleTest".to_string()), Some(5678), @@ -103,18 +106,9 @@ pub fn create_rpcserver_uuri() -> UUri { } #[allow(clippy::must_use_candidate)] -pub fn create_authority() -> UAuthority { - UAuthority { - name: Some("MyAuthName".to_string()), - number: Some(Number::Id(vec![1, 2, 3, 4])), - ..Default::default() - } -} - -#[allow(clippy::must_use_candidate)] -pub fn create_special_uuri() -> UUri { +pub fn create_special_uuri(idx: u8) -> UUri { UUri { - authority: Some(create_authority()).into(), + authority: Some(create_authority(idx)).into(), ..Default::default() } } From f7bd9226cd094afbfb8eb6b7b340c8a113c70ce8 Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Wed, 17 Apr 2024 09:51:50 +0800 Subject: [PATCH 3/3] Update test function in Publish. Signed-off-by: ChenYing Kuo --- tests/publish.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/publish.rs b/tests/publish.rs index ea8ecf8..f2b970c 100644 --- a/tests/publish.rs +++ b/tests/publish.rs @@ -56,7 +56,7 @@ impl UListener for PublishNotificationListener { #[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0), test_lib::create_utransport_uuri(Some(0), 0, 0); "Normal UUri")] #[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0), test_lib::create_special_uuri(0); "Special UUri")] #[async_std::test] -async fn test_publish_and_subscribe(origin_uuri: UUri, dst_uuri: UUri) { +async fn test_publish_and_subscribe(publish_uuri: UUri, listen_uuri: UUri) { test_lib::before_test(); // Initialization @@ -67,14 +67,14 @@ async fn test_publish_and_subscribe(origin_uuri: UUri, dst_uuri: UUri) { // Register the listener let pub_listener = Arc::new(PublishNotificationListener::new()); upclient_recv - .register_listener(dst_uuri.clone(), pub_listener.clone()) + .register_listener(listen_uuri.clone(), pub_listener.clone()) .await .unwrap(); // Waiting for listener to take effect task::sleep(time::Duration::from_millis(1000)).await; // Send UMessage - let umessage = UMessageBuilder::publish(origin_uuri.clone()) + let umessage = UMessageBuilder::publish(publish_uuri.clone()) .with_message_id(UUIDBuilder::build()) .build_with_payload( target_data.as_bytes().to_vec().into(), @@ -91,7 +91,7 @@ async fn test_publish_and_subscribe(origin_uuri: UUri, dst_uuri: UUri) { // Cleanup upclient_recv - .unregister_listener(dst_uuri.clone(), pub_listener) + .unregister_listener(listen_uuri.clone(), pub_listener) .await .unwrap(); }