From 87217c314c69f04b1913ba5b43a14eff1f011c38 Mon Sep 17 00:00:00 2001 From: "ChenYing Kuo (CY)" Date: Sat, 15 Jun 2024 09:35:32 +0800 Subject: [PATCH] Match the 1.5.8 with up-rust and up-client-zenoh-rust (#8) * Match the 1.5.8 with up-rust and up-client-zenoh-rust Signed-off-by: ChenYing Kuo * Use up-transport-zenoh instead. Signed-off-by: ChenYing Kuo --------- Signed-off-by: ChenYing Kuo --- Cargo.toml | 8 ++--- src/common.rs | 29 +++++++++++++++++ src/common_uuri.rs | 78 ---------------------------------------------- src/publisher.rs | 37 +++++++--------------- src/rpc_client.rs | 58 +++++++++++----------------------- src/rpc_server.rs | 68 +++++++++++++++++++++------------------- src/subscriber.rs | 42 ++++++++++--------------- 7 files changed, 115 insertions(+), 205 deletions(-) create mode 100644 src/common.rs delete mode 100644 src/common_uuri.rs diff --git a/Cargo.toml b/Cargo.toml index 69db4c3..b261676 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,13 +13,13 @@ pedantic = "deny" #nursery = "deny" [dependencies] -async-std = "1.12.0" async-trait = "0.1" chrono = "0.4.31" env_logger = "0.10.0" -up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "1bb08ba7a3666e58c316489fbcf3da3e29dee611" } -up-client-zenoh = { git = "https://github.com/eclipse-uprotocol/up-client-zenoh-rust", rev = "cb592dc5abbaf9dcd204f592733404ea7c61e999" } -zenoh = { version = "0.10.1-rc", features = ["unstable"]} +tokio = { version = "1.35.1", default-features = false } +up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "d736fdf35ff4728effa7f36b720f0fc1605d5ba0" } +up-transport-zenoh = { git = "https://github.com/eclipse-uprotocol/up-transport-zenoh-rust", rev = "b8925c643465959f402372f796c8856f906dcd05" } +zenoh = { version = "0.11.0-rc.3", features = ["unstable"]} [[bin]] name = "publisher" diff --git a/src/common.rs b/src/common.rs new file mode 100644 index 0000000..545352b --- /dev/null +++ b/src/common.rs @@ -0,0 +1,29 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ +use zenoh::config::Config; + +#[allow(clippy::must_use_candidate, clippy::missing_panics_doc)] +pub fn get_zenoh_config() -> Config { + // Load the config from file path + // Config Examples: https://github.com/eclipse-zenoh/zenoh/blob/0.10.1-rc/DEFAULT_CONFIG.json5 + // let mut zenoh_cfg = Config::from_file("./DEFAULT_CONFIG.json5").unwrap(); + + // Loat the default config struct + let mut zenoh_cfg = Config::default(); + // You can choose from Router, Peer, Client + zenoh_cfg + .set_mode(Some(zenoh::config::WhatAmI::Peer)) + .unwrap(); + + zenoh_cfg +} diff --git a/src/common_uuri.rs b/src/common_uuri.rs deleted file mode 100644 index 5167fb6..0000000 --- a/src/common_uuri.rs +++ /dev/null @@ -1,78 +0,0 @@ -/******************************************************************************** - * Copyright (c) 2024 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - ********************************************************************************/ -use up_rust::{Number, UAuthority, UEntity, UResource, UResourceBuilder}; -use zenoh::config::Config; - -pub enum ExampleType { - Publisher, - Subscriber, - RpcServer, - RpcClient, -} - -#[allow(clippy::must_use_candidate)] -pub fn authority() -> UAuthority { - UAuthority { - name: Some("auth_name".to_string()), - number: Some(Number::Id(vec![1, 2, 3, 4])), - ..Default::default() - } -} - -#[allow(clippy::must_use_candidate)] -pub fn entity(example_type: &ExampleType) -> UEntity { - let (name, id) = match example_type { - ExampleType::Publisher => ("publisher", 1), - ExampleType::Subscriber => ("subscriber", 2), - ExampleType::RpcServer => ("rpc_server", 3), - ExampleType::RpcClient => ("rpc_client", 4), - }; - UEntity { - name: name.to_string(), - id: Some(1), - version_major: Some(id), - ..Default::default() - } -} - -#[allow(clippy::must_use_candidate)] -pub fn pub_resource() -> UResource { - UResource { - name: "door".to_string(), - instance: Some("front_left".to_string()), - message: Some("Door".to_string()), - id: Some(5678), - ..Default::default() - } -} - -#[allow(clippy::must_use_candidate)] -pub fn rpc_resource() -> UResource { - UResourceBuilder::for_rpc_request(Some("getTime".to_string()), Some(5678)) -} - -#[allow(clippy::must_use_candidate, clippy::missing_panics_doc)] -pub fn get_zenoh_config() -> Config { - // Load the config from file path - // Config Examples: https://github.com/eclipse-zenoh/zenoh/blob/0.10.1-rc/DEFAULT_CONFIG.json5 - // let mut zenoh_cfg = Config::from_file("./DEFAULT_CONFIG.json5").unwrap(); - - // Loat the default config struct - let mut zenoh_cfg = Config::default(); - // You can choose from Router, Peer, Client - zenoh_cfg - .set_mode(Some(zenoh::config::WhatAmI::Peer)) - .unwrap(); - - zenoh_cfg -} diff --git a/src/publisher.rs b/src/publisher.rs index 6295303..02bafda 100644 --- a/src/publisher.rs +++ b/src/publisher.rs @@ -10,48 +10,35 @@ * * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -pub mod common_uuri; +pub mod common; -use async_std::task; -use common_uuri::ExampleType; -use std::time; -use up_client_zenoh::UPClientZenoh; -use up_rust::{UMessageBuilder, UPayloadFormat, UTransport, UUIDBuilder, UUri}; +use std::str::FromStr; +use tokio::time::{sleep, Duration}; +use up_rust::{UMessageBuilder, UPayloadFormat, UTransport, UUri}; +use up_transport_zenoh::UPClientZenoh; -#[async_std::main] +#[tokio::main] async fn main() { // initiate logging env_logger::init(); println!("uProtocol publisher example"); - let publisher = UPClientZenoh::new( - common_uuri::get_zenoh_config(), - common_uuri::authority(), - common_uuri::entity(&ExampleType::Publisher), - ) - .await - .unwrap(); + let publisher = UPClientZenoh::new(common::get_zenoh_config(), String::from("publisher")) + .await + .unwrap(); // create uuri - let uuri = UUri { - entity: Some(common_uuri::entity(&ExampleType::Publisher)).into(), - resource: Some(common_uuri::pub_resource()).into(), - ..Default::default() - }; + let uuri = UUri::from_str("//publisher/1/1/8001").unwrap(); let mut cnt: u64 = 0; loop { let data = format!("{cnt}"); let umessage = UMessageBuilder::publish(uuri.clone()) - .with_message_id(UUIDBuilder::build()) - .build_with_payload( - data.as_bytes().to_vec().into(), - UPayloadFormat::UPAYLOAD_FORMAT_TEXT, - ) + .build_with_payload(data.clone(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT) .unwrap(); println!("Sending {data} to {uuri}..."); publisher.send(umessage).await.unwrap(); - task::sleep(time::Duration::from_millis(1000)).await; + sleep(Duration::from_millis(1000)).await; cnt += 1; } } diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 3afbd8d..31ed7f1 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -10,60 +10,38 @@ * * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -pub mod common_uuri; +pub mod common; -use common_uuri::ExampleType; -use up_client_zenoh::UPClientZenoh; -use up_rust::{CallOptions, Data, RpcClient, UPayload, UPayloadFormat, UUri}; +use std::str::FromStr; +use up_rust::{RpcClient, UMessageBuilder, UPayloadFormat, UUri}; +use up_transport_zenoh::UPClientZenoh; -#[async_std::main] +#[tokio::main] async fn main() { // initiate logging env_logger::init(); println!("uProtocol RPC client example"); - let rpc_client = UPClientZenoh::new( - common_uuri::get_zenoh_config(), - common_uuri::authority(), - common_uuri::entity(&ExampleType::RpcClient), - ) - .await - .unwrap(); + let rpc_client = UPClientZenoh::new(common::get_zenoh_config(), String::from("rpc_client")) + .await + .unwrap(); // create uuri - let uuri = UUri { - entity: Some(common_uuri::entity(&ExampleType::RpcServer)).into(), - resource: Some(common_uuri::rpc_resource()).into(), - ..Default::default() - }; + let src_uuri = UUri::from_str("//rpc_client/1/1/0").unwrap(); + let sink_uuri = UUri::from_str("//rpc_server/1/1/1").unwrap(); // create uPayload let data = String::from("GetCurrentTime"); - let payload = UPayload { - length: Some(0), - format: UPayloadFormat::UPAYLOAD_FORMAT_TEXT.into(), - data: Some(Data::Value(data.as_bytes().to_vec())), - ..Default::default() - }; + let umsg = UMessageBuilder::request(sink_uuri.clone(), src_uuri.clone(), 1000) + .build_with_payload(data, UPayloadFormat::UPAYLOAD_FORMAT_TEXT) + .unwrap(); // invoke RPC method - println!("Send request to {uuri}"); - let result = rpc_client - .invoke_method( - uuri, - payload, - CallOptions { - ttl: 1000, - ..Default::default() - }, - ) - .await; + println!("Send request from {src_uuri} to {sink_uuri}"); + let result = rpc_client.invoke_method(sink_uuri, umsg).await; // process the result - if let Data::Value(v) = result.unwrap().payload.unwrap().data.unwrap() { - let value = v.into_iter().map(|c| c as char).collect::(); - println!("Receive {value}"); - } else { - println!("Failed to get result from invoke_method."); - } + let payload = result.unwrap().payload.unwrap(); + let value = payload.into_iter().map(|c| c as char).collect::(); + println!("Receive {value}"); } diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 364f7ee..80cc8d1 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -10,18 +10,18 @@ * * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -pub mod common_uuri; +pub mod common; -use async_std::task::{self, block_on}; use async_trait::async_trait; use chrono::Utc; -use common_uuri::ExampleType; -use std::{sync::Arc, time}; -use up_client_zenoh::UPClientZenoh; -use up_rust::{ - Data, UListener, UMessage, UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUIDBuilder, - UUri, +use std::{str::FromStr, sync::Arc}; +use tokio::{ + runtime::Handle, + task, + time::{sleep, Duration}, }; +use up_rust::{UListener, UMessage, UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUri}; +use up_transport_zenoh::UPClientZenoh; struct RpcListener { up_client: Arc, @@ -39,59 +39,63 @@ impl UListener for RpcListener { payload, .. } = msg; + // Build the payload to send back - if let Data::Value(v) = payload.unwrap().data.unwrap() { - let value = v.into_iter().map(|c| c as char).collect::(); - let source = attributes.clone().unwrap().source.unwrap(); - let sink = attributes.clone().unwrap().sink.unwrap(); - println!("Receive {value} from {source} to {sink}"); - } + let value = payload + .unwrap() + .into_iter() + .map(|c| c as char) + .collect::(); + let source = attributes.clone().unwrap().source.unwrap(); + let sink = attributes.clone().unwrap().sink.unwrap(); + println!("Receive {value} from {source} to {sink}"); + // Send back result let umessage = UMessageBuilder::response_for_request(&attributes) - .with_message_id(UUIDBuilder::build()) .build_with_payload( // Get current time - format!("{}", Utc::now()).as_bytes().to_vec().into(), + format!("{}", Utc::now()), UPayloadFormat::UPAYLOAD_FORMAT_TEXT, ) .unwrap(); - block_on(self.up_client.send(umessage)).unwrap(); + task::block_in_place(|| { + Handle::current() + .block_on(self.up_client.send(umessage)) + .unwrap(); + }); } async fn on_error(&self, err: UStatus) { panic!("Internal Error: {err:?}"); } } -#[async_std::main] +#[tokio::main] async fn main() { // initiate logging env_logger::init(); println!("uProtocol RPC server example"); let rpc_server = Arc::new( - UPClientZenoh::new( - common_uuri::get_zenoh_config(), - common_uuri::authority(), - common_uuri::entity(&ExampleType::RpcServer), - ) - .await - .unwrap(), + UPClientZenoh::new(common::get_zenoh_config(), String::from("rpc_server")) + .await + .unwrap(), ); // create uuri - let uuri = UUri { - entity: Some(common_uuri::entity(&ExampleType::RpcServer)).into(), - resource: Some(common_uuri::rpc_resource()).into(), - ..Default::default() - }; + let src_uuri = UUri::from_str("//*/FFFF/FF/FFFF").unwrap(); + let sink_uuri = UUri::from_str("//rpc_server/1/1/1").unwrap(); println!("Register the listener..."); rpc_server - .register_listener(uuri, Arc::new(RpcListener::new(rpc_server.clone()))) + .register_listener( + &src_uuri, + Some(&sink_uuri), + Arc::new(RpcListener::new(rpc_server.clone())), + ) .await .unwrap(); loop { - task::sleep(time::Duration::from_millis(1000)).await; + sleep(Duration::from_millis(1000)).await; } } diff --git a/src/subscriber.rs b/src/subscriber.rs index 8a136c0..fb473fa 100644 --- a/src/subscriber.rs +++ b/src/subscriber.rs @@ -10,58 +10,48 @@ * * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -pub mod common_uuri; +pub mod common; -use async_std::task; use async_trait::async_trait; -use common_uuri::ExampleType; -use std::{sync::Arc, time}; -use up_client_zenoh::UPClientZenoh; -use up_rust::{Data, UListener, UMessage, UStatus, UTransport, UUri}; +use std::{str::FromStr, sync::Arc}; +use tokio::time::{sleep, Duration}; +use up_rust::{UListener, UMessage, UStatus, UTransport, UUri}; +use up_transport_zenoh::UPClientZenoh; struct SubscriberListener; #[async_trait] impl UListener for SubscriberListener { async fn on_receive(&self, msg: UMessage) { - if let Data::Value(v) = msg.payload.unwrap().data.unwrap() { - let value = v.into_iter().map(|c| c as char).collect::(); - let uri = msg.attributes.unwrap().source.unwrap().to_string(); - println!("Receiving {value} from {uri}"); - } + let payload = msg.payload.unwrap(); + let value = payload.into_iter().map(|c| c as char).collect::(); + let uri = msg.attributes.unwrap().source.unwrap().to_string(); + println!("Receiving {value} from {uri}"); } async fn on_error(&self, err: UStatus) { panic!("Internal Error: {err:?}"); } } -#[async_std::main] +#[tokio::main] async fn main() { // initiate logging env_logger::init(); println!("uProtocol subscriber example"); - let subscriber = UPClientZenoh::new( - common_uuri::get_zenoh_config(), - common_uuri::authority(), - common_uuri::entity(&ExampleType::Subscriber), - ) - .await - .unwrap(); + let subscriber = UPClientZenoh::new(common::get_zenoh_config(), String::from("subscriber")) + .await + .unwrap(); // create uuri - let uuri = UUri { - entity: Some(common_uuri::entity(&ExampleType::Publisher)).into(), - resource: Some(common_uuri::pub_resource()).into(), - ..Default::default() - }; + let uuri = UUri::from_str("//publisher/1/1/8001").unwrap(); println!("Register the listener..."); subscriber - .register_listener(uuri, Arc::new(SubscriberListener {})) + .register_listener(&uuri, None, Arc::new(SubscriberListener {})) .await .unwrap(); loop { - task::sleep(time::Duration::from_millis(1000)).await; + sleep(Duration::from_millis(1000)).await; } }