Skip to content

Commit

Permalink
Update up-client-zenoh-rust with up-spec 1.5.7 (#21)
Browse files Browse the repository at this point in the history
* Port to up-rust latest version (up-spec 1.5.7)

Signed-off-by: ChenYing Kuo <[email protected]>

* Use better way to create uAttributes.

Signed-off-by: ChenYing Kuo <[email protected]>

* Update how to log the errors.

Signed-off-by: ChenYing Kuo <[email protected]>

* Fix running test errors.

Signed-off-by: ChenYing Kuo <[email protected]>

* Add document to the contstructor.

Signed-off-by: ChenYing Kuo <[email protected]>

* Add some comments to the code for easy understanding.

Signed-off-by: ChenYing Kuo <[email protected]>

* Reorganize the test files.

Signed-off-by: ChenYing Kuo <[email protected]>

* Fix clippy warning in tests.

Signed-off-by: ChenYing Kuo <[email protected]>

* Update notification test.

Signed-off-by: ChenYing Kuo <[email protected]>

* Update the comments in tests.

Signed-off-by: ChenYing Kuo <[email protected]>

* gse correct source address from the constructor.

Signed-off-by: ChenYing Kuo <[email protected]>

* Add request and response test.

Signed-off-by: ChenYing Kuo <[email protected]>

* Use API to simplify creating UMessage.

Signed-off-by: ChenYing Kuo <[email protected]>

* Fix the comments.

Signed-off-by: ChenYing Kuo <[email protected]>

* Remove some wrong TODO items.

Signed-off-by: ChenYing Kuo <[email protected]>

* Show the minimum UAttributes version.

Signed-off-by: ChenYing Kuo <[email protected]>

* ghange the publish & notification tests.

Signed-off-by: ChenYing Kuo <[email protected]>

* Update the notification test.

Signed-off-by: ChenYing Kuo <[email protected]>

* Comment some code to pass the test for the time being.

Signed-off-by: ChenYing Kuo <[email protected]>

* Rename register_publish_listener to register_publish_notification_listener

Signed-off-by: ChenYing Kuo <[email protected]>

* Rename uuri to source_uuri.

Signed-off-by: ChenYing Kuo <[email protected]>

* Use UAuthority and UEntity instead of UUri.

Signed-off-by: ChenYing Kuo <[email protected]>

* Update the constructors.

Signed-off-by: ChenYing Kuo <[email protected]>

* Remove Zenoh key in send_response to avoid the misunderstanding.

Signed-off-by: ChenYing Kuo <[email protected]>

* Bump up-rust to the latest version with notification validator patch.

Signed-off-by: ChenYing Kuo <[email protected]>

* Add validation of UAuthority and UEntity.

Signed-off-by: ChenYing Kuo <[email protected]>

* Rename send function and also remove the TODO.

Signed-off-by: ChenYing Kuo <[email protected]>

---------

Signed-off-by: ChenYing Kuo <[email protected]>
  • Loading branch information
evshary authored Apr 4, 2024
1 parent 7e2cc34 commit 69122c2
Show file tree
Hide file tree
Showing 11 changed files with 1,078 additions and 822 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,9 @@ log = "0.4.17"
prost = "0.12"
prost-types = "0.12"
protobuf = { version = "3.3" }
up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "68c8a1d94f0006daf4ba135c9cbbfddcd793108d" }
rand = "0.8.5"
up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "a30d3655ab13f8d97815280d718f4891f693ed2d" }
zenoh = { version = "0.10.1-rc", features = ["unstable"]}

[dev-dependencies]
test-case = { version = "3.3" }
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Rust UPClient implementation for the Zenoh transport

# Build
## Build

```shell
# Check clippy
Expand All @@ -13,6 +13,6 @@ cargo build
cargo test
```

# Note
## Note

