Skip to content

Commit

Permalink
Match the 1.5.8 with up-rust and up-client-zenoh-rust (#8)
Browse files Browse the repository at this point in the history
* Match the 1.5.8 with up-rust and up-client-zenoh-rust

Signed-off-by: ChenYing Kuo <[email protected]>

* Use up-transport-zenoh instead.

Signed-off-by: ChenYing Kuo <[email protected]>

---------

Signed-off-by: ChenYing Kuo <[email protected]>
  • Loading branch information
evshary authored Jun 15, 2024
1 parent 633e889 commit 87217c3
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 205 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ pedantic = "deny"
#nursery = "deny"

[dependencies]
async-std = "1.12.0"
async-trait = "0.1"
chrono = "0.4.31"
env_logger = "0.10.0"
up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "1bb08ba7a3666e58c316489fbcf3da3e29dee611" }
up-client-zenoh = { git = "https://github.com/eclipse-uprotocol/up-client-zenoh-rust", rev = "cb592dc5abbaf9dcd204f592733404ea7c61e999" }
zenoh = { version = "0.10.1-rc", features = ["unstable"]}
tokio = { version = "1.35.1", default-features = false }
up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "d736fdf35ff4728effa7f36b720f0fc1605d5ba0" }
up-transport-zenoh = { git = "https://github.com/eclipse-uprotocol/up-transport-zenoh-rust", rev = "b8925c643465959f402372f796c8856f906dcd05" }
zenoh = { version = "0.11.0-rc.3", features = ["unstable"]}

[[bin]]
name = "publisher"
Expand Down
29 changes: 29 additions & 0 deletions src/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
use zenoh::config::Config;

#[allow(clippy::must_use_candidate, clippy::missing_panics_doc)]
pub fn get_zenoh_config() -> Config {
// Load the config from file path
// Config Examples: https://github.com/eclipse-zenoh/zenoh/blob/0.10.1-rc/DEFAULT_CONFIG.json5
// let mut zenoh_cfg = Config::from_file("./DEFAULT_CONFIG.json5").unwrap();

// Loat the default config struct
let mut zenoh_cfg = Config::default();
// You can choose from Router, Peer, Client
zenoh_cfg
.set_mode(Some(zenoh::config::WhatAmI::Peer))
.unwrap();

zenoh_cfg
}
78 changes: 0 additions & 78 deletions src/common_uuri.rs

This file was deleted.

37 changes: 12 additions & 25 deletions src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,48 +10,35 @@
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
pub mod common_uuri;
pub mod common;

use async_std::task;
use common_uuri::ExampleType;
use std::time;
use up_client_zenoh::UPClientZenoh;
use up_rust::{UMessageBuilder, UPayloadFormat, UTransport, UUIDBuilder, UUri};
use std::str::FromStr;
use tokio::time::{sleep, Duration};
use up_rust::{UMessageBuilder, UPayloadFormat, UTransport, UUri};
use up_transport_zenoh::UPClientZenoh;

