Skip to content

Commit

Permalink
Update examples with new UListener interface and singleton UUIDBuilder.
Browse files Browse the repository at this point in the history
Signed-off-by: ChenYing Kuo <[email protected]>
  • Loading branch information
evshary committed Apr 17, 2024
1 parent 97b250d commit 0e6f348
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 75 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ pedantic = "deny"

[dependencies]
async-std = "1.12.0"
async-trait = "0.1"
chrono = "0.4.31"
env_logger = "0.10.0"
up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "68c8a1d94f0006daf4ba135c9cbbfddcd793108d" }
up-client-zenoh = { git = "https://github.com/eclipse-uprotocol/up-client-zenoh-rust", rev = "8855b9abd4bd27228d30d9061522194f330fa547" }
up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "c705ac97602ad6917a93d23651e8a504ec7bb718" }
# TODO: Should be removed after up-client-zenoh is updated
up-client-zenoh = { git = "https://github.com/ZettaScaleLabs/up-client-zenoh-rust.git", branch = "update_up_rust" }
#up-client-zenoh = { git = "https://github.com/eclipse-uprotocol/up-client-zenoh-rust", rev = "8855b9abd4bd27228d30d9061522194f330fa547" }
zenoh = { version = "0.10.1-rc", features = ["unstable"]}

[[bin]]
Expand Down
27 changes: 21 additions & 6 deletions src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ use async_std::task;
use std::time;
use up_client_zenoh::UPClientZenoh;
use up_rust::{
transport::{builder::UMessageBuilder, datamodel::UTransport},
uprotocol::{UEntity, UPayloadFormat, UResource, UUri},
uuid::builder::UUIDBuilder,
Number, UAuthority, UEntity, UMessageBuilder, UPayloadFormat, UResource, UTransport,
UUIDBuilder, UUri,
};
use zenoh::config::Config;

