Skip to content

Commit

Permalink
Enhance subscribers, queryables and liveliness tokens propagation to …
Browse files Browse the repository at this point in the history
…improve scalability (#814)

* Router implements interests protocol for clients

* Send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients for pico

* Fix WireExprExt M flag encoding/decoding

* Fix decl_key

* Clients send all samples and queries to routers and peers

* Avoid self declaration loop on interest

* Fix query/replies copy/paste bugs

* Peers implement interests protocol for clients

* Don't send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients

* Add client writer-side filtering (#863)

* Add client writer-side filtering

* Reimplement liveliness with interests

* Fix writer-side filtering before receiving FinalInterest

* Fix pubsub interest based routing after router failover

* Declare message can be Push/Request/RequestContinuous/Response

* Address review comments

* Remove F: Future flag from DeclareInterest

* cargo fmt --all

* Remove unused Interest flags field

* Update doc

* Remove unneeded interest_id field

* Update commons/zenoh-protocol/src/network/declare.rs

* Remove unused UndeclareInterest

* Implement proper Declare Request/Response id correlation

* Add new Interest network message

* Update doc

* Update codec

* Fix stable build

* Fix test_acl

* Fix writer side filtering

* Add separate functions to compute matching status

* Fix unstable imports

* Remove useless checks

---------

Co-authored-by: Luca Cominardi <[email protected]>
  • Loading branch information
OlivierHecart and Mallets authored May 13, 2024
1 parent 511bc67 commit cfb86a8
Show file tree
Hide file tree
Showing 30 changed files with 2,856 additions and 664 deletions.
6 changes: 3 additions & 3 deletions commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ where
if x.wire_expr.has_suffix() {
flags |= 1;
}
if let Mapping::Receiver = wire_expr.mapping {
if let Mapping::Sender = wire_expr.mapping {
flags |= 1 << 1;
}
codec.write(&mut zriter, flags)?;
Expand Down Expand Up @@ -998,9 +998,9 @@ where
String::new()
};
let mapping = if imsg::has_flag(flags, 1 << 1) {
Mapping::Receiver
} else {
Mapping::Sender
} else {
Mapping::Receiver
};

Ok((
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-codec/src/network/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use zenoh_protocol::{
core::WireExpr,
network::{
declare, id,
interest::{self, InterestMode, InterestOptions},
Interest, Mapping,
interest::{self, Interest, InterestMode, InterestOptions},
Mapping,
},
};

Expand Down
13 changes: 13 additions & 0 deletions commons/zenoh-protocol/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,19 @@ pub mod common {
pub mod ext {
use super::*;

/// Flags:
/// - N: Named If N==1 then the key expr has name/suffix
/// - M: Mapping if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver
///
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// |X|X|X|X|X|X|M|N|
/// +-+-+-+---------+
/// ~ key_scope:z16 ~
/// +---------------+
/// ~ key_suffix ~ if N==1 -- <u8;z16>
/// +---------------+
///
pub type WireExprExt = zextzbuf!(0x0f, true);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WireExprType {
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-protocol/src/network/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub mod flag {
pub type DeclareRequestId = u32;
pub type AtomicDeclareRequestId = AtomicU32;

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InterestMode {
Final,
Current,
Expand Down
29 changes: 11 additions & 18 deletions zenoh/src/api/builders/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
fn create_one_shot_publisher(self) -> ZResult<Publisher<'a>> {
Ok(Publisher {
session: self.session,
#[cfg(feature = "unstable")]
eid: 0, // This is a one shot Publisher
id: 0, // This is a one shot Publisher
key_expr: self.key_expr?,
congestion_control: self.congestion_control,
priority: self.priority,
Expand Down Expand Up @@ -363,22 +362,16 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> {
}
}
self.session
.declare_publication_intent(key_expr.clone())
.wait()?;
#[cfg(feature = "unstable")]
let eid = self.session.runtime.next_id();
let publisher = Publisher {
session: self.session,
#[cfg(feature = "unstable")]
eid,
key_expr,
congestion_control: self.congestion_control,
priority: self.priority,
is_express: self.is_express,
destination: self.destination,
};
tracing::trace!("publish({:?})", publisher.key_expr);
Ok(publisher)
.declare_publisher_inner(key_expr.clone(), self.destination)
.map(|id| Publisher {
session: self.session,
id,
key_expr,
congestion_control: self.congestion_control,
priority: self.priority,
is_express: self.is_express,
destination: self.destination,
})
}
}

Expand Down
37 changes: 23 additions & 14 deletions zenoh/src/api/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::{
convert::TryFrom,
fmt,
future::{IntoFuture, Ready},
pin::Pin,
task::{Context, Poll},
Expand All @@ -32,9 +33,7 @@ use zenoh_result::{Error, ZResult};
use {
crate::api::handlers::{Callback, DefaultHandler, IntoHandler},
crate::api::sample::SourceInfo,
crate::api::Id,
zenoh_protocol::core::EntityGlobalId,
zenoh_protocol::core::EntityId,
};

use super::{
Expand All @@ -48,7 +47,23 @@ use super::{
sample::{DataInfo, Locality, QoS, Sample, SampleFields, SampleKind},
session::{SessionRef, Undeclarable},
};
use crate::net::primitives::Primitives;
use crate::{api::Id, net::primitives::Primitives};

pub(crate) struct PublisherState {
pub(crate) id: Id,
pub(crate) remote_id: Id,
pub(crate) key_expr: KeyExpr<'static>,
pub(crate) destination: Locality,
}

impl fmt::Debug for PublisherState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Publisher")
.field("id", &self.id)
.field("key_expr", &self.key_expr)
.finish()
}
}

#[zenoh_macros::unstable]
#[derive(Clone)]
Expand Down Expand Up @@ -113,8 +128,7 @@ impl std::fmt::Debug for PublisherRef<'_> {
#[derive(Debug, Clone)]
pub struct Publisher<'a> {
pub(crate) session: SessionRef<'a>,
#[cfg(feature = "unstable")]
pub(crate) eid: EntityId,
pub(crate) id: Id,
pub(crate) key_expr: KeyExpr<'a>,
pub(crate) congestion_control: CongestionControl,
pub(crate) priority: Priority,
Expand Down Expand Up @@ -142,7 +156,7 @@ impl<'a> Publisher<'a> {
pub fn id(&self) -> EntityGlobalId {
EntityGlobalId {
zid: self.session.zid(),
eid: self.eid,
eid: self.id,
}
}

Expand Down Expand Up @@ -459,11 +473,9 @@ impl Resolvable for PublisherUndeclaration<'_> {
impl Wait for PublisherUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
let Publisher {
session, key_expr, ..
session, id: eid, ..
} = &self.publisher;
session
.undeclare_publication_intent(key_expr.clone())
.wait()?;
session.undeclare_publisher_inner(*eid)?;
self.publisher.key_expr = unsafe { keyexpr::from_str_unchecked("") }.into();
Ok(())
}
Expand All @@ -481,10 +493,7 @@ impl IntoFuture for PublisherUndeclaration<'_> {
impl Drop for Publisher<'_> {
fn drop(&mut self) {
if !self.key_expr.is_empty() {
let _ = self
.session
.undeclare_publication_intent(self.key_expr.clone())
.wait();
let _ = self.session.undeclare_publisher_inner(self.id);
}
}
}
Expand Down
Loading

0 comments on commit cfb86a8

Please sign in to comment.