Skip to content

Commit

Permalink
refactor: use a version number for routes computation
Browse files Browse the repository at this point in the history
Using a version number allows resetting all computed routes just by
incrementing it.
Each time a route is computed, it associated with the current version
number. When the already computed route is accessed, its associated
version number is compared, and no route is returned if it doesn't match.
  • Loading branch information
wyfo committed Jan 26, 2025
1 parent eb3a7a4 commit b37b26e
Show file tree
Hide file tree
Showing 18 changed files with 63 additions and 67 deletions.
3 changes: 1 addition & 2 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,7 @@ impl Primitives for Face {
&mut |p, m| declares.push((p.clone(), m)),
);

disable_all_data_routes(&mut wtables);
disable_all_query_routes(&mut wtables);
wtables.disable_all_routes();

drop(wtables);
drop(ctrl_lock);
Expand Down
21 changes: 7 additions & 14 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,6 @@ pub(crate) fn undeclare_subscription(
}
}

pub(crate) fn disable_all_data_routes(tables: &mut Tables) {
pub(crate) fn disable_all_data_routes_rec(res: &mut Arc<Resource>) {
let res = get_mut_unchecked(res);
if let Some(ctx) = &mut res.context {
ctx.disable_data_routes();
}
for child in res.children.values_mut() {
disable_all_data_routes_rec(child);
}
}
disable_all_data_routes_rec(&mut tables.root_res)
}

