Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance subscribers, queryables and liveliness tokens propagation to improve scalability #814

Merged
merged 43 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
1eb8629
Router implements interests protocol for clients
OlivierHecart Mar 11, 2024
16f7789
Send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients fo…
OlivierHecart Mar 11, 2024
cf1f579
Fix WireExprExt M flag encoding/decoding
OlivierHecart Mar 12, 2024
6047d75
Fix decl_key
OlivierHecart Mar 15, 2024
5de298f
Clients send all samples and queries to routers and peers
OlivierHecart Mar 15, 2024
961bec7
Avoid self declaration loop on interest
OlivierHecart Mar 19, 2024
eb976c8
Fix query/replies copy/paste bugs
OlivierHecart Mar 21, 2024
b8f1a9c
Peers implement interests protocol for clients
OlivierHecart Mar 21, 2024
83a51e4
Merge branch 'protocol_changes' into interests
OlivierHecart Mar 21, 2024
26bbd8e
Don't send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients
OlivierHecart Mar 22, 2024
cede672
Add client writer-side filtering (#863)
OlivierHecart Mar 27, 2024
76fb3ed
Merge branch 'protocol_changes' into interests
OlivierHecart Mar 27, 2024
df2ea58
Fix pubsub interest based routing after router failover
OlivierHecart Mar 27, 2024
41f59d3
Declare message can be Push/Request/RequestContinuous/Response
Mallets Apr 4, 2024
43a61c7
Address review comments
Mallets Apr 4, 2024
bce8855
Remove F: Future flag from DeclareInterest
Mallets Apr 5, 2024
3da2aed
cargo fmt --all
Mallets Apr 5, 2024
c753e82
Remove unused Interest flags field
OlivierHecart Apr 5, 2024
52ff7d0
Update doc
OlivierHecart Apr 5, 2024
8c9abc1
Remove unneeded interest_id field
OlivierHecart Apr 5, 2024
3a4161b
Merge branch 'protocol_changes' into protocol_declare
OlivierHecart Apr 5, 2024
9aa2079
Update commons/zenoh-protocol/src/network/declare.rs
Mallets Apr 5, 2024
dd2ef80
Remove unused UndeclareInterest
OlivierHecart Apr 5, 2024
b6dc311
Merge branch 'protocol_changes' into protocol_declare
OlivierHecart Apr 5, 2024
83d781f
Merge branch 'protocol_changes' into interests
OlivierHecart Apr 5, 2024
7f55917
Implement proper Declare Request/Response id correlation
OlivierHecart Apr 5, 2024
62192b9
Add new Interest network message
OlivierHecart Apr 8, 2024
3ebad65
Merge branch 'protocol_changes' into protocol_declare
OlivierHecart Apr 8, 2024
e3a8eb2
Update doc
OlivierHecart Apr 8, 2024
a80ce2b
Update codec
OlivierHecart Apr 8, 2024
bac3acb
Merge branch 'protocol_declare' into interests
OlivierHecart Apr 18, 2024
4e0ccae
Merge branch 'protocol_changes' into interests
OlivierHecart Apr 18, 2024
d8ba33c
Merge branch 'protocol_changes' into interests
OlivierHecart Apr 18, 2024
59ae98f
Fix stable build
OlivierHecart Apr 18, 2024
6a9c4f7
Fix test_acl
OlivierHecart Apr 18, 2024
9a2a539
Fix writer side filtering
OlivierHecart Apr 23, 2024
d4dcf14
Merge branch 'protocol_changes' into interests
OlivierHecart Apr 26, 2024
c6e8b53
Merge branch 'protocol_changes' into interests
OlivierHecart Apr 26, 2024
0d5df18
Add separate functions to compute matching status
OlivierHecart May 2, 2024
1141291
Merge branch 'protocol_changes' into interests
OlivierHecart May 3, 2024
b43b159
Fix unstable imports
OlivierHecart May 3, 2024
0eb4e98
Remove useless checks
OlivierHecart May 3, 2024
902c958
Merge branch 'protocol_changes' into interests
OlivierHecart May 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ where
if x.wire_expr.has_suffix() {
flags |= 1;
}
if let Mapping::Receiver = wire_expr.mapping {
if let Mapping::Sender = wire_expr.mapping {
flags |= 1 << 1;
}
codec.write(&mut zriter, flags)?;
Expand Down Expand Up @@ -996,9 +996,9 @@ where
String::new()
};
let mapping = if imsg::has_flag(flags, 1 << 1) {
Mapping::Receiver
} else {
Mapping::Sender
} else {
Mapping::Receiver
};

Ok((
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-codec/src/network/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use zenoh_protocol::{
core::WireExpr,
network::{
declare, id,
interest::{self, InterestMode, InterestOptions},
Interest, Mapping,
interest::{self, Interest, InterestMode, InterestOptions},
Mapping,
},
};

Expand Down
13 changes: 13 additions & 0 deletions commons/zenoh-protocol/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,19 @@ pub mod common {
pub mod ext {
use super::*;

/// Flags:
/// - N: Named If N==1 then the key expr has name/suffix
/// - M: Mapping if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver
///
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// |X|X|X|X|X|X|M|N|
/// +-+-+-+---------+
/// ~ key_scope:z16 ~
/// +---------------+
/// ~ key_suffix ~ if N==1 -- <u8;z16>
/// +---------------+
///
pub type WireExprExt = zextzbuf!(0x0f, true);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WireExprType {
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-protocol/src/network/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub mod flag {
pub type DeclareRequestId = u32;
pub type AtomicDeclareRequestId = AtomicU32;

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InterestMode {
Final,
Current,
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/primitives/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl TransportPeerEventHandler for DeMux {
match msg.body {
NetworkBody::Push(m) => self.face.send_push(m),
NetworkBody::Declare(m) => self.face.send_declare(m),
NetworkBody::Interest(_) => todo!(),
NetworkBody::Interest(m) => self.face.send_interest(m),
NetworkBody::Request(m) => self.face.send_request(m),
NetworkBody::Response(m) => self.face.send_response(m),
NetworkBody::ResponseFinal(m) => self.face.send_response_final(m),
Expand Down
4 changes: 4 additions & 0 deletions zenoh/src/net/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub trait Primitives: Send + Sync {
pub(crate) trait EPrimitives: Send + Sync {
fn as_any(&self) -> &dyn Any;

fn send_interest(&self, ctx: RoutingContext<Interest>);

fn send_declare(&self, ctx: RoutingContext<Declare>);

fn send_push(&self, msg: Push);
Expand Down Expand Up @@ -76,6 +78,8 @@ impl Primitives for DummyPrimitives {
}

impl EPrimitives for DummyPrimitives {
fn send_interest(&self, _ctx: RoutingContext<Interest>) {}

fn send_declare(&self, _ctx: RoutingContext<Declare>) {}

fn send_push(&self, _msg: Push) {}
Expand Down
50 changes: 50 additions & 0 deletions zenoh/src/net/primitives/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,31 @@ impl Primitives for Mux {
}

impl EPrimitives for Mux {
fn send_interest(&self, ctx: RoutingContext<Interest>) {
let ctx = RoutingContext {
msg: NetworkMessage {
body: NetworkBody::Interest(ctx.msg),
#[cfg(feature = "stats")]
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix
.as_ref()
.and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap()));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
}

fn send_declare(&self, ctx: RoutingContext<Declare>) {
let ctx = RoutingContext {
msg: NetworkMessage {
Expand Down Expand Up @@ -495,6 +520,31 @@ impl Primitives for McastMux {
}

impl EPrimitives for McastMux {
fn send_interest(&self, ctx: RoutingContext<Interest>) {
let ctx = RoutingContext {
msg: NetworkMessage {
body: NetworkBody::Interest(ctx.msg),
#[cfg(feature = "stats")]
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix
.as_ref()
.and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap()));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
}

fn send_declare(&self, ctx: RoutingContext<Declare>) {
let ctx = RoutingContext {
msg: NetworkMessage {
Expand Down
93 changes: 88 additions & 5 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ use super::tables::TablesLock;
use super::{resource::*, tables};
use crate::net::primitives::{McastMux, Mux, Primitives};
use crate::net::routing::interceptor::{InterceptorTrait, InterceptorsChain};
use crate::net::routing::RoutingContext;
use crate::KeyExpr;
use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Weak};
use tokio_util::sync::CancellationToken;
use zenoh_protocol::network::interest::{InterestId, InterestMode, InterestOptions};
use zenoh_protocol::network::{ext, Declare, DeclareBody, DeclareFinal};
use zenoh_protocol::zenoh::RequestBody;
use zenoh_protocol::{
core::{ExprId, WhatAmI, ZenohId},
Expand All @@ -33,13 +36,21 @@ use zenoh_transport::multicast::TransportMulticast;
#[cfg(feature = "stats")]
use zenoh_transport::stats::TransportStats;

pub(crate) struct InterestState {
pub(crate) options: InterestOptions,
pub(crate) res: Option<Arc<Resource>>,
pub(crate) finalized: bool,
}

pub struct FaceState {
pub(crate) id: usize,
pub(crate) zid: ZenohId,
pub(crate) whatami: WhatAmI,
#[cfg(feature = "stats")]
pub(crate) stats: Option<Arc<TransportStats>>,
pub(crate) primitives: Arc<dyn crate::net::primitives::EPrimitives + Send + Sync>,
pub(crate) local_interests: HashMap<InterestId, InterestState>,
pub(crate) remote_key_interests: HashMap<InterestId, Option<Arc<Resource>>>,
pub(crate) local_mappings: HashMap<ExprId, Arc<Resource>>,
pub(crate) remote_mappings: HashMap<ExprId, Arc<Resource>>,
pub(crate) next_qid: RequestId,
Expand Down Expand Up @@ -69,6 +80,8 @@ impl FaceState {
#[cfg(feature = "stats")]
stats,
primitives,
local_interests: HashMap::new(),
remote_key_interests: HashMap::new(),
local_mappings: HashMap::new(),
remote_mappings: HashMap::new(),
next_qid: 0,
Expand Down Expand Up @@ -185,8 +198,67 @@ impl Face {
}

impl Primitives for Face {
fn send_interest(&self, _msg: zenoh_protocol::network::Interest) {
todo!()
fn send_interest(&self, msg: zenoh_protocol::network::Interest) {
let ctrl_lock = zlock!(self.tables.ctrl_lock);
if msg.mode != InterestMode::Final {
if msg.options.keyexprs() && msg.mode != InterestMode::Current {
register_expr_interest(
&self.tables,
&mut self.state.clone(),
msg.id,
msg.wire_expr.as_ref(),
);
}
if msg.options.subscribers() {
declare_sub_interest(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
msg.id,
msg.wire_expr.as_ref(),
msg.mode,
msg.options.aggregate(),
);
}
if msg.options.queryables() {
declare_qabl_interest(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
msg.id,
msg.wire_expr.as_ref(),
msg.mode,
msg.options.aggregate(),
);
}
if msg.mode != InterestMode::Future {
self.state.primitives.send_declare(RoutingContext::new_out(
Declare {
interest_id: Some(msg.id),
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::DeclareFinal(DeclareFinal),
},
self.clone(),
));
}
} else {
unregister_expr_interest(&self.tables, &mut self.state.clone(), msg.id);
undeclare_sub_interest(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
msg.id,
);
undeclare_qabl_interest(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
msg.id,
);
}
drop(ctrl_lock);
}

fn send_declare(&self, msg: zenoh_protocol::network::Declare) {
Expand Down Expand Up @@ -240,9 +312,20 @@ impl Primitives for Face {
msg.ext_nodeid.node_id,
);
}
zenoh_protocol::network::DeclareBody::DeclareToken(_m) => todo!(),
zenoh_protocol::network::DeclareBody::UndeclareToken(_m) => todo!(),
zenoh_protocol::network::DeclareBody::DeclareFinal(_) => todo!(),
zenoh_protocol::network::DeclareBody::DeclareToken(m) => {
tracing::warn!("Received unsupported {m:?}")
}
zenoh_protocol::network::DeclareBody::UndeclareToken(m) => {
tracing::warn!("Received unsupported {m:?}")
}
zenoh_protocol::network::DeclareBody::DeclareFinal(_) => {
if let Some(id) = msg.interest_id {
get_mut_unchecked(&mut self.state.clone())
.local_interests
.entry(id)
.and_modify(|interest| interest.finalized = true);
}
}
}
drop(ctrl_lock);
}
Expand Down
81 changes: 81 additions & 0 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,94 @@ use zenoh_core::zread;
use zenoh_protocol::core::key_expr::keyexpr;
use zenoh_protocol::network::declare::subscriber::ext::SubscriberInfo;
use zenoh_protocol::network::declare::SubscriberId;
use zenoh_protocol::network::interest::{InterestId, InterestMode};
use zenoh_protocol::{
core::{WhatAmI, WireExpr},
network::{declare::ext, Push},
zenoh::PushBody,
};
use zenoh_sync::get_mut_unchecked;

pub(crate) fn declare_sub_interest(
hat_code: &(dyn HatTrait + Send + Sync),
tables: &TablesLock,
face: &mut Arc<FaceState>,
id: InterestId,
expr: Option<&WireExpr>,
mode: InterestMode,
aggregate: bool,
) {
if let Some(expr) = expr {
let rtables = zread!(tables.tables);
match rtables
.get_mapping(face, &expr.scope, expr.mapping)
.cloned()
{
Some(mut prefix) => {
tracing::debug!(
"{} Declare sub interest {} ({}{})",
face,
id,
prefix.expr(),
expr.suffix
);
let res = Resource::get_resource(&prefix, &expr.suffix);
let (mut res, mut wtables) = if res
.as_ref()
.map(|r| r.context.is_some())
.unwrap_or(false)
{
drop(rtables);
let wtables = zwrite!(tables.tables);
(res.unwrap(), wtables)
} else {
let mut fullexpr = prefix.expr();
fullexpr.push_str(expr.suffix.as_ref());
let mut matches = keyexpr::new(fullexpr.as_str())
.map(|ke| Resource::get_matches(&rtables, ke))
.unwrap_or_default();
drop(rtables);
let mut wtables = zwrite!(tables.tables);
let mut res =
Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref());
matches.push(Arc::downgrade(&res));
Resource::match_resource(&wtables, &mut res, matches);
(res, wtables)
};

hat_code.declare_sub_interest(
&mut wtables,
face,
id,
Some(&mut res),
mode,
aggregate,
);
}
None => tracing::error!(
"{} Declare sub interest {} for unknown scope {}!",
face,
id,
expr.scope
),
}
} else {
let mut wtables = zwrite!(tables.tables);
hat_code.declare_sub_interest(&mut wtables, face, id, None, mode, aggregate);
}
}

pub(crate) fn undeclare_sub_interest(
hat_code: &(dyn HatTrait + Send + Sync),
tables: &TablesLock,
face: &mut Arc<FaceState>,
id: InterestId,
) {
tracing::debug!("{} Undeclare sub interest {}", face, id,);
let mut wtables = zwrite!(tables.tables);
hat_code.undeclare_sub_interest(&mut wtables, face, id);
}

pub(crate) fn declare_subscription(
hat_code: &(dyn HatTrait + Send + Sync),
tables: &TablesLock,
Expand Down
Loading