Skip to content

Commit

Permalink
PR feedback fixes:
Browse files Browse the repository at this point in the history
* put message_forwarding_loop onto its own thread to not steal a thread
permanently from the executor
* moved creation of channel communicating to TransportForwarder behind
condition
* remove extraneous .clone() when creating ForwardingListener
* updated documentation of creating routes in README.md
  • Loading branch information
PLeVasseur committed Apr 29, 2024
1 parent a53b62d commit ec17b99
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 15 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ which will open your browser to view the docs.

### Usage

After following along with the [cargo docs](#generating-cargo-docs-locally) generated to add all your forwarding endpoints, you'll then need to keep the instantiated `UStreamer` around and then pause the main thread, so it will not exit, while the routing happens in the background threads spun up.
After following along with the [cargo docs](#generating-cargo-docs-locally) generated to add all your forwarding rules, you'll then need to keep the instantiated `UStreamer` around and then pause the main thread, so it will not exit, while the routing happens in the background threads spun up.

## Implementation Status

Expand Down
29 changes: 15 additions & 14 deletions up-streamer/src/ustreamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use log::*;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::ops::Deref;
use std::thread;
use up_rust::{UAuthority, UCode, UListener, UMessage, UStatus, UTransport, UUIDBuilder, UUri};

const USTREAMER_TAG: &str = "UStreamer:";
Expand Down Expand Up @@ -337,15 +338,16 @@ impl UStreamer {
let sender = {
let mut transport_forwarders = self.transport_forwarders.lock().await;

let (tx, rx) = channel::bounded(self.message_queue_size);
let (_, sender) = transport_forwarders
.entry(out_comparable_transport.clone())
.or_insert((TransportForwarder::new(out.transport.clone(), rx).await, tx));
.or_insert_with(|| {
let (tx, rx) = channel::bounded(self.message_queue_size);
(TransportForwarder::new(out.transport.clone(), rx), tx)
});
sender.clone()
};
let forwarding_listener: Arc<dyn UListener> = Arc::new(
ForwardingListener::new(&Self::forwarding_id(&r#in, &out), sender.clone()).await,
);
let forwarding_listener: Arc<dyn UListener> =
Arc::new(ForwardingListener::new(&Self::forwarding_id(&r#in, &out), sender).await);

let insertion_result = {
let mut registered_forwarding_rules = self.registered_forwarding_rules.lock().await;
Expand Down Expand Up @@ -597,17 +599,16 @@ const TRANSPORT_FORWARDER_FN_MESSAGE_FORWARDING_LOOP_TAG: &str = "message_forwar
pub(crate) struct TransportForwarder {}

impl TransportForwarder {
async fn new(
out_transport: Arc<dyn UTransport>,
message_receiver: Receiver<Arc<UMessage>>,
) -> Self {
fn new(out_transport: Arc<dyn UTransport>, message_receiver: Receiver<Arc<UMessage>>) -> Self {
let out_transport_clone = out_transport.clone();
let message_receiver_clone = message_receiver.clone();
task::spawn(Self::message_forwarding_loop(
UUIDBuilder::build().to_hyphenated_string(),
out_transport_clone,
message_receiver_clone,
));
thread::spawn(|| {
task::block_on(Self::message_forwarding_loop(
UUIDBuilder::build().to_hyphenated_string(),
out_transport_clone,
message_receiver_clone,
))
});

Self {}
}
Expand Down

0 comments on commit ec17b99

Please sign in to comment.