Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#79] Do not use Zenoh Queryables for RPC #83

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pedantic = "deny"
[dependencies]
anyhow = "1.0.75"
async-trait = "0.1"
bitmask-enum = "2.2.4"
bytes = "1.6.1"
lazy_static = "1.4.0"
protobuf = { version = "3.3" }
Expand Down
9 changes: 6 additions & 3 deletions examples/l2_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ mod common;

use std::{str::FromStr, sync::Arc};
use up_rust::{
communication::{CallOptions, RpcClient, UPayload},
communication::{CallOptions, InMemoryRpcClient, RpcClient, UPayload},
LocalUriProvider, UPayloadFormat, UPriority, UUri, UUID,
};
use up_transport_zenoh::{UPTransportZenoh, ZenohRpcClient};
use up_transport_zenoh::UPTransportZenoh;

#[tokio::main]
async fn main() {
Expand All @@ -30,7 +30,10 @@ async fn main() {
.await
.unwrap(),
);
let rpc_client = Arc::new(ZenohRpcClient::new(zenoh_transport.clone()));
let rpc_client = InMemoryRpcClient::new(zenoh_transport.clone(), zenoh_transport.clone())
.await
.map(Arc::new)
.expect("failed to create RpcClient for Zenoh transport");

let sink_uuri = UUri::from_str("//rpc_server/1/1/1").unwrap();

Expand Down
121 changes: 2 additions & 119 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,9 @@
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
pub mod rpc;
pub mod uri_provider;
pub mod utransport;

pub use rpc::ZenohRpcClient;

use bitmask_enum::bitmask;
use protobuf::Message;
use std::{
collections::HashMap,
Expand All @@ -25,18 +21,11 @@ use std::{
};
use tokio::runtime::Runtime;
use tracing::error;
use up_rust::{
ComparableListener, LocalUriProvider, UAttributes, UCode, UListener, UPriority, UStatus, UUri,
};
use up_rust::{ComparableListener, LocalUriProvider, UAttributes, UCode, UPriority, UStatus, UUri};
// Re-export Zenoh config
pub use zenoh::config as zenoh_config;
use zenoh::{
bytes::ZBytes,
internal::runtime::Runtime as ZRuntime,
key_expr::OwnedKeyExpr,
pubsub::Subscriber,
qos::Priority,
query::{Query, Queryable},
bytes::ZBytes, internal::runtime::Runtime as ZRuntime, pubsub::Subscriber, qos::Priority,
Session,
};

Expand All @@ -52,28 +41,11 @@ lazy_static::lazy_static! {
.expect("Unable to create callback runtime");
}

#[bitmask(u8)]
enum MessageFlag {
Publish,
Notification,
Request,
Response,
}

type SubscriberMap = Arc<Mutex<HashMap<(String, ComparableListener), Subscriber<()>>>>;
type QueryableMap = Arc<Mutex<HashMap<(String, ComparableListener), Queryable<()>>>>;
type QueryMap = Arc<Mutex<HashMap<String, Query>>>;
type RpcCallbackMap = Arc<Mutex<HashMap<OwnedKeyExpr, Arc<dyn UListener>>>>;
pub struct UPTransportZenoh {
session: Arc<Session>,
// Able to unregister Subscriber
subscriber_map: SubscriberMap,
// Able to unregister Queryable
queryable_map: QueryableMap,
// Save the reqid to be able to send back response
query_map: QueryMap,
// Save the callback for RPC response
rpc_callback_map: RpcCallbackMap,
// URI
uri: UUri,
}
Expand Down Expand Up @@ -161,9 +133,6 @@ impl UPTransportZenoh {
Ok(UPTransportZenoh {
session: Arc::new(session),
subscriber_map: Arc::new(Mutex::new(HashMap::new())),
queryable_map: Arc::new(Mutex::new(HashMap::new())),
query_map: Arc::new(Mutex::new(HashMap::new())),
rpc_callback_map: Arc::new(Mutex::new(HashMap::new())),
uri,
})
}
Expand Down Expand Up @@ -276,59 +245,6 @@ impl UPTransportZenoh {
};
Ok(uattributes)
}

// You can take a look at the table in up-spec for more detail
// https://github.com/eclipse-uprotocol/up-spec/blob/ca8172a8cf17d70e4f095e6c0d57fe2ebc68c58d/up-l1/README.adoc#23-registerlistener
#[allow(clippy::nonminimal_bool)] // Don't simplify the boolean expression for better understanding
fn get_listener_message_type(
source_uuri: &UUri,
sink_uuri: Option<&UUri>,
) -> Result<MessageFlag, UStatus> {
let mut flag = MessageFlag::none();
let rpc_range = 1..0x7FFF_u32;
let nonrpc_range = 0x8000..0xFFFE_u32;

let src_resource = source_uuri.resource_id;
// Notification / Request / Response
if let Some(dst_uuri) = sink_uuri {
let dst_resource = dst_uuri.resource_id;

if (nonrpc_range.contains(&src_resource) && dst_resource == 0)
|| (src_resource == 0xFFFF && dst_resource == 0)
|| (src_resource == 0xFFFF && dst_resource == 0xFFFF)
{
flag |= MessageFlag::Notification;
}
if (src_resource == 0 && rpc_range.contains(&dst_resource))
|| (src_resource == 0xFFFF && rpc_range.contains(&dst_resource))
|| (src_resource == 0xFFFF && dst_resource == 0xFFFF)
{
flag |= MessageFlag::Request;
}
if (rpc_range.contains(&src_resource) && dst_resource == 0)
|| (src_resource == 0xFFFF && dst_resource == 0)
|| (src_resource == 0xFFFF && dst_resource == 0xFFFF)
{
flag |= MessageFlag::Response;
}
} else if nonrpc_range.contains(&src_resource) {
flag |= MessageFlag::Publish;
}
if flag.is_none() {
let src_resource = format!("{:X}", source_uuri.resource_id);
let dst_resource = if let Some(dst_uuri) = sink_uuri {
format!("{:X}", dst_uuri.resource_id)
} else {
String::from("None")
};
Err(UStatus::fail_with_code(
UCode::INTERNAL,
format!("Wrong combination of resource ID in source UUri ({src_resource}) and sink UUri ({dst_resource}). Please check up-spec for more details."),
))
} else {
Ok(flag)
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -376,37 +292,4 @@ mod tests {
);
}
}

#[test_case("//192.168.1.100/10AB/3/80CD", None, Ok(MessageFlag::Publish); "Publish Message")]
#[test_case("//192.168.1.100/10AB/3/80CD", Some("//192.168.1.101/20EF/4/0"), Ok(MessageFlag::Notification); "Notification Message")]
#[test_case("//192.168.1.100/10AB/3/0", Some("//192.168.1.101/20EF/4/B"), Ok(MessageFlag::Request); "Request Message")]
#[test_case("//192.168.1.101/20EF/4/B", Some("//192.168.1.100/10AB/3/0"), Ok(MessageFlag::Response); "Response Message")]
#[test_case("//*/FFFF/FF/FFFF", Some("//192.168.1.101/20EF/4/B"), Ok(MessageFlag::Request); "Listen to all Request Messages")]
#[test_case("//*/FFFF/FF/FFFF", Some("//192.168.1.100/10AB/3/0"), Ok(MessageFlag::Notification | MessageFlag::Response); "Listen to Notification and Response Messages")]
#[test_case("//*/FFFF/FF/FFFF", Some("//[::1]/FFFF/FF/FFFF"), Ok(MessageFlag::Notification | MessageFlag::Request | MessageFlag::Response); "Listen to all messages to a device")]
#[test_case("//*/FFFF/FF/FFFF", None, Err(UCode::INTERNAL); "Impossible scenario: Listen to all Publish Messages")]
#[test_case("//192.168.1.100/10AB/3/0", Some("//*/FFFF/FF/FFFF"), Err(UCode::INTERNAL); "Impossible scenario: Broadcast Request Message")]
#[test_case("//192.168.1.101/20EF/4/B", Some("//*/FFFF/FF/FFFF"), Err(UCode::INTERNAL); "Impossible scenario: Broadcast Response Message")]
#[test_case("//192.168.1.100/10AB/3/80CD", Some("//*/FFFF/FF/FFFF"), Err(UCode::INTERNAL); "Impossible scenario: Broadcast Notification Message")]
#[tokio::test(flavor = "multi_thread")]
async fn test_get_listener_message_type(
src_uri: &str,
sink_uri: Option<&str>,
result: Result<MessageFlag, UCode>,
) {
let src = UUri::from_str(src_uri).unwrap();
if let Some(uri) = sink_uri {
let dst = UUri::from_str(uri).unwrap();
assert_eq!(
UPTransportZenoh::get_listener_message_type(&src, Some(&dst))
.map_err(|e| e.get_code()),
result
);
} else {
assert_eq!(
UPTransportZenoh::get_listener_message_type(&src, None).map_err(|e| e.get_code()),
result
);
}
}
}
144 changes: 0 additions & 144 deletions src/rpc.rs

This file was deleted.

Loading