The implementation follows the spec defined in [up-l1/zenoh](https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l1/zenoh.adoc).
205 changes: 166 additions & 39 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use std::{
collections::HashMap,
sync::{atomic::AtomicU64, Arc, Mutex},
};
use up_rust::uprotocol::{UAttributes, UCode, UMessage, UPayloadFormat, UPriority, UStatus, UUri};
use up_rust::{
UAttributes, UAuthority, UCode, UEntity, UMessage, UPayloadFormat, UPriority, UResourceBuilder,
UStatus, UUri,
};
use zenoh::{
config::Config,
prelude::r#async::*,
Expand All @@ -43,18 +46,70 @@ pub struct UPClientZenoh {
query_map: Arc<Mutex<HashMap<String, Query>>>,
// Save the callback for RPC response
rpc_callback_map: Arc<Mutex<HashMap<String, Arc<UtransportListener>>>>,
// Used to identify different callback
callback_counter: AtomicU64,
// Source UUri in RPC
source_uuri: UUri,
}

impl UPClientZenoh {
/// Create `UPClientZenoh` by applying the Zenoh configuration, `UAuthority`, and `UEntity`.
///
/// # Arguments
///
/// * `config` - Zenoh configuration. You can refer to [here](https://github.com/eclipse-zenoh/zenoh/blob/0.10.1-rc/DEFAULT_CONFIG.json5) for more configuration details.
/// * `uauthority` - The `UAuthority` which is put into source address while generating messages.
/// * `uentity` - The `UEntity` which is put into source address while generating messages.
///
/// # Errors
/// Will return `Err` if unable to create Zenoh session
pub async fn new(config: Config) -> Result<UPClientZenoh, UStatus> {
/// Will return `Err` if unable to create `UPClientZenoh`
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// use up_client_zenoh::UPClientZenoh;
/// use up_rust::{Number, UAuthority, UEntity, UUri};
/// use zenoh::config::Config;
/// let uauthority = UAuthority {
/// name: Some("MyAuthName".to_string()),
/// number: Some(Number::Id(vec![1, 2, 3, 4])),
/// ..Default::default()
/// };
/// let uentity = UEntity {
/// name: "default.entity".to_string(),
/// id: Some(u32::from(rand::random::<u16>())),
/// version_major: Some(1),
/// version_minor: None,
/// ..Default::default()
/// };
/// let upclient = UPClientZenoh::new(Config::default(), uauthority, uentity).await.unwrap();
/// # });
/// ```
pub async fn new(
config: Config,
uauthority: UAuthority,
uentity: UEntity,
) -> Result<UPClientZenoh, UStatus> {
uauthority.validate_micro_form().map_err(|e| {
let msg = format!("UAuthority is invalid: {e:?}");
log::error!("{msg}");
UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)
})?;
uentity.validate_micro_form().map_err(|e| {
let msg = format!("UEntity is invalid: {e:?}");
log::error!("{msg}");
UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)
})?;
let Ok(session) = zenoh::open(config).res().await else {
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"Unable to open Zenoh session",
));
let msg = "Unable to open Zenoh session".to_string();
log::error!("{msg}");
return Err(UStatus::fail_with_code(UCode::INTERNAL, msg));
};
let source_uuri = UUri {
authority: Some(uauthority).into(),
entity: Some(uentity).into(),
..Default::default()
};
Ok(UPClientZenoh {
session: Arc::new(session),
Expand All @@ -63,25 +118,58 @@ impl UPClientZenoh {
query_map: Arc::new(Mutex::new(HashMap::new())),
rpc_callback_map: Arc::new(Mutex::new(HashMap::new())),
callback_counter: AtomicU64::new(0),
source_uuri,
})
}

/// Get the `UUri` of `UPClientZenoh` in for RPC response
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// use up_client_zenoh::UPClientZenoh;
/// use up_rust::{Number, UAuthority, UEntity, UriValidator, UUri};
/// use zenoh::config::Config;
/// let uauthority = UAuthority {
/// name: Some("MyAuthName".to_string()),
/// number: Some(Number::Id(vec![1, 2, 3, 4])),
/// ..Default::default()
/// };
/// let uentity = UEntity {
/// name: "default.entity".to_string(),
/// id: Some(u32::from(rand::random::<u16>())),
/// version_major: Some(1),
/// version_minor: None,
/// ..Default::default()
/// };
/// let upclient = UPClientZenoh::new(Config::default(), uauthority, uentity).await.unwrap();
/// let uuri = upclient.get_response_uuri();
/// assert!(UriValidator::is_rpc_response(&uuri));
/// assert_eq!(uuri.authority.unwrap().name.unwrap(), "MyAuthName");
/// assert_eq!(uuri.entity.unwrap().name, "default.entity");
/// # });
/// ```
pub fn get_response_uuri(&self) -> UUri {
let mut source = self.source_uuri.clone();
source.resource = Some(UResourceBuilder::for_rpc_response()).into();
source
}

