Skip to content

Commit

Permalink
Perf improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Jan 5, 2024
1 parent 43fcb78 commit 7cd513a
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 74 deletions.
23 changes: 19 additions & 4 deletions zenoh/src/net/primitives/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,30 @@ use std::any::Any;
use zenoh_link::Link;
use zenoh_protocol::network::{NetworkBody, NetworkMessage};
use zenoh_result::ZResult;
use zenoh_transport::TransportPeerEventHandler;
use zenoh_transport::{TransportPeerEventHandler, TransportUnicast};

pub struct DeMux {
face: Face,
pub(crate) transport: Option<TransportUnicast>,
pub(crate) intercept: IngressIntercept,
}

impl DeMux {
pub(crate) fn new(face: Face, intercept: IngressIntercept) -> Self {
Self { face, intercept }
pub(crate) fn new(
face: Face,
transport: Option<TransportUnicast>,
intercept: IngressIntercept,
) -> Self {
Self {
face,
transport,
intercept,
}
}
}

impl TransportPeerEventHandler for DeMux {
#[inline]
fn handle_message(&self, msg: NetworkMessage) -> ZResult<()> {
let ctx = RoutingContext::with_face(msg, self.face.clone());
let ctx = match self.intercept.intercept(ctx) {
Expand All @@ -39,8 +49,8 @@ impl TransportPeerEventHandler for DeMux {
};

match ctx.msg.body {
NetworkBody::Declare(m) => self.face.send_declare(m),
NetworkBody::Push(m) => self.face.send_push(m),
NetworkBody::Declare(m) => self.face.send_declare(m),
NetworkBody::Request(m) => self.face.send_request(m),
NetworkBody::Response(m) => self.face.send_response(m),
NetworkBody::ResponseFinal(m) => self.face.send_response_final(m),
Expand All @@ -65,6 +75,11 @@ impl TransportPeerEventHandler for DeMux {

fn closing(&self) {
self.face.send_close();
if let Some(transport) = self.transport.as_ref() {
let ctrl_lock = zlock!(self.face.tables.ctrl_lock);
let mut tables = zwrite!(self.face.tables.tables);
let _ = ctrl_lock.closing(&mut tables, &self.face.tables, transport);
}
}

fn closed(&self) {}
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ impl Primitives for Face {
drop(ctrl_lock);
}

#[inline]
fn send_push(&self, msg: Push) {
full_reentrant_route_data(
&self.tables.tables,
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/hat/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ impl HatBaseTrait for HatCode {
Ok(())
}

#[inline]
fn map_routing_context(
&self,
_tables: &Tables,
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ impl HatBaseTrait for HatCode {
Ok(())
}

#[inline]
fn map_routing_context(
&self,
tables: &Tables,
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/hat/p2p_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ impl HatBaseTrait for HatCode {
Ok(())
}

#[inline]
fn map_routing_context(
&self,
_tables: &Tables,
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/hat/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ impl HatBaseTrait for HatCode {
Ok(())
}

#[inline]
fn map_routing_context(
&self,
tables: &Tables,
Expand Down
72 changes: 4 additions & 68 deletions zenoh/src/net/routing/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,13 @@ use crate::net::primitives::EPrimitives;
use crate::net::primitives::McastMux;
use crate::net::primitives::Mux;
use crate::net::routing::interceptor::IngressIntercept;
use std::any::Any;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{Mutex, RwLock};
use uhlc::HLC;
use zenoh_config::Config;
use zenoh_link::Link;
use zenoh_protocol::core::{WhatAmI, ZenohId};
use zenoh_protocol::network::{NetworkBody, NetworkMessage};
use zenoh_transport::{
TransportMulticast, TransportPeer, TransportPeerEventHandler, TransportUnicast,
};
use zenoh_transport::{TransportMulticast, TransportPeer, TransportUnicast};
// use zenoh_collections::Timer;
use zenoh_result::ZResult;

Expand Down Expand Up @@ -106,10 +101,7 @@ impl Router {
Arc::new(face)
}

pub fn new_transport_unicast(
&self,
transport: TransportUnicast,
) -> ZResult<Arc<LinkStateInterceptor>> {
pub fn new_transport_unicast(&self, transport: TransportUnicast) -> ZResult<Arc<DeMux>> {
let ctrl_lock = zlock!(self.tables.ctrl_lock);
let mut tables = zwrite!(self.tables.tables);

Expand Down Expand Up @@ -162,12 +154,7 @@ impl Router {

ctrl_lock.new_transport_unicast_face(&mut tables, &self.tables, &mut face, &transport)?;

Ok(Arc::new(LinkStateInterceptor::new(
transport.clone(),
self.tables.clone(),
face,
ingress,
)))
Ok(Arc::new(DeMux::new(face, Some(transport), ingress)))
}

pub fn new_transport_multicast(&self, transport: TransportMulticast) -> ZResult<()> {
Expand Down Expand Up @@ -240,59 +227,8 @@ impl Router {
tables: self.tables.clone(),
state: face_state,
},
None,
intercept,
)))
}
}

pub struct LinkStateInterceptor {
pub(crate) transport: TransportUnicast,
pub(crate) tables: Arc<TablesLock>,
pub(crate) demux: DeMux,
}

impl LinkStateInterceptor {
fn new(
transport: TransportUnicast,
tables: Arc<TablesLock>,
face: Face,
ingress: IngressIntercept,
) -> Self {
LinkStateInterceptor {
transport,
tables,
demux: DeMux::new(face, ingress),
}
}
}

impl TransportPeerEventHandler for LinkStateInterceptor {
fn handle_message(&self, msg: NetworkMessage) -> ZResult<()> {
log::trace!("Recv {:?}", msg);
match msg.body {
NetworkBody::OAM(oam) => {
let ctrl_lock = zlock!(self.tables.ctrl_lock);
let mut tables = zwrite!(self.tables.tables);
ctrl_lock.handle_oam(&mut tables, &self.tables, oam, &self.transport)
}
_ => self.demux.handle_message(msg),
}
}

fn new_link(&self, _link: Link) {}

fn del_link(&self, _link: Link) {}

fn closing(&self) {
self.demux.closing();
let ctrl_lock = zlock!(self.tables.ctrl_lock);
let mut tables = zwrite!(self.tables.tables);
let _ = ctrl_lock.closing(&mut tables, &self.tables, &self.transport);
}

fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
self
}
}
4 changes: 2 additions & 2 deletions zenoh/src/net/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub mod orchestrator;

use super::primitives::DeMux;
use super::routing;
use super::routing::router::{LinkStateInterceptor, Router};
use super::routing::router::Router;
use crate::config::{unwrap_or_default, Config, ModeDependent, Notifier};
use crate::GIT_VERSION;
pub use adminspace::AdminSpace;
Expand Down Expand Up @@ -236,7 +236,7 @@ impl TransportEventHandler for RuntimeTransportEventHandler {
pub(super) struct RuntimeSession {
pub(super) runtime: Runtime,
pub(super) endpoint: std::sync::RwLock<Option<EndPoint>>,
pub(super) main_handler: Arc<LinkStateInterceptor>,
pub(super) main_handler: Arc<DeMux>,
pub(super) slave_handlers: Vec<Arc<dyn TransportPeerEventHandler>>,
}

Expand Down

0 comments on commit 7cd513a

Please sign in to comment.