diff --git a/zenoh/src/net/primitives/demux.rs b/zenoh/src/net/primitives/demux.rs index f9694a16f7..003d50d8bd 100644 --- a/zenoh/src/net/primitives/demux.rs +++ b/zenoh/src/net/primitives/demux.rs @@ -17,20 +17,30 @@ use std::any::Any; use zenoh_link::Link; use zenoh_protocol::network::{NetworkBody, NetworkMessage}; use zenoh_result::ZResult; -use zenoh_transport::TransportPeerEventHandler; +use zenoh_transport::{TransportPeerEventHandler, TransportUnicast}; pub struct DeMux { face: Face, + pub(crate) transport: Option, pub(crate) intercept: IngressIntercept, } impl DeMux { - pub(crate) fn new(face: Face, intercept: IngressIntercept) -> Self { - Self { face, intercept } + pub(crate) fn new( + face: Face, + transport: Option, + intercept: IngressIntercept, + ) -> Self { + Self { + face, + transport, + intercept, + } } } impl TransportPeerEventHandler for DeMux { + #[inline] fn handle_message(&self, msg: NetworkMessage) -> ZResult<()> { let ctx = RoutingContext::with_face(msg, self.face.clone()); let ctx = match self.intercept.intercept(ctx) { @@ -39,8 +49,8 @@ impl TransportPeerEventHandler for DeMux { }; match ctx.msg.body { - NetworkBody::Declare(m) => self.face.send_declare(m), NetworkBody::Push(m) => self.face.send_push(m), + NetworkBody::Declare(m) => self.face.send_declare(m), NetworkBody::Request(m) => self.face.send_request(m), NetworkBody::Response(m) => self.face.send_response(m), NetworkBody::ResponseFinal(m) => self.face.send_response_final(m), @@ -65,6 +75,11 @@ impl TransportPeerEventHandler for DeMux { fn closing(&self) { self.face.send_close(); + if let Some(transport) = self.transport.as_ref() { + let ctrl_lock = zlock!(self.face.tables.ctrl_lock); + let mut tables = zwrite!(self.face.tables.tables); + let _ = ctrl_lock.closing(&mut tables, &self.face.tables, transport); + } } fn closed(&self) {} diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 4a8bfe64bf..b9562ca74a 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -156,6 +156,7 @@ impl Primitives for Face { drop(ctrl_lock); } + #[inline] fn send_push(&self, msg: Push) { full_reentrant_route_data( &self.tables.tables, diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index f8f24ad86c..b6480aa863 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -231,6 +231,7 @@ impl HatBaseTrait for HatCode { Ok(()) } + #[inline] fn map_routing_context( &self, _tables: &Tables, diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index fad6b98384..2f1e293237 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -373,6 +373,7 @@ impl HatBaseTrait for HatCode { Ok(()) } + #[inline] fn map_routing_context( &self, tables: &Tables, diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 2e761acf39..98f591059a 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -294,6 +294,7 @@ impl HatBaseTrait for HatCode { Ok(()) } + #[inline] fn map_routing_context( &self, _tables: &Tables, diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index b6adabe2a7..656a9217bf 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -604,6 +604,7 @@ impl HatBaseTrait for HatCode { Ok(()) } + #[inline] fn map_routing_context( &self, tables: &Tables, diff --git a/zenoh/src/net/routing/router.rs b/zenoh/src/net/routing/router.rs index 12c420ec03..60fcfccfa9 100644 --- a/zenoh/src/net/routing/router.rs +++ b/zenoh/src/net/routing/router.rs @@ -27,18 +27,13 @@ use crate::net::primitives::EPrimitives; use crate::net::primitives::McastMux; use crate::net::primitives::Mux; use crate::net::routing::interceptor::IngressIntercept; -use std::any::Any; use std::str::FromStr; use std::sync::Arc; use std::sync::{Mutex, RwLock}; use uhlc::HLC; use zenoh_config::Config; -use zenoh_link::Link; use zenoh_protocol::core::{WhatAmI, ZenohId}; -use zenoh_protocol::network::{NetworkBody, NetworkMessage}; -use zenoh_transport::{ - TransportMulticast, TransportPeer, TransportPeerEventHandler, TransportUnicast, -}; +use zenoh_transport::{TransportMulticast, TransportPeer, TransportUnicast}; // use zenoh_collections::Timer; use zenoh_result::ZResult; @@ -106,10 +101,7 @@ impl Router { Arc::new(face) } - pub fn new_transport_unicast( - &self, - transport: TransportUnicast, - ) -> ZResult> { + pub fn new_transport_unicast(&self, transport: TransportUnicast) -> ZResult> { let ctrl_lock = zlock!(self.tables.ctrl_lock); let mut tables = zwrite!(self.tables.tables); @@ -162,12 +154,7 @@ impl Router { ctrl_lock.new_transport_unicast_face(&mut tables, &self.tables, &mut face, &transport)?; - Ok(Arc::new(LinkStateInterceptor::new( - transport.clone(), - self.tables.clone(), - face, - ingress, - ))) + Ok(Arc::new(DeMux::new(face, Some(transport), ingress))) } pub fn new_transport_multicast(&self, transport: TransportMulticast) -> ZResult<()> { @@ -240,59 +227,8 @@ impl Router { tables: self.tables.clone(), state: face_state, }, + None, intercept, ))) } } - -pub struct LinkStateInterceptor { - pub(crate) transport: TransportUnicast, - pub(crate) tables: Arc, - pub(crate) demux: DeMux, -} - -impl LinkStateInterceptor { - fn new( - transport: TransportUnicast, - tables: Arc, - face: Face, - ingress: IngressIntercept, - ) -> Self { - LinkStateInterceptor { - transport, - tables, - demux: DeMux::new(face, ingress), - } - } -} - -impl TransportPeerEventHandler for LinkStateInterceptor { - fn handle_message(&self, msg: NetworkMessage) -> ZResult<()> { - log::trace!("Recv {:?}", msg); - match msg.body { - NetworkBody::OAM(oam) => { - let ctrl_lock = zlock!(self.tables.ctrl_lock); - let mut tables = zwrite!(self.tables.tables); - ctrl_lock.handle_oam(&mut tables, &self.tables, oam, &self.transport) - } - _ => self.demux.handle_message(msg), - } - } - - fn new_link(&self, _link: Link) {} - - fn del_link(&self, _link: Link) {} - - fn closing(&self) { - self.demux.closing(); - let ctrl_lock = zlock!(self.tables.ctrl_lock); - let mut tables = zwrite!(self.tables.tables); - let _ = ctrl_lock.closing(&mut tables, &self.tables, &self.transport); - } - - fn closed(&self) {} - - fn as_any(&self) -> &dyn Any { - self - } -} diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index a3574914ea..3573b093d3 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -22,7 +22,7 @@ pub mod orchestrator; use super::primitives::DeMux; use super::routing; -use super::routing::router::{LinkStateInterceptor, Router}; +use super::routing::router::Router; use crate::config::{unwrap_or_default, Config, ModeDependent, Notifier}; use crate::GIT_VERSION; pub use adminspace::AdminSpace; @@ -236,7 +236,7 @@ impl TransportEventHandler for RuntimeTransportEventHandler { pub(super) struct RuntimeSession { pub(super) runtime: Runtime, pub(super) endpoint: std::sync::RwLock>, - pub(super) main_handler: Arc, + pub(super) main_handler: Arc, pub(super) slave_handlers: Vec>, }