Expand All @@ -27,7 +26,23 @@ async fn main() {
env_logger::init();

println!("uProtocol publisher example");
let publisher = UPClientZenoh::new(Config::default()).await.unwrap();
let publisher = UPClientZenoh::new(
Config::default(),
UAuthority {
name: Some("auth_name".to_string()),
number: Some(Number::Id(vec![1, 2, 3, 4])),
..Default::default()
},
UEntity {
name: "entity_pub".to_string(),
id: Some(1),
version_major: Some(1),
version_minor: None,
..Default::default()
},
)
.await
.unwrap();

// create uuri
let uuri = UUri {
Expand All @@ -52,9 +67,9 @@ async fn main() {
let mut cnt: u64 = 0;
loop {
let data = format!("{cnt}");
let umessage = UMessageBuilder::publish(&uuri)
let umessage = UMessageBuilder::publish(uuri.clone())
.with_message_id(UUIDBuilder::build())
.build_with_payload(
&UUIDBuilder::new(),
data.as_bytes().to_vec().into(),
UPayloadFormat::UPAYLOAD_FORMAT_TEXT,
)
Expand Down
32 changes: 27 additions & 5 deletions src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
//
use up_client_zenoh::UPClientZenoh;
use up_rust::{
rpc::{CallOptionsBuilder, RpcClient},
uprotocol::{Data, UEntity, UPayload, UPayloadFormat, UUri},
uri::builder::resourcebuilder::UResourceBuilder,
CallOptions, Data, Number, RpcClient, UAuthority, UEntity, UPayload, UPayloadFormat,
UResourceBuilder, UUri,
};
use zenoh::config::Config;

Expand All @@ -25,7 +24,23 @@ async fn main() {
env_logger::init();

println!("uProtocol RPC client example");
let rpc_client = UPClientZenoh::new(Config::default()).await.unwrap();
let rpc_client = UPClientZenoh::new(
Config::default(),
UAuthority {
name: Some("auth_name".to_string()),
number: Some(Number::Id(vec![1, 2, 3, 4])),
..Default::default()
},
UEntity {
name: "entity_rpc_client".to_string(),
id: Some(4),
version_major: Some(1),
version_minor: None,
..Default::default()
},
)
.await
.unwrap();

// create uuri
let uuri = UUri {
Expand Down Expand Up @@ -56,7 +71,14 @@ async fn main() {
// invoke RPC method
println!("Send request to {uuri}");
let result = rpc_client
.invoke_method(uuri, payload, CallOptionsBuilder::default().build())
.invoke_method(
uuri,
payload,
CallOptions {
ttl: 1000,
..Default::default()
},
)
.await;

// process the result
Expand Down
111 changes: 62 additions & 49 deletions src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,80 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::task::{self, block_on};
use async_trait::async_trait;
use chrono::Utc;
use std::{sync::Arc, time};
use up_client_zenoh::UPClientZenoh;
use up_rust::{
rpc::RpcServer,
transport::datamodel::UTransport,
uprotocol::{Data, UEntity, UMessage, UMessageType, UPayload, UPayloadFormat, UStatus, UUri},
uri::builder::resourcebuilder::UResourceBuilder,
Data, Number, UAuthority, UEntity, UListener, UMessage, UMessageBuilder, UPayloadFormat,
UResourceBuilder, UStatus, UTransport, UUIDBuilder, UUri,
};
use zenoh::config::Config;

struct RpcListener {
up_client: Arc<UPClientZenoh>,
}
impl RpcListener {
fn new(up_client: Arc<UPClientZenoh>) -> Self {
RpcListener { up_client }
}
}
#[async_trait]
impl UListener for RpcListener {
async fn on_receive(&self, msg: UMessage) {
let UMessage {
attributes,
payload,
..
} = msg;
// Build the payload to send back
if let Data::Value(v) = payload.unwrap().data.unwrap() {
let value = v.into_iter().map(|c| c as char).collect::<String>();
let source = attributes.clone().unwrap().source.unwrap();
let sink = attributes.clone().unwrap().sink.unwrap();
println!("Receive {value} from {source} to {sink}");
}
// Send back result
let umessage = UMessageBuilder::response_for_request(&attributes)
.with_message_id(UUIDBuilder::build())
.build_with_payload(
// Get current time
format!("{}", Utc::now()).as_bytes().to_vec().into(),
UPayloadFormat::UPAYLOAD_FORMAT_TEXT,
)
.unwrap();
block_on(self.up_client.send(umessage)).unwrap();
}
async fn on_error(&self, err: UStatus) {
panic!("Internal Error: {err:?}");
}
}

#[async_std::main]
async fn main() {
// initiate logging
env_logger::init();

println!("uProtocol RPC server example");
let rpc_server = Arc::new(UPClientZenoh::new(Config::default()).await.unwrap());
let rpc_server = Arc::new(
UPClientZenoh::new(
Config::default(),
UAuthority {
name: Some("auth_name".to_string()),
number: Some(Number::Id(vec![1, 2, 3, 4])),
..Default::default()
},
UEntity {
name: "entity_rpc_server".to_string(),
id: Some(3),
version_major: Some(1),
version_minor: None,
..Default::default()
},
)
.await
.unwrap(),
);

// create uuri
let uuri = UUri {
Expand All @@ -48,52 +104,9 @@ async fn main() {
..Default::default()
};

let rpc_server_cloned = rpc_server.clone();
let callback = move |result: Result<UMessage, UStatus>| {
match result {
Ok(msg) => {
let UMessage {
attributes,
payload,
..
} = msg;
// Get the UUri
let source = attributes.clone().unwrap().source.unwrap();
let sink = attributes.clone().unwrap().sink.unwrap();
// Build the payload to send back
if let Data::Value(v) = payload.unwrap().data.unwrap() {
let value = v.into_iter().map(|c| c as char).collect::<String>();
println!("Receive {value} from {source} to {sink}");
}
// Get current time
let upayload = UPayload {
length: Some(0),
format: UPayloadFormat::UPAYLOAD_FORMAT_TEXT.into(),
data: Some(Data::Value(format!("{}", Utc::now()).as_bytes().to_vec())),
..Default::default()
};
// Set the attributes type to Response
let mut uattributes = attributes.unwrap();
uattributes.type_ = UMessageType::UMESSAGE_TYPE_RESPONSE.into();
uattributes.sink = Some(source.clone()).into();
uattributes.source = Some(sink.clone()).into();
// Send back result
block_on(rpc_server_cloned.send(UMessage {
attributes: Some(uattributes).into(),
payload: Some(upayload).into(),
..Default::default()
}))
.unwrap();
}
Err(ustatus) => {
println!("Internal Error: {ustatus:?}");
}
}
};

println!("Register the listener...");
rpc_server
.register_rpc_listener(uuri, Box::new(callback))
.register_listener(uuri, Arc::new(RpcListener::new(rpc_server.clone())))
.await
.unwrap();

Expand Down
44 changes: 31 additions & 13 deletions src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,26 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::task;
use std::time;
use async_trait::async_trait;
use std::{sync::Arc, time};
use up_client_zenoh::UPClientZenoh;
use up_rust::{
transport::datamodel::UTransport,
uprotocol::{Data, UEntity, UMessage, UResource, UStatus, UUri},
Data, Number, UAuthority, UEntity, UListener, UMessage, UResource, UStatus, UTransport, UUri,
};
use zenoh::config::Config;

fn callback(result: Result<UMessage, UStatus>) {
match result {
Ok(msg) => {
struct SubscriberListener;
#[async_trait]
impl UListener for SubscriberListener {
async fn on_receive(&self, msg: UMessage) {
if let Data::Value(v) = msg.payload.unwrap().data.unwrap() {
let value = v.into_iter().map(|c| c as char).collect::<String>();
let uri = msg.attributes.unwrap().source.unwrap().to_string();
if let Data::Value(v) = msg.payload.unwrap().data.unwrap() {
let value = v.into_iter().map(|c| c as char).collect::<String>();
println!("Receiving {value} from {uri}");
}
println!("Receiving {value} from {uri}");
}
Err(ustatus) => println!("Internal Error: {ustatus:?}"),
}
async fn on_error(&self, err: UStatus) {
panic!("Internal Error: {err:?}");
}
}

Expand All @@ -39,7 +41,23 @@ async fn main() {
env_logger::init();

println!("uProtocol subscriber example");
let subscriber = UPClientZenoh::new(Config::default()).await.unwrap();
let subscriber = UPClientZenoh::new(
Config::default(),
UAuthority {
name: Some("auth_name".to_string()),
number: Some(Number::Id(vec![1, 2, 3, 4])),
..Default::default()
},
UEntity {
name: "entity_sub".to_string(),
id: Some(2),
version_major: Some(1),
version_minor: None,
..Default::default()
},
)
.await
.unwrap();

// create uuri
let uuri = UUri {
Expand All @@ -63,7 +81,7 @@ async fn main() {

println!("Register the listener...");
subscriber
.register_listener(uuri, Box::new(callback))
.register_listener(uuri, Arc::new(SubscriberListener {}))
.await
.unwrap();

Expand Down

0 comments on commit 0e6f348

Please sign in to comment.