From b37b26ef7a9563420794bb5df8d754b847296407 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Sun, 26 Jan 2025 15:00:25 +0100 Subject: [PATCH] refactor: use a version number for routes computation Using a version number allows resetting all computed routes just by incrementing it. Each time a route is computed, it associated with the current version number. When the already computed route is accessed, its associated version number is compared, and no route is returned if it doesn't match. --- zenoh/src/net/routing/dispatcher/face.rs | 3 +- zenoh/src/net/routing/dispatcher/pubsub.rs | 21 ++++------- zenoh/src/net/routing/dispatcher/queries.rs | 21 ++++------- zenoh/src/net/routing/dispatcher/resource.rs | 35 ++++++++++++++++--- zenoh/src/net/routing/dispatcher/tables.rs | 6 ++++ zenoh/src/net/routing/hat/client/mod.rs | 2 ++ zenoh/src/net/routing/hat/client/pubsub.rs | 4 --- zenoh/src/net/routing/hat/client/queries.rs | 4 --- .../src/net/routing/hat/linkstate_peer/mod.rs | 1 + .../net/routing/hat/linkstate_peer/pubsub.rs | 5 +-- .../net/routing/hat/linkstate_peer/queries.rs | 5 +-- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 2 ++ zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 3 -- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 3 -- zenoh/src/net/routing/hat/router/mod.rs | 1 + zenoh/src/net/routing/hat/router/pubsub.rs | 5 +-- zenoh/src/net/routing/hat/router/queries.rs | 5 +-- zenoh/src/net/routing/router.rs | 4 +-- 18 files changed, 63 insertions(+), 67 deletions(-) diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index dcc5b3917..41d9d1427 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -390,8 +390,7 @@ impl Primitives for Face { &mut |p, m| declares.push((p.clone(), m)), ); - disable_all_data_routes(&mut wtables); - disable_all_query_routes(&mut wtables); + wtables.disable_all_routes(); drop(wtables); drop(ctrl_lock); diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index 45b484ec8..d19373528 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -156,19 +156,6 @@ pub(crate) fn undeclare_subscription( } } -pub(crate) fn disable_all_data_routes(tables: &mut Tables) { - pub(crate) fn disable_all_data_routes_rec(res: &mut Arc) { - let res = get_mut_unchecked(res); - if let Some(ctx) = &mut res.context { - ctx.disable_data_routes(); - } - for child in res.children.values_mut() { - disable_all_data_routes_rec(child); - } - } - disable_all_data_routes_rec(&mut tables.root_res) -} - pub(crate) fn disable_matches_data_routes(_tables: &mut Tables, res: &mut Arc) { if res.context.is_some() { get_mut_unchecked(res).context_mut().disable_data_routes(); @@ -235,7 +222,13 @@ fn get_data_route( .and_then(|res| res.context.as_ref()) .map(|ctx| &ctx.data_routes) { - return get_or_set_route(data_routes, face.whatami, local_context, compute_route); + return get_or_set_route( + data_routes, + tables.routes_version, + face.whatami, + local_context, + compute_route, + ); } compute_route() } diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 7bcff21d1..68dad601b 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -335,19 +335,6 @@ impl Timed for QueryCleanup { } } -pub(crate) fn disable_all_query_routes(tables: &mut Tables) { - pub(crate) fn disable_all_query_routes_rec(res: &mut Arc) { - let res = get_mut_unchecked(res); - if let Some(ctx) = &mut res.context { - ctx.disable_query_routes(); - } - for child in res.children.values_mut() { - disable_all_query_routes_rec(child); - } - } - disable_all_query_routes_rec(&mut tables.root_res) -} - pub(crate) fn disable_matches_query_routes(_tables: &mut Tables, res: &mut Arc) { if res.context.is_some() { get_mut_unchecked(res).context_mut().disable_query_routes(); @@ -378,7 +365,13 @@ fn get_query_route( .and_then(|res| res.context.as_ref()) .map(|ctx| &ctx.query_routes) { - return get_or_set_route(query_routes, face.whatami, local_context, compute_route); + return get_or_set_route( + query_routes, + tables.routes_version, + face.whatami, + local_context, + compute_route, + ); } compute_route() } diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index 29534392d..010495f84 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -80,10 +80,15 @@ impl SessionContext { } } +/// Global version number for route computation. +/// Use 64bit to not care about rollover. +pub type RoutesVersion = u64; + pub(crate) struct Routes { routers: Vec>, peers: Vec>, clients: Vec>, + version: u64, } impl Default for Routes { @@ -92,6 +97,7 @@ impl Default for Routes { routers: Vec::new(), peers: Vec::new(), clients: Vec::new(), + version: 0, } } } @@ -104,7 +110,15 @@ impl Routes { } #[inline] - pub(crate) fn get_route(&self, whatami: WhatAmI, context: NodeId) -> Option<&T> { + pub(crate) fn get_route( + &self, + version: RoutesVersion, + whatami: WhatAmI, + context: NodeId, + ) -> Option<&T> { + if version != self.version { + return None; + } let routes = match whatami { WhatAmI::Router => &self.routers, WhatAmI::Peer => &self.peers, @@ -114,7 +128,17 @@ impl Routes { } #[inline] - pub(crate) fn set_route(&mut self, whatami: WhatAmI, context: NodeId, route: T) { + pub(crate) fn set_route( + &mut self, + version: RoutesVersion, + whatami: WhatAmI, + context: NodeId, + route: T, + ) { + if self.version != version { + self.clear(); + self.version = version; + } let routes = match whatami { WhatAmI::Router => &mut self.routers, WhatAmI::Peer => &mut self.peers, @@ -127,19 +151,20 @@ impl Routes { pub(crate) fn get_or_set_route( routes: &RwLock>, + version: RoutesVersion, whatami: WhatAmI, context: NodeId, compute_route: impl FnOnce() -> T, ) -> T { - if let Some(route) = routes.read().unwrap().get_route(whatami, context) { + if let Some(route) = routes.read().unwrap().get_route(version, whatami, context) { return route.clone(); } let mut routes = routes.write().unwrap(); - if let Some(route) = routes.get_route(whatami, context) { + if let Some(route) = routes.get_route(version, whatami, context) { return route.clone(); } let route = compute_route(); - routes.set_route(whatami, context, route.clone()); + routes.set_route(version, whatami, context, route.clone()); route } diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index f4d916ae5..d0f79985e 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -78,6 +78,7 @@ pub struct Tables { pub(crate) interceptors: Vec, pub(crate) hat: Box, pub(crate) hat_code: Arc, // @TODO make this a Box + pub(crate) routes_version: RoutesVersion, } impl Tables { @@ -112,6 +113,7 @@ impl Tables { interceptors: interceptor_factories(config)?, hat: hat_code.new_tables(router_peers_failover_brokering), hat_code: hat_code.into(), + routes_version: 0, }) } @@ -155,6 +157,10 @@ impl Tables { pub(crate) fn get_face(&self, zid: &ZenohIdProto) -> Option<&Arc> { self.faces.values().find(|face| face.zid == *zid) } + + pub(crate) fn disable_all_routes(&mut self) { + self.routes_version = self.routes_version.saturating_add(1); + } } pub struct TablesLock { diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index c407f1b2d..cf25f6a68 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -108,6 +108,7 @@ impl HatBaseTrait for HatCode { pubsub_new_face(tables, &mut face.state, send_declare); queries_new_face(tables, &mut face.state, send_declare); token_new_face(tables, &mut face.state, send_declare); + tables.disable_all_routes(); Ok(()) } @@ -123,6 +124,7 @@ impl HatBaseTrait for HatCode { pubsub_new_face(tables, &mut face.state, send_declare); queries_new_face(tables, &mut face.state, send_declare); token_new_face(tables, &mut face.state, send_declare); + tables.disable_all_routes(); Ok(()) } diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 562ac83d0..8e1a6a43a 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -37,7 +37,6 @@ use crate::{ tables::{Route, RoutingExpr, Tables}, }, hat::{HatPubSubTrait, SendDeclare, Sources}, - router::disable_all_data_routes, RoutingContext, }, }; @@ -259,9 +258,6 @@ pub(super) fn pubsub_new_face( ); } } - - // disable routes - disable_all_data_routes(tables); } impl HatPubSubTrait for HatCode { diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 6d905c286..855207cd5 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -42,7 +42,6 @@ use crate::{ tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, }, hat::{HatQueriesTrait, SendDeclare, Sources}, - router::disable_all_query_routes, RoutingContext, }, }; @@ -275,9 +274,6 @@ pub(super) fn queries_new_face( propagate_simple_queryable(tables, qabl, Some(&mut face.clone()), send_declare); } } - - // disable routes - disable_all_query_routes(tables); } lazy_static::lazy_static! { diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index e9dbf9363..4909ed3fd 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -142,6 +142,7 @@ impl TreesComputationWorker { pubsub::pubsub_tree_change(&mut tables, &new_children); queries::queries_tree_change(&mut tables, &new_children); token::token_tree_change(&mut tables, &new_children); + tables.disable_all_routes(); drop(tables); } } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index e647bd32d..ce73ac340 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -45,7 +45,7 @@ use crate::net::routing::{ tables::{Route, RoutingExpr, Tables}, }, hat::{CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources}, - router::{disable_all_data_routes, disable_matches_data_routes}, + router::disable_matches_data_routes, RoutingContext, }; @@ -671,9 +671,6 @@ pub(super) fn pubsub_tree_change(tables: &mut Tables, new_children: &[Vec