From ec17b999a227c4e6a9568b58794c83fe53b1334e Mon Sep 17 00:00:00 2001 From: Peter LeVasseur Date: Mon, 29 Apr 2024 17:02:29 -0400 Subject: [PATCH] PR feedback fixes: * put message_forwarding_loop onto its own thread to not steal a thread permanently from the executor * moved creation of channel communicating to TransportForwarder behind condition * remove extraneous .clone() when creating ForwardingListener * updated documentation of creating routes in README.md --- README.md | 2 +- up-streamer/src/ustreamer.rs | 29 +++++++++++++++-------------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index d35c5ea9..6d0903bd 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ which will open your browser to view the docs. ### Usage -After following along with the [cargo docs](#generating-cargo-docs-locally) generated to add all your forwarding endpoints, you'll then need to keep the instantiated `UStreamer` around and then pause the main thread, so it will not exit, while the routing happens in the background threads spun up. +After following along with the [cargo docs](#generating-cargo-docs-locally) generated to add all your forwarding rules, you'll then need to keep the instantiated `UStreamer` around and then pause the main thread, so it will not exit, while the routing happens in the background threads spun up. ## Implementation Status diff --git a/up-streamer/src/ustreamer.rs b/up-streamer/src/ustreamer.rs index ce0175cc..6ebcd381 100644 --- a/up-streamer/src/ustreamer.rs +++ b/up-streamer/src/ustreamer.rs @@ -20,6 +20,7 @@ use log::*; use std::collections::HashMap; use std::hash::{Hash, Hasher}; use std::ops::Deref; +use std::thread; use up_rust::{UAuthority, UCode, UListener, UMessage, UStatus, UTransport, UUIDBuilder, UUri}; const USTREAMER_TAG: &str = "UStreamer:"; @@ -337,15 +338,16 @@ impl UStreamer { let sender = { let mut transport_forwarders = self.transport_forwarders.lock().await; - let (tx, rx) = channel::bounded(self.message_queue_size); let (_, sender) = transport_forwarders .entry(out_comparable_transport.clone()) - .or_insert((TransportForwarder::new(out.transport.clone(), rx).await, tx)); + .or_insert_with(|| { + let (tx, rx) = channel::bounded(self.message_queue_size); + (TransportForwarder::new(out.transport.clone(), rx), tx) + }); sender.clone() }; - let forwarding_listener: Arc = Arc::new( - ForwardingListener::new(&Self::forwarding_id(&r#in, &out), sender.clone()).await, - ); + let forwarding_listener: Arc = + Arc::new(ForwardingListener::new(&Self::forwarding_id(&r#in, &out), sender).await); let insertion_result = { let mut registered_forwarding_rules = self.registered_forwarding_rules.lock().await; @@ -597,17 +599,16 @@ const TRANSPORT_FORWARDER_FN_MESSAGE_FORWARDING_LOOP_TAG: &str = "message_forwar pub(crate) struct TransportForwarder {} impl TransportForwarder { - async fn new( - out_transport: Arc, - message_receiver: Receiver>, - ) -> Self { + fn new(out_transport: Arc, message_receiver: Receiver>) -> Self { let out_transport_clone = out_transport.clone(); let message_receiver_clone = message_receiver.clone(); - task::spawn(Self::message_forwarding_loop( - UUIDBuilder::build().to_hyphenated_string(), - out_transport_clone, - message_receiver_clone, - )); + thread::spawn(|| { + task::block_on(Self::message_forwarding_loop( + UUIDBuilder::build().to_hyphenated_string(), + out_transport_clone, + message_receiver_clone, + )) + }); Self {} }