pub(crate) fn disable_matches_data_routes(_tables: &mut Tables, res: &mut Arc<Resource>) {
if res.context.is_some() {
get_mut_unchecked(res).context_mut().disable_data_routes();
Expand Down Expand Up @@ -235,7 +222,13 @@ fn get_data_route(
.and_then(|res| res.context.as_ref())
.map(|ctx| &ctx.data_routes)
{
return get_or_set_route(data_routes, face.whatami, local_context, compute_route);
return get_or_set_route(
data_routes,
tables.routes_version,
face.whatami,
local_context,
compute_route,
);
}
compute_route()
}
Expand Down
21 changes: 7 additions & 14 deletions zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,19 +335,6 @@ impl Timed for QueryCleanup {
}
}

pub(crate) fn disable_all_query_routes(tables: &mut Tables) {
pub(crate) fn disable_all_query_routes_rec(res: &mut Arc<Resource>) {
let res = get_mut_unchecked(res);
if let Some(ctx) = &mut res.context {
ctx.disable_query_routes();
}
for child in res.children.values_mut() {
disable_all_query_routes_rec(child);
}
}
disable_all_query_routes_rec(&mut tables.root_res)
}

pub(crate) fn disable_matches_query_routes(_tables: &mut Tables, res: &mut Arc<Resource>) {
if res.context.is_some() {
get_mut_unchecked(res).context_mut().disable_query_routes();
Expand Down Expand Up @@ -378,7 +365,13 @@ fn get_query_route(
.and_then(|res| res.context.as_ref())
.map(|ctx| &ctx.query_routes)
{
return get_or_set_route(query_routes, face.whatami, local_context, compute_route);
return get_or_set_route(
query_routes,
tables.routes_version,
face.whatami,
local_context,
compute_route,
);
}
compute_route()
}
Expand Down
35 changes: 30 additions & 5 deletions zenoh/src/net/routing/dispatcher/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,15 @@ impl SessionContext {
}
}

/// Global version number for route computation.
/// Use 64bit to not care about rollover.
pub type RoutesVersion = u64;

pub(crate) struct Routes<T> {
routers: Vec<Option<T>>,
peers: Vec<Option<T>>,
clients: Vec<Option<T>>,
version: u64,
}

impl<T> Default for Routes<T> {
Expand All @@ -92,6 +97,7 @@ impl<T> Default for Routes<T> {
routers: Vec::new(),
peers: Vec::new(),
clients: Vec::new(),
version: 0,
}
}
}
Expand All @@ -104,7 +110,15 @@ impl<T> Routes<T> {
}

#[inline]
pub(crate) fn get_route(&self, whatami: WhatAmI, context: NodeId) -> Option<&T> {
pub(crate) fn get_route(
&self,
version: RoutesVersion,
whatami: WhatAmI,
context: NodeId,
) -> Option<&T> {
if version != self.version {
return None;
}
let routes = match whatami {
WhatAmI::Router => &self.routers,
WhatAmI::Peer => &self.peers,
Expand All @@ -114,7 +128,17 @@ impl<T> Routes<T> {
}

#[inline]
pub(crate) fn set_route(&mut self, whatami: WhatAmI, context: NodeId, route: T) {
pub(crate) fn set_route(
&mut self,
version: RoutesVersion,
whatami: WhatAmI,
context: NodeId,
route: T,
) {
if self.version != version {
self.clear();
self.version = version;
}
let routes = match whatami {
WhatAmI::Router => &mut self.routers,
WhatAmI::Peer => &mut self.peers,
Expand All @@ -127,19 +151,20 @@ impl<T> Routes<T> {

pub(crate) fn get_or_set_route<T: Clone>(
routes: &RwLock<Routes<T>>,
version: RoutesVersion,
whatami: WhatAmI,
context: NodeId,
compute_route: impl FnOnce() -> T,
) -> T {
if let Some(route) = routes.read().unwrap().get_route(whatami, context) {
if let Some(route) = routes.read().unwrap().get_route(version, whatami, context) {
return route.clone();
}
let mut routes = routes.write().unwrap();
if let Some(route) = routes.get_route(whatami, context) {
if let Some(route) = routes.get_route(version, whatami, context) {
return route.clone();
}
let route = compute_route();
routes.set_route(whatami, context, route.clone());
routes.set_route(version, whatami, context, route.clone());
route
}

Expand Down
6 changes: 6 additions & 0 deletions zenoh/src/net/routing/dispatcher/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub struct Tables {
pub(crate) interceptors: Vec<InterceptorFactory>,
pub(crate) hat: Box<dyn Any + Send + Sync>,
pub(crate) hat_code: Arc<dyn HatTrait + Send + Sync>, // @TODO make this a Box
pub(crate) routes_version: RoutesVersion,
}

impl Tables {
Expand Down Expand Up @@ -112,6 +113,7 @@ impl Tables {
interceptors: interceptor_factories(config)?,
hat: hat_code.new_tables(router_peers_failover_brokering),
hat_code: hat_code.into(),
routes_version: 0,
})
}

Expand Down Expand Up @@ -155,6 +157,10 @@ impl Tables {
pub(crate) fn get_face(&self, zid: &ZenohIdProto) -> Option<&Arc<FaceState>> {
self.faces.values().find(|face| face.zid == *zid)
}

pub(crate) fn disable_all_routes(&mut self) {
self.routes_version = self.routes_version.saturating_add(1);
}
}

pub struct TablesLock {
Expand Down
2 changes: 2 additions & 0 deletions zenoh/src/net/routing/hat/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl HatBaseTrait for HatCode {
pubsub_new_face(tables, &mut face.state, send_declare);
queries_new_face(tables, &mut face.state, send_declare);
token_new_face(tables, &mut face.state, send_declare);
tables.disable_all_routes();
Ok(())
}

Expand All @@ -123,6 +124,7 @@ impl HatBaseTrait for HatCode {
pubsub_new_face(tables, &mut face.state, send_declare);
queries_new_face(tables, &mut face.state, send_declare);
token_new_face(tables, &mut face.state, send_declare);
tables.disable_all_routes();
Ok(())
}

Expand Down
4 changes: 0 additions & 4 deletions zenoh/src/net/routing/hat/client/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::{
tables::{Route, RoutingExpr, Tables},
},
hat::{HatPubSubTrait, SendDeclare, Sources},
router::disable_all_data_routes,
RoutingContext,
},
};
Expand Down Expand Up @@ -259,9 +258,6 @@ pub(super) fn pubsub_new_face(
);
}
}

// disable routes
disable_all_data_routes(tables);
}

impl HatPubSubTrait for HatCode {
Expand Down
4 changes: 0 additions & 4 deletions zenoh/src/net/routing/hat/client/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use crate::{
tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables},
},
hat::{HatQueriesTrait, SendDeclare, Sources},
router::disable_all_query_routes,
RoutingContext,
},
};
Expand Down Expand Up @@ -275,9 +274,6 @@ pub(super) fn queries_new_face(
propagate_simple_queryable(tables, qabl, Some(&mut face.clone()), send_declare);
}
}

// disable routes
disable_all_query_routes(tables);
}

lazy_static::lazy_static! {
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl TreesComputationWorker {
pubsub::pubsub_tree_change(&mut tables, &new_children);
queries::queries_tree_change(&mut tables, &new_children);
token::token_tree_change(&mut tables, &new_children);
tables.disable_all_routes();
drop(tables);
}
}
Expand Down
5 changes: 1 addition & 4 deletions zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::net::routing::{
tables::{Route, RoutingExpr, Tables},
},
hat::{CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources},
router::{disable_all_data_routes, disable_matches_data_routes},
router::disable_matches_data_routes,
RoutingContext,
};

