Skip to content

Commit

Permalink
georganize the test code.
Browse files Browse the repository at this point in the history
Signed-off-by: ChenYing Kuo <[email protected]>
  • Loading branch information
evshary committed Apr 11, 2024
1 parent 7818841 commit 553023f
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 359 deletions.
5 changes: 1 addition & 4 deletions src/utransport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ impl UPClientZenoh {
Ok(())
}

// TODO: Might need some refactors
#[allow(clippy::too_many_lines)]
async fn send_request(
&self,
zenoh_key: &str,
Expand Down Expand Up @@ -137,8 +135,7 @@ impl UPClientZenoh {
let u_attribute = match UPClientZenoh::attachment_to_uattributes(attachment) {
Ok(uattr) => uattr,
Err(e) => {
let msg =
format!("Unable to transform attachment to UAttributes: {e:?}");
let msg = format!("Transform attachment to UAttributes failed: {e:?}");
log::error!("{msg}");
block_on(
resp_callback
Expand Down
47 changes: 28 additions & 19 deletions tests/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ use std::{
sync::{Arc, Mutex},
time,
};
use test_case::test_case;
use up_rust::{
Data, UListener, UMessage, UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUIDBuilder,
UUri,
};

struct PublishNotificationListener {
Expand Down Expand Up @@ -51,31 +53,35 @@ impl UListener for PublishNotificationListener {
}
}

#[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0), test_lib::create_utransport_uuri(Some(0), 0, 0); "Normal UUri")]
#[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0), test_lib::create_special_uuri(0); "Special UUri")]
#[async_std::test]
async fn test_publish_and_subscribe() {
async fn test_publish_and_subscribe(origin_uuri: UUri, dst_uuri: UUri) {
test_lib::before_test();

// Initialization
let target_data = String::from("Hello World!");
let upclient = test_lib::create_up_client_zenoh().await.unwrap();
let uuri = test_lib::create_utransport_uuri(0);
let upclient_send = test_lib::create_up_client_zenoh(0, 0).await.unwrap();
let upclient_recv = test_lib::create_up_client_zenoh(1, 1).await.unwrap();

// Register the listener
let pub_listener = Arc::new(PublishNotificationListener::new());
upclient
.register_listener(uuri.clone(), pub_listener.clone())
upclient_recv
.register_listener(dst_uuri.clone(), pub_listener.clone())
.await
.unwrap();
// Waiting for listener to take effect
task::sleep(time::Duration::from_millis(1000)).await;

// Send UMessage
let umessage = UMessageBuilder::publish(uuri.clone())
let umessage = UMessageBuilder::publish(origin_uuri.clone())
.with_message_id(UUIDBuilder::build())
.build_with_payload(
target_data.as_bytes().to_vec().into(),
UPayloadFormat::UPAYLOAD_FORMAT_TEXT,
)
.unwrap();
upclient.send(umessage).await.unwrap();
upclient_send.send(umessage).await.unwrap();

// Waiting for the subscriber to receive data
task::sleep(time::Duration::from_millis(1000)).await;
Expand All @@ -84,38 +90,41 @@ async fn test_publish_and_subscribe() {
assert_eq!(pub_listener.get_recv_data(), target_data);

// Cleanup
upclient
.unregister_listener(uuri.clone(), pub_listener)
upclient_recv
.unregister_listener(dst_uuri.clone(), pub_listener)
.await
.unwrap();
}

#[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0), test_lib::create_utransport_uuri(Some(1), 1, 1), test_lib::create_utransport_uuri(Some(1), 1, 1); "Normal UUri")]
#[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0), test_lib::create_utransport_uuri(Some(1), 1, 1), test_lib::create_special_uuri(1); "Special UUri")]
#[async_std::test]
async fn test_notification_and_subscribe() {
async fn test_notification_and_subscribe(origin_uuri: UUri, dst_uuri: UUri, listen_uuri: UUri) {
test_lib::before_test();

// Initialization
let target_data = String::from("Hello World!");
let upclient = test_lib::create_up_client_zenoh().await.unwrap();
let src_uuri = test_lib::create_utransport_uuri(0);
let dst_uuri = test_lib::create_utransport_uuri(1);
let upclient_notify = test_lib::create_up_client_zenoh(0, 0).await.unwrap();
let upclient_recv = test_lib::create_up_client_zenoh(1, 1).await.unwrap();

// Register the listener
let notification_listener = Arc::new(PublishNotificationListener::new());
upclient
.register_listener(dst_uuri.clone(), notification_listener.clone())
upclient_recv
.register_listener(listen_uuri.clone(), notification_listener.clone())
.await
.unwrap();
// Waiting for listener to take effect
task::sleep(time::Duration::from_millis(1000)).await;

// Send UMessage
let umessage = UMessageBuilder::notification(src_uuri.clone(), dst_uuri.clone())
let umessage = UMessageBuilder::notification(origin_uuri.clone(), dst_uuri.clone())
.with_message_id(UUIDBuilder::build())
.build_with_payload(
target_data.as_bytes().to_vec().into(),
UPayloadFormat::UPAYLOAD_FORMAT_TEXT,
)
.unwrap();
upclient.send(umessage).await.unwrap();
upclient_notify.send(umessage).await.unwrap();

// Waiting for the subscriber to receive data
task::sleep(time::Duration::from_millis(1000)).await;
Expand All @@ -124,8 +133,8 @@ async fn test_notification_and_subscribe() {
assert_eq!(notification_listener.get_recv_data(), target_data);

// Cleanup
upclient
.unregister_listener(dst_uuri.clone(), notification_listener)
upclient_recv
.unregister_listener(listen_uuri.clone(), notification_listener)
.await
.unwrap();
}
86 changes: 8 additions & 78 deletions tests/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,89 +16,25 @@ pub mod test_lib;
use std::sync::Arc;

use async_trait::async_trait;
use up_rust::{UCode, UListener, UMessage, UStatus, UTransport};
use test_case::test_case;
use up_rust::{UListener, UMessage, UStatus, UTransport, UUri};

struct FooListener;
#[async_trait]
impl UListener for FooListener {
async fn on_receive(&self, _msg: UMessage) {}
async fn on_error(&self, _err: UStatus) {}
}
#[async_std::test]
async fn test_utransport_register_and_unregister() {
test_lib::before_test();

// Initialization
let upclient = test_lib::create_up_client_zenoh().await.unwrap();
let uuri = test_lib::create_utransport_uuri(0);
let foo_listener = Arc::new(FooListener);

// Register the listener
upclient
.register_listener(uuri.clone(), foo_listener.clone())
.await
.unwrap();

// Able to ungister
upclient
.unregister_listener(uuri.clone(), foo_listener.clone())
.await
.unwrap();

// Unable to ungister
let result = upclient
.unregister_listener(uuri.clone(), foo_listener.clone())
.await;
assert_eq!(
result,
Err(UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Publish listener doesn't exist"
))
);
}

#[async_std::test]
async fn test_rpcserver_register_and_unregister() {
test_lib::before_test();

// Initialization
let upclient = test_lib::create_up_client_zenoh().await.unwrap();
let uuri = test_lib::create_rpcserver_uuri();
let foo_listener = Arc::new(FooListener);

// Register the listener
upclient
.register_listener(uuri.clone(), foo_listener.clone())
.await
.unwrap();

// Able to ungister
upclient
.unregister_listener(uuri.clone(), foo_listener.clone())
.await
.unwrap();

// Unable to ungister
let result = upclient
.unregister_listener(uuri.clone(), foo_listener.clone())
.await;
assert_eq!(
result,
Err(UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"RPC request listener doesn't exist"
))
);
}

#[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0); "Publish / Notification register_listener")]
#[test_case(test_lib::create_rpcserver_uuri(Some(0), 0); "RPC register_listener")]
#[test_case(test_lib::create_special_uuri(0); "Special UUri register_listener")]
#[async_std::test]
async fn test_utransport_special_uuri_register_and_unregister() {
async fn test_register_and_unregister(uuri: UUri) {
test_lib::before_test();

// Initialization
let upclient = test_lib::create_up_client_zenoh().await.unwrap();
let uuri = test_lib::create_special_uuri();
let upclient = test_lib::create_up_client_zenoh(0, 0).await.unwrap();
let foo_listener = Arc::new(FooListener);

// Register the listener
Expand All @@ -117,11 +53,5 @@ async fn test_utransport_special_uuri_register_and_unregister() {
let result = upclient
.unregister_listener(uuri.clone(), foo_listener.clone())
.await;
assert_eq!(
result,
Err(UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"RPC response callback doesn't exist"
))
);
assert!(result.is_err());
}
20 changes: 10 additions & 10 deletions tests/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ use std::{
sync::{Arc, Mutex},
time,
};
use test_case::test_case;
use up_client_zenoh::UPClientZenoh;
use up_rust::{
CallOptions, Data, RpcClient, UListener, UMessage, UMessageBuilder, UPayload, UPayloadFormat,
UStatus, UTransport, UUIDBuilder,
UStatus, UTransport, UUIDBuilder, UUri,
};

struct RequestListener {
Expand Down Expand Up @@ -95,21 +96,21 @@ impl UListener for ResponseListener {
}
}
async fn on_error(&self, _err: UStatus) {
// TODO: Check why we also receive timeout after receiving correct data
//panic!("Internal Error: {err:?}");
}
}

#[test_case(test_lib::create_rpcserver_uuri(Some(1), 1), test_lib::create_rpcserver_uuri(Some(1), 1); "Normal RPC UUri")]
#[test_case(test_lib::create_rpcserver_uuri(Some(1), 1), test_lib::create_special_uuri(1); "Special listen UUri")]
#[async_std::test]
async fn test_rpc_server_client() {
async fn test_rpc_server_client(dst_uuri: UUri, listen_uuri: UUri) {
test_lib::before_test();

// Initialization
let upclient_client = test_lib::create_up_client_zenoh().await.unwrap();
let upclient_server = Arc::new(test_lib::create_up_client_zenoh().await.unwrap());
let upclient_client = test_lib::create_up_client_zenoh(0, 0).await.unwrap();
let upclient_server = Arc::new(test_lib::create_up_client_zenoh(1, 1).await.unwrap());
let request_data = String::from("This is the request data");
let response_data = String::from("This is the response data");
let dst_uuri = test_lib::create_rpcserver_uuri();

// Setup RpcServer callback
let request_listener = Arc::new(RequestListener::new(
Expand All @@ -118,10 +119,9 @@ async fn test_rpc_server_client() {
response_data.clone(),
));
upclient_server
.register_listener(dst_uuri.clone(), request_listener.clone())
.register_listener(listen_uuri.clone(), request_listener.clone())
.await
.unwrap();

// Need some time for queryable to run
task::sleep(time::Duration::from_millis(1000)).await;

Expand Down Expand Up @@ -163,7 +163,7 @@ async fn test_rpc_server_client() {
.unwrap();

// Send request
let umessage = UMessageBuilder::request(dst_uuri.clone(), response_uuri.clone(), 2000)
let umessage = UMessageBuilder::request(dst_uuri.clone(), response_uuri.clone(), 1000)
.with_message_id(UUIDBuilder::build())
.build_with_payload(
request_data.as_bytes().to_vec().into(),
Expand All @@ -187,7 +187,7 @@ async fn test_rpc_server_client() {

// Cleanup
upclient_server
.unregister_listener(dst_uuri.clone(), request_listener.clone())
.unregister_listener(listen_uuri.clone(), request_listener.clone())
.await
.unwrap();
}
Loading

0 comments on commit 553023f

Please sign in to comment.