From cfb86a81bef94c6867d03d710424f3ec8256abd2 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Mon, 13 May 2024 18:21:48 +0200 Subject: [PATCH] Enhance subscribers, queryables and liveliness tokens propagation to improve scalability (#814) * Router implements interests protocol for clients * Send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients for pico * Fix WireExprExt M flag encoding/decoding * Fix decl_key * Clients send all samples and queries to routers and peers * Avoid self declaration loop on interest * Fix query/replies copy/paste bugs * Peers implement interests protocol for clients * Don't send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients * Add client writer-side filtering (#863) * Add client writer-side filtering * Reimplement liveliness with interests * Fix writer-side filtering before receiving FinalInterest * Fix pubsub interest based routing after router failover * Declare message can be Push/Request/RequestContinuous/Response * Address review comments * Remove F: Future flag from DeclareInterest * cargo fmt --all * Remove unused Interest flags field * Update doc * Remove unneeded interest_id field * Update commons/zenoh-protocol/src/network/declare.rs * Remove unused UndeclareInterest * Implement proper Declare Request/Response id correlation * Add new Interest network message * Update doc * Update codec * Fix stable build * Fix test_acl * Fix writer side filtering * Add separate functions to compute matching status * Fix unstable imports * Remove useless checks --------- Co-authored-by: Luca Cominardi --- commons/zenoh-codec/src/network/declare.rs | 6 +- commons/zenoh-codec/src/network/interest.rs | 4 +- commons/zenoh-protocol/src/network/declare.rs | 13 + .../zenoh-protocol/src/network/interest.rs | 2 +- zenoh/src/api/builders/publication.rs | 29 +- zenoh/src/api/publication.rs | 37 +- zenoh/src/api/session.rs | 276 ++++++---- zenoh/src/api/subscriber.rs | 4 +- zenoh/src/net/primitives/demux.rs | 2 +- zenoh/src/net/primitives/mod.rs | 4 + zenoh/src/net/primitives/mux.rs | 50 ++ zenoh/src/net/routing/dispatcher/face.rs | 102 +++- zenoh/src/net/routing/dispatcher/pubsub.rs | 98 +++- zenoh/src/net/routing/dispatcher/queries.rs | 82 +++ zenoh/src/net/routing/dispatcher/resource.rs | 158 ++++-- zenoh/src/net/routing/hat/client/mod.rs | 3 + zenoh/src/net/routing/hat/client/pubsub.rs | 285 ++++++++-- zenoh/src/net/routing/hat/client/queries.rs | 57 +- .../src/net/routing/hat/linkstate_peer/mod.rs | 5 + .../net/routing/hat/linkstate_peer/pubsub.rs | 473 +++++++++++++---- .../net/routing/hat/linkstate_peer/queries.rs | 264 ++++++++-- zenoh/src/net/routing/hat/mod.rs | 60 ++- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 5 + zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 390 +++++++++++--- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 340 +++++++++--- zenoh/src/net/routing/hat/router/mod.rs | 5 + zenoh/src/net/routing/hat/router/pubsub.rs | 488 ++++++++++++++---- zenoh/src/net/routing/hat/router/queries.rs | 271 ++++++++-- zenoh/src/net/runtime/adminspace.rs | 5 + zenoh/src/net/tests/tables.rs | 2 + 30 files changed, 2856 insertions(+), 664 deletions(-) diff --git a/commons/zenoh-codec/src/network/declare.rs b/commons/zenoh-codec/src/network/declare.rs index faffb04952..7c3b797d5d 100644 --- a/commons/zenoh-codec/src/network/declare.rs +++ b/commons/zenoh-codec/src/network/declare.rs @@ -958,7 +958,7 @@ where if x.wire_expr.has_suffix() { flags |= 1; } - if let Mapping::Receiver = wire_expr.mapping { + if let Mapping::Sender = wire_expr.mapping { flags |= 1 << 1; } codec.write(&mut zriter, flags)?; @@ -998,9 +998,9 @@ where String::new() }; let mapping = if imsg::has_flag(flags, 1 << 1) { - Mapping::Receiver - } else { Mapping::Sender + } else { + Mapping::Receiver }; Ok(( diff --git a/commons/zenoh-codec/src/network/interest.rs b/commons/zenoh-codec/src/network/interest.rs index 2deda7748a..5ebdc91f71 100644 --- a/commons/zenoh-codec/src/network/interest.rs +++ b/commons/zenoh-codec/src/network/interest.rs @@ -23,8 +23,8 @@ use zenoh_protocol::{ core::WireExpr, network::{ declare, id, - interest::{self, InterestMode, InterestOptions}, - Interest, Mapping, + interest::{self, Interest, InterestMode, InterestOptions}, + Mapping, }, }; diff --git a/commons/zenoh-protocol/src/network/declare.rs b/commons/zenoh-protocol/src/network/declare.rs index a5373cd5f4..d8c66559ce 100644 --- a/commons/zenoh-protocol/src/network/declare.rs +++ b/commons/zenoh-protocol/src/network/declare.rs @@ -178,6 +178,19 @@ pub mod common { pub mod ext { use super::*; + /// Flags: + /// - N: Named If N==1 then the key expr has name/suffix + /// - M: Mapping if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver + /// + /// 7 6 5 4 3 2 1 0 + /// +-+-+-+-+-+-+-+-+ + /// |X|X|X|X|X|X|M|N| + /// +-+-+-+---------+ + /// ~ key_scope:z16 ~ + /// +---------------+ + /// ~ key_suffix ~ if N==1 -- + /// +---------------+ + /// pub type WireExprExt = zextzbuf!(0x0f, true); #[derive(Debug, Clone, PartialEq, Eq)] pub struct WireExprType { diff --git a/commons/zenoh-protocol/src/network/interest.rs b/commons/zenoh-protocol/src/network/interest.rs index 46797b72ee..b36080be28 100644 --- a/commons/zenoh-protocol/src/network/interest.rs +++ b/commons/zenoh-protocol/src/network/interest.rs @@ -121,7 +121,7 @@ pub mod flag { pub type DeclareRequestId = u32; pub type AtomicDeclareRequestId = AtomicU32; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum InterestMode { Final, Current, diff --git a/zenoh/src/api/builders/publication.rs b/zenoh/src/api/builders/publication.rs index d4dc1b54d2..0b7bb01eae 100644 --- a/zenoh/src/api/builders/publication.rs +++ b/zenoh/src/api/builders/publication.rs @@ -314,8 +314,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { fn create_one_shot_publisher(self) -> ZResult> { Ok(Publisher { session: self.session, - #[cfg(feature = "unstable")] - eid: 0, // This is a one shot Publisher + id: 0, // This is a one shot Publisher key_expr: self.key_expr?, congestion_control: self.congestion_control, priority: self.priority, @@ -363,22 +362,16 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> { } } self.session - .declare_publication_intent(key_expr.clone()) - .wait()?; - #[cfg(feature = "unstable")] - let eid = self.session.runtime.next_id(); - let publisher = Publisher { - session: self.session, - #[cfg(feature = "unstable")] - eid, - key_expr, - congestion_control: self.congestion_control, - priority: self.priority, - is_express: self.is_express, - destination: self.destination, - }; - tracing::trace!("publish({:?})", publisher.key_expr); - Ok(publisher) + .declare_publisher_inner(key_expr.clone(), self.destination) + .map(|id| Publisher { + session: self.session, + id, + key_expr, + congestion_control: self.congestion_control, + priority: self.priority, + is_express: self.is_express, + destination: self.destination, + }) } } diff --git a/zenoh/src/api/publication.rs b/zenoh/src/api/publication.rs index 553170e76a..d72f18739d 100644 --- a/zenoh/src/api/publication.rs +++ b/zenoh/src/api/publication.rs @@ -14,6 +14,7 @@ use std::{ convert::TryFrom, + fmt, future::{IntoFuture, Ready}, pin::Pin, task::{Context, Poll}, @@ -32,9 +33,7 @@ use zenoh_result::{Error, ZResult}; use { crate::api::handlers::{Callback, DefaultHandler, IntoHandler}, crate::api::sample::SourceInfo, - crate::api::Id, zenoh_protocol::core::EntityGlobalId, - zenoh_protocol::core::EntityId, }; use super::{ @@ -48,7 +47,23 @@ use super::{ sample::{DataInfo, Locality, QoS, Sample, SampleFields, SampleKind}, session::{SessionRef, Undeclarable}, }; -use crate::net::primitives::Primitives; +use crate::{api::Id, net::primitives::Primitives}; + +pub(crate) struct PublisherState { + pub(crate) id: Id, + pub(crate) remote_id: Id, + pub(crate) key_expr: KeyExpr<'static>, + pub(crate) destination: Locality, +} + +impl fmt::Debug for PublisherState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Publisher") + .field("id", &self.id) + .field("key_expr", &self.key_expr) + .finish() + } +} #[zenoh_macros::unstable] #[derive(Clone)] @@ -113,8 +128,7 @@ impl std::fmt::Debug for PublisherRef<'_> { #[derive(Debug, Clone)] pub struct Publisher<'a> { pub(crate) session: SessionRef<'a>, - #[cfg(feature = "unstable")] - pub(crate) eid: EntityId, + pub(crate) id: Id, pub(crate) key_expr: KeyExpr<'a>, pub(crate) congestion_control: CongestionControl, pub(crate) priority: Priority, @@ -142,7 +156,7 @@ impl<'a> Publisher<'a> { pub fn id(&self) -> EntityGlobalId { EntityGlobalId { zid: self.session.zid(), - eid: self.eid, + eid: self.id, } } @@ -459,11 +473,9 @@ impl Resolvable for PublisherUndeclaration<'_> { impl Wait for PublisherUndeclaration<'_> { fn wait(mut self) -> ::To { let Publisher { - session, key_expr, .. + session, id: eid, .. } = &self.publisher; - session - .undeclare_publication_intent(key_expr.clone()) - .wait()?; + session.undeclare_publisher_inner(*eid)?; self.publisher.key_expr = unsafe { keyexpr::from_str_unchecked("") }.into(); Ok(()) } @@ -481,10 +493,7 @@ impl IntoFuture for PublisherUndeclaration<'_> { impl Drop for Publisher<'_> { fn drop(&mut self) { if !self.key_expr.is_empty() { - let _ = self - .session - .undeclare_publication_intent(self.key_expr.clone()) - .wait(); + let _ = self.session.undeclare_publisher_inner(self.id); } } } diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 018a3a085e..e5087e693b 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -35,16 +35,19 @@ use zenoh_protocol::network::{declare::SubscriberId, ext}; use zenoh_protocol::{ core::{ key_expr::{keyexpr, OwnedKeyExpr}, - AtomicExprId, CongestionControl, ExprId, Reliability, WireExpr, ZenohId, EMPTY_EXPR_ID, + AtomicExprId, CongestionControl, EntityId, ExprId, Reliability, WireExpr, ZenohId, + EMPTY_EXPR_ID, }, network::{ + self, declare::{ self, common::ext::WireExprType, queryable::ext::QueryableInfoType, subscriber::ext::SubscriberInfo, Declare, DeclareBody, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber, UndeclareQueryable, UndeclareSubscriber, }, + interest::{InterestMode, InterestOptions}, request::{self, ext::TargetType, Request}, - AtomicRequestId, Mapping, Push, RequestId, Response, ResponseFinal, + AtomicRequestId, Interest, Mapping, Push, RequestId, Response, ResponseFinal, }, zenoh::{ query::{self, ext::QueryBodyType, Consolidation}, @@ -68,7 +71,7 @@ use super::{ handlers::{Callback, DefaultHandler}, info::SessionInfo, key_expr::{KeyExpr, KeyExprInner}, - publication::Priority, + publication::{Priority, PublisherState}, query::{ConsolidationMode, GetBuilder, QueryConsolidation, QueryState, QueryTarget, Reply}, queryable::{Query, QueryInner, QueryableBuilder, QueryableState}, sample::{DataInfo, DataInfoIntoSample, Locality, QoS, Sample, SampleKind}, @@ -107,7 +110,7 @@ pub(crate) struct SessionState { pub(crate) remote_resources: HashMap, #[cfg(feature = "unstable")] pub(crate) remote_subscribers: HashMap>, - //pub(crate) publications: Vec, + pub(crate) publishers: HashMap, pub(crate) subscribers: HashMap>, pub(crate) queryables: HashMap>, #[cfg(feature = "unstable")] @@ -116,13 +119,13 @@ pub(crate) struct SessionState { pub(crate) matching_listeners: HashMap>, pub(crate) queries: HashMap, pub(crate) aggregated_subscribers: Vec, - //pub(crate) aggregated_publishers: Vec, + pub(crate) aggregated_publishers: Vec, } impl SessionState { pub(crate) fn new( aggregated_subscribers: Vec, - _aggregated_publishers: Vec, + aggregated_publishers: Vec, ) -> SessionState { SessionState { primitives: None, @@ -132,7 +135,7 @@ impl SessionState { remote_resources: HashMap::new(), #[cfg(feature = "unstable")] remote_subscribers: HashMap::new(), - //publications: Vec::new(), + publishers: HashMap::new(), subscribers: HashMap::new(), queryables: HashMap::new(), #[cfg(feature = "unstable")] @@ -141,7 +144,7 @@ impl SessionState { matching_listeners: HashMap::new(), queries: HashMap::new(), aggregated_subscribers, - //aggregated_publishers, + aggregated_publishers, } } } @@ -916,84 +919,99 @@ impl Session { }) } - /// Declare a publication for the given key expression. - /// - /// Puts that match the given key expression will only be sent on the network - /// if matching subscribers exist in the system. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression to publish - pub(crate) fn declare_publication_intent<'a>( - &'a self, - _key_expr: KeyExpr<'a>, - ) -> impl Resolve> + 'a { - ResolveClosure::new(move || { - // tracing::trace!("declare_publication({:?})", key_expr); - // let mut state = zwrite!(self.state); - // if !state.publications.iter().any(|p| **p == **key_expr) { - // let declared_pub = if let Some(join_pub) = state - // .aggregated_publishers - // .iter() - // .find(|s| s.includes(&key_expr)) - // { - // let joined_pub = state.publications.iter().any(|p| join_pub.includes(p)); - // (!joined_pub).then(|| join_pub.clone().into()) - // } else { - // Some(key_expr.clone()) - // }; - // state.publications.push(key_expr.into()); - - // if let Some(res) = declared_pub { - // let primitives = state.primitives.as_ref().unwrap().clone(); - // drop(state); - // primitives.decl_publisher(&res.to_wire(self), None); - // } - // } - Ok(()) - }) + pub(crate) fn declare_publisher_inner( + &self, + key_expr: KeyExpr, + destination: Locality, + ) -> ZResult { + let mut state = zwrite!(self.state); + tracing::trace!("declare_publisher({:?})", key_expr); + let id = self.runtime.next_id(); + + let mut pub_state = PublisherState { + id, + remote_id: id, + key_expr: key_expr.clone().into_owned(), + destination, + }; + + let declared_pub = (destination != Locality::SessionLocal) + .then(|| { + match state + .aggregated_publishers + .iter() + .find(|s| s.includes(&key_expr)) + { + Some(join_pub) => { + if let Some(joined_pub) = state.publishers.values().find(|p| { + p.destination != Locality::SessionLocal + && join_pub.includes(&p.key_expr) + }) { + pub_state.remote_id = joined_pub.remote_id; + None + } else { + Some(join_pub.clone().into()) + } + } + None => { + if let Some(twin_pub) = state.publishers.values().find(|p| { + p.destination != Locality::SessionLocal && p.key_expr == key_expr + }) { + pub_state.remote_id = twin_pub.remote_id; + None + } else { + Some(key_expr.clone()) + } + } + } + }) + .flatten(); + + state.publishers.insert(id, pub_state); + + if let Some(res) = declared_pub { + let primitives = state.primitives.as_ref().unwrap().clone(); + drop(state); + primitives.send_interest(Interest { + id, + mode: InterestMode::CurrentFuture, + options: InterestOptions::KEYEXPRS + InterestOptions::SUBSCRIBERS, + wire_expr: Some(res.to_wire(self).to_owned()), + ext_qos: network::ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: network::ext::NodeIdType::DEFAULT, + }); + } + Ok(id) } - /// Undeclare a publication previously declared - /// with [`declare_publication`](Session::declare_publication). - /// - /// # Arguments - /// - /// * `key_expr` - The key expression of the publication to undeclarte - pub(crate) fn undeclare_publication_intent<'a>( - &'a self, - _key_expr: KeyExpr<'a>, - ) -> impl Resolve> + 'a { - ResolveClosure::new(move || { - // let mut state = zwrite!(self.state); - // if let Some(idx) = state.publications.iter().position(|p| **p == *key_expr) { - // trace!("undeclare_publication({:?})", key_expr); - // state.publications.remove(idx); - // match state - // .aggregated_publishers - // .iter() - // .find(|s| s.includes(&key_expr)) - // { - // Some(join_pub) => { - // let joined_pub = state.publications.iter().any(|p| join_pub.includes(p)); - // if !joined_pub { - // let primitives = state.primitives.as_ref().unwrap().clone(); - // let key_expr = WireExpr::from(join_pub).to_owned(); - // drop(state); - // primitives.forget_publisher(&key_expr, None); - // } - // } - // None => { - // let primitives = state.primitives.as_ref().unwrap().clone(); - // drop(state); - // primitives.forget_publisher(&key_expr.to_wire(self), None); - // } - // }; - // } else { - // bail!("Unable to find publication") - // } + pub(crate) fn undeclare_publisher_inner(&self, pid: Id) -> ZResult<()> { + let mut state = zwrite!(self.state); + if let Some(pub_state) = state.publishers.remove(&pid) { + trace!("undeclare_publisher({:?})", pub_state); + if pub_state.destination != Locality::SessionLocal { + // Note: there might be several publishers on the same KeyExpr. + // Before calling forget_publishers(key_expr), check if this was the last one. + if !state.publishers.values().any(|p| { + p.destination != Locality::SessionLocal && p.remote_id == pub_state.remote_id + }) { + let primitives = state.primitives.as_ref().unwrap().clone(); + drop(state); + primitives.send_interest(Interest { + id: pub_state.remote_id, + mode: InterestMode::Final, + options: InterestOptions::empty(), + wire_expr: None, + ext_qos: declare::ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: declare::ext::NodeIdType::DEFAULT, + }); + } + } Ok(()) - }) + } else { + Err(zerror!("Unable to find publisher").into()) + } } pub(crate) fn declare_subscriber_inner( @@ -1005,7 +1023,7 @@ impl Session { info: &SubscriberInfo, ) -> ZResult> { let mut state = zwrite!(self.state); - tracing::trace!("subscribe({:?})", key_expr); + tracing::trace!("declare_subscriber({:?})", key_expr); let id = self.runtime.next_id(); let key_expr = match scope { Some(scope) => scope / key_expr, @@ -1126,15 +1144,34 @@ impl Session { let state = zread!(self.state); self.update_status_up(&state, &key_expr) } + } else { + #[cfg(feature = "unstable")] + if key_expr + .as_str() + .starts_with(crate::api::liveliness::PREFIX_LIVELINESS) + { + let primitives = state.primitives.as_ref().unwrap().clone(); + drop(state); + + primitives.send_interest(Interest { + id, + mode: InterestMode::CurrentFuture, + options: InterestOptions::KEYEXPRS + InterestOptions::SUBSCRIBERS, + wire_expr: Some(key_expr.to_wire(self).to_owned()), + ext_qos: network::ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: network::ext::NodeIdType::DEFAULT, + }); + } } Ok(sub_state) } - pub(crate) fn unsubscribe(&self, sid: Id) -> ZResult<()> { + pub(crate) fn undeclare_subscriber_inner(&self, sid: Id) -> ZResult<()> { let mut state = zwrite!(self.state); if let Some(sub_state) = state.subscribers.remove(&sid) { - trace!("unsubscribe({:?})", sub_state); + trace!("undeclare_subscriber({:?})", sub_state); for res in state .local_resources .values_mut() @@ -1184,6 +1221,26 @@ impl Session { self.update_status_down(&state, &sub_state.key_expr) } } + } else { + #[cfg(feature = "unstable")] + if sub_state + .key_expr + .as_str() + .starts_with(crate::api::liveliness::PREFIX_LIVELINESS) + { + let primitives = state.primitives.as_ref().unwrap().clone(); + drop(state); + + primitives.send_interest(Interest { + id: sub_state.id, + mode: InterestMode::Final, + options: InterestOptions::empty(), + wire_expr: None, + ext_qos: declare::ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: declare::ext::NodeIdType::DEFAULT, + }); + } } Ok(()) } else { @@ -1199,7 +1256,7 @@ impl Session { callback: Callback<'static, Query>, ) -> ZResult> { let mut state = zwrite!(self.state); - tracing::trace!("queryable({:?})", key_expr); + tracing::trace!("declare_queryable({:?})", key_expr); let id = self.runtime.next_id(); let qable_state = Arc::new(QueryableState { id, @@ -1236,7 +1293,7 @@ impl Session { pub(crate) fn close_queryable(&self, qid: Id) -> ZResult<()> { let mut state = zwrite!(self.state); if let Some(qable_state) = state.queryables.remove(&qid) { - trace!("close_queryable({:?})", qable_state); + trace!("undeclare_queryable({:?})", qable_state); if qable_state.origin != Locality::SessionLocal { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); @@ -1358,33 +1415,29 @@ impl Session { key_expr: &KeyExpr, destination: Locality, ) -> ZResult { - use crate::net::routing::dispatcher::tables::RoutingExpr; let router = self.runtime.router(); let tables = zread!(router.tables.tables); - let res = crate::net::routing::dispatcher::resource::Resource::get_resource( - &tables.root_res, - key_expr.as_str(), - ); - let route = crate::net::routing::dispatcher::pubsub::get_local_data_route( - &tables, - &res, - &mut RoutingExpr::new(&tables.root_res, key_expr.as_str()), - ); + let matching_subscriptions = + crate::net::routing::dispatcher::pubsub::get_matching_subscriptions(&tables, key_expr); drop(tables); let matching = match destination { - Locality::Any => !route.is_empty(), + Locality::Any => !matching_subscriptions.is_empty(), Locality::Remote => { if let Some(face) = zread!(self.state).primitives.as_ref() { - route.values().any(|dir| !Arc::ptr_eq(&dir.0, &face.state)) + matching_subscriptions + .values() + .any(|dir| !Arc::ptr_eq(dir, &face.state)) } else { - !route.is_empty() + !matching_subscriptions.is_empty() } } Locality::SessionLocal => { if let Some(face) = zread!(self.state).primitives.as_ref() { - route.values().any(|dir| Arc::ptr_eq(&dir.0, &face.state)) + matching_subscriptions + .values() + .any(|dir| Arc::ptr_eq(dir, &face.state)) } else { false } @@ -2070,7 +2123,7 @@ impl Primitives for Session { }; self.handle_data( false, - &m.ext_wire_expr.wire_expr, + &expr.to_wire(self), Some(data_info), ZBuf::default(), #[cfg(feature = "unstable")] @@ -2088,9 +2141,15 @@ impl Primitives for Session { zenoh_protocol::network::DeclareBody::UndeclareQueryable(m) => { trace!("recv UndeclareQueryable {:?}", m.id); } - DeclareBody::DeclareToken(_) => todo!(), - DeclareBody::UndeclareToken(_) => todo!(), - DeclareBody::DeclareFinal(_) => todo!(), + DeclareBody::DeclareToken(m) => { + trace!("recv DeclareToken {:?}", m.id); + } + DeclareBody::UndeclareToken(m) => { + trace!("recv UndeclareToken {:?}", m.id); + } + DeclareBody::DeclareFinal(_) => { + trace!("recv DeclareFinal {:?}", msg.interest_id); + } } } @@ -2585,6 +2644,11 @@ pub trait SessionDeclarations<'s, 'a> { } impl crate::net::primitives::EPrimitives for Session { + #[inline] + fn send_interest(&self, ctx: crate::net::routing::RoutingContext) { + (self as &dyn Primitives).send_interest(ctx.msg) + } + #[inline] fn send_declare(&self, ctx: crate::net::routing::RoutingContext) { (self as &dyn Primitives).send_declare(ctx.msg) diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index ba345f5116..a0cfd51811 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -145,7 +145,7 @@ impl Wait for SubscriberUndeclaration<'_> { self.subscriber.alive = false; self.subscriber .session - .unsubscribe(self.subscriber.state.id) + .undeclare_subscriber_inner(self.subscriber.state.id) } } @@ -161,7 +161,7 @@ impl IntoFuture for SubscriberUndeclaration<'_> { impl Drop for SubscriberInner<'_> { fn drop(&mut self) { if self.alive { - let _ = self.session.unsubscribe(self.state.id); + let _ = self.session.undeclare_subscriber_inner(self.state.id); } } } diff --git a/zenoh/src/net/primitives/demux.rs b/zenoh/src/net/primitives/demux.rs index b400d1a254..56bbbe4570 100644 --- a/zenoh/src/net/primitives/demux.rs +++ b/zenoh/src/net/primitives/demux.rs @@ -68,7 +68,7 @@ impl TransportPeerEventHandler for DeMux { match msg.body { NetworkBody::Push(m) => self.face.send_push(m), NetworkBody::Declare(m) => self.face.send_declare(m), - NetworkBody::Interest(_) => todo!(), + NetworkBody::Interest(m) => self.face.send_interest(m), NetworkBody::Request(m) => self.face.send_request(m), NetworkBody::Response(m) => self.face.send_response(m), NetworkBody::ResponseFinal(m) => self.face.send_response_final(m), diff --git a/zenoh/src/net/primitives/mod.rs b/zenoh/src/net/primitives/mod.rs index d3aa8097ca..dbdcdd26f8 100644 --- a/zenoh/src/net/primitives/mod.rs +++ b/zenoh/src/net/primitives/mod.rs @@ -43,6 +43,8 @@ pub trait Primitives: Send + Sync { pub(crate) trait EPrimitives: Send + Sync { fn as_any(&self) -> &dyn Any; + fn send_interest(&self, ctx: RoutingContext); + fn send_declare(&self, ctx: RoutingContext); fn send_push(&self, msg: Push); @@ -76,6 +78,8 @@ impl Primitives for DummyPrimitives { } impl EPrimitives for DummyPrimitives { + fn send_interest(&self, _ctx: RoutingContext) {} + fn send_declare(&self, _ctx: RoutingContext) {} fn send_push(&self, _msg: Push) {} diff --git a/zenoh/src/net/primitives/mux.rs b/zenoh/src/net/primitives/mux.rs index df292b4315..f58b4550d0 100644 --- a/zenoh/src/net/primitives/mux.rs +++ b/zenoh/src/net/primitives/mux.rs @@ -197,6 +197,31 @@ impl Primitives for Mux { } impl EPrimitives for Mux { + fn send_interest(&self, ctx: RoutingContext) { + let ctx = RoutingContext { + msg: NetworkMessage { + body: NetworkBody::Interest(ctx.msg), + #[cfg(feature = "stats")] + size: None, + }, + inface: ctx.inface, + outface: ctx.outface, + prefix: ctx.prefix, + full_expr: ctx.full_expr, + }; + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { + let _ = self.handler.schedule(ctx.msg); + } + } + fn send_declare(&self, ctx: RoutingContext) { let ctx = RoutingContext { msg: NetworkMessage { @@ -497,6 +522,31 @@ impl Primitives for McastMux { } impl EPrimitives for McastMux { + fn send_interest(&self, ctx: RoutingContext) { + let ctx = RoutingContext { + msg: NetworkMessage { + body: NetworkBody::Interest(ctx.msg), + #[cfg(feature = "stats")] + size: None, + }, + inface: ctx.inface, + outface: ctx.outface, + prefix: ctx.prefix, + full_expr: ctx.full_expr, + }; + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { + let _ = self.handler.schedule(ctx.msg); + } + } + fn send_declare(&self, ctx: RoutingContext) { let ctx = RoutingContext { msg: NetworkMessage { diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index c5129f76e2..4669433145 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -21,7 +21,12 @@ use std::{ use tokio_util::sync::CancellationToken; use zenoh_protocol::{ core::{ExprId, WhatAmI, ZenohId}, - network::{Mapping, Push, Request, RequestId, Response, ResponseFinal}, + network::{ + declare::ext, + interest::{InterestId, InterestMode, InterestOptions}, + Declare, DeclareBody, DeclareFinal, Mapping, Push, Request, RequestId, Response, + ResponseFinal, + }, zenoh::RequestBody, }; use zenoh_sync::get_mut_unchecked; @@ -35,10 +40,19 @@ use crate::{ api::key_expr::KeyExpr, net::{ primitives::{McastMux, Mux, Primitives}, - routing::interceptor::{InterceptorTrait, InterceptorsChain}, + routing::{ + interceptor::{InterceptorTrait, InterceptorsChain}, + RoutingContext, + }, }, }; +pub(crate) struct InterestState { + pub(crate) options: InterestOptions, + pub(crate) res: Option>, + pub(crate) finalized: bool, +} + pub struct FaceState { pub(crate) id: usize, pub(crate) zid: ZenohId, @@ -46,6 +60,8 @@ pub struct FaceState { #[cfg(feature = "stats")] pub(crate) stats: Option>, pub(crate) primitives: Arc, + pub(crate) local_interests: HashMap, + pub(crate) remote_key_interests: HashMap>>, pub(crate) local_mappings: HashMap>, pub(crate) remote_mappings: HashMap>, pub(crate) next_qid: RequestId, @@ -75,6 +91,8 @@ impl FaceState { #[cfg(feature = "stats")] stats, primitives, + local_interests: HashMap::new(), + remote_key_interests: HashMap::new(), local_mappings: HashMap::new(), remote_mappings: HashMap::new(), next_qid: 0, @@ -191,8 +209,67 @@ impl Face { } impl Primitives for Face { - fn send_interest(&self, _msg: zenoh_protocol::network::Interest) { - todo!() + fn send_interest(&self, msg: zenoh_protocol::network::Interest) { + let ctrl_lock = zlock!(self.tables.ctrl_lock); + if msg.mode != InterestMode::Final { + if msg.options.keyexprs() && msg.mode != InterestMode::Current { + register_expr_interest( + &self.tables, + &mut self.state.clone(), + msg.id, + msg.wire_expr.as_ref(), + ); + } + if msg.options.subscribers() { + declare_sub_interest( + ctrl_lock.as_ref(), + &self.tables, + &mut self.state.clone(), + msg.id, + msg.wire_expr.as_ref(), + msg.mode, + msg.options.aggregate(), + ); + } + if msg.options.queryables() { + declare_qabl_interest( + ctrl_lock.as_ref(), + &self.tables, + &mut self.state.clone(), + msg.id, + msg.wire_expr.as_ref(), + msg.mode, + msg.options.aggregate(), + ); + } + if msg.mode != InterestMode::Future { + self.state.primitives.send_declare(RoutingContext::new_out( + Declare { + interest_id: Some(msg.id), + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareFinal(DeclareFinal), + }, + self.clone(), + )); + } + } else { + unregister_expr_interest(&self.tables, &mut self.state.clone(), msg.id); + undeclare_sub_interest( + ctrl_lock.as_ref(), + &self.tables, + &mut self.state.clone(), + msg.id, + ); + undeclare_qabl_interest( + ctrl_lock.as_ref(), + &self.tables, + &mut self.state.clone(), + msg.id, + ); + } + drop(ctrl_lock); } fn send_declare(&self, msg: zenoh_protocol::network::Declare) { @@ -246,9 +323,20 @@ impl Primitives for Face { msg.ext_nodeid.node_id, ); } - zenoh_protocol::network::DeclareBody::DeclareToken(_m) => todo!(), - zenoh_protocol::network::DeclareBody::UndeclareToken(_m) => todo!(), - zenoh_protocol::network::DeclareBody::DeclareFinal(_) => todo!(), + zenoh_protocol::network::DeclareBody::DeclareToken(m) => { + tracing::warn!("Received unsupported {m:?}") + } + zenoh_protocol::network::DeclareBody::UndeclareToken(m) => { + tracing::warn!("Received unsupported {m:?}") + } + zenoh_protocol::network::DeclareBody::DeclareFinal(_) => { + if let Some(id) = msg.interest_id { + get_mut_unchecked(&mut self.state.clone()) + .local_interests + .entry(id) + .and_modify(|interest| interest.finalized = true); + } + } } drop(ctrl_lock); } diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index 94c6f7b1a6..4e69e45dc3 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -18,6 +18,7 @@ use zenoh_protocol::{ core::{key_expr::keyexpr, WhatAmI, WireExpr}, network::{ declare::{ext, subscriber::ext::SubscriberInfo, SubscriberId}, + interest::{InterestId, InterestMode}, Push, }, zenoh::PushBody, @@ -29,8 +30,90 @@ use super::{ resource::{DataRoutes, Direction, Resource}, tables::{NodeId, Route, RoutingExpr, Tables, TablesLock}, }; +#[zenoh_macros::unstable] +use crate::key_expr::KeyExpr; use crate::net::routing::hat::HatTrait; +pub(crate) fn declare_sub_interest( + hat_code: &(dyn HatTrait + Send + Sync), + tables: &TablesLock, + face: &mut Arc, + id: InterestId, + expr: Option<&WireExpr>, + mode: InterestMode, + aggregate: bool, +) { + if let Some(expr) = expr { + let rtables = zread!(tables.tables); + match rtables + .get_mapping(face, &expr.scope, expr.mapping) + .cloned() + { + Some(mut prefix) => { + tracing::debug!( + "{} Declare sub interest {} ({}{})", + face, + id, + prefix.expr(), + expr.suffix + ); + let res = Resource::get_resource(&prefix, &expr.suffix); + let (mut res, mut wtables) = if res + .as_ref() + .map(|r| r.context.is_some()) + .unwrap_or(false) + { + drop(rtables); + let wtables = zwrite!(tables.tables); + (res.unwrap(), wtables) + } else { + let mut fullexpr = prefix.expr(); + fullexpr.push_str(expr.suffix.as_ref()); + let mut matches = keyexpr::new(fullexpr.as_str()) + .map(|ke| Resource::get_matches(&rtables, ke)) + .unwrap_or_default(); + drop(rtables); + let mut wtables = zwrite!(tables.tables); + let mut res = + Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref()); + matches.push(Arc::downgrade(&res)); + Resource::match_resource(&wtables, &mut res, matches); + (res, wtables) + }; + + hat_code.declare_sub_interest( + &mut wtables, + face, + id, + Some(&mut res), + mode, + aggregate, + ); + } + None => tracing::error!( + "{} Declare sub interest {} for unknown scope {}!", + face, + id, + expr.scope + ), + } + } else { + let mut wtables = zwrite!(tables.tables); + hat_code.declare_sub_interest(&mut wtables, face, id, None, mode, aggregate); + } +} + +pub(crate) fn undeclare_sub_interest( + hat_code: &(dyn HatTrait + Send + Sync), + tables: &TablesLock, + face: &mut Arc, + id: InterestId, +) { + tracing::debug!("{} Undeclare sub interest {}", face, id,); + let mut wtables = zwrite!(tables.tables); + hat_code.undeclare_sub_interest(&mut wtables, face, id); +} + pub(crate) fn declare_subscription( hat_code: &(dyn HatTrait + Send + Sync), tables: &TablesLock, @@ -329,18 +412,11 @@ fn get_data_route( #[zenoh_macros::unstable] #[inline] -pub(crate) fn get_local_data_route( +pub(crate) fn get_matching_subscriptions( tables: &Tables, - res: &Option>, - expr: &mut RoutingExpr, -) -> Arc { - res.as_ref() - .and_then(|res| res.data_route(WhatAmI::Client, 0)) - .unwrap_or_else(|| { - tables - .hat_code - .compute_data_route(tables, expr, 0, WhatAmI::Client) - }) + key_expr: &KeyExpr<'_>, +) -> HashMap> { + tables.hat_code.get_matching_subscriptions(tables, key_expr) } #[cfg(feature = "stats")] diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 2bbc924e0b..23e405c3c4 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -24,6 +24,7 @@ use zenoh_protocol::{ core::{key_expr::keyexpr, Encoding, WireExpr}, network::{ declare::{ext, queryable::ext::QueryableInfoType, QueryableId}, + interest::{InterestId, InterestMode}, request::{ext::TargetType, Request, RequestId}, response::{self, ext::ResponderIdType, Response, ResponseFinal}, }, @@ -39,6 +40,87 @@ use super::{ }; use crate::net::routing::{hat::HatTrait, RoutingContext}; +#[allow(clippy::too_many_arguments)] // TODO refactor +pub(crate) fn declare_qabl_interest( + hat_code: &(dyn HatTrait + Send + Sync), + tables: &TablesLock, + face: &mut Arc, + id: InterestId, + expr: Option<&WireExpr>, + mode: InterestMode, + aggregate: bool, +) { + if let Some(expr) = expr { + let rtables = zread!(tables.tables); + match rtables + .get_mapping(face, &expr.scope, expr.mapping) + .cloned() + { + Some(mut prefix) => { + tracing::debug!( + "{} Declare qabl interest {} ({}{})", + face, + id, + prefix.expr(), + expr.suffix + ); + let res = Resource::get_resource(&prefix, &expr.suffix); + let (mut res, mut wtables) = if res + .as_ref() + .map(|r| r.context.is_some()) + .unwrap_or(false) + { + drop(rtables); + let wtables = zwrite!(tables.tables); + (res.unwrap(), wtables) + } else { + let mut fullexpr = prefix.expr(); + fullexpr.push_str(expr.suffix.as_ref()); + let mut matches = keyexpr::new(fullexpr.as_str()) + .map(|ke| Resource::get_matches(&rtables, ke)) + .unwrap_or_default(); + drop(rtables); + let mut wtables = zwrite!(tables.tables); + let mut res = + Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref()); + matches.push(Arc::downgrade(&res)); + Resource::match_resource(&wtables, &mut res, matches); + (res, wtables) + }; + + hat_code.declare_qabl_interest( + &mut wtables, + face, + id, + Some(&mut res), + mode, + aggregate, + ); + } + None => tracing::error!( + "{} Declare qabl interest {} for unknown scope {}!", + face, + id, + expr.scope + ), + } + } else { + let mut wtables = zwrite!(tables.tables); + hat_code.declare_qabl_interest(&mut wtables, face, id, None, mode, aggregate); + } +} + +pub(crate) fn undeclare_qabl_interest( + hat_code: &(dyn HatTrait + Send + Sync), + tables: &TablesLock, + face: &mut Arc, + id: InterestId, +) { + tracing::debug!("{} Undeclare qabl interest {}", face, id,); + let mut wtables = zwrite!(tables.tables); + hat_code.undeclare_qabl_interest(&mut wtables, face, id); +} + pub(crate) struct Query { src_face: Arc, src_qid: RequestId, diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index d8765e16ae..e6b13dc2c8 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -27,6 +27,7 @@ use zenoh_protocol::{ ext, queryable::ext::QueryableInfoType, subscriber::ext::SubscriberInfo, Declare, DeclareBody, DeclareKeyExpr, }, + interest::InterestId, Mapping, RequestId, }, }; @@ -60,6 +61,20 @@ pub(crate) struct SessionContext { pub(crate) e_interceptor_cache: Option>, } +impl SessionContext { + pub(crate) fn new(face: Arc) -> Self { + Self { + face, + local_expr_id: None, + remote_expr_id: None, + subs: None, + qabl: None, + in_interceptor_cache: None, + e_interceptor_cache: None, + } + } +} + #[derive(Default)] pub(crate) struct RoutesIndexes { pub(crate) routers: Vec, @@ -217,6 +232,16 @@ impl Resource { self.context.as_mut().unwrap() } + #[inline(always)] + pub(crate) fn matches(&self, other: &Arc) -> bool { + self.context + .as_ref() + .unwrap() + .matches + .iter() + .any(|m| m.upgrade().is_some_and(|m| &m == other)) + } + pub fn nonwild_prefix(res: &Arc) -> (Option>, String) { match &res.nonwild_prefix { None => (Some(res.clone()), "".to_string()), @@ -434,34 +459,34 @@ impl Resource { let (nonwild_prefix, wildsuffix) = Resource::nonwild_prefix(res); match nonwild_prefix { Some(mut nonwild_prefix) => { - let ctx = get_mut_unchecked(&mut nonwild_prefix) + if let Some(ctx) = get_mut_unchecked(&mut nonwild_prefix) .session_ctxs - .entry(face.id) - .or_insert_with(|| { - Arc::new(SessionContext { - face: face.clone(), - local_expr_id: None, - remote_expr_id: None, - subs: None, - qabl: None, - in_interceptor_cache: None, - e_interceptor_cache: None, - }) - }); - - if let Some(expr_id) = ctx.remote_expr_id { - WireExpr { - scope: expr_id, - suffix: wildsuffix.into(), - mapping: Mapping::Receiver, + .get(&face.id) + { + if let Some(expr_id) = ctx.remote_expr_id { + return WireExpr { + scope: expr_id, + suffix: wildsuffix.into(), + mapping: Mapping::Receiver, + }; } - } else if let Some(expr_id) = ctx.local_expr_id { - WireExpr { - scope: expr_id, - suffix: wildsuffix.into(), - mapping: Mapping::Sender, + if let Some(expr_id) = ctx.local_expr_id { + return WireExpr { + scope: expr_id, + suffix: wildsuffix.into(), + mapping: Mapping::Sender, + }; } - } else { + } + if face.remote_key_interests.values().any(|res| { + res.as_ref() + .map(|res| res.matches(&nonwild_prefix)) + .unwrap_or(true) + }) { + let ctx = get_mut_unchecked(&mut nonwild_prefix) + .session_ctxs + .entry(face.id) + .or_insert_with(|| Arc::new(SessionContext::new(face.clone()))); let expr_id = face.get_next_local_id(); get_mut_unchecked(ctx).local_expr_id = Some(expr_id); get_mut_unchecked(face) @@ -486,6 +511,8 @@ impl Resource { suffix: wildsuffix.into(), mapping: Mapping::Sender, } + } else { + res.expr().into() } } None => wildsuffix.into(), @@ -650,7 +677,7 @@ impl Resource { } } -pub fn register_expr( +pub(crate) fn register_expr( tables: &TablesLock, face: &mut Arc, expr_id: ExprId, @@ -697,20 +724,12 @@ pub fn register_expr( Resource::match_resource(&wtables, &mut res, matches); (res, wtables) }; - get_mut_unchecked(&mut res) + let ctx = get_mut_unchecked(&mut res) .session_ctxs .entry(face.id) - .or_insert_with(|| { - Arc::new(SessionContext { - face: face.clone(), - local_expr_id: None, - remote_expr_id: Some(expr_id), - subs: None, - qabl: None, - in_interceptor_cache: None, - e_interceptor_cache: None, - }) - }); + .or_insert_with(|| Arc::new(SessionContext::new(face.clone()))); + + get_mut_unchecked(ctx).remote_expr_id = Some(expr_id); get_mut_unchecked(face) .remote_mappings @@ -728,7 +747,7 @@ pub fn register_expr( } } -pub fn unregister_expr(tables: &TablesLock, face: &mut Arc, expr_id: ExprId) { +pub(crate) fn unregister_expr(tables: &TablesLock, face: &mut Arc, expr_id: ExprId) { let wtables = zwrite!(tables.tables); match get_mut_unchecked(face).remote_mappings.remove(&expr_id) { Some(mut res) => Resource::clean(&mut res), @@ -736,3 +755,64 @@ pub fn unregister_expr(tables: &TablesLock, face: &mut Arc, expr_id: } drop(wtables); } + +pub(crate) fn register_expr_interest( + tables: &TablesLock, + face: &mut Arc, + id: InterestId, + expr: Option<&WireExpr>, +) { + if let Some(expr) = expr { + let rtables = zread!(tables.tables); + match rtables + .get_mapping(face, &expr.scope, expr.mapping) + .cloned() + { + Some(mut prefix) => { + let res = Resource::get_resource(&prefix, &expr.suffix); + let (res, wtables) = if res.as_ref().map(|r| r.context.is_some()).unwrap_or(false) { + drop(rtables); + let wtables = zwrite!(tables.tables); + (res.unwrap(), wtables) + } else { + let mut fullexpr = prefix.expr(); + fullexpr.push_str(expr.suffix.as_ref()); + let mut matches = keyexpr::new(fullexpr.as_str()) + .map(|ke| Resource::get_matches(&rtables, ke)) + .unwrap_or_default(); + drop(rtables); + let mut wtables = zwrite!(tables.tables); + let mut res = + Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref()); + matches.push(Arc::downgrade(&res)); + Resource::match_resource(&wtables, &mut res, matches); + (res, wtables) + }; + get_mut_unchecked(face) + .remote_key_interests + .insert(id, Some(res)); + drop(wtables); + } + None => tracing::error!( + "Declare keyexpr interest with unknown scope {}!", + expr.scope + ), + } + } else { + let wtables = zwrite!(tables.tables); + get_mut_unchecked(face) + .remote_key_interests + .insert(id, None); + drop(wtables); + } +} + +pub(crate) fn unregister_expr_interest( + tables: &TablesLock, + face: &mut Arc, + id: InterestId, +) { + let wtables = zwrite!(tables.tables); + get_mut_unchecked(face).remote_key_interests.remove(&id); + drop(wtables); +} diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index 3b4e7c7103..921dc7554c 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -26,6 +26,7 @@ use std::{ use zenoh_config::WhatAmI; use zenoh_protocol::network::{ declare::{queryable::ext::QueryableInfoType, QueryableId, SubscriberId}, + interest::InterestId, Oam, }; use zenoh_result::ZResult; @@ -285,6 +286,7 @@ impl HatContext { struct HatFace { next_id: AtomicU32, // @TODO: manage rollover and uniqueness + remote_sub_interests: HashMap>>, local_subs: HashMap, SubscriberId>, remote_subs: HashMap>, local_qabls: HashMap, (QueryableId, QueryableInfoType)>, @@ -295,6 +297,7 @@ impl HatFace { fn new() -> Self { Self { next_id: AtomicU32::new(0), + remote_sub_interests: HashMap::new(), local_subs: HashMap::new(), remote_subs: HashMap::new(), local_qabls: HashMap::new(), diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 3334fbfb14..a87a4e7f1e 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -19,23 +19,30 @@ use std::{ use zenoh_protocol::{ core::{key_expr::OwnedKeyExpr, Reliability, WhatAmI}, - network::declare::{ - common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, SubscriberId, UndeclareSubscriber, + network::{ + declare::{ + common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, + DeclareSubscriber, SubscriberId, UndeclareSubscriber, + }, + interest::{InterestId, InterestMode, InterestOptions}, + Interest, }, }; use zenoh_sync::get_mut_unchecked; use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace}; -use crate::net::routing::{ - dispatcher::{ - face::FaceState, - resource::{NodeId, Resource, SessionContext}, - tables::{Route, RoutingExpr, Tables}, +use crate::{ + key_expr::KeyExpr, + net::routing::{ + dispatcher::{ + face::{FaceState, InterestState}, + resource::{NodeId, Resource, SessionContext}, + tables::{Route, RoutingExpr, Tables}, + }, + hat::{HatPubSubTrait, Sources}, + router::{update_data_routes_from, RoutesIndexes}, + RoutingContext, PREFIX_LIVELINESS, }, - hat::{HatPubSubTrait, Sources}, - router::RoutesIndexes, - RoutingContext, PREFIX_LIVELINESS, }; #[inline] @@ -104,18 +111,11 @@ fn register_client_subscription( } } None => { - res.session_ctxs.insert( - face.id, - Arc::new(SessionContext { - face: face.clone(), - local_expr_id: None, - remote_expr_id: None, - subs: Some(*sub_info), - qabl: None, - in_interceptor_cache: None, - e_interceptor_cache: None, - }), - ); + let ctx = res + .session_ctxs + .entry(face.id) + .or_insert_with(|| Arc::new(SessionContext::new(face.clone()))); + get_mut_unchecked(ctx).subs = Some(*sub_info); } } } @@ -243,7 +243,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers }; - for src_face in tables + for mut src_face in tables .faces .values() .cloned() @@ -252,10 +252,134 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { for sub in face_hat!(src_face).remote_subs.values() { propagate_simple_subscription_to(tables, face, sub, &sub_info, &mut src_face.clone()); } + if face.whatami != WhatAmI::Client { + for res in face_hat_mut!(&mut src_face).remote_sub_interests.values() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + let options = InterestOptions::KEYEXPRS + InterestOptions::SUBSCRIBERS; + get_mut_unchecked(face).local_interests.insert( + id, + InterestState { + options, + res: res.as_ref().map(|res| (*res).clone()), + finalized: false, + }, + ); + let wire_expr = res.as_ref().map(|res| Resource::decl_key(res, face)); + face.primitives.send_interest(RoutingContext::with_expr( + Interest { + id, + mode: InterestMode::CurrentFuture, + options, + wire_expr, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + }, + res.as_ref().map(|res| res.expr()).unwrap_or_default(), + )); + } + } } + // recompute routes + update_data_routes_from(tables, &mut tables.root_res.clone()); } impl HatPubSubTrait for HatCode { + fn declare_sub_interest( + &self, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, + res: Option<&mut Arc>, + mode: InterestMode, + _aggregate: bool, + ) { + face_hat_mut!(face) + .remote_sub_interests + .insert(id, res.as_ref().map(|res| (*res).clone())); + for dst_face in tables + .faces + .values_mut() + .filter(|f| f.whatami != WhatAmI::Client) + { + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); + let options = InterestOptions::KEYEXPRS + InterestOptions::SUBSCRIBERS; + get_mut_unchecked(dst_face).local_interests.insert( + id, + InterestState { + options, + res: res.as_ref().map(|res| (*res).clone()), + finalized: mode == InterestMode::Future, + }, + ); + let wire_expr = res.as_ref().map(|res| Resource::decl_key(res, dst_face)); + dst_face.primitives.send_interest(RoutingContext::with_expr( + Interest { + id, + mode, + options, + wire_expr, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + }, + res.as_ref().map(|res| res.expr()).unwrap_or_default(), + )); + } + } + + fn undeclare_sub_interest( + &self, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, + ) { + if let Some(interest) = face_hat_mut!(face).remote_sub_interests.remove(&id) { + if !tables.faces.values().any(|f| { + f.whatami == WhatAmI::Client + && face_hat!(f) + .remote_sub_interests + .values() + .any(|i| *i == interest) + }) { + for dst_face in tables + .faces + .values_mut() + .filter(|f| f.whatami != WhatAmI::Client) + { + for id in dst_face + .local_interests + .keys() + .cloned() + .collect::>() + { + let local_interest = dst_face.local_interests.get(&id).unwrap(); + if local_interest.options.subscribers() && (local_interest.res == interest) + { + dst_face.primitives.send_interest(RoutingContext::with_expr( + Interest { + id, + mode: InterestMode::Final, + options: InterestOptions::empty(), + wire_expr: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + }, + local_interest + .res + .as_ref() + .map(|res| res.expr()) + .unwrap_or_default(), + )); + get_mut_unchecked(dst_face).local_interests.remove(&id); + } + } + } + } + } + } + fn declare_subscription( &self, tables: &mut Tables, @@ -322,6 +446,51 @@ impl HatPubSubTrait for HatCode { return Arc::new(route); } }; + + for face in tables + .faces + .values() + .filter(|f| f.whatami != WhatAmI::Client) + { + if face.local_interests.values().any(|interest| { + interest.finalized + && interest.options.subscribers() + && interest + .res + .as_ref() + .map(|res| { + KeyExpr::try_from(res.expr()) + .and_then(|intres| { + KeyExpr::try_from(expr.full_expr()) + .map(|putres| intres.includes(&putres)) + }) + .unwrap_or(false) + }) + .unwrap_or(true) + }) { + if face_hat!(face).remote_subs.values().any(|sub| { + KeyExpr::try_from(sub.expr()) + .and_then(|subres| { + KeyExpr::try_from(expr.full_expr()) + .map(|putres| subres.intersects(&putres)) + }) + .unwrap_or(false) + }) { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.insert( + face.id, + (face.clone(), key_expr.to_owned(), NodeId::default()), + ); + } + } else { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.insert( + face.id, + (face.clone(), key_expr.to_owned(), NodeId::default()), + ); + } + } + let res = Resource::get_resource(expr.prefix, expr.suffix); let matches = res .as_ref() @@ -333,15 +502,7 @@ impl HatPubSubTrait for HatCode { let mres = mres.upgrade().unwrap(); for (sid, context) in &mres.session_ctxs { - if context.subs.is_some() - && match tables.whatami { - WhatAmI::Router => context.face.whatami != WhatAmI::Router, - _ => { - source_type == WhatAmI::Client - || context.face.whatami == WhatAmI::Client - } - } - { + if context.subs.is_some() && context.face.whatami == WhatAmI::Client { route.entry(*sid).or_insert_with(|| { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); (context.face.clone(), key_expr.to_owned(), NodeId::default()) @@ -365,4 +526,62 @@ impl HatPubSubTrait for HatCode { fn get_data_routes_entries(&self, _tables: &Tables) -> RoutesIndexes { get_routes_entries() } + + fn get_matching_subscriptions( + &self, + tables: &Tables, + key_expr: &KeyExpr<'_>, + ) -> HashMap> { + let mut matching_subscriptions = HashMap::new(); + if key_expr.ends_with('/') { + return matching_subscriptions; + } + tracing::trace!("get_matching_subscriptions({})", key_expr,); + + for face in tables + .faces + .values() + .filter(|f| f.whatami != WhatAmI::Client) + { + if face.local_interests.values().any(|interest| { + interest.finalized + && interest.options.subscribers() + && interest + .res + .as_ref() + .map(|res| { + KeyExpr::try_from(res.expr()) + .map(|intres| intres.includes(key_expr)) + .unwrap_or(false) + }) + .unwrap_or(true) + }) && face_hat!(face).remote_subs.values().any(|sub| { + KeyExpr::try_from(sub.expr()) + .map(|subres| subres.intersects(key_expr)) + .unwrap_or(false) + }) { + matching_subscriptions.insert(face.id, face.clone()); + } + } + + let res = Resource::get_resource(&tables.root_res, key_expr); + let matches = res + .as_ref() + .and_then(|res| res.context.as_ref()) + .map(|ctx| Cow::from(&ctx.matches)) + .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); + + for mres in matches.iter() { + let mres = mres.upgrade().unwrap(); + + for (sid, context) in &mres.session_ctxs { + if context.subs.is_some() && context.face.whatami == WhatAmI::Client { + matching_subscriptions + .entry(*sid) + .or_insert_with(|| context.face.clone()); + } + } + } + matching_subscriptions + } } diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index c915d788a9..749c03d5f8 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -27,9 +27,12 @@ use zenoh_protocol::{ }, WhatAmI, WireExpr, }, - network::declare::{ - common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, DeclareBody, - DeclareQueryable, QueryableId, UndeclareQueryable, + network::{ + declare::{ + common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, + DeclareBody, DeclareQueryable, QueryableId, UndeclareQueryable, + }, + interest::{InterestId, InterestMode}, }, }; use zenoh_sync::get_mut_unchecked; @@ -99,6 +102,7 @@ fn propagate_simple_queryable( .local_qabls .insert(res.clone(), (id, info)); let key_expr = Resource::decl_key(res, &mut dst_face); + println!("Decled key = {key_expr:?}"); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { interest_id: None, @@ -127,17 +131,11 @@ fn register_client_queryable( // Register queryable { let res = get_mut_unchecked(res); - get_mut_unchecked(res.session_ctxs.entry(face.id).or_insert_with(|| { - Arc::new(SessionContext { - face: face.clone(), - local_expr_id: None, - remote_expr_id: None, - subs: None, - qabl: None, - in_interceptor_cache: None, - e_interceptor_cache: None, - }) - })) + get_mut_unchecked( + res.session_ctxs + .entry(face.id) + .or_insert_with(|| Arc::new(SessionContext::new(face.clone()))), + ) .qabl = Some(*qabl_info); } face_hat_mut!(face).remote_qabls.insert(id, res.clone()); @@ -260,6 +258,27 @@ lazy_static::lazy_static! { } impl HatQueriesTrait for HatCode { + fn declare_qabl_interest( + &self, + _tables: &mut Tables, + _face: &mut Arc, + _id: InterestId, + _res: Option<&mut Arc>, + _mode: InterestMode, + _aggregate: bool, + ) { + // ignore + } + + fn undeclare_qabl_interest( + &self, + _tables: &mut Tables, + _face: &mut Arc, + _id: InterestId, + ) { + // ignore + } + fn declare_queryable( &self, tables: &mut Tables, @@ -326,6 +345,16 @@ impl HatQueriesTrait for HatCode { return EMPTY_ROUTE.clone(); } }; + + if let Some(face) = tables.faces.values().find(|f| f.whatami != WhatAmI::Client) { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.push(QueryTargetQabl { + direction: (face.clone(), key_expr.to_owned(), NodeId::default()), + complete: 0, + distance: f64::MAX, + }); + } + let res = Resource::get_resource(expr.prefix, expr.suffix); let matches = res .as_ref() diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index e76f53a0dd..bb5aec4db1 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -29,6 +29,7 @@ use zenoh_protocol::{ common::ZExtBody, network::{ declare::{queryable::ext::QueryableInfoType, QueryableId, SubscriberId}, + interest::InterestId, oam::id::OAM_LINKSTATE, Oam, }, @@ -480,8 +481,10 @@ impl HatContext { struct HatFace { link_id: usize, next_id: AtomicU32, // @TODO: manage rollover and uniqueness + remote_sub_interests: HashMap>, bool)>, local_subs: HashMap, SubscriberId>, remote_subs: HashMap>, + remote_qabl_interests: HashMap>>, local_qabls: HashMap, (QueryableId, QueryableInfoType)>, remote_qabls: HashMap>, } @@ -491,8 +494,10 @@ impl HatFace { Self { link_id: 0, next_id: AtomicU32::new(0), + remote_sub_interests: HashMap::new(), local_subs: HashMap::new(), remote_subs: HashMap::new(), + remote_qabl_interests: HashMap::new(), local_qabls: HashMap::new(), remote_qabls: HashMap::new(), } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index e5f7da81f7..135f899656 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -20,9 +20,12 @@ use std::{ use petgraph::graph::NodeIndex; use zenoh_protocol::{ core::{key_expr::OwnedKeyExpr, Reliability, WhatAmI, ZenohId}, - network::declare::{ - common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, SubscriberId, UndeclareSubscriber, + network::{ + declare::{ + common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, + DeclareSubscriber, SubscriberId, UndeclareSubscriber, + }, + interest::{InterestId, InterestMode}, }, }; use zenoh_sync::get_mut_unchecked; @@ -31,16 +34,19 @@ use super::{ face_hat, face_hat_mut, get_peer, get_routes_entries, hat, hat_mut, network::Network, res_hat, res_hat_mut, HatCode, HatContext, HatFace, HatTables, }; -use crate::net::routing::{ - dispatcher::{ - face::FaceState, - pubsub::*, - resource::{NodeId, Resource, SessionContext}, - tables::{Route, RoutingExpr, Tables}, +use crate::{ + key_expr::KeyExpr, + net::routing::{ + dispatcher::{ + face::FaceState, + pubsub::*, + resource::{NodeId, Resource, SessionContext}, + tables::{Route, RoutingExpr, Tables}, + }, + hat::{CurrentFutureTrait, HatPubSubTrait, Sources}, + router::RoutesIndexes, + RoutingContext, PREFIX_LIVELINESS, }, - hat::{HatPubSubTrait, Sources}, - router::RoutesIndexes, - RoutingContext, PREFIX_LIVELINESS, }; #[inline] @@ -96,23 +102,59 @@ fn propagate_simple_subscription_to( && !face_hat!(dst_face).local_subs.contains_key(res) && dst_face.whatami == WhatAmI::Client { - let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id, - wire_expr: key_expr, - ext_info: *sub_info, - }), - }, - res.expr(), - )); + if dst_face.whatami != WhatAmI::Client { + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); + let key_expr = Resource::decl_key(res, dst_face); + dst_face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr: key_expr, + ext_info: *sub_info, + }), + }, + res.expr(), + )); + } else { + let matching_interests = face_hat!(dst_face) + .remote_sub_interests + .values() + .filter(|si| si.0.as_ref().map(|si| si.matches(res)).unwrap_or(true)) + .cloned() + .collect::>, bool)>>(); + + for (int_res, aggregate) in matching_interests { + let res = if aggregate { + int_res.as_ref().unwrap_or(res) + } else { + res + }; + if !face_hat!(dst_face).local_subs.contains_key(res) { + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); + let key_expr = Resource::decl_key(res, dst_face); + dst_face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr: key_expr, + ext_info: *sub_info, + }), + }, + res.expr(), + )); + } + } + } } } @@ -220,18 +262,11 @@ fn register_client_subscription( } } None => { - res.session_ctxs.insert( - face.id, - Arc::new(SessionContext { - face: face.clone(), - local_expr_id: None, - remote_expr_id: None, - subs: Some(*sub_info), - qabl: None, - in_interceptor_cache: None, - e_interceptor_cache: None, - }), - ); + let ctx = res + .session_ctxs + .entry(face.id) + .or_insert_with(|| Arc::new(SessionContext::new(face.clone()))); + get_mut_unchecked(ctx).subs = Some(*sub_info); } } } @@ -273,6 +308,13 @@ fn client_subs(res: &Arc) -> Vec> { .collect() } +#[inline] +fn remote_client_subs(res: &Arc, face: &Arc) -> bool { + res.session_ctxs + .values() + .any(|ctx| ctx.face.id != face.id && ctx.subs.is_some()) +} + #[inline] fn send_forget_sourced_subscription_to_net_childs( tables: &Tables, @@ -313,8 +355,8 @@ fn send_forget_sourced_subscription_to_net_childs( } fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc) { - for face in tables.faces.values_mut() { - if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { + for mut face in tables.faces.values().cloned() { + if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { interest_id: None, @@ -329,6 +371,35 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc res.expr(), )); } + for res in face_hat!(face) + .local_subs + .keys() + .cloned() + .collect::>>() + { + if !res.context().matches.iter().any(|m| { + m.upgrade().is_some_and(|m| { + m.context.is_some() + && (remote_client_subs(&m, &face) || remote_peer_subs(tables, &m)) + }) + }) { + if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } + } + } } } @@ -417,8 +488,9 @@ pub(super) fn undeclare_client_subscription( if client_subs.is_empty() { undeclare_peer_subscription(tables, None, res, &tables.zid.clone()); } + if client_subs.len() == 1 && !peer_subs { - let face = &mut client_subs[0]; + let mut face = &mut client_subs[0]; if !(face.whatami == WhatAmI::Client && res.expr().starts_with(PREFIX_LIVELINESS)) { if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( @@ -435,6 +507,35 @@ pub(super) fn undeclare_client_subscription( res.expr(), )); } + for res in face_hat!(face) + .local_subs + .keys() + .cloned() + .collect::>>() + { + if !res.context().matches.iter().any(|m| { + m.upgrade().is_some_and(|m| { + m.context.is_some() + && (remote_client_subs(&m, face) || remote_peer_subs(tables, &m)) + }) + }) { + if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } + } + } } } } @@ -453,32 +554,8 @@ fn forget_client_subscription( } } -pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; - - if face.whatami == WhatAmI::Client { - for sub in &hat!(tables).peer_subs { - let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(face).local_subs.insert(sub.clone(), id); - let key_expr = Resource::decl_key(sub, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id, - wire_expr: key_expr, - ext_info: sub_info, - }), - }, - sub.expr(), - )); - } - } +pub(super) fn pubsub_new_face(_tables: &mut Tables, _face: &mut Arc) { + // Nothing to do } pub(super) fn pubsub_remove_node(tables: &mut Tables, node: &ZenohId) { @@ -534,40 +611,129 @@ pub(super) fn pubsub_tree_change(tables: &mut Tables, new_childs: &[Vec, -) { - if net.trees.len() > source as usize { - for sub in subs { - if let Some(sub_idx) = net.get_idx(sub) { - if net.trees[source as usize].directions.len() > sub_idx.index() { - if let Some(direction) = net.trees[source as usize].directions[sub_idx.index()] - { - if net.graph.contains_node(direction) { - if let Some(face) = tables.get_face(&net.graph[direction].zid) { - route.entry(face.id).or_insert_with(|| { - let key_expr = - Resource::get_best_key(expr.prefix, expr.suffix, face.id); - (face.clone(), key_expr.to_owned(), source) - }); - } +impl HatPubSubTrait for HatCode { + fn declare_sub_interest( + &self, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, + res: Option<&mut Arc>, + mode: InterestMode, + aggregate: bool, + ) { + if mode.current() && face.whatami == WhatAmI::Client { + let interest_id = mode.future().then_some(id); + let sub_info = SubscriberInfo { + reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers + }; + if let Some(res) = res.as_ref() { + if aggregate { + if hat!(tables).peer_subs.iter().any(|sub| { + sub.context.is_some() + && sub.matches(res) + && (remote_client_subs(sub, face) || remote_peer_subs(tables, sub)) + }) { + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face).local_subs.insert((*res).clone(), id); + id + } else { + 0 + }; + let wire_expr = Resource::decl_key(res, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr, + ext_info: sub_info, + }), + }, + res.expr(), + )); + } + } else { + for sub in &hat!(tables).peer_subs { + if sub.context.is_some() + && sub.matches(res) + && (remote_client_subs(sub, face) || remote_peer_subs(tables, sub)) + { + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face).local_subs.insert(sub.clone(), id); + id + } else { + 0 + }; + let wire_expr = Resource::decl_key(sub, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr, + ext_info: sub_info, + }), + }, + sub.expr(), + )); } } } + } else { + for sub in &hat!(tables).peer_subs { + if sub.context.is_some() + && (remote_client_subs(sub, face) || remote_peer_subs(tables, sub)) + { + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face).local_subs.insert(sub.clone(), id); + id + } else { + 0 + }; + let wire_expr = Resource::decl_key(sub, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr, + ext_info: sub_info, + }), + }, + sub.expr(), + )); + } + } } } - } else { - tracing::trace!("Tree for node sid:{} not yet ready", source); + if mode.future() { + face_hat_mut!(face) + .remote_sub_interests + .insert(id, (res.cloned(), aggregate)); + } + } + + fn undeclare_sub_interest( + &self, + _tables: &mut Tables, + face: &mut Arc, + id: InterestId, + ) { + face_hat_mut!(face).remote_sub_interests.remove(&id); } -} -impl HatPubSubTrait for HatCode { fn declare_subscription( &self, tables: &mut Tables, @@ -644,6 +810,43 @@ impl HatPubSubTrait for HatCode { source: NodeId, source_type: WhatAmI, ) -> Arc { + #[inline] + fn insert_faces_for_subs( + route: &mut Route, + expr: &RoutingExpr, + tables: &Tables, + net: &Network, + source: NodeId, + subs: &HashSet, + ) { + if net.trees.len() > source as usize { + for sub in subs { + if let Some(sub_idx) = net.get_idx(sub) { + if net.trees[source as usize].directions.len() > sub_idx.index() { + if let Some(direction) = + net.trees[source as usize].directions[sub_idx.index()] + { + if net.graph.contains_node(direction) { + if let Some(face) = tables.get_face(&net.graph[direction].zid) { + route.entry(face.id).or_insert_with(|| { + let key_expr = Resource::get_best_key( + expr.prefix, + expr.suffix, + face.id, + ); + (face.clone(), key_expr.to_owned(), source) + }); + } + } + } + } + } + } + } else { + tracing::trace!("Tree for node sid:{} not yet ready", source); + } + } + let mut route = HashMap::new(); let key_expr = expr.full_expr(); if key_expr.ends_with('/') { @@ -688,13 +891,7 @@ impl HatPubSubTrait for HatCode { for (sid, context) in &mres.session_ctxs { if context.subs.is_some() - && match tables.whatami { - WhatAmI::Router => context.face.whatami != WhatAmI::Router, - _ => { - source_type == WhatAmI::Client - || context.face.whatami == WhatAmI::Client - } - } + && (source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client) { route.entry(*sid).or_insert_with(|| { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); @@ -719,4 +916,72 @@ impl HatPubSubTrait for HatCode { fn get_data_routes_entries(&self, tables: &Tables) -> RoutesIndexes { get_routes_entries(tables) } + + fn get_matching_subscriptions( + &self, + tables: &Tables, + key_expr: &KeyExpr<'_>, + ) -> HashMap> { + #[inline] + fn insert_faces_for_subs( + route: &mut HashMap>, + tables: &Tables, + net: &Network, + source: usize, + subs: &HashSet, + ) { + if net.trees.len() > source { + for sub in subs { + if let Some(sub_idx) = net.get_idx(sub) { + if net.trees[source].directions.len() > sub_idx.index() { + if let Some(direction) = net.trees[source].directions[sub_idx.index()] { + if net.graph.contains_node(direction) { + if let Some(face) = tables.get_face(&net.graph[direction].zid) { + route.entry(face.id).or_insert_with(|| face.clone()); + } + } + } + } + } + } + } else { + tracing::trace!("Tree for node sid:{} not yet ready", source); + } + } + + let mut matching_subscriptions = HashMap::new(); + if key_expr.ends_with('/') { + return matching_subscriptions; + } + tracing::trace!("get_matching_subscriptions({})", key_expr,); + + let res = Resource::get_resource(&tables.root_res, key_expr); + let matches = res + .as_ref() + .and_then(|res| res.context.as_ref()) + .map(|ctx| Cow::from(&ctx.matches)) + .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); + + for mres in matches.iter() { + let mres = mres.upgrade().unwrap(); + + let net = hat!(tables).peers_net.as_ref().unwrap(); + insert_faces_for_subs( + &mut matching_subscriptions, + tables, + net, + net.idx.index(), + &res_hat!(mres).peer_subs, + ); + + for (sid, context) in &mres.session_ctxs { + if context.subs.is_some() { + matching_subscriptions + .entry(*sid) + .or_insert_with(|| context.face.clone()); + } + } + } + matching_subscriptions + } } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index bed683f717..3d9babbd5d 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -28,9 +28,12 @@ use zenoh_protocol::{ }, WhatAmI, WireExpr, ZenohId, }, - network::declare::{ - common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, DeclareBody, - DeclareQueryable, QueryableId, UndeclareQueryable, + network::{ + declare::{ + common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, + DeclareBody, DeclareQueryable, QueryableId, UndeclareQueryable, + }, + interest::{InterestId, InterestMode}, }, }; use zenoh_sync::get_mut_unchecked; @@ -46,7 +49,7 @@ use crate::net::routing::{ resource::{NodeId, Resource, SessionContext}, tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, }, - hat::{HatQueriesTrait, Sources}, + hat::{CurrentFutureTrait, HatQueriesTrait, Sources}, router::RoutesIndexes, RoutingContext, PREFIX_LIVELINESS, }; @@ -170,6 +173,10 @@ fn propagate_simple_queryable( if (src_face.is_none() || src_face.as_ref().unwrap().id != dst_face.id) && (current.is_none() || current.unwrap().1 != info) && dst_face.whatami == WhatAmI::Client + && face_hat!(dst_face) + .remote_qabl_interests + .values() + .any(|si| si.as_ref().map(|si| si.matches(res)).unwrap_or(true)) { let id = current .map(|c| c.0) @@ -279,17 +286,11 @@ fn register_client_queryable( // Register queryable { let res = get_mut_unchecked(res); - get_mut_unchecked(res.session_ctxs.entry(face.id).or_insert_with(|| { - Arc::new(SessionContext { - face: face.clone(), - local_expr_id: None, - remote_expr_id: None, - subs: None, - qabl: None, - in_interceptor_cache: None, - e_interceptor_cache: None, - }) - })) + get_mut_unchecked( + res.session_ctxs + .entry(face.id) + .or_insert_with(|| Arc::new(SessionContext::new(face.clone()))), + ) .qabl = Some(*qabl_info); } face_hat_mut!(face).remote_qabls.insert(id, res.clone()); @@ -331,6 +332,13 @@ fn client_qabls(res: &Arc) -> Vec> { .collect() } +#[inline] +fn remote_client_qabls(res: &Arc, face: &Arc) -> bool { + res.session_ctxs + .values() + .any(|ctx| ctx.face.id != face.id && ctx.qabl.is_some()) +} + #[inline] fn send_forget_sourced_queryable_to_net_childs( tables: &Tables, @@ -371,8 +379,8 @@ fn send_forget_sourced_queryable_to_net_childs( } fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { - for face in tables.faces.values_mut() { - if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { + for mut face in tables.faces.values().cloned() { + if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { interest_id: None, @@ -387,6 +395,35 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc>>() + { + if !res.context().matches.iter().any(|m| { + m.upgrade().is_some_and(|m| { + m.context.is_some() + && (remote_client_qabls(&m, &face) || remote_peer_qabls(tables, &m)) + }) + }) { + if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(&res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } + } + } } } @@ -485,7 +522,7 @@ pub(super) fn undeclare_client_queryable( } if client_qabls.len() == 1 && !peer_qabls { - let face = &mut client_qabls[0]; + let mut face = &mut client_qabls[0]; if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -501,6 +538,35 @@ pub(super) fn undeclare_client_queryable( res.expr(), )); } + for res in face_hat!(face) + .local_qabls + .keys() + .cloned() + .collect::>>() + { + if !res.context().matches.iter().any(|m| { + m.upgrade().is_some_and(|m| { + m.context.is_some() + && (remote_client_qabls(&m, face) || remote_peer_qabls(tables, &m)) + }) + }) { + if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(&res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } + } + } } } } @@ -518,33 +584,8 @@ fn forget_client_queryable( } } -pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { - if face.whatami == WhatAmI::Client { - for qabl in &hat!(tables).peer_qabls { - if qabl.context.is_some() { - let info = local_qabl_info(tables, qabl, face); - let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(face) - .local_qabls - .insert(qabl.clone(), (id, info)); - let key_expr = Resource::decl_key(qabl, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareQueryable(DeclareQueryable { - id, - wire_expr: key_expr, - ext_info: info, - }), - }, - qabl.expr(), - )); - } - } - } +pub(super) fn queries_new_face(_tables: &mut Tables, _face: &mut Arc) { + // Nothing to do } pub(super) fn queries_remove_node(tables: &mut Tables, node: &ZenohId) { @@ -644,6 +685,134 @@ lazy_static::lazy_static! { } impl HatQueriesTrait for HatCode { + fn declare_qabl_interest( + &self, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, + res: Option<&mut Arc>, + mode: InterestMode, + aggregate: bool, + ) { + if mode.current() && face.whatami == WhatAmI::Client { + let interest_id = mode.future().then_some(id); + if let Some(res) = res.as_ref() { + if aggregate { + if hat!(tables).peer_qabls.iter().any(|qabl| { + qabl.context.is_some() + && qabl.matches(res) + && (remote_client_qabls(qabl, face) || remote_peer_qabls(tables, qabl)) + }) { + let info = local_qabl_info(tables, res, face); + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face) + .local_qabls + .insert((*res).clone(), (id, info)); + id + } else { + 0 + }; + let wire_expr = Resource::decl_key(res, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id, + wire_expr, + ext_info: info, + }), + }, + res.expr(), + )); + } + } else { + for qabl in hat!(tables).peer_qabls.iter() { + if qabl.context.is_some() + && qabl.matches(res) + && (remote_client_qabls(qabl, face) || remote_peer_qabls(tables, qabl)) + { + let info = local_qabl_info(tables, qabl, face); + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face) + .local_qabls + .insert(qabl.clone(), (id, info)); + id + } else { + 0 + }; + let key_expr = Resource::decl_key(qabl, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id, + wire_expr: key_expr, + ext_info: info, + }), + }, + qabl.expr(), + )); + } + } + } + } else { + for qabl in hat!(tables).peer_qabls.iter() { + if qabl.context.is_some() + && (remote_client_qabls(qabl, face) || remote_peer_qabls(tables, qabl)) + { + let info = local_qabl_info(tables, qabl, face); + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face) + .local_qabls + .insert(qabl.clone(), (id, info)); + id + } else { + 0 + }; + let key_expr = Resource::decl_key(qabl, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id, + wire_expr: key_expr, + ext_info: info, + }), + }, + qabl.expr(), + )); + } + } + } + } + if mode.future() { + face_hat_mut!(face) + .remote_qabl_interests + .insert(id, res.cloned()); + } + } + + fn undeclare_qabl_interest( + &self, + _tables: &mut Tables, + face: &mut Arc, + id: InterestId, + ) { + face_hat_mut!(face).remote_qabl_interests.remove(&id); + } + fn declare_queryable( &self, tables: &mut Tables, @@ -765,10 +934,7 @@ impl HatQueriesTrait for HatCode { ); for (sid, context) in &mres.session_ctxs { - if match tables.whatami { - WhatAmI::Router => context.face.whatami != WhatAmI::Router, - _ => source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client, - } { + if source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); if let Some(qabl_info) = context.qabl.as_ref() { route.push(QueryTargetQabl { diff --git a/zenoh/src/net/routing/hat/mod.rs b/zenoh/src/net/routing/hat/mod.rs index 5eb812df71..3a7844ea44 100644 --- a/zenoh/src/net/routing/hat/mod.rs +++ b/zenoh/src/net/routing/hat/mod.rs @@ -17,7 +17,7 @@ //! This module is intended for Zenoh's internal use. //! //! [Click here for Zenoh's documentation](../zenoh/index.html) -use std::{any::Any, sync::Arc}; +use std::{any::Any, collections::HashMap, sync::Arc}; use zenoh_buffers::ZBuf; use zenoh_config::{unwrap_or_default, Config, WhatAmI, ZenohId}; @@ -28,6 +28,7 @@ use zenoh_protocol::{ queryable::ext::QueryableInfoType, subscriber::ext::SubscriberInfo, QueryableId, SubscriberId, }, + interest::{InterestId, InterestMode}, Oam, }, }; @@ -41,7 +42,7 @@ use super::{ }, router::RoutesIndexes, }; -use crate::net::runtime::Runtime; +use crate::{key_expr::KeyExpr, runtime::Runtime}; mod client; mod linkstate_peer; @@ -135,6 +136,22 @@ pub(crate) trait HatBaseTrait { } pub(crate) trait HatPubSubTrait { + #[allow(clippy::too_many_arguments)] // TODO refactor + fn declare_sub_interest( + &self, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, + res: Option<&mut Arc>, + mode: InterestMode, + aggregate: bool, + ); + fn undeclare_sub_interest( + &self, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, + ); fn declare_subscription( &self, tables: &mut Tables, @@ -164,9 +181,31 @@ pub(crate) trait HatPubSubTrait { ) -> Arc; fn get_data_routes_entries(&self, tables: &Tables) -> RoutesIndexes; + + fn get_matching_subscriptions( + &self, + tables: &Tables, + key_expr: &KeyExpr<'_>, + ) -> HashMap>; } pub(crate) trait HatQueriesTrait { + #[allow(clippy::too_many_arguments)] // TODO refactor + fn declare_qabl_interest( + &self, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, + res: Option<&mut Arc>, + mode: InterestMode, + aggregate: bool, + ); + fn undeclare_qabl_interest( + &self, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, + ); fn declare_queryable( &self, tables: &mut Tables, @@ -219,3 +258,20 @@ pub(crate) fn new_hat(whatami: WhatAmI, config: &Config) -> Box Box::new(router::HatCode {}), } } + +trait CurrentFutureTrait { + fn future(&self) -> bool; + fn current(&self) -> bool; +} + +impl CurrentFutureTrait for InterestMode { + #[inline] + fn future(&self) -> bool { + self == &InterestMode::Future || self == &InterestMode::CurrentFuture + } + + #[inline] + fn current(&self) -> bool { + self == &InterestMode::Current || self == &InterestMode::CurrentFuture + } +} diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 530c181335..5ac77a3135 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -28,6 +28,7 @@ use zenoh_protocol::{ common::ZExtBody, network::{ declare::{queryable::ext::QueryableInfoType, QueryableId, SubscriberId}, + interest::InterestId, oam::id::OAM_LINKSTATE, Oam, }, @@ -357,8 +358,10 @@ impl HatContext { struct HatFace { next_id: AtomicU32, // @TODO: manage rollover and uniqueness + remote_sub_interests: HashMap>, bool)>, local_subs: HashMap, SubscriberId>, remote_subs: HashMap>, + remote_qabl_interests: HashMap>>, local_qabls: HashMap, (QueryableId, QueryableInfoType)>, remote_qabls: HashMap>, } @@ -367,8 +370,10 @@ impl HatFace { fn new() -> Self { Self { next_id: AtomicU32::new(0), + remote_sub_interests: HashMap::new(), local_subs: HashMap::new(), remote_subs: HashMap::new(), + remote_qabl_interests: HashMap::new(), local_qabls: HashMap::new(), remote_qabls: HashMap::new(), } diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index e7cf0c5e5d..31172e2804 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -19,23 +19,29 @@ use std::{ use zenoh_protocol::{ core::{key_expr::OwnedKeyExpr, Reliability, WhatAmI}, - network::declare::{ - common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, SubscriberId, UndeclareSubscriber, + network::{ + declare::{ + common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, + DeclareSubscriber, SubscriberId, UndeclareSubscriber, + }, + interest::{InterestId, InterestMode}, }, }; use zenoh_sync::get_mut_unchecked; use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace}; -use crate::net::routing::{ - dispatcher::{ - face::FaceState, - resource::{NodeId, Resource, SessionContext}, - tables::{Route, RoutingExpr, Tables}, +use crate::{ + key_expr::KeyExpr, + net::routing::{ + dispatcher::{ + face::FaceState, + resource::{NodeId, Resource, SessionContext}, + tables::{Route, RoutingExpr, Tables}, + }, + hat::{CurrentFutureTrait, HatPubSubTrait, Sources}, + router::RoutesIndexes, + RoutingContext, PREFIX_LIVELINESS, }, - hat::{HatPubSubTrait, Sources}, - router::RoutesIndexes, - RoutingContext, PREFIX_LIVELINESS, }; #[inline] @@ -51,23 +57,59 @@ fn propagate_simple_subscription_to( && !face_hat!(dst_face).local_subs.contains_key(res) && (src_face.whatami == WhatAmI::Client || dst_face.whatami == WhatAmI::Client) { - let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id, - wire_expr: key_expr, - ext_info: *sub_info, - }), - }, - res.expr(), - )); + if dst_face.whatami != WhatAmI::Client { + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); + let key_expr = Resource::decl_key(res, dst_face); + dst_face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr: key_expr, + ext_info: *sub_info, + }), + }, + res.expr(), + )); + } else { + let matching_interests = face_hat!(dst_face) + .remote_sub_interests + .values() + .filter(|si| si.0.as_ref().map(|si| si.matches(res)).unwrap_or(true)) + .cloned() + .collect::>, bool)>>(); + + for (int_res, aggregate) in matching_interests { + let res = if aggregate { + int_res.as_ref().unwrap_or(res) + } else { + res + }; + if !face_hat!(dst_face).local_subs.contains_key(res) { + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); + let key_expr = Resource::decl_key(res, dst_face); + dst_face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr: key_expr, + ext_info: *sub_info, + }), + }, + res.expr(), + )); + } + } + } } } @@ -104,18 +146,11 @@ fn register_client_subscription( } } None => { - res.session_ctxs.insert( - face.id, - Arc::new(SessionContext { - face: face.clone(), - local_expr_id: None, - remote_expr_id: None, - subs: Some(*sub_info), - qabl: None, - in_interceptor_cache: None, - e_interceptor_cache: None, - }), - ); + let ctx = res + .session_ctxs + .entry(face.id) + .or_insert_with(|| Arc::new(SessionContext::new(face.clone()))); + get_mut_unchecked(ctx).subs = Some(*sub_info); } } } @@ -169,9 +204,16 @@ fn client_subs(res: &Arc) -> Vec> { .collect() } +#[inline] +fn remote_client_subs(res: &Arc, face: &Arc) -> bool { + res.session_ctxs + .values() + .any(|ctx| ctx.face.id != face.id && ctx.subs.is_some()) +} + fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc) { - for face in tables.faces.values_mut() { - if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { + for mut face in tables.faces.values().cloned() { + if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { interest_id: None, @@ -186,6 +228,33 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc res.expr(), )); } + for res in face_hat!(face) + .local_subs + .keys() + .cloned() + .collect::>>() + { + if !res.context().matches.iter().any(|m| { + m.upgrade() + .is_some_and(|m| m.context.is_some() && remote_client_subs(&m, &face)) + }) { + if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } + } + } } } @@ -203,8 +272,9 @@ pub(super) fn undeclare_client_subscription( if client_subs.is_empty() { propagate_forget_simple_subscription(tables, res); } + if client_subs.len() == 1 { - let face = &mut client_subs[0]; + let mut face = &mut client_subs[0]; if !(face.whatami == WhatAmI::Client && res.expr().starts_with(PREFIX_LIVELINESS)) { if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( @@ -221,6 +291,33 @@ pub(super) fn undeclare_client_subscription( res.expr(), )); } + for res in face_hat!(face) + .local_subs + .keys() + .cloned() + .collect::>>() + { + if !res.context().matches.iter().any(|m| { + m.upgrade() + .is_some_and(|m| m.context.is_some() && remote_client_subs(&m, face)) + }) { + if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } + } + } } } } @@ -240,22 +337,168 @@ fn forget_client_subscription( } pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; - for src_face in tables - .faces - .values() - .cloned() - .collect::>>() - { - for sub in face_hat!(src_face).remote_subs.values() { - propagate_simple_subscription_to(tables, face, sub, &sub_info, &mut src_face.clone()); + if face.whatami != WhatAmI::Client { + let sub_info = SubscriberInfo { + reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers + }; + for src_face in tables + .faces + .values() + .cloned() + .collect::>>() + { + for sub in face_hat!(src_face).remote_subs.values() { + propagate_simple_subscription_to( + tables, + face, + sub, + &sub_info, + &mut src_face.clone(), + ); + } } } } impl HatPubSubTrait for HatCode { + fn declare_sub_interest( + &self, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, + res: Option<&mut Arc>, + mode: InterestMode, + aggregate: bool, + ) { + if mode.current() && face.whatami == WhatAmI::Client { + let interest_id = mode.future().then_some(id); + let sub_info = SubscriberInfo { + reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers + }; + if let Some(res) = res.as_ref() { + if aggregate { + if tables.faces.values().any(|src_face| { + src_face.id != face.id + && face_hat!(src_face) + .remote_subs + .values() + .any(|sub| sub.context.is_some() && sub.matches(res)) + }) { + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face).local_subs.insert((*res).clone(), id); + id + } else { + 0 + }; + let wire_expr = Resource::decl_key(res, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr, + ext_info: sub_info, + }), + }, + res.expr(), + )); + } + } else { + for src_face in tables + .faces + .values() + .cloned() + .collect::>>() + { + if src_face.id != face.id { + for sub in face_hat!(src_face).remote_subs.values() { + if sub.context.is_some() && sub.matches(res) { + let id = if mode.future() { + let id = + face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face).local_subs.insert(sub.clone(), id); + id + } else { + 0 + }; + let wire_expr = Resource::decl_key(sub, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber( + DeclareSubscriber { + id, + wire_expr, + ext_info: sub_info, + }, + ), + }, + sub.expr(), + )); + } + } + } + } + } + } else { + for src_face in tables + .faces + .values() + .cloned() + .collect::>>() + { + if src_face.id != face.id { + for sub in face_hat!(src_face).remote_subs.values() { + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face).local_subs.insert(sub.clone(), id); + id + } else { + 0 + }; + let wire_expr = Resource::decl_key(sub, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr, + ext_info: sub_info, + }), + }, + sub.expr(), + )); + } + } + } + } + } + if mode.future() { + face_hat_mut!(face) + .remote_sub_interests + .insert(id, (res.cloned(), aggregate)); + } + } + + fn undeclare_sub_interest( + &self, + _tables: &mut Tables, + face: &mut Arc, + id: InterestId, + ) { + face_hat_mut!(face).remote_sub_interests.remove(&id); + } + fn declare_subscription( &self, tables: &mut Tables, @@ -334,13 +577,7 @@ impl HatPubSubTrait for HatCode { for (sid, context) in &mres.session_ctxs { if context.subs.is_some() - && match tables.whatami { - WhatAmI::Router => context.face.whatami != WhatAmI::Router, - _ => { - source_type == WhatAmI::Client - || context.face.whatami == WhatAmI::Client - } - } + && (source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client) { route.entry(*sid).or_insert_with(|| { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); @@ -365,4 +602,35 @@ impl HatPubSubTrait for HatCode { fn get_data_routes_entries(&self, _tables: &Tables) -> RoutesIndexes { get_routes_entries() } + + fn get_matching_subscriptions( + &self, + tables: &Tables, + key_expr: &KeyExpr<'_>, + ) -> HashMap> { + let mut matching_subscriptions = HashMap::new(); + if key_expr.ends_with('/') { + return matching_subscriptions; + } + tracing::trace!("get_matching_subscriptions({})", key_expr,); + let res = Resource::get_resource(&tables.root_res, key_expr); + let matches = res + .as_ref() + .and_then(|res| res.context.as_ref()) + .map(|ctx| Cow::from(&ctx.matches)) + .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); + + for mres in matches.iter() { + let mres = mres.upgrade().unwrap(); + + for (sid, context) in &mres.session_ctxs { + if context.subs.is_some() { + matching_subscriptions + .entry(*sid) + .or_insert_with(|| context.face.clone()); + } + } + } + matching_subscriptions + } } diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index f0de12d7b9..1801f66c84 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -27,9 +27,12 @@ use zenoh_protocol::{ }, WhatAmI, WireExpr, }, - network::declare::{ - common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, DeclareBody, - DeclareQueryable, QueryableId, UndeclareQueryable, + network::{ + declare::{ + common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, + DeclareBody, DeclareQueryable, QueryableId, UndeclareQueryable, + }, + interest::{InterestId, InterestMode}, }, }; use zenoh_sync::get_mut_unchecked; @@ -41,7 +44,7 @@ use crate::net::routing::{ resource::{NodeId, Resource, SessionContext}, tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, }, - hat::{HatQueriesTrait, Sources}, + hat::{CurrentFutureTrait, HatQueriesTrait, Sources}, router::RoutesIndexes, RoutingContext, PREFIX_LIVELINESS, }; @@ -77,43 +80,62 @@ fn local_qabl_info( .unwrap_or(QueryableInfoType::DEFAULT) } +#[inline] +fn propagate_simple_queryable_to( + tables: &mut Tables, + dst_face: &mut Arc, + res: &Arc, + src_face: &Option<&mut Arc>, +) { + let info = local_qabl_info(tables, res, dst_face); + let current = face_hat!(dst_face).local_qabls.get(res); + if (src_face.is_none() || src_face.as_ref().unwrap().id != dst_face.id) + && (current.is_none() || current.unwrap().1 != info) + && (dst_face.whatami != WhatAmI::Client + || face_hat!(dst_face) + .remote_qabl_interests + .values() + .any(|si| si.as_ref().map(|si| si.matches(res)).unwrap_or(true))) + && (src_face.is_none() + || src_face.as_ref().unwrap().whatami == WhatAmI::Client + || dst_face.whatami == WhatAmI::Client) + { + let id = current + .map(|c| c.0) + .unwrap_or(face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst)); + face_hat_mut!(dst_face) + .local_qabls + .insert(res.clone(), (id, info)); + let key_expr = Resource::decl_key(res, dst_face); + dst_face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id, + wire_expr: key_expr, + ext_info: info, + }), + }, + res.expr(), + )); + } +} + fn propagate_simple_queryable( tables: &mut Tables, res: &Arc, src_face: Option<&mut Arc>, ) { - let faces = tables.faces.values().cloned(); + let faces = tables + .faces + .values() + .cloned() + .collect::>>(); for mut dst_face in faces { - let info = local_qabl_info(tables, res, &dst_face); - let current = face_hat!(dst_face).local_qabls.get(res); - if (src_face.is_none() || src_face.as_ref().unwrap().id != dst_face.id) - && (current.is_none() || current.unwrap().1 != info) - && (src_face.is_none() - || src_face.as_ref().unwrap().whatami == WhatAmI::Client - || dst_face.whatami == WhatAmI::Client) - { - let id = current - .map(|c| c.0) - .unwrap_or(face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst)); - face_hat_mut!(&mut dst_face) - .local_qabls - .insert(res.clone(), (id, info)); - let key_expr = Resource::decl_key(res, &mut dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareQueryable(DeclareQueryable { - id, - wire_expr: key_expr, - ext_info: info, - }), - }, - res.expr(), - )); - } + propagate_simple_queryable_to(tables, &mut dst_face, res, &src_face); } } @@ -127,17 +149,11 @@ fn register_client_queryable( // Register queryable { let res = get_mut_unchecked(res); - get_mut_unchecked(res.session_ctxs.entry(face.id).or_insert_with(|| { - Arc::new(SessionContext { - face: face.clone(), - local_expr_id: None, - remote_expr_id: None, - subs: None, - qabl: None, - in_interceptor_cache: None, - e_interceptor_cache: None, - }) - })) + get_mut_unchecked( + res.session_ctxs + .entry(face.id) + .or_insert_with(|| Arc::new(SessionContext::new(face.clone()))), + ) .qabl = Some(*qabl_info); } face_hat_mut!(face).remote_qabls.insert(id, res.clone()); @@ -168,6 +184,13 @@ fn client_qabls(res: &Arc) -> Vec> { .collect() } +#[inline] +fn remote_client_qabls(res: &Arc, face: &Arc) -> bool { + res.session_ctxs + .values() + .any(|ctx| ctx.face.id != face.id && ctx.qabl.is_some()) +} + fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { for face in tables.faces.values_mut() { if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { @@ -185,6 +208,33 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc>>() + { + if !res.context().matches.iter().any(|m| { + m.upgrade() + .is_some_and(|m| m.context.is_some() && remote_client_qabls(&m, face)) + }) { + if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(&res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } + } + } } } @@ -209,7 +259,7 @@ pub(super) fn undeclare_client_queryable( propagate_simple_queryable(tables, res, None); } if client_qabls.len() == 1 { - let face = &mut client_qabls[0]; + let mut face = &mut client_qabls[0]; if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -225,6 +275,33 @@ pub(super) fn undeclare_client_queryable( res.expr(), )); } + for res in face_hat!(face) + .local_qabls + .keys() + .cloned() + .collect::>>() + { + if !res.context().matches.iter().any(|m| { + m.upgrade() + .is_some_and(|m| m.context.is_some() && (remote_client_qabls(&m, face))) + }) { + if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(&res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } + } + } } } } @@ -242,15 +319,17 @@ fn forget_client_queryable( } } -pub(super) fn queries_new_face(tables: &mut Tables, _face: &mut Arc) { - for face in tables - .faces - .values() - .cloned() - .collect::>>() - { - for qabl in face_hat!(face).remote_qabls.values() { - propagate_simple_queryable(tables, qabl, Some(&mut face.clone())); +pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { + if face.whatami != WhatAmI::Client { + for src_face in tables + .faces + .values() + .cloned() + .collect::>>() + { + for qabl in face_hat!(src_face).remote_qabls.values() { + propagate_simple_queryable_to(tables, face, qabl, &Some(&mut src_face.clone())); + } } } } @@ -260,6 +339,150 @@ lazy_static::lazy_static! { } impl HatQueriesTrait for HatCode { + fn declare_qabl_interest( + &self, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, + res: Option<&mut Arc>, + mode: InterestMode, + aggregate: bool, + ) { + if mode.current() && face.whatami == WhatAmI::Client { + let interest_id = mode.future().then_some(id); + if let Some(res) = res.as_ref() { + if aggregate { + if tables.faces.values().any(|src_face| { + src_face.id != face.id + && face_hat!(src_face) + .remote_qabls + .values() + .any(|qabl| qabl.context.is_some() && qabl.matches(res)) + }) { + let info = local_qabl_info(tables, res, face); + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face) + .local_qabls + .insert((*res).clone(), (id, info)); + id + } else { + 0 + }; + let wire_expr = Resource::decl_key(res, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id, + wire_expr, + ext_info: info, + }), + }, + res.expr(), + )); + } + } else { + for src_face in tables + .faces + .values() + .cloned() + .collect::>>() + { + if src_face.id != face.id { + for qabl in face_hat!(src_face).remote_qabls.values() { + if qabl.context.is_some() && qabl.matches(res) { + let info = local_qabl_info(tables, qabl, face); + let id = if mode.future() { + let id = + face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face) + .local_qabls + .insert(qabl.clone(), (id, info)); + id + } else { + 0 + }; + let key_expr = Resource::decl_key(qabl, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id, + wire_expr: key_expr, + ext_info: info, + }), + }, + qabl.expr(), + )); + } + } + } + } + } + } else { + for src_face in tables + .faces + .values() + .cloned() + .collect::>>() + { + if src_face.id != face.id { + for qabl in face_hat!(src_face).remote_qabls.values() { + if qabl.context.is_some() { + let info = local_qabl_info(tables, qabl, face); + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face) + .local_qabls + .insert(qabl.clone(), (id, info)); + id + } else { + 0 + }; + let key_expr = Resource::decl_key(qabl, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id, + wire_expr: key_expr, + ext_info: info, + }), + }, + qabl.expr(), + )); + } + } + } + } + } + } + if mode.future() { + face_hat_mut!(face) + .remote_qabl_interests + .insert(id, res.cloned()); + } + } + + fn undeclare_qabl_interest( + &self, + _tables: &mut Tables, + face: &mut Arc, + id: InterestId, + ) { + face_hat_mut!(face).remote_qabl_interests.remove(&id); + } + fn declare_queryable( &self, tables: &mut Tables, @@ -337,10 +560,7 @@ impl HatQueriesTrait for HatCode { let mres = mres.upgrade().unwrap(); let complete = DEFAULT_INCLUDER.includes(mres.expr().as_bytes(), key_expr.as_bytes()); for (sid, context) in &mres.session_ctxs { - if match tables.whatami { - WhatAmI::Router => context.face.whatami != WhatAmI::Router, - _ => source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client, - } { + if source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); if let Some(qabl_info) = context.qabl.as_ref() { route.push(QueryTargetQabl { diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index f573acee43..54b132f665 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -30,6 +30,7 @@ use zenoh_protocol::{ common::ZExtBody, network::{ declare::{queryable::ext::QueryableInfoType, QueryableId, SubscriberId}, + interest::InterestId, oam::id::OAM_LINKSTATE, Oam, }, @@ -785,8 +786,10 @@ impl HatContext { struct HatFace { link_id: usize, next_id: AtomicU32, // @TODO: manage rollover and uniqueness + remote_sub_interests: HashMap>, bool)>, local_subs: HashMap, SubscriberId>, remote_subs: HashMap>, + remote_qabl_interests: HashMap>>, local_qabls: HashMap, (QueryableId, QueryableInfoType)>, remote_qabls: HashMap>, } @@ -796,8 +799,10 @@ impl HatFace { Self { link_id: 0, next_id: AtomicU32::new(0), + remote_sub_interests: HashMap::new(), local_subs: HashMap::new(), remote_subs: HashMap::new(), + remote_qabl_interests: HashMap::new(), local_qabls: HashMap::new(), remote_qabls: HashMap::new(), } diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index 14726ac970..3bfb0fdd6f 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -20,9 +20,12 @@ use std::{ use petgraph::graph::NodeIndex; use zenoh_protocol::{ core::{key_expr::OwnedKeyExpr, Reliability, WhatAmI, ZenohId}, - network::declare::{ - common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, SubscriberId, UndeclareSubscriber, + network::{ + declare::{ + common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, + DeclareSubscriber, SubscriberId, UndeclareSubscriber, + }, + interest::{InterestId, InterestMode}, }, }; use zenoh_sync::get_mut_unchecked; @@ -31,16 +34,19 @@ use super::{ face_hat, face_hat_mut, get_peer, get_router, get_routes_entries, hat, hat_mut, network::Network, res_hat, res_hat_mut, HatCode, HatContext, HatFace, HatTables, }; -use crate::net::routing::{ - dispatcher::{ - face::FaceState, - pubsub::*, - resource::{NodeId, Resource, SessionContext}, - tables::{Route, RoutingExpr, Tables}, +use crate::{ + key_expr::KeyExpr, + net::routing::{ + dispatcher::{ + face::FaceState, + pubsub::*, + resource::{NodeId, Resource, SessionContext}, + tables::{Route, RoutingExpr, Tables}, + }, + hat::{CurrentFutureTrait, HatPubSubTrait, Sources}, + router::RoutesIndexes, + RoutingContext, PREFIX_LIVELINESS, }, - hat::{HatPubSubTrait, Sources}, - router::RoutesIndexes, - RoutingContext, PREFIX_LIVELINESS, }; #[inline] @@ -105,23 +111,59 @@ fn propagate_simple_subscription_to( || hat!(tables).failover_brokering(src_face.zid, dst_face.zid)) } { - let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id, - wire_expr: key_expr, - ext_info: *sub_info, - }), - }, - res.expr(), - )); + if dst_face.whatami != WhatAmI::Client { + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); + let key_expr = Resource::decl_key(res, dst_face); + dst_face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr: key_expr, + ext_info: *sub_info, + }), + }, + res.expr(), + )); + } else { + let matching_interests = face_hat!(dst_face) + .remote_sub_interests + .values() + .filter(|si| si.0.as_ref().map(|si| si.matches(res)).unwrap_or(true)) + .cloned() + .collect::>, bool)>>(); + + for (int_res, aggregate) in matching_interests { + let res = if aggregate { + int_res.as_ref().unwrap_or(res) + } else { + res + }; + if !face_hat!(dst_face).local_subs.contains_key(res) { + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); + let key_expr = Resource::decl_key(res, dst_face); + dst_face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr: key_expr, + ext_info: *sub_info, + }), + }, + res.expr(), + )); + } + } + } } } @@ -272,18 +314,11 @@ fn register_client_subscription( } } None => { - res.session_ctxs.insert( - face.id, - Arc::new(SessionContext { - face: face.clone(), - local_expr_id: None, - remote_expr_id: None, - subs: Some(*sub_info), - qabl: None, - in_interceptor_cache: None, - e_interceptor_cache: None, - }), - ); + let ctx = res + .session_ctxs + .entry(face.id) + .or_insert_with(|| Arc::new(SessionContext::new(face.clone()))); + get_mut_unchecked(ctx).subs = Some(*sub_info); } } } @@ -334,6 +369,13 @@ fn client_subs(res: &Arc) -> Vec> { .collect() } +#[inline] +fn remote_client_subs(res: &Arc, face: &Arc) -> bool { + res.session_ctxs + .values() + .any(|ctx| ctx.face.id != face.id && ctx.subs.is_some()) +} + #[inline] fn send_forget_sourced_subscription_to_net_childs( tables: &Tables, @@ -374,8 +416,8 @@ fn send_forget_sourced_subscription_to_net_childs( } fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc) { - for face in tables.faces.values_mut() { - if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { + for mut face in tables.faces.values().cloned() { + if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { interest_id: None, @@ -390,6 +432,37 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc res.expr(), )); } + for res in face_hat!(&mut face) + .local_subs + .keys() + .cloned() + .collect::>>() + { + if !res.context().matches.iter().any(|m| { + m.upgrade().is_some_and(|m| { + m.context.is_some() + && (remote_client_subs(&m, &face) + || remote_peer_subs(tables, &m) + || remote_router_subs(tables, &m)) + }) + }) { + if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } + } + } } } @@ -563,8 +636,9 @@ pub(super) fn undeclare_client_subscription( } else { propagate_forget_simple_subscription_to_peers(tables, res); } + if client_subs.len() == 1 && !router_subs && !peer_subs { - let face = &mut client_subs[0]; + let mut face = &mut client_subs[0]; if !(face.whatami == WhatAmI::Client && res.expr().starts_with(PREFIX_LIVELINESS)) { if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( @@ -581,6 +655,37 @@ pub(super) fn undeclare_client_subscription( res.expr(), )); } + for res in face_hat!(face) + .local_subs + .keys() + .cloned() + .collect::>>() + { + if !res.context().matches.iter().any(|m| { + m.upgrade().is_some_and(|m| { + m.context.is_some() + && (remote_client_subs(&m, face) + || remote_peer_subs(tables, &m) + || remote_router_subs(tables, &m)) + }) + }) { + if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } + } + } } } } @@ -604,27 +709,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers }; - if face.whatami == WhatAmI::Client { - for sub in &hat!(tables).router_subs { - let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(face).local_subs.insert(sub.clone(), id); - let key_expr = Resource::decl_key(sub, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id, - wire_expr: key_expr, - ext_info: sub_info, - }), - }, - sub.expr(), - )); - } - } else if face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer) { + if face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer) { for sub in &hat!(tables).router_subs { if sub.context.is_some() && (res_hat!(sub).router_subs.iter().any(|r| *r != tables.zid) @@ -826,40 +911,135 @@ pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: } } -#[inline] -fn insert_faces_for_subs( - route: &mut Route, - expr: &RoutingExpr, - tables: &Tables, - net: &Network, - source: NodeId, - subs: &HashSet, -) { - if net.trees.len() > source as usize { - for sub in subs { - if let Some(sub_idx) = net.get_idx(sub) { - if net.trees[source as usize].directions.len() > sub_idx.index() { - if let Some(direction) = net.trees[source as usize].directions[sub_idx.index()] - { - if net.graph.contains_node(direction) { - if let Some(face) = tables.get_face(&net.graph[direction].zid) { - route.entry(face.id).or_insert_with(|| { - let key_expr = - Resource::get_best_key(expr.prefix, expr.suffix, face.id); - (face.clone(), key_expr.to_owned(), source) - }); - } +impl HatPubSubTrait for HatCode { + fn declare_sub_interest( + &self, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, + res: Option<&mut Arc>, + mode: InterestMode, + aggregate: bool, + ) { + if mode.current() && face.whatami == WhatAmI::Client { + let interest_id = mode.future().then_some(id); + let sub_info = SubscriberInfo { + reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers + }; + if let Some(res) = res.as_ref() { + if aggregate { + if hat!(tables).router_subs.iter().any(|sub| { + sub.context.is_some() + && sub.matches(res) + && (remote_client_subs(sub, face) + || remote_peer_subs(tables, sub) + || remote_router_subs(tables, sub)) + }) { + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face).local_subs.insert((*res).clone(), id); + id + } else { + 0 + }; + let wire_expr = Resource::decl_key(res, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr, + ext_info: sub_info, + }), + }, + res.expr(), + )); + } + } else { + for sub in &hat!(tables).router_subs { + if sub.context.is_some() + && sub.matches(res) + && (remote_client_subs(sub, face) + || remote_peer_subs(tables, sub) + || remote_router_subs(tables, sub)) + { + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face).local_subs.insert(sub.clone(), id); + id + } else { + 0 + }; + let wire_expr = Resource::decl_key(sub, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr, + ext_info: sub_info, + }), + }, + sub.expr(), + )); } } } + } else { + for sub in &hat!(tables).router_subs { + if sub.context.is_some() + && (remote_client_subs(sub, face) + || remote_peer_subs(tables, sub) + || remote_router_subs(tables, sub)) + { + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face).local_subs.insert(sub.clone(), id); + id + } else { + 0 + }; + let wire_expr = Resource::decl_key(sub, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr, + ext_info: sub_info, + }), + }, + sub.expr(), + )); + } + } } } - } else { - tracing::trace!("Tree for node sid:{} not yet ready", source); + if mode.future() { + face_hat_mut!(face) + .remote_sub_interests + .insert(id, (res.cloned(), aggregate)); + } + } + + fn undeclare_sub_interest( + &self, + _tables: &mut Tables, + face: &mut Arc, + id: InterestId, + ) { + face_hat_mut!(face).remote_sub_interests.remove(&id); } -} -impl HatPubSubTrait for HatCode { fn declare_subscription( &self, tables: &mut Tables, @@ -973,6 +1153,43 @@ impl HatPubSubTrait for HatCode { source: NodeId, source_type: WhatAmI, ) -> Arc { + #[inline] + fn insert_faces_for_subs( + route: &mut Route, + expr: &RoutingExpr, + tables: &Tables, + net: &Network, + source: NodeId, + subs: &HashSet, + ) { + if net.trees.len() > source as usize { + for sub in subs { + if let Some(sub_idx) = net.get_idx(sub) { + if net.trees[source as usize].directions.len() > sub_idx.index() { + if let Some(direction) = + net.trees[source as usize].directions[sub_idx.index()] + { + if net.graph.contains_node(direction) { + if let Some(face) = tables.get_face(&net.graph[direction].zid) { + route.entry(face.id).or_insert_with(|| { + let key_expr = Resource::get_best_key( + expr.prefix, + expr.suffix, + face.id, + ); + (face.clone(), key_expr.to_owned(), source) + }); + } + } + } + } + } + } + } else { + tracing::trace!("Tree for node sid:{} not yet ready", source); + } + } + let mut route = HashMap::new(); let key_expr = expr.full_expr(); if key_expr.ends_with('/') { @@ -1064,4 +1281,91 @@ impl HatPubSubTrait for HatCode { fn get_data_routes_entries(&self, tables: &Tables) -> RoutesIndexes { get_routes_entries(tables) } + + fn get_matching_subscriptions( + &self, + tables: &Tables, + key_expr: &KeyExpr<'_>, + ) -> HashMap> { + #[inline] + fn insert_faces_for_subs( + route: &mut HashMap>, + tables: &Tables, + net: &Network, + source: usize, + subs: &HashSet, + ) { + if net.trees.len() > source { + for sub in subs { + if let Some(sub_idx) = net.get_idx(sub) { + if net.trees[source].directions.len() > sub_idx.index() { + if let Some(direction) = net.trees[source].directions[sub_idx.index()] { + if net.graph.contains_node(direction) { + if let Some(face) = tables.get_face(&net.graph[direction].zid) { + route.entry(face.id).or_insert_with(|| face.clone()); + } + } + } + } + } + } + } else { + tracing::trace!("Tree for node sid:{} not yet ready", source); + } + } + + let mut matching_subscriptions = HashMap::new(); + if key_expr.ends_with('/') { + return matching_subscriptions; + } + tracing::trace!("get_matching_subscriptions({})", key_expr,); + + let res = Resource::get_resource(&tables.root_res, key_expr); + let matches = res + .as_ref() + .and_then(|res| res.context.as_ref()) + .map(|ctx| Cow::from(&ctx.matches)) + .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); + + let master = !hat!(tables).full_net(WhatAmI::Peer) + || *hat!(tables).elect_router(&tables.zid, key_expr, hat!(tables).shared_nodes.iter()) + == tables.zid; + + for mres in matches.iter() { + let mres = mres.upgrade().unwrap(); + + if master { + let net = hat!(tables).routers_net.as_ref().unwrap(); + insert_faces_for_subs( + &mut matching_subscriptions, + tables, + net, + net.idx.index(), + &res_hat!(mres).router_subs, + ); + } + + if hat!(tables).full_net(WhatAmI::Peer) { + let net = hat!(tables).peers_net.as_ref().unwrap(); + insert_faces_for_subs( + &mut matching_subscriptions, + tables, + net, + net.idx.index(), + &res_hat!(mres).peer_subs, + ); + } + + if master { + for (sid, context) in &mres.session_ctxs { + if context.subs.is_some() && context.face.whatami != WhatAmI::Router { + matching_subscriptions + .entry(*sid) + .or_insert_with(|| context.face.clone()); + } + } + } + } + matching_subscriptions + } } diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 9defb80081..72e3a781e5 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -28,9 +28,12 @@ use zenoh_protocol::{ }, WhatAmI, WireExpr, ZenohId, }, - network::declare::{ - common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, DeclareBody, - DeclareQueryable, QueryableId, UndeclareQueryable, + network::{ + declare::{ + common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, + DeclareBody, DeclareQueryable, QueryableId, UndeclareQueryable, + }, + interest::{InterestId, InterestMode}, }, }; use zenoh_sync::get_mut_unchecked; @@ -46,7 +49,7 @@ use crate::net::routing::{ resource::{NodeId, Resource, SessionContext}, tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, }, - hat::{HatQueriesTrait, Sources}, + hat::{CurrentFutureTrait, HatQueriesTrait, Sources}, router::RoutesIndexes, RoutingContext, PREFIX_LIVELINESS, }; @@ -238,6 +241,11 @@ fn propagate_simple_queryable( let current = face_hat!(dst_face).local_qabls.get(res); if (src_face.is_none() || src_face.as_ref().unwrap().id != dst_face.id) && (current.is_none() || current.unwrap().1 != info) + && (dst_face.whatami != WhatAmI::Client + || face_hat!(dst_face) + .remote_qabl_interests + .values() + .any(|si| si.as_ref().map(|si| si.matches(res)).unwrap_or(true))) && if full_peers_net { dst_face.whatami == WhatAmI::Client } else { @@ -404,17 +412,11 @@ fn register_client_queryable( // Register queryable { let res = get_mut_unchecked(res); - get_mut_unchecked(res.session_ctxs.entry(face.id).or_insert_with(|| { - Arc::new(SessionContext { - face: face.clone(), - local_expr_id: None, - remote_expr_id: None, - subs: None, - qabl: None, - in_interceptor_cache: None, - e_interceptor_cache: None, - }) - })) + get_mut_unchecked( + res.session_ctxs + .entry(face.id) + .or_insert_with(|| Arc::new(SessionContext::new(face.clone()))), + ) .qabl = Some(*qabl_info); } face_hat_mut!(face).remote_qabls.insert(id, res.clone()); @@ -465,6 +467,13 @@ fn client_qabls(res: &Arc) -> Vec> { .collect() } +#[inline] +fn remote_client_qabls(res: &Arc, face: &Arc) -> bool { + res.session_ctxs + .values() + .any(|ctx| ctx.face.id != face.id && ctx.qabl.is_some()) +} + #[inline] fn send_forget_sourced_queryable_to_net_childs( tables: &Tables, @@ -505,8 +514,8 @@ fn send_forget_sourced_queryable_to_net_childs( } fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { - for face in tables.faces.values_mut() { - if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { + for mut face in tables.faces.values().cloned() { + if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { interest_id: None, @@ -521,6 +530,37 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc>>() + { + if !res.context().matches.iter().any(|m| { + m.upgrade().is_some_and(|m| { + m.context.is_some() + && (remote_client_qabls(&m, &face) + || remote_peer_qabls(tables, &m) + || remote_router_qabls(tables, &m)) + }) + }) { + if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(&res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } + } + } } } @@ -707,7 +747,7 @@ pub(super) fn undeclare_client_queryable( } if client_qabls.len() == 1 && !router_qabls && !peer_qabls { - let face = &mut client_qabls[0]; + let mut face = &mut client_qabls[0]; if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -723,6 +763,37 @@ pub(super) fn undeclare_client_queryable( res.expr(), )); } + for res in face_hat!(face) + .local_qabls + .keys() + .cloned() + .collect::>>() + { + if !res.context().matches.iter().any(|m| { + m.upgrade().is_some_and(|m| { + m.context.is_some() + && (remote_client_qabls(&m, face) + || remote_peer_qabls(tables, &m) + || remote_router_qabls(tables, &m)) + }) + }) { + if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(&res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } + } + } } } } @@ -741,32 +812,7 @@ fn forget_client_queryable( } pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { - if face.whatami == WhatAmI::Client { - for qabl in hat!(tables).router_qabls.iter() { - if qabl.context.is_some() { - let info = local_qabl_info(tables, qabl, face); - let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(face) - .local_qabls - .insert(qabl.clone(), (id, info)); - let key_expr = Resource::decl_key(qabl, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareQueryable(DeclareQueryable { - id, - wire_expr: key_expr, - ext_info: info, - }), - }, - qabl.expr(), - )); - } - } - } else if face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer) { + if face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer) { for qabl in hat!(tables).router_qabls.iter() { if qabl.context.is_some() && (res_hat!(qabl).router_qabls.keys().any(|r| *r != tables.zid) @@ -864,7 +910,8 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links { let dst_face = &mut get_mut_unchecked(ctx).face; if dst_face.whatami == WhatAmI::Peer && src_face.zid != dst_face.zid { - if let Some(id) = face_hat!(dst_face).local_subs.get(res).cloned() { + if let Some((id, _)) = face_hat!(dst_face).local_qabls.get(res).cloned() + { let forget = !HatTables::failover_brokering_to(links, dst_face.zid) && { let ctx_links = hat!(tables) @@ -1021,6 +1068,140 @@ lazy_static::lazy_static! { } impl HatQueriesTrait for HatCode { + fn declare_qabl_interest( + &self, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, + res: Option<&mut Arc>, + mode: InterestMode, + aggregate: bool, + ) { + if mode.current() && face.whatami == WhatAmI::Client { + let interest_id = mode.future().then_some(id); + if let Some(res) = res.as_ref() { + if aggregate { + if hat!(tables).router_qabls.iter().any(|qabl| { + qabl.context.is_some() + && qabl.matches(res) + && (remote_client_qabls(qabl, face) + || remote_peer_qabls(tables, qabl) + || remote_router_qabls(tables, qabl)) + }) { + let info = local_qabl_info(tables, res, face); + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face) + .local_qabls + .insert((*res).clone(), (id, info)); + id + } else { + 0 + }; + let wire_expr = Resource::decl_key(res, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id, + wire_expr, + ext_info: info, + }), + }, + res.expr(), + )); + } + } else { + for qabl in hat!(tables).router_qabls.iter() { + if qabl.context.is_some() + && qabl.matches(res) + && (remote_client_qabls(qabl, face) + || remote_peer_qabls(tables, qabl) + || remote_router_qabls(tables, qabl)) + { + let info = local_qabl_info(tables, qabl, face); + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face) + .local_qabls + .insert(qabl.clone(), (id, info)); + id + } else { + 0 + }; + let key_expr = Resource::decl_key(qabl, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id, + wire_expr: key_expr, + ext_info: info, + }), + }, + qabl.expr(), + )); + } + } + } + } else { + for qabl in hat!(tables).router_qabls.iter() { + if qabl.context.is_some() + && (remote_client_qabls(qabl, face) + || remote_peer_qabls(tables, qabl) + || remote_router_qabls(tables, qabl)) + { + let info = local_qabl_info(tables, qabl, face); + let id = if mode.future() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face) + .local_qabls + .insert(qabl.clone(), (id, info)); + id + } else { + 0 + }; + let key_expr = Resource::decl_key(qabl, face); + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id, + wire_expr: key_expr, + ext_info: info, + }), + }, + qabl.expr(), + )); + } + } + } + } + if mode.future() { + face_hat_mut!(face) + .remote_qabl_interests + .insert(id, res.cloned()); + } + } + + fn undeclare_qabl_interest( + &self, + _tables: &mut Tables, + face: &mut Arc, + id: InterestId, + ) { + face_hat_mut!(face).remote_qabl_interests.remove(&id); + } + fn declare_queryable( &self, tables: &mut Tables, diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 62f6b7c8b4..12c1f26fdb 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -499,6 +499,11 @@ impl Primitives for AdminSpace { } impl crate::net::primitives::EPrimitives for AdminSpace { + #[inline] + fn send_interest(&self, ctx: crate::net::routing::RoutingContext) { + (self as &dyn Primitives).send_interest(ctx.msg) + } + #[inline] fn send_declare(&self, ctx: crate::net::routing::RoutingContext) { (self as &dyn Primitives).send_declare(ctx.msg) diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 5f04b73d53..1622c1eb52 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -534,6 +534,8 @@ impl Primitives for ClientPrimitives { } impl EPrimitives for ClientPrimitives { + fn send_interest(&self, _ctx: RoutingContext) {} + fn send_declare(&self, ctx: RoutingContext) { match ctx.msg.body { DeclareBody::DeclareKeyExpr(d) => {