From 0b977192da0a2cb83b2dd1a963a0331f8410f681 Mon Sep 17 00:00:00 2001 From: Muhamad Azmy Date: Fri, 3 Jun 2022 12:33:26 +0200 Subject: [PATCH] Add support for tagging messages (#49) --- src/http_api/entry.rs | 3 + src/http_workers/work_runner.rs | 97 ++++++++++++++++++--------------- src/storage/redis.rs | 1 + src/types/mod.rs | 3 + 4 files changed, 59 insertions(+), 45 deletions(-) diff --git a/src/http_api/entry.rs b/src/http_api/entry.rs index e70d099..72b9889 100644 --- a/src/http_api/entry.rs +++ b/src/http_api/entry.rs @@ -187,7 +187,10 @@ async fn rmb_reply_handler( } }; + // reset the values back to original + // as per the source. message.reply = source.reply; + message.tag = source.tag; data.storage .reply(&message) diff --git a/src/http_workers/work_runner.rs b/src/http_workers/work_runner.rs index a21fb73..c865d43 100644 --- a/src/http_workers/work_runner.rs +++ b/src/http_workers/work_runner.rs @@ -175,62 +175,69 @@ where async fn run(&self, job: Self::Input) { //identify uri and extract msg log::debug!("http worker received a job"); - let (queue, msg) = match job { + let (queue, mut msg) = match job { TransitMessage::Request(msg) => (Queue::Request, msg), TransitMessage::Reply(msg) => (Queue::Reply, msg), }; log::debug!("received a message for forwarding '{}'", queue.as_ref()); let retry = Self::retries(msg.retry); - for id in &msg.destination { - log::debug!( - "forwarding message to destination '{}' '{}'", - id, - queue.as_ref() - ); - let mut msg = msg.clone(); - //let uri_path = uri_path.clone(); - // getting twin object - let twin = match self.get_twin(*id, retry).await { - Some(twin) => twin, - None if queue == Queue::Request => { - self.handle_delivery_err( - *id, - msg, - SendError::Terminal(format!("twin with id {} not found", *id)), - ) - .await; - - continue; - } - None => continue, - }; - - // encrypt dat - msg.data = match Self::encrypt_dat(msg.data, &twin) { - Ok(dat) => dat, - Err(_err) => { - todo!() - } - }; + assert_eq!( + msg.destination.len(), + 1, + "expecting only one destination in worker" + ); + let id = msg.destination[0]; - // signing the message - msg.sign(&self.identity); + log::debug!( + "forwarding message to destination '{}' '{}'", + id, + queue.as_ref() + ); + // getting twin object + let twin = match self.get_twin(id, retry).await { + Some(twin) => twin, + None if queue == Queue::Request => { + self.handle_delivery_err( + id, + msg, + SendError::Terminal(format!("twin with id {} not found", id)), + ) + .await; + + return; + } + None => return, + }; - // posting the message to the remote twin - let mut result = Ok(()); - for _ in 0..retry { - result = self.send_msg(&twin, &queue, &msg).await; - if let Err(SendError::Error(_)) = &result { - continue; - } - break; + // encrypt dat + msg.data = match Self::encrypt_dat(msg.data, &twin) { + Ok(dat) => dat, + Err(_err) => { + todo!() } + }; - if result.is_err() && queue == Queue::Request { - self.handle_delivery_err(twin.id, msg, result.err().unwrap()) - .await; + // we always reset the tag to none before + // sending to remote + msg.tag = None; + + // signing the message + msg.sign(&self.identity); + + // posting the message to the remote twin + let mut result = Ok(()); + for _ in 0..retry { + result = self.send_msg(&twin, &queue, &msg).await; + if let Err(SendError::Error(_)) = &result { + continue; } + break; + } + + if result.is_err() && queue == Queue::Request { + self.handle_delivery_err(twin.id, msg, result.err().unwrap()) + .await; } } } diff --git a/src/storage/redis.rs b/src/storage/redis.rs index d4acb25..6c47812 100644 --- a/src/storage/redis.rs +++ b/src/storage/redis.rs @@ -352,6 +352,7 @@ mod tests { command: String::from("test.get"), expiration: 300, data: String::from(""), + tag: None, source: 1, destination: vec![4], reply: String::from("de31075e-9af4-4933-b107-c36887d0c0f0"), diff --git a/src/types/mod.rs b/src/types/mod.rs index 0601b0b..30b09fc 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -26,6 +26,8 @@ pub struct Message { pub retry: usize, #[serde(rename = "dat")] pub data: String, + #[serde(rename = "tag")] + pub tag: Option, #[serde(rename = "src")] pub source: u32, #[serde(rename = "dst")] @@ -51,6 +53,7 @@ impl Default for Message { expiration: Default::default(), retry: Default::default(), data: Default::default(), + tag: None, source: Default::default(), destination: Default::default(), reply: Default::default(),