fn get_uauth_from_uuri(uri: &UUri) -> Result<String, UStatus> {
if let Some(authority) = uri.authority.as_ref() {
let buf: Vec<u8> = authority.try_into().map_err(|_| {
UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Unable to transform UAuthority into micro form",
)
let msg = "Unable to transform UAuthority into micro form".to_string();
log::error!("{msg}");
UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)
})?;
Ok(buf
.iter()
.fold(String::new(), |s, c| s + &format!("{c:02x}")))
} else {
Err(UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Empty UAuthority",
))
let msg = "UAuthority is empty".to_string();
log::error!("{msg}");
Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg))
}
}

Expand All @@ -90,11 +178,10 @@ impl UPClientZenoh {
if uri.authority.is_some() && uri.entity.is_none() && uri.resource.is_none() {
Ok(String::from("upr/") + &UPClientZenoh::get_uauth_from_uuri(uri)? + "/**")
} else {
let micro_uuri: Vec<u8> = uri.try_into().map_err(|_| {
UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Unable to serialize into micro format",
)
let micro_uuri: Vec<u8> = uri.try_into().map_err(|e| {
let msg = format!("Unable to serialize into micro format: {e}");
log::error!("{msg}");
UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)
})?;
// If the UUri is larger than 8 bytes, then it should be remote UUri with UAuthority
// We should prepend it to the Zenoh key.
Expand Down Expand Up @@ -148,30 +235,28 @@ impl UPClientZenoh {
fn attachment_to_uattributes(attachment: &Attachment) -> anyhow::Result<UAttributes> {
let mut attachment_iter = attachment.iter();
if let Some((_, value)) = attachment_iter.next() {
let version = *value.as_slice().first().ok_or(UStatus::fail_with_code(
UCode::INTERNAL,
"uAttributes version is empty",
))?;
if version != 1 {
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"uAttributes version should be 1",
)
.into());
let version = *value.as_slice().first().ok_or_else(|| {
let msg = format!("UAttributes version is empty (should be {UATTRIBUTE_VERSION})");
log::error!("{msg}");
UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)
})?;
if version != UATTRIBUTE_VERSION {
let msg =
format!("UAttributes version is {version} (should be {UATTRIBUTE_VERSION})");
log::error!("{msg}");
return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg).into());
}
} else {
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"Unable to get the uAttributes version",
)
.into());
let msg = "Unable to get the UAttributes version".to_string();
log::error!("{msg}");
return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg).into());
}
let uattributes = if let Some((_, value)) = attachment_iter.next() {
UAttributes::parse_from_bytes(value.as_slice())?
} else {
return Err(
UStatus::fail_with_code(UCode::INTERNAL, "Unable to get the uAttributes").into(),
);
let msg = "Unable to get the UAttributes".to_string();
log::error!("{msg}");
return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg).into());
};
Ok(uattributes)
}
Expand All @@ -180,7 +265,24 @@ impl UPClientZenoh {
#[cfg(test)]
mod tests {
use super::*;
use up_rust::uprotocol::{uri::uauthority::Number, UAuthority, UEntity, UResource, UUri};
use async_std::task::block_on;
use test_case::test_case;
use up_rust::{Number, UAuthority, UEntity, UResource, UUri};

fn invalid_entity() -> UEntity {
UEntity {
name: "default.entity".to_string(),
..Default::default()
}
}

#[test_case(valid_authority(), valid_entity(), true; "succeeds with both valid authority and entity")]
#[test_case(invalid_authority(), valid_entity(), false; "fails for invalid authority")]
#[test_case(valid_authority(), invalid_entity(), false; "fails for invalid entity")]
fn test_new_up_client_zenoh(authority: UAuthority, entity: UEntity, expected_result: bool) {
let up_client_zenoh = block_on(UPClientZenoh::new(Config::default(), authority, entity));
assert_eq!(up_client_zenoh.is_ok(), expected_result);
}

#[test]
fn test_to_zenoh_key_string() {
Expand Down Expand Up @@ -222,4 +324,29 @@ mod tests {
String::from("upr/060102030a0b0c/**")
);
}

fn valid_authority() -> UAuthority {
UAuthority {
name: Some("UAuthName".to_string()),
number: Some(Number::Id(vec![1, 2, 3, 10, 11, 12])),
..Default::default()
}
}

fn invalid_authority() -> UAuthority {
UAuthority {
name: Some("UAuthName".to_string()),
..Default::default()
}
}

fn valid_entity() -> UEntity {
UEntity {
name: "default.entity".to_string(),
id: Some(u32::from(rand::random::<u16>())),
version_major: Some(1),
version_minor: None,
..Default::default()
}
}
}
Loading

0 comments on commit 69122c2

Please sign in to comment.