From ce0d849932707e6e757f20341c1cf38bf5c60605 Mon Sep 17 00:00:00 2001 From: Johannes Barthel Date: Fri, 12 Jul 2024 11:23:23 +0200 Subject: [PATCH 1/6] workin params --- .cargo/config.toml | 2 + Cargo.toml | 6 +- README.md | 9 ++ src/client_api.rs | 8 +- src/core.rs | 293 ++++++++++++++++++++++++++++++++++----------- src/lib.rs | 2 + src/param_tree.rs | 172 ++++++++++++++++++++++++++ 7 files changed, 416 insertions(+), 76 deletions(-) create mode 100644 .cargo/config.toml create mode 100644 src/param_tree.rs diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..3c32d25 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[target.aarch64-unknown-linux-gnu] +linker = "aarch64-linux-gnu-gcc" diff --git a/Cargo.toml b/Cargo.toml index 2eb6bc9..b5bd7d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,9 @@ keywords = ["ros", "rosrust", "roscore", "robotics"] categories = ["science::robotics"] [dependencies] -dxr = { version = "0.5.3", features = ["server", "server-axum"] } +dxr = { path = "../dxr/dxr" } +dxr_server = { path = "../dxr/dxr_server", features = ["axum", "multicall"] } +dxr_client = { path = "../dxr/dxr_client", features = ["reqwest"] } anyhow = "1.0.69" log = "0.4.17" env_logger = "0.10.0" @@ -20,6 +22,8 @@ chrono = "0.4.24" paste = "1.0.12" tokio = { version = "1", features = ["macros", "rt-multi-thread"]} url = "2.3.1" +maplit = "1.0.2" +futures = "0.3.30" [dev-dependencies] rosrust = "0.9" diff --git a/README.md b/README.md index 34b27a6..d2798de 100644 --- a/README.md +++ b/README.md @@ -71,3 +71,12 @@ necessary to use this script to use the standalone implementation on its own. We welcome contributions to this project! If you find a bug or have a feature request, please create an issue on the GitHub repository. If you want to contribute code, feel free to submit a pull request. + + +## Cross-compilation to arm64 +```bash +apt install libssl-dev:arm64 +export AARCH64_UNKNOWN_LINUX_GNU_OPENSSL_LIB_DIR=/usr/lib/aarch64-linux-gnu/ +export AARCH64_UNKNOWN_LINUX_GNU_OPENSSL_INCLUDE_DIR=/usr/include/aarch64-linux-gnu/ +cargo build --target aarch64-unknown-linux-gnu --release +``` \ No newline at end of file diff --git a/src/client_api.rs b/src/client_api.rs index 5991cf7..2ca5646 100644 --- a/src/client_api.rs +++ b/src/client_api.rs @@ -1,4 +1,4 @@ -use dxr::client::{Call, Client, ClientBuilder, Url}; +use dxr_client::{Call, Client, ClientBuilder, Url}; use dxr::Value; pub struct ClientApi { @@ -42,8 +42,8 @@ impl ClientApi { publisher_apis: &Vec, ) -> anyhow::Result<()> { let request = Call::new("publisherUpdate", (caller_id, topic, publisher_apis)); - let result = self.client.call(request).await; - result + let result = self.client.call::<_, ()>(request).await; + Ok(result?) } /// Sends a "paramUpdate" request to the ROS node. @@ -65,6 +65,6 @@ impl ClientApi { ) -> anyhow::Result<()> { let request = Call::new("paramUpdate", (caller_id, key, value)); let result = self.client.call(request).await; - result + Ok(result?) } } diff --git a/src/core.rs b/src/core.rs index efb1ecf..773d968 100644 --- a/src/core.rs +++ b/src/core.rs @@ -1,13 +1,17 @@ extern crate dxr; -use dxr::client::{Call, Client, ClientBuilder, Url}; +use dxr_client::{Call, Client, ClientBuilder, Url}; +use futures::stream::FuturesOrdered; +use maplit::hashmap; use paste::paste; +use tokio::task::JoinSet; use std::collections::{HashMap, HashSet}; +use std::future::Future; +use futures::StreamExt; use std::sync::{Arc, RwLock}; -use dxr::server::{async_trait, Handler, HandlerResult}; -use dxr::server_axum::axum; -use dxr::server_axum::Server; -use dxr::server_axum::{axum::http::HeaderMap, RouteBuilder}; +use dxr_server::{async_trait, Handler, HandlerResult}; +use dxr_server::{axum::{self, http::HeaderMap}, Server, RouteBuilder}; + use dxr::{TryFromParams, TryFromValue, TryToValue, Value}; use crate::client_api::ClientApi; @@ -17,8 +21,7 @@ pub type Nodes = HashMap; pub type Topics = HashMap; pub type Subscriptions = HashMap>; pub type Publishers = HashMap>; -pub type Parameters = HashMap; -pub type ParameterSubscriptions = HashMap>; +pub type Parameters = crate::param_tree::ParamValue; /// An enum that represents the different types of endpoints that can be accessed in the ROS Master API. /// @@ -68,6 +71,7 @@ enum MasterEndpoints { HasParam, GetParamNames, SystemMultiCall, + GetPid, Default, } @@ -95,11 +99,19 @@ impl MasterEndpoints { MasterEndpoints::HasParam => "hasParam", MasterEndpoints::GetParamNames => "getParamNames", MasterEndpoints::SystemMultiCall => "system.multicall", + MasterEndpoints::GetPid => "getPid", MasterEndpoints::Default => "", } } } +#[derive(Debug)] +struct ParamSubscription { + node_id : String, + param : String, + api_uri : String +} + /// Struct containing information about ROS data. pub struct RosData { // RwLocks to allow for concurrent read/write access to data @@ -109,7 +121,7 @@ pub struct RosData { subscriptions: RwLock, // stores information about topic subscriptions publications: RwLock, // stores information about topic publishers parameters: RwLock, // stores information about ROS parameters - parameter_subscriptions: RwLock, // stores information about parameter subscriptions + parameter_subscriptions: RwLock>, // stores information about parameter subscriptions uri: std::net::SocketAddr, // the address of the ROS network } @@ -399,7 +411,16 @@ impl Handler for RegisterPublisherHandler { .unwrap_or_default(); // Inform all subscribers of the new publisher. - let publisher_apis = publishers.into_iter().collect::>(); + let publisher_nodes = publishers.into_iter().collect::>(); + let publisher_apis = self + .data + .nodes + .read() // Note: This should not be a race condition, because for every publisher, the node has to be there first, and we're reading "nodes" after "publishers". + .unwrap() + .iter() + .filter(|node| publisher_nodes.contains(node.0)) + .map(|node| node.1.clone()) + .collect::>(); for client_api_url in subscribers_api_urls.clone() { let client_api = ClientApi::new(client_api_url.as_str()); log::debug!("Call {}", client_api_url); @@ -441,7 +462,7 @@ impl Handler for UnRegisterPublisherHandler { log::debug!("UnRegisterPublisherHandler {:?} ", params); type Request = (String, String, String); let (caller_id, topic, caller_api) = Request::try_from_params(params)?; - println!("Called {caller_id} with {topic} {caller_api}"); + log::debug!("Called {caller_id} with {topic} {caller_api}"); if self .data @@ -678,6 +699,35 @@ impl Handler for GetUriHandler { } } +/// Handler for getting the PID of the master. +/// +/// # Parameters +/// +/// - `caller_id` - ROS caller ID (string) +/// +/// # Returns +/// +/// A tuple of integers and a string representing the response: +/// +/// - `code` - response code (integer) +/// - `statusMessage` - status message (string) +/// - `pid` - PID of the ROS master (integer) +struct GetPidHandler { + #[allow(unused)] + data: Arc, +} +type GetPidResponse = (i32, String, i32); +#[async_trait] +impl Handler for GetPidHandler { + async fn handle(&self, params: &[Value], _headers: HeaderMap) -> HandlerResult { + log::debug!("GetPidHandler {:?} ", params); + type Request = String; + let _caller_id = Request::try_from_params(params)?; + let result = std::process::id() as i32; // max pid on linux is 2^22, so the typecast should have no unintended side effects + return Ok((1, "", (result,)).try_to_value()?); + } +} + /// Handler for looking up all providers of a particular service. /// /// # Parameters @@ -760,11 +810,31 @@ impl Handler for DeleteParamHandler { log::debug!("DeleteParamHandler {:?} ", params); type Request = (String, String); let (_caller_id, key) = Request::try_from_params(params)?; - self.data.parameters.write().unwrap().remove(&key); + let key = key.strip_prefix("/").unwrap_or(&key).split("/"); + self.data.parameters.write().unwrap().remove(key); return Ok((1, "", 0).try_to_value()?); } } +fn one_is_prefix_of_the_other(a : &str, b: &str) -> bool { + let len = a.len().min(b.len()); + a[..len] == b[..len] +} + +async fn update_client_with_new_param_value( + client_api_url : String, + updating_node_id : String, + subscribing_node_id : String, + param_name : String, + new_value : Value +) -> Result<(), anyhow::Error> + { + let client_api = ClientApi::new(&client_api_url); + let request = client_api + .param_update(&updating_node_id, ¶m_name, &new_value); + request.await +} + /// Handler for setting a ROS parameter. /// /// # Parameters @@ -794,46 +864,94 @@ impl Handler for SetParamHandler { log::debug!("SetParamHandler {:?} ", params); type Request = (String, String, Value); let (caller_id, key, value) = Request::try_from_params(params)?; - self.data - .parameters - .write() - .unwrap() - .insert(key.clone(), value.clone()); + + let mut update_futures = JoinSet::new(); - // TODO(patwie): handle case where value is not a single value. - let all_key_values; - all_key_values = vec![(key.clone(), value.clone())]; - - for (cur_key, cur_value) in all_key_values { - // Update the parameter value - self.data - .parameters - .write() - .unwrap() - .insert(cur_key.clone(), cur_value.clone()); - // Notify any parameter subscribers about this new value - let subscribers = self - .data - .parameter_subscriptions - .read() - .unwrap() - .get(&cur_key) - .cloned(); - - if let Some(subscribers) = subscribers { - for client_api_url in subscribers.values() { - let client_api = ClientApi::new(client_api_url.as_str()); - log::debug!("Call {}", client_api_url); - let r = client_api - .param_update(&caller_id.as_str(), &cur_key.as_str(), &cur_value) - .await; - if let Err(r) = r { - log::warn!("paramUpdate call to {} failed: {}", client_api_url, r); - } + { + let key = key.clone(); + let mut params = self.data.parameters.write().unwrap(); + let key_split = key.strip_prefix("/").unwrap_or(&key).split("/"); + params.update_inner(key_split, value); + + let param_subscriptions = self.data.parameter_subscriptions.read().unwrap(); + log::info!("updating param {}", &key); + for subscription in param_subscriptions.iter() { + log::debug!("subscriber {:?} has subscription? {}", &subscription, one_is_prefix_of_the_other(&key, &subscription.param)); + if one_is_prefix_of_the_other(&key, &subscription.param) { + let subscribed_key_spit = subscription.param.strip_prefix("/").unwrap_or(&subscription.param).split("/"); + let new_value = params.get(subscribed_key_spit).unwrap(); + update_futures.spawn( + update_client_with_new_param_value( + subscription.api_uri.clone(), + caller_id.clone(), + subscription.node_id.clone(), + subscription.param.clone(), + new_value + ) + ); } } } + while let Some(res) = update_futures.join_next().await { + match res { + Ok(Ok(())) => { + log::debug!("a subscriber has been updated"); + }, + Ok(Err(err)) => { + log::warn!("Error updating a subscriber of changed param {}:\n{:#?}", &key, err); + }, + Err(err) => { + log::warn!("Error updating a subscriber of changed param {}:\n{:#?}", &key, err); + }, + } + } + + log::info!("done updating subscribers"); + + // TODO: send update notifications + + // } + // self.data + // .parameters + // .write() + // .unwrap() + // .insert(key.clone(), value.clone()); + + // TODO(patwie): handle case where value is not a single value. + // let all_key_values; + // all_key_values = vec![(key.clone(), value.clone())]; + + // for (cur_key, cur_value) in all_key_values { + // // Update the parameter value + // // self.data + // // .parameters + // // .write() + // // .unwrap() + // // .insert(cur_key.clone(), cur_value.clone()); + // // Notify any parameter subscribers about this new value + // let subscribers = self + // .data + // .parameter_subscriptions + // .read() + // .unwrap() + // .get(&cur_key) + // .cloned(); + + // if let Some(subscribers) = subscribers { + // for client_api_url in subscribers.values() { + // let client_api = ClientApi::new(client_api_url.as_str()); + // log::debug!("Call {}", client_api_url); + // let r = client_api + // .param_update(&caller_id.as_str(), &cur_key.as_str(), &cur_value) + // .await; + // if let Err(r) = r { + // log::warn!("paramUpdate call to {} failed: {}", client_api_url, r); + // } + // } + // } + // } + Ok((1, "", 0).try_to_value()?) } } @@ -864,9 +982,14 @@ impl Handler for GetParamHandler { log::debug!("GetParamHandler {:?} ", params); type Request = (String, String); let (_caller_id, key) = Request::try_from_params(params)?; - Ok(match self.data.parameters.read().unwrap().get(&key) { + let params = self.data.parameters.read().unwrap(); + let key = key.strip_prefix("/").unwrap_or(&key).split("/"); + + Ok(match params.get(key) { Some(value) => (1, "", value.to_owned()), - None => (0, "", Value::string("")), + None => { + (0, "", Value::string("".to_owned())) + }, } .try_to_value()?) } @@ -898,22 +1021,39 @@ impl Handler for SubscribeParamHandler { log::debug!("SubscribeParamHandler {:?} ", params); type Request = (String, String, String); let (caller_id, caller_api, key) = Request::try_from_params(params)?; - let param_subscriptions = &mut self.data.parameter_subscriptions.write().unwrap(); - if !param_subscriptions.contains_key(&key) { - param_subscriptions.insert(key.clone(), HashMap::new()); + + let mut new_subscription = Some(ParamSubscription { + node_id: caller_id.clone(), + param: key.clone(), + api_uri: caller_api + }); + + { // RwLock scope + let param_subscriptions = &mut self.data.parameter_subscriptions.write().unwrap(); + + // replace old entry if subscribing node has restarted + for subscription in param_subscriptions.iter_mut() { + if &subscription.node_id == &caller_id && &subscription.param == &key { + let _ = std::mem::replace(subscription, new_subscription.take().unwrap()); + break; + } + } + + // add a new entry if it's a new node id + if let Some(new_subscription) = new_subscription { + param_subscriptions.push(new_subscription) + } } - let subscriptions = param_subscriptions.get_mut(&key).unwrap(); - subscriptions.insert(caller_id.clone(), caller_api.clone()); + let key_split = key.strip_prefix("/").unwrap_or(&key).split("/"); let value = self .data .parameters .read() .unwrap() - .get(&key) - .cloned() - .unwrap_or(Value::string("")); + .get(key_split) + .unwrap_or(Value::string("".to_owned())); Ok((1, "", value).try_to_value()?) } } @@ -946,11 +1086,15 @@ impl Handler for UnSubscribeParamHandler { let (caller_id, _caller_api, key) = Request::try_from_params(params)?; let mut parameter_subscriptions = self.data.parameter_subscriptions.write().unwrap(); - let subscribers = parameter_subscriptions.entry(key.clone()).or_default(); - let removed = subscribers.remove(&caller_id).is_some(); - if subscribers.is_empty() { - parameter_subscriptions.remove(&key); - } + let mut removed = false; + parameter_subscriptions.retain(|subscription| { + if subscription.node_id == caller_id && subscription.param == key { + removed = true; + false + } else { + true + } + }); Ok((1, "", if removed { 1 } else { 0 }).try_to_value()?) } } @@ -977,9 +1121,11 @@ type HasParamResponse = (i32, String, bool); impl Handler for HasParamHandler { async fn handle(&self, params: &[Value], _headers: HeaderMap) -> HandlerResult { log::debug!("HasParamHandler {:?} ", params); + type Request = (String, String); let (_caller_id, key) = Request::try_from_params(params)?; - let has = self.data.parameters.read().unwrap().contains_key(&key); + + let has = self.data.parameters.read().unwrap().get_keys().contains(&key); Ok((1, "", has).try_to_value()?) } } @@ -1005,16 +1151,19 @@ type GetParamNamesResponse = (i32, String, Vec); impl Handler for GetParamNamesHandler { async fn handle(&self, params: &[Value], _headers: HeaderMap) -> HandlerResult { log::debug!("GetParamNamesHandler {:?} ", params); - type Request = (String, String); - let _caller_id = Request::try_from_params(params)?; + let a = <(String, String)>::try_from_params(params); + let b = <(String,)>::try_from_params(params); + + if a.is_err() && b.is_err() { + a?; + } + let keys: Vec = self .data .parameters .read() .unwrap() - .keys() - .cloned() - .collect(); + .get_keys(); Ok((1, "", keys).try_to_value()?) } } @@ -1065,8 +1214,8 @@ impl Master { topics: RwLock::new(Topics::new()), subscriptions: RwLock::new(Subscriptions::new()), publications: RwLock::new(Publishers::new()), - parameters: RwLock::new(Parameters::new()), - parameter_subscriptions: RwLock::new(ParameterSubscriptions::new()), + parameters: RwLock::new(Parameters::HashMap(hashmap! {})), + parameter_subscriptions: RwLock::new(Vec::new()), uri: url.to_owned(), }), } @@ -1096,6 +1245,7 @@ impl Master { MasterEndpoints::HasParam => HasParamHandler, MasterEndpoints::GetParamNames => GetParamNamesHandler, MasterEndpoints::SystemMultiCall => DebugOutputHandler, + MasterEndpoints::GetPid => GetPidHandler, MasterEndpoints::Default => DebugOutputHandler ); router @@ -1127,8 +1277,8 @@ impl Master { .nest("/", self.create_router()) .nest("/RPC2", self.create_router()); log::info!("roscore-rs is listening on {}", self.data.uri); - let server = Server::from_route(self.data.uri, router); - server.serve().await + let server = Server::from_route(router); + Ok(server.serve(self.data.uri.try_into()?).await?) } } @@ -1195,6 +1345,7 @@ impl MasterClient { GetTopicTypes(caller_id: &str) -> GetTopicTypesResponse, GetSystemState(caller_id: &str) -> GetSystemStateResponse, GetUri(caller_id: &str) -> GetUriResponse, + GetPid(caller_id: &str) -> GetPidResponse, LookupService(caller_id: &str, service: &str) -> LookupServiceResponse, DeleteParam(caller_id: &str, key: &str) -> DeleteParamResponse, // TODO(): correct args diff --git a/src/lib.rs b/src/lib.rs index f2e7fd7..9cf608c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,8 @@ pub mod core; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use url::Url; +mod param_tree; + pub fn url_to_socket_addr(url: &Url) -> anyhow::Result { let ip_addr = match url.host() { Some(url::Host::Domain(domain)) if domain == "localhost" => IpAddr::V4(Ipv4Addr::LOCALHOST), diff --git a/src/param_tree.rs b/src/param_tree.rs new file mode 100644 index 0000000..1f451bd --- /dev/null +++ b/src/param_tree.rs @@ -0,0 +1,172 @@ +use std::{any::Any, collections::HashMap, iter::Peekable, mem}; + +use dxr::{TryFromParams, TryFromValue, TryToValue, Value}; + +#[derive(Debug, PartialEq)] +pub(crate) enum ParamValue { + HashMap(HashMap), + Array(Vec), + Value(Value), +} + +impl From<&Value> for ParamValue { + fn from(value: &Value) -> Self { + if let (Ok(hm)) = HashMap::::try_from_value(value) { + let mut rv = HashMap::with_capacity(hm.len()); + for (k, v) in hm.into_iter() { + rv.insert(k, ParamValue::from(&v)); + } + return Self::HashMap(rv); + } + if let (Ok(vec)) = Vec::::try_from_value(value) { + let mut rv = Vec::with_capacity(vec.len()); + for e in vec.into_iter() { + rv.push(ParamValue::from(&e)) + } + return Self::Array(rv); + } + Self::Value(value.clone()) + } +} + +impl TryToValue for ParamValue { + fn try_to_value(&self) -> Result { + match self { + ParamValue::Value(v) => Ok(v.clone()), + ParamValue::Array(arr) => arr.try_to_value(), + ParamValue::HashMap(hm) => hm.try_to_value(), + } + } +} + +impl ParamValue { + pub(crate) fn get_keys(&self) -> Vec { + match self { + ParamValue::HashMap(hm) => { + let mut keys = Vec::new(); + for (k, v) in hm.iter() { + keys.push(format!("/{k}")); + for suffix in v.get_keys() { + keys.push(format!("/{k}{suffix}")); + } + } + keys + } + _ => Vec::new(), + } + } + pub(crate) fn get(&self, key: I) -> Option + where + I: IntoIterator, + T: AsRef, + { + let mut hm = self; + for e in key.into_iter() { + let e = e.as_ref(); + if e == "" { + continue; + } + match hm { + ParamValue::HashMap(inner) => { + if let Some(inner_value) = inner.get(e) { + hm = inner_value; + } else { + return None; + } + } + _ => return None, + } + } + Some(hm.try_to_value().unwrap()) + } + + pub(crate) fn remove(&mut self, key: I) + where + I: IntoIterator, + T: AsRef, + { + let mut peekable = key.into_iter().peekable(); + match self { + ParamValue::HashMap(inner) => { + let mut hm = inner; + loop { + let current_key = peekable.next(); + let next_key = peekable.peek(); + match (current_key, next_key) { + (Some(current_key), None) => { + hm.remove(current_key.as_ref()); + return; + } + (None, None) => { + let _ = mem::replace(self, ParamValue::HashMap(hashmap! {})); + return; + } + (None, Some(_)) => unreachable!(), + (Some(current_key), Some(_)) => match hm.get_mut(current_key.as_ref()) { + Some(ParamValue::HashMap(new_hm)) => hm = new_hm, + _ => return, + }, + } + } + } + _ => (), + } + } + + pub(crate) fn update_inner(&mut self, mut key: I, value: Value) + where + I: Iterator, + T: AsRef, + { + match key.next() { + None => { + let _ = mem::replace(self, ParamValue::from(&value)); + } + Some(next_key) => match self { + ParamValue::HashMap(hm) => match hm.get_mut(next_key.as_ref()) { + Some(inner) => inner.update_inner(key, value), + None => { + hm.insert(next_key.as_ref().to_string(), { + let mut inner = ParamValue::HashMap(HashMap::new()); + inner.update_inner(key, value); + inner + }); + } + }, + _ => { + let mut inner = ParamValue::HashMap(hashmap! {}); + inner.update_inner(key, value); + let outer = ParamValue::HashMap(hashmap! { + next_key.as_ref().to_string() => inner + }); + let _ = mem::replace(self, outer); + } + }, + } + } +} + +use maplit::hashmap; + +#[test] +fn test_param_tree() { + let mut tree = ParamValue::HashMap(hashmap! { + "run_id".to_owned() => ParamValue::Value(Value::string("asdf-jkl0".to_owned())), + "robot_id".to_owned() => ParamValue::Value(Value::i4(42)), + "robot_configs".to_owned() => ParamValue::Array(vec![ + ParamValue::HashMap(hashmap! { + "robot_speed".to_owned() => ParamValue::Value(Value::double(3.0)), + "robot_id".to_owned() => ParamValue::Value(Value::i4(24)) + }) + ]), + "arms".to_owned() => ParamValue::HashMap(hashmap! { + "arm_left".to_owned() => ParamValue::HashMap(hashmap! { + "length".to_owned() => ParamValue::Value(Value::double(-0.45)) + }) + }) + }); + + tree.update_inner(["robot_configs"].iter(), Value::i4(23)); + let res = tree.get(["robot_configs"]).unwrap(); + assert_eq!(res, Value::i4(23)); +} From 07fb1575854a91ed37234ef81190d348eed6dab7 Mon Sep 17 00:00:00 2001 From: Johannes Barthel Date: Fri, 12 Jul 2024 13:29:09 +0200 Subject: [PATCH 2/6] switch to rustls for better cross-compilation support --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index b5bd7d9..34d9934 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ categories = ["science::robotics"] [dependencies] dxr = { path = "../dxr/dxr" } dxr_server = { path = "../dxr/dxr_server", features = ["axum", "multicall"] } -dxr_client = { path = "../dxr/dxr_client", features = ["reqwest"] } +dxr_client = { path = "../dxr/dxr_client", default-features = false, features = ["reqwest", "rustls-tls"] } anyhow = "1.0.69" log = "0.4.17" env_logger = "0.10.0" From 57956d9eff2f76ab8ffdabe62fa27e705e3f5745 Mon Sep 17 00:00:00 2001 From: Johannes Barthel Date: Fri, 12 Jul 2024 13:29:27 +0200 Subject: [PATCH 3/6] implement name resolution --- src/core.rs | 45 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 40 insertions(+), 5 deletions(-) diff --git a/src/core.rs b/src/core.rs index 773d968..5777de8 100644 --- a/src/core.rs +++ b/src/core.rs @@ -155,6 +155,9 @@ impl Handler for RegisterServiceHandler { log::debug!("RegisterServiceHandler {:?} ", params); type Request = (String, String, String, String); let (caller_id, service, service_api, caller_api) = Request::try_from_params(params)?; + + let service = resolve(&caller_id, &service); + self.data .service_list .write() @@ -202,6 +205,8 @@ impl Handler for UnRegisterServiceHandler { type Request = (String, String, String); let (caller_id, service, _service_api) = Request::try_from_params(params)?; + let service = resolve(&caller_id, &service); + let mut service_list = self.data.service_list.write().unwrap(); let removed = if let Some(providers) = service_list.get_mut(&service) { @@ -246,6 +251,8 @@ impl Handler for RegisterSubscriberHandler { type Request = (String, String, String, String); let (caller_id, topic, topic_type, caller_api) = Request::try_from_params(params)?; + let topic = resolve(&caller_id, &topic); + if let Some(known_topic_type) = self.data.topics.read().unwrap().get(&topic.clone()) { if known_topic_type != &topic_type { let err_msg = format!( @@ -315,6 +322,8 @@ impl Handler for UnRegisterSubscriberHandler { type Request = (String, String, String); let (caller_id, topic, _caller_api) = Request::try_from_params(params)?; + let topic = resolve(&caller_id, &topic); + let removed = self .data .subscriptions @@ -361,6 +370,8 @@ impl Handler for RegisterPublisherHandler { type Request = (String, String, String, String); let (caller_id, topic, topic_type, caller_api) = Request::try_from_params(params)?; + let topic = resolve(&caller_id, &topic); + if let Some(v) = self.data.topics.read().unwrap().get(&topic.clone()) { if v != &topic_type { let err_msg = format!("{} for topic {} does not match {}", topic_type, topic, v); @@ -462,6 +473,9 @@ impl Handler for UnRegisterPublisherHandler { log::debug!("UnRegisterPublisherHandler {:?} ", params); type Request = (String, String, String); let (caller_id, topic, caller_api) = Request::try_from_params(params)?; + + let topic = resolve(&caller_id, &topic); + log::debug!("Called {caller_id} with {topic} {caller_api}"); if self @@ -752,7 +766,9 @@ impl Handler for LookupServiceHandler { async fn handle(&self, params: &[Value], _headers: HeaderMap) -> HandlerResult { log::debug!("LookupServiceHandler {:?} ", params); type Request = (String, String); - let (_caller_id, service) = Request::try_from_params(params)?; + let (caller_id, service) = Request::try_from_params(params)?; + + let service = resolve(&caller_id, &service); let services = self .data @@ -809,7 +825,8 @@ impl Handler for DeleteParamHandler { async fn handle(&self, params: &[Value], _headers: HeaderMap) -> HandlerResult { log::debug!("DeleteParamHandler {:?} ", params); type Request = (String, String); - let (_caller_id, key) = Request::try_from_params(params)?; + let (caller_id, key) = Request::try_from_params(params)?; + let key = resolve(&caller_id, &key); let key = key.strip_prefix("/").unwrap_or(&key).split("/"); self.data.parameters.write().unwrap().remove(key); return Ok((1, "", 0).try_to_value()?); @@ -864,6 +881,7 @@ impl Handler for SetParamHandler { log::debug!("SetParamHandler {:?} ", params); type Request = (String, String, Value); let (caller_id, key, value) = Request::try_from_params(params)?; + let key = resolve(&caller_id, &key); let mut update_futures = JoinSet::new(); @@ -981,7 +999,8 @@ impl Handler for GetParamHandler { async fn handle(&self, params: &[Value], _headers: HeaderMap) -> HandlerResult { log::debug!("GetParamHandler {:?} ", params); type Request = (String, String); - let (_caller_id, key) = Request::try_from_params(params)?; + let (caller_id, key) = Request::try_from_params(params)?; + let key = resolve(&caller_id, &key); let params = self.data.parameters.read().unwrap(); let key = key.strip_prefix("/").unwrap_or(&key).split("/"); @@ -1021,6 +1040,7 @@ impl Handler for SubscribeParamHandler { log::debug!("SubscribeParamHandler {:?} ", params); type Request = (String, String, String); let (caller_id, caller_api, key) = Request::try_from_params(params)?; + let key = resolve(&caller_id, &key); let mut new_subscription = Some(ParamSubscription { node_id: caller_id.clone(), @@ -1084,6 +1104,7 @@ impl Handler for UnSubscribeParamHandler { log::debug!("UnSubscribeParamHandler {:?} ", params); type Request = (String, String, String); let (caller_id, _caller_api, key) = Request::try_from_params(params)?; + let key = resolve(&caller_id, &key); let mut parameter_subscriptions = self.data.parameter_subscriptions.write().unwrap(); let mut removed = false; @@ -1099,6 +1120,20 @@ impl Handler for UnSubscribeParamHandler { } } +fn resolve(caller_id : &str, key : &str) -> String { + match key.chars().next() { + None => "".to_owned(), + Some('/') => key.to_owned(), + Some('~') => format!("{}/{}", caller_id, &key[1..]), + Some(_) => { + match caller_id.rsplit_once("/") { + Some((namespace, _node_name)) => format!("{}/{}", namespace, key), + None => key.to_owned() + } + } + } +} + /// Handler for checking if a parameter is stored on the server. /// /// # Parameters @@ -1123,8 +1158,8 @@ impl Handler for HasParamHandler { log::debug!("HasParamHandler {:?} ", params); type Request = (String, String); - let (_caller_id, key) = Request::try_from_params(params)?; - + let (caller_id, key) = Request::try_from_params(params)?; + let key = resolve(&caller_id, &key); let has = self.data.parameters.read().unwrap().get_keys().contains(&key); Ok((1, "", has).try_to_value()?) } From d99ec502f3eb558faaa00ca21b5c474062415e2c Mon Sep 17 00:00:00 2001 From: Johannes Barthel Date: Thu, 18 Jul 2024 14:21:28 +0200 Subject: [PATCH 4/6] add run_id --- Cargo.toml | 1 + src/core.rs | 39 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 34d9934..0f3698c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"]} url = "2.3.1" maplit = "1.0.2" futures = "0.3.30" +uuid = { version = "1.10.0", features = ["v1", "rng"] } [dev-dependencies] rosrust = "0.9" diff --git a/src/core.rs b/src/core.rs index 5777de8..d6a0780 100644 --- a/src/core.rs +++ b/src/core.rs @@ -4,6 +4,7 @@ use futures::stream::FuturesOrdered; use maplit::hashmap; use paste::paste; use tokio::task::JoinSet; +use uuid::{Context, NoContext}; use std::collections::{HashMap, HashSet}; use std::future::Future; use futures::StreamExt; @@ -15,6 +16,7 @@ use dxr_server::{axum::{self, http::HeaderMap}, Server, RouteBuilder}; use dxr::{TryFromParams, TryFromValue, TryToValue, Value}; use crate::client_api::ClientApi; +use crate::param_tree::ParamValue; pub type Services = HashMap>; pub type Nodes = HashMap; @@ -1240,8 +1242,41 @@ macro_rules! make_handlers { }}; } +fn get_node_id() -> Option<[u8; 6]> { + let ip_link = std::process::Command::new("ip").arg("link").output().ok()?.stdout; + let ip_link = String::from_utf8_lossy(&ip_link); + let mut next_is_mac = false; + let mut mac = None; + for element in ip_link.split_whitespace() { + if next_is_mac { + mac = Some(element); + break; + } + if element == "link/ether" { + next_is_mac = true; + } + }; + let mac = mac?; + let mut all_ok = true; + let mac : Vec = mac + .split(':') + .filter_map(|hex| { + let res = u8::from_str_radix(hex, 16); + all_ok &= res.is_ok(); + res.ok() + }) + .collect(); + if !all_ok { + return None + } + let mac : [u8; 6] = mac.try_into().ok()?; + Some(mac) +} + impl Master { pub fn new(url: &std::net::SocketAddr) -> Master { + + let run_id = ParamValue::Value(Value::string(uuid::Uuid::new_v1(uuid::Timestamp::now(Context::new_random()), &get_node_id().unwrap_or_default()).to_string())); Master { data: Arc::new(RosData { service_list: RwLock::new(Services::new()), @@ -1249,7 +1284,9 @@ impl Master { topics: RwLock::new(Topics::new()), subscriptions: RwLock::new(Subscriptions::new()), publications: RwLock::new(Publishers::new()), - parameters: RwLock::new(Parameters::HashMap(hashmap! {})), + parameters: RwLock::new(Parameters::HashMap(hashmap! { + "run_id".to_owned() => run_id + })), parameter_subscriptions: RwLock::new(Vec::new()), uri: url.to_owned(), }), From e3cce16f15f446783275a3f144c2630493e5ac96 Mon Sep 17 00:00:00 2001 From: Johannes Barthel Date: Thu, 18 Jul 2024 18:47:04 +0200 Subject: [PATCH 5/6] implement search_param (needed for robot_state_publisher) --- src/core.rs | 47 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/src/core.rs b/src/core.rs index d6a0780..bf47b6a 100644 --- a/src/core.rs +++ b/src/core.rs @@ -1016,6 +1016,49 @@ impl Handler for GetParamHandler { } } +struct SearchParamHandler { + data: Arc, +} +type SearchParamResponse = (i32, String, Value); +#[async_trait] +impl Handler for SearchParamHandler { + async fn handle(&self, params: &[Value], _headers: HeaderMap) -> HandlerResult { + log::debug!("GetParamHandler {:?} ", params); + type Request = (String, String); + let (caller_id, key) = Request::try_from_params(params)?; + + let mut param_name = String::with_capacity(caller_id.len() + key.len()); + + let params = self.data.parameters.read().unwrap().get_keys(); + let key = key.strip_prefix("/").unwrap_or(&key); + let key_first_element = key.split("/").next().unwrap_or(""); + let namespace = caller_id.strip_prefix("/").unwrap_or(&caller_id).split("/").collect::>(); + + let range = (0usize..namespace.len()).rev(); + + for up_to in range { + param_name.clear(); + param_name.push('/'); + for idx in 0..up_to { + param_name.push_str(namespace[idx]); + param_name.push('/'); + } + param_name.push_str(key_first_element); + if params.contains(¶m_name) { + break; + } + } + + for path in key.split("/").skip(1) { + param_name.push('/'); + param_name.push_str(path); + } + + Ok((1, "", param_name) + .try_to_value()?) + } +} + /// Handler for subscribing to a parameter value and updates. /// /// # Parameters @@ -1311,7 +1354,7 @@ impl Master { MasterEndpoints::DeleteParam => DeleteParamHandler, MasterEndpoints::SetParam => SetParamHandler, MasterEndpoints::GetParam => GetParamHandler, - MasterEndpoints::SearchParam => GetParamHandler, + MasterEndpoints::SearchParam => SearchParamHandler, MasterEndpoints::SubscribeParam => SubscribeParamHandler, MasterEndpoints::UnsubscribeParam => UnSubscribeParamHandler, MasterEndpoints::HasParam => HasParamHandler, @@ -1423,7 +1466,7 @@ impl MasterClient { // TODO(): correct args SetParam(caller_id: &str, key: &str, value: &Value) -> SetParamResponse, GetParam(caller_id: &str, key: &str) -> GetParamResponse, - SearchParam(caller_id: &str, key: &str) -> GetParamResponse, + SearchParam(caller_id: &str, key: &str) -> SearchParamResponse, // TODO(): correct args SubscribeParam(caller_id: &str, caller_api: &str, keys: &str) -> SubscribeParamResponse, UnsubscribeParam(caller_id: &str, caller_api: &str, key: &str) -> UnSubscribeParamResponse, From c4960d2865bc7f2bb2e14c13a0bc4d387c3dfe72 Mon Sep 17 00:00:00 2001 From: Johannes Barthel Date: Fri, 19 Jul 2024 11:15:14 +0200 Subject: [PATCH 6/6] switch dependencies back to crates.io, format code, fix 'cargo check' warnings --- Cargo.toml | 9 +- src/client_api.rs | 2 +- src/core.rs | 244 +++++++++++++++++++++++----------------------- src/param_tree.rs | 10 +- 4 files changed, 132 insertions(+), 133 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0f3698c..cdf4411 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,9 +12,12 @@ keywords = ["ros", "rosrust", "roscore", "robotics"] categories = ["science::robotics"] [dependencies] -dxr = { path = "../dxr/dxr" } -dxr_server = { path = "../dxr/dxr_server", features = ["axum", "multicall"] } -dxr_client = { path = "../dxr/dxr_client", default-features = false, features = ["reqwest", "rustls-tls"] } +dxr = { version = "0.6.3" } +dxr_server = { version = "0.6.3", features = ["axum", "multicall"] } +dxr_client = { version = "0.6.3", default-features = false, features = [ + "reqwest", + "rustls-tls" # Use rustls instead of openssl for easier cross-compilation +] } anyhow = "1.0.69" log = "0.4.17" env_logger = "0.10.0" diff --git a/src/client_api.rs b/src/client_api.rs index 2ca5646..3c32c1b 100644 --- a/src/client_api.rs +++ b/src/client_api.rs @@ -1,5 +1,5 @@ -use dxr_client::{Call, Client, ClientBuilder, Url}; use dxr::Value; +use dxr_client::{Call, Client, ClientBuilder, Url}; pub struct ClientApi { client: Client, diff --git a/src/core.rs b/src/core.rs index bf47b6a..1f032cb 100644 --- a/src/core.rs +++ b/src/core.rs @@ -1,17 +1,17 @@ extern crate dxr; use dxr_client::{Call, Client, ClientBuilder, Url}; -use futures::stream::FuturesOrdered; use maplit::hashmap; use paste::paste; -use tokio::task::JoinSet; -use uuid::{Context, NoContext}; use std::collections::{HashMap, HashSet}; -use std::future::Future; -use futures::StreamExt; use std::sync::{Arc, RwLock}; +use tokio::task::JoinSet; +use uuid::Context; use dxr_server::{async_trait, Handler, HandlerResult}; -use dxr_server::{axum::{self, http::HeaderMap}, Server, RouteBuilder}; +use dxr_server::{ + axum::{self, http::HeaderMap}, + RouteBuilder, Server, +}; use dxr::{TryFromParams, TryFromValue, TryToValue, Value}; @@ -109,9 +109,9 @@ impl MasterEndpoints { #[derive(Debug)] struct ParamSubscription { - node_id : String, - param : String, - api_uri : String + node_id: String, + param: String, + api_uri: String, } /// Struct containing information about ROS data. @@ -829,29 +829,42 @@ impl Handler for DeleteParamHandler { type Request = (String, String); let (caller_id, key) = Request::try_from_params(params)?; let key = resolve(&caller_id, &key); - let key = key.strip_prefix("/").unwrap_or(&key).split("/"); + let key = key.strip_prefix('/').unwrap_or(&key).split('/'); self.data.parameters.write().unwrap().remove(key); return Ok((1, "", 0).try_to_value()?); } } -fn one_is_prefix_of_the_other(a : &str, b: &str) -> bool { +fn one_is_prefix_of_the_other(a: &str, b: &str) -> bool { let len = a.len().min(b.len()); a[..len] == b[..len] } async fn update_client_with_new_param_value( - client_api_url : String, - updating_node_id : String, - subscribing_node_id : String, - param_name : String, - new_value : Value -) -> Result<(), anyhow::Error> - { + client_api_url: String, + updating_node_id: String, + subscribing_node_id: String, + param_name: String, + new_value: Value, +) -> Result<(), anyhow::Error> { let client_api = ClientApi::new(&client_api_url); - let request = client_api - .param_update(&updating_node_id, ¶m_name, &new_value); - request.await + let request = client_api.param_update(&updating_node_id, ¶m_name, &new_value); + let res = request.await; + match res { + Ok(()) => log::debug!( + "Sent new value for param '{}' to node '{}'.", + param_name, + subscribing_node_id + ), + Err(ref e) => log::debug!( + "Error sending new value for param '{}' to node '{}': {:?}", + param_name, + subscribing_node_id, + e + ), + } + + res } /// Handler for setting a ROS parameter. @@ -884,31 +897,37 @@ impl Handler for SetParamHandler { type Request = (String, String, Value); let (caller_id, key, value) = Request::try_from_params(params)?; let key = resolve(&caller_id, &key); - + let mut update_futures = JoinSet::new(); { let key = key.clone(); let mut params = self.data.parameters.write().unwrap(); - let key_split = key.strip_prefix("/").unwrap_or(&key).split("/"); + let key_split = key.strip_prefix('/').unwrap_or(&key).split('/'); params.update_inner(key_split, value); let param_subscriptions = self.data.parameter_subscriptions.read().unwrap(); log::info!("updating param {}", &key); for subscription in param_subscriptions.iter() { - log::debug!("subscriber {:?} has subscription? {}", &subscription, one_is_prefix_of_the_other(&key, &subscription.param)); + log::debug!( + "subscriber {:?} has subscription? {}", + &subscription, + one_is_prefix_of_the_other(&key, &subscription.param) + ); if one_is_prefix_of_the_other(&key, &subscription.param) { - let subscribed_key_spit = subscription.param.strip_prefix("/").unwrap_or(&subscription.param).split("/"); + let subscribed_key_spit = subscription + .param + .strip_prefix('/') + .unwrap_or(&subscription.param) + .split('/'); let new_value = params.get(subscribed_key_spit).unwrap(); - update_futures.spawn( - update_client_with_new_param_value( - subscription.api_uri.clone(), - caller_id.clone(), - subscription.node_id.clone(), - subscription.param.clone(), - new_value - ) - ); + update_futures.spawn(update_client_with_new_param_value( + subscription.api_uri.clone(), + caller_id.clone(), + subscription.node_id.clone(), + subscription.param.clone(), + new_value, + )); } } } @@ -917,61 +936,26 @@ impl Handler for SetParamHandler { match res { Ok(Ok(())) => { log::debug!("a subscriber has been updated"); - }, + } Ok(Err(err)) => { - log::warn!("Error updating a subscriber of changed param {}:\n{:#?}", &key, err); - }, + log::warn!( + "Error updating a subscriber of changed param {}:\n{:#?}", + &key, + err + ); + } Err(err) => { - log::warn!("Error updating a subscriber of changed param {}:\n{:#?}", &key, err); - }, + log::warn!( + "Error updating a subscriber of changed param {}:\n{:#?}", + &key, + err + ); + } } } log::info!("done updating subscribers"); - // TODO: send update notifications - - // } - // self.data - // .parameters - // .write() - // .unwrap() - // .insert(key.clone(), value.clone()); - - // TODO(patwie): handle case where value is not a single value. - // let all_key_values; - // all_key_values = vec![(key.clone(), value.clone())]; - - // for (cur_key, cur_value) in all_key_values { - // // Update the parameter value - // // self.data - // // .parameters - // // .write() - // // .unwrap() - // // .insert(cur_key.clone(), cur_value.clone()); - // // Notify any parameter subscribers about this new value - // let subscribers = self - // .data - // .parameter_subscriptions - // .read() - // .unwrap() - // .get(&cur_key) - // .cloned(); - - // if let Some(subscribers) = subscribers { - // for client_api_url in subscribers.values() { - // let client_api = ClientApi::new(client_api_url.as_str()); - // log::debug!("Call {}", client_api_url); - // let r = client_api - // .param_update(&caller_id.as_str(), &cur_key.as_str(), &cur_value) - // .await; - // if let Err(r) = r { - // log::warn!("paramUpdate call to {} failed: {}", client_api_url, r); - // } - // } - // } - // } - Ok((1, "", 0).try_to_value()?) } } @@ -1004,13 +988,11 @@ impl Handler for GetParamHandler { let (caller_id, key) = Request::try_from_params(params)?; let key = resolve(&caller_id, &key); let params = self.data.parameters.read().unwrap(); - let key = key.strip_prefix("/").unwrap_or(&key).split("/"); + let key = key.strip_prefix('/').unwrap_or(&key).split('/'); Ok(match params.get(key) { Some(value) => (1, "", value.to_owned()), - None => { - (0, "", Value::string("".to_owned())) - }, + None => (0, "", Value::string("".to_owned())), } .try_to_value()?) } @@ -1023,19 +1005,25 @@ type SearchParamResponse = (i32, String, Value); #[async_trait] impl Handler for SearchParamHandler { async fn handle(&self, params: &[Value], _headers: HeaderMap) -> HandlerResult { - log::debug!("GetParamHandler {:?} ", params); + log::debug!("SearchParamHandler {:?} ", params); type Request = (String, String); let (caller_id, key) = Request::try_from_params(params)?; let mut param_name = String::with_capacity(caller_id.len() + key.len()); - + + // For an explanation of what the search algorithm does, see the comment in the original code: + // https://github.com/ros/ros_comm/blob/9ae132c/tools/rosmaster/src/rosmaster/paramserver.py#L82 let params = self.data.parameters.read().unwrap().get_keys(); - let key = key.strip_prefix("/").unwrap_or(&key); - let key_first_element = key.split("/").next().unwrap_or(""); - let namespace = caller_id.strip_prefix("/").unwrap_or(&caller_id).split("/").collect::>(); + let key = key.strip_prefix('/').unwrap_or(&key); + let key_first_element = key.split('/').next().unwrap_or(""); + let namespace = caller_id + .strip_prefix('/') + .unwrap_or(&caller_id) + .split('/') + .collect::>(); let range = (0usize..namespace.len()).rev(); - + for up_to in range { param_name.clear(); param_name.push('/'); @@ -1049,13 +1037,12 @@ impl Handler for SearchParamHandler { } } - for path in key.split("/").skip(1) { + for path in key.split('/').skip(1) { param_name.push('/'); param_name.push_str(path); } - Ok((1, "", param_name) - .try_to_value()?) + Ok((1, "", param_name).try_to_value()?) } } @@ -1086,16 +1073,17 @@ impl Handler for SubscribeParamHandler { type Request = (String, String, String); let (caller_id, caller_api, key) = Request::try_from_params(params)?; let key = resolve(&caller_id, &key); - + let mut new_subscription = Some(ParamSubscription { node_id: caller_id.clone(), param: key.clone(), - api_uri: caller_api + api_uri: caller_api, }); - - { // RwLock scope + + { + // RwLock scope let param_subscriptions = &mut self.data.parameter_subscriptions.write().unwrap(); - + // replace old entry if subscribing node has restarted for subscription in param_subscriptions.iter_mut() { if &subscription.node_id == &caller_id && &subscription.param == &key { @@ -1110,7 +1098,7 @@ impl Handler for SubscribeParamHandler { } } - let key_split = key.strip_prefix("/").unwrap_or(&key).split("/"); + let key_split = key.strip_prefix('/').unwrap_or(&key).split('/'); let value = self .data @@ -1165,17 +1153,15 @@ impl Handler for UnSubscribeParamHandler { } } -fn resolve(caller_id : &str, key : &str) -> String { +fn resolve(caller_id: &str, key: &str) -> String { match key.chars().next() { None => "".to_owned(), Some('/') => key.to_owned(), Some('~') => format!("{}/{}", caller_id, &key[1..]), - Some(_) => { - match caller_id.rsplit_once("/") { - Some((namespace, _node_name)) => format!("{}/{}", namespace, key), - None => key.to_owned() - } - } + Some(_) => match caller_id.rsplit_once('/') { + Some((namespace, _node_name)) => format!("{}/{}", namespace, key), + None => key.to_owned(), + }, } } @@ -1201,11 +1187,17 @@ type HasParamResponse = (i32, String, bool); impl Handler for HasParamHandler { async fn handle(&self, params: &[Value], _headers: HeaderMap) -> HandlerResult { log::debug!("HasParamHandler {:?} ", params); - + type Request = (String, String); let (caller_id, key) = Request::try_from_params(params)?; let key = resolve(&caller_id, &key); - let has = self.data.parameters.read().unwrap().get_keys().contains(&key); + let has = self + .data + .parameters + .read() + .unwrap() + .get_keys() + .contains(&key); Ok((1, "", has).try_to_value()?) } } @@ -1233,17 +1225,12 @@ impl Handler for GetParamNamesHandler { log::debug!("GetParamNamesHandler {:?} ", params); let a = <(String, String)>::try_from_params(params); let b = <(String,)>::try_from_params(params); - + if a.is_err() && b.is_err() { a?; } - let keys: Vec = self - .data - .parameters - .read() - .unwrap() - .get_keys(); + let keys: Vec = self.data.parameters.read().unwrap().get_keys(); Ok((1, "", keys).try_to_value()?) } } @@ -1286,7 +1273,11 @@ macro_rules! make_handlers { } fn get_node_id() -> Option<[u8; 6]> { - let ip_link = std::process::Command::new("ip").arg("link").output().ok()?.stdout; + let ip_link = std::process::Command::new("ip") + .arg("link") + .output() + .ok()? + .stdout; let ip_link = String::from_utf8_lossy(&ip_link); let mut next_is_mac = false; let mut mac = None; @@ -1298,10 +1289,10 @@ fn get_node_id() -> Option<[u8; 6]> { if element == "link/ether" { next_is_mac = true; } - }; + } let mac = mac?; let mut all_ok = true; - let mac : Vec = mac + let mac: Vec = mac .split(':') .filter_map(|hex| { let res = u8::from_str_radix(hex, 16); @@ -1310,16 +1301,21 @@ fn get_node_id() -> Option<[u8; 6]> { }) .collect(); if !all_ok { - return None + return None; } - let mac : [u8; 6] = mac.try_into().ok()?; + let mac: [u8; 6] = mac.try_into().ok()?; Some(mac) } impl Master { pub fn new(url: &std::net::SocketAddr) -> Master { - - let run_id = ParamValue::Value(Value::string(uuid::Uuid::new_v1(uuid::Timestamp::now(Context::new_random()), &get_node_id().unwrap_or_default()).to_string())); + let run_id = ParamValue::Value(Value::string( + uuid::Uuid::new_v1( + uuid::Timestamp::now(Context::new_random()), + &get_node_id().unwrap_or_default(), + ) + .to_string(), + )); Master { data: Arc::new(RosData { service_list: RwLock::new(Services::new()), diff --git a/src/param_tree.rs b/src/param_tree.rs index 1f451bd..4e529e6 100644 --- a/src/param_tree.rs +++ b/src/param_tree.rs @@ -1,9 +1,9 @@ -use std::{any::Any, collections::HashMap, iter::Peekable, mem}; +use std::{collections::HashMap, mem}; -use dxr::{TryFromParams, TryFromValue, TryToValue, Value}; +use dxr::{TryFromValue, TryToValue, Value}; #[derive(Debug, PartialEq)] -pub(crate) enum ParamValue { +pub enum ParamValue { HashMap(HashMap), Array(Vec), Value(Value), @@ -11,14 +11,14 @@ pub(crate) enum ParamValue { impl From<&Value> for ParamValue { fn from(value: &Value) -> Self { - if let (Ok(hm)) = HashMap::::try_from_value(value) { + if let Ok(hm) = HashMap::::try_from_value(value) { let mut rv = HashMap::with_capacity(hm.len()); for (k, v) in hm.into_iter() { rv.insert(k, ParamValue::from(&v)); } return Self::HashMap(rv); } - if let (Ok(vec)) = Vec::::try_from_value(value) { + if let Ok(vec) = Vec::::try_from_value(value) { let mut rv = Vec::with_capacity(vec.len()); for e in vec.into_iter() { rv.push(ParamValue::from(&e))