Expand Down Expand Up @@ -671,9 +671,6 @@ pub(super) fn pubsub_tree_change(tables: &mut Tables, new_children: &[Vec<NodeIn
}
}
}

// disable routes
disable_all_data_routes(tables);
}

#[inline]
Expand Down
5 changes: 1 addition & 4 deletions zenoh/src/net/routing/hat/linkstate_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::net::routing::{
tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables},
},
hat::{CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources},
router::{disable_all_query_routes, disable_matches_query_routes},
router::disable_matches_query_routes,
RoutingContext,
};

Expand Down Expand Up @@ -690,9 +690,6 @@ pub(super) fn queries_tree_change(tables: &mut Tables, new_children: &[Vec<NodeI
}
}
}

// disable routes
disable_all_query_routes(tables);
}

#[inline]
Expand Down
2 changes: 2 additions & 0 deletions zenoh/src/net/routing/hat/p2p_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ impl HatBaseTrait for HatCode {
pubsub_new_face(tables, &mut face.state, send_declare);
queries_new_face(tables, &mut face.state, send_declare);
token_new_face(tables, &mut face.state, send_declare);
tables.disable_all_routes();
Ok(())
}

Expand Down Expand Up @@ -198,6 +199,7 @@ impl HatBaseTrait for HatCode {
pubsub_new_face(tables, &mut face.state, send_declare);
queries_new_face(tables, &mut face.state, send_declare);
token_new_face(tables, &mut face.state, send_declare);
tables.disable_all_routes();

if face.state.whatami == WhatAmI::Peer {
send_declare(
Expand Down
3 changes: 0 additions & 3 deletions zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use crate::{
hat::{
p2p_peer::initial_interest, CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources,
},
router::disable_all_data_routes,
RoutingContext,
},
};
Expand Down Expand Up @@ -403,8 +402,6 @@ pub(super) fn pubsub_new_face(
}
}
}
// disable routes
disable_all_data_routes(tables);
}

#[inline]
Expand Down
3 changes: 0 additions & 3 deletions zenoh/src/net/routing/hat/p2p_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use crate::{
hat::{
p2p_peer::initial_interest, CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources,
},
router::disable_all_query_routes,
RoutingContext,
},
};
Expand Down Expand Up @@ -375,8 +374,6 @@ pub(super) fn queries_new_face(
}
}
}
// disable routes
disable_all_query_routes(tables);
}

lazy_static::lazy_static! {
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/hat/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl TreesComputationWorker {
pubsub::pubsub_tree_change(&mut tables, &new_children, net_type);
queries::queries_tree_change(&mut tables, &new_children, net_type);
token::token_tree_change(&mut tables, &new_children, net_type);
tables.disable_all_routes();
drop(tables);
}
}
Expand Down
5 changes: 1 addition & 4 deletions zenoh/src/net/routing/hat/router/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::net::routing::{
tables::{Route, RoutingExpr, Tables},
},
hat::{CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources},
router::{disable_all_data_routes, disable_matches_data_routes},
router::disable_matches_data_routes,
RoutingContext,
};

Expand Down Expand Up @@ -836,9 +836,6 @@ pub(super) fn pubsub_tree_change(
}
}
}

// disable routes
disable_all_data_routes(tables);
}

pub(super) fn pubsub_linkstate_change(
Expand Down
5 changes: 1 addition & 4 deletions zenoh/src/net/routing/hat/router/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::net::routing::{
tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables},
},
hat::{CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources},
router::{disable_all_query_routes, disable_matches_query_routes},
router::disable_matches_query_routes,
RoutingContext,
};

Expand Down Expand Up @@ -1074,9 +1074,6 @@ pub(super) fn queries_tree_change(
}
}
}

// disable routes
disable_all_query_routes(tables);
}

#[inline]
Expand Down
Loading

0 comments on commit b37b26e

Please sign in to comment.