#[async_std::main]
#[tokio::main]
async fn main() {
// initiate logging
env_logger::init();

println!("uProtocol publisher example");
let publisher = UPClientZenoh::new(
common_uuri::get_zenoh_config(),
common_uuri::authority(),
common_uuri::entity(&ExampleType::Publisher),
)
.await
.unwrap();
let publisher = UPClientZenoh::new(common::get_zenoh_config(), String::from("publisher"))
.await
.unwrap();

// create uuri
let uuri = UUri {
entity: Some(common_uuri::entity(&ExampleType::Publisher)).into(),
resource: Some(common_uuri::pub_resource()).into(),
..Default::default()
};
let uuri = UUri::from_str("//publisher/1/1/8001").unwrap();

let mut cnt: u64 = 0;
loop {
let data = format!("{cnt}");
let umessage = UMessageBuilder::publish(uuri.clone())
.with_message_id(UUIDBuilder::build())
.build_with_payload(
data.as_bytes().to_vec().into(),
UPayloadFormat::UPAYLOAD_FORMAT_TEXT,
)
.build_with_payload(data.clone(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
.unwrap();
println!("Sending {data} to {uuri}...");
publisher.send(umessage).await.unwrap();
task::sleep(time::Duration::from_millis(1000)).await;
sleep(Duration::from_millis(1000)).await;
cnt += 1;
}
}
58 changes: 18 additions & 40 deletions src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,60 +10,38 @@
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
pub mod common_uuri;
pub mod common;

use common_uuri::ExampleType;
use up_client_zenoh::UPClientZenoh;
use up_rust::{CallOptions, Data, RpcClient, UPayload, UPayloadFormat, UUri};
use std::str::FromStr;
use up_rust::{RpcClient, UMessageBuilder, UPayloadFormat, UUri};
use up_transport_zenoh::UPClientZenoh;

#[async_std::main]
#[tokio::main]
async fn main() {
// initiate logging
env_logger::init();

println!("uProtocol RPC client example");
let rpc_client = UPClientZenoh::new(
common_uuri::get_zenoh_config(),
common_uuri::authority(),
common_uuri::entity(&ExampleType::RpcClient),
)
.await
.unwrap();
let rpc_client = UPClientZenoh::new(common::get_zenoh_config(), String::from("rpc_client"))
.await
.unwrap();

// create uuri
let uuri = UUri {
entity: Some(common_uuri::entity(&ExampleType::RpcServer)).into(),
resource: Some(common_uuri::rpc_resource()).into(),
..Default::default()
};
let src_uuri = UUri::from_str("//rpc_client/1/1/0").unwrap();
let sink_uuri = UUri::from_str("//rpc_server/1/1/1").unwrap();

// create uPayload
let data = String::from("GetCurrentTime");
let payload = UPayload {
length: Some(0),
format: UPayloadFormat::UPAYLOAD_FORMAT_TEXT.into(),
data: Some(Data::Value(data.as_bytes().to_vec())),
..Default::default()
};
let umsg = UMessageBuilder::request(sink_uuri.clone(), src_uuri.clone(), 1000)
.build_with_payload(data, UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
.unwrap();

// invoke RPC method
println!("Send request to {uuri}");
let result = rpc_client
.invoke_method(
uuri,
payload,
CallOptions {
ttl: 1000,
..Default::default()
},
)
.await;
println!("Send request from {src_uuri} to {sink_uuri}");
let result = rpc_client.invoke_method(sink_uuri, umsg).await;

// process the result
if let Data::Value(v) = result.unwrap().payload.unwrap().data.unwrap() {
let value = v.into_iter().map(|c| c as char).collect::<String>();
println!("Receive {value}");
} else {
println!("Failed to get result from invoke_method.");
}
let payload = result.unwrap().payload.unwrap();
let value = payload.into_iter().map(|c| c as char).collect::<String>();
println!("Receive {value}");
}
68 changes: 36 additions & 32 deletions src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
pub mod common_uuri;
pub mod common;

use async_std::task::{self, block_on};
use async_trait::async_trait;
use chrono::Utc;
use common_uuri::ExampleType;
use std::{sync::Arc, time};
use up_client_zenoh::UPClientZenoh;
use up_rust::{
Data, UListener, UMessage, UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUIDBuilder,
UUri,
use std::{str::FromStr, sync::Arc};
use tokio::{
runtime::Handle,
task,
time::{sleep, Duration},
};
use up_rust::{UListener, UMessage, UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUri};
use up_transport_zenoh::UPClientZenoh;

struct RpcListener {
up_client: Arc<UPClientZenoh>,
Expand All @@ -39,59 +39,63 @@ impl UListener for RpcListener {
payload,
..
} = msg;

// Build the payload to send back
if let Data::Value(v) = payload.unwrap().data.unwrap() {
let value = v.into_iter().map(|c| c as char).collect::<String>();
let source = attributes.clone().unwrap().source.unwrap();
let sink = attributes.clone().unwrap().sink.unwrap();
println!("Receive {value} from {source} to {sink}");
}
let value = payload
.unwrap()
.into_iter()
.map(|c| c as char)
.collect::<String>();
let source = attributes.clone().unwrap().source.unwrap();
let sink = attributes.clone().unwrap().sink.unwrap();
println!("Receive {value} from {source} to {sink}");

// Send back result
let umessage = UMessageBuilder::response_for_request(&attributes)
.with_message_id(UUIDBuilder::build())
.build_with_payload(
// Get current time
format!("{}", Utc::now()).as_bytes().to_vec().into(),
format!("{}", Utc::now()),
UPayloadFormat::UPAYLOAD_FORMAT_TEXT,
)
.unwrap();
block_on(self.up_client.send(umessage)).unwrap();
task::block_in_place(|| {
Handle::current()
.block_on(self.up_client.send(umessage))
.unwrap();
});
}
async fn on_error(&self, err: UStatus) {
panic!("Internal Error: {err:?}");
}
}

#[async_std::main]
#[tokio::main]
async fn main() {
// initiate logging
env_logger::init();

println!("uProtocol RPC server example");
let rpc_server = Arc::new(
UPClientZenoh::new(
common_uuri::get_zenoh_config(),
common_uuri::authority(),
common_uuri::entity(&ExampleType::RpcServer),
)
.await
.unwrap(),
UPClientZenoh::new(common::get_zenoh_config(), String::from("rpc_server"))
.await
.unwrap(),
);

// create uuri
let uuri = UUri {
entity: Some(common_uuri::entity(&ExampleType::RpcServer)).into(),
resource: Some(common_uuri::rpc_resource()).into(),
..Default::default()
};
let src_uuri = UUri::from_str("//*/FFFF/FF/FFFF").unwrap();
let sink_uuri = UUri::from_str("//rpc_server/1/1/1").unwrap();

println!("Register the listener...");
rpc_server
.register_listener(uuri, Arc::new(RpcListener::new(rpc_server.clone())))
.register_listener(
&src_uuri,
Some(&sink_uuri),
Arc::new(RpcListener::new(rpc_server.clone())),
)
.await
.unwrap();

loop {
task::sleep(time::Duration::from_millis(1000)).await;
sleep(Duration::from_millis(1000)).await;
}
}
Loading

0 comments on commit 87217c3

Please sign in to comment.