Skip to content

Commit

Permalink
Admin space: Show known origins of Subscribers and Queryables. (#959)
Browse files Browse the repository at this point in the history
* Subscribers are reported with known sources in adminspace

* Return valid empty json in case of serialisation failure

* Queryables are reported with known sources in adminspace

* Address review comments
  • Loading branch information
OlivierHecart authored Apr 22, 2024
1 parent 9ecc903 commit 81217c7
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 39 deletions.
18 changes: 13 additions & 5 deletions zenoh/src/net/routing/hat/client/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use crate::net::routing::dispatcher::face::FaceState;
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
use crate::net::routing::dispatcher::tables::Tables;
use crate::net::routing::dispatcher::tables::{Route, RoutingExpr};
use crate::net::routing::hat::HatPubSubTrait;
use crate::net::routing::hat::{HatPubSubTrait, Sources};
use crate::net::routing::router::RoutesIndexes;
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use zenoh_protocol::core::key_expr::OwnedKeyExpr;
use zenoh_protocol::{
Expand Down Expand Up @@ -274,11 +274,19 @@ impl HatPubSubTrait for HatCode {
forget_client_subscription(tables, face, res);
}

fn get_subscriptions(&self, tables: &Tables) -> Vec<Arc<Resource>> {
let mut subs = HashSet::new();
fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
// Compute the list of known suscriptions (keys)
let mut subs = HashMap::new();
for src_face in tables.faces.values() {
for sub in &face_hat!(src_face).remote_subs {
subs.insert(sub.clone());
// Insert the key in the list of known suscriptions
let srcs = subs.entry(sub.clone()).or_insert_with(Sources::empty);
// Append src_face as a suscription source in the proper list
match src_face.whatami {
WhatAmI::Router => srcs.routers.push(src_face.zid),
WhatAmI::Peer => srcs.peers.push(src_face.zid),
WhatAmI::Client => srcs.clients.push(src_face.zid),
}
}
}
Vec::from_iter(subs)
Expand Down
18 changes: 13 additions & 5 deletions zenoh/src/net/routing/hat/client/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use crate::net::routing::dispatcher::face::FaceState;
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
use crate::net::routing::dispatcher::tables::Tables;
use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr};
use crate::net::routing::hat::HatQueriesTrait;
use crate::net::routing::hat::{HatQueriesTrait, Sources};
use crate::net::routing::router::RoutesIndexes;
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
use ordered_float::OrderedFloat;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use zenoh_buffers::ZBuf;
use zenoh_protocol::core::key_expr::include::{Includer, DEFAULT_INCLUDER};
Expand Down Expand Up @@ -272,11 +272,19 @@ impl HatQueriesTrait for HatCode {
forget_client_queryable(tables, face, res);
}

fn get_queryables(&self, tables: &Tables) -> Vec<Arc<Resource>> {
let mut qabls = HashSet::new();
fn get_queryables(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
// Compute the list of known queryables (keys)
let mut qabls = HashMap::new();
for src_face in tables.faces.values() {
for qabl in &face_hat!(src_face).remote_qabls {
qabls.insert(qabl.clone());
// Insert the key in the list of known queryables
let srcs = qabls.entry(qabl.clone()).or_insert_with(Sources::empty);
// Append src_face as a queryable source in the proper list
match src_face.whatami {
WhatAmI::Router => srcs.routers.push(src_face.zid),
WhatAmI::Peer => srcs.peers.push(src_face.zid),
WhatAmI::Client => srcs.clients.push(src_face.zid),
}
}
}
Vec::from_iter(qabls)
Expand Down
29 changes: 26 additions & 3 deletions zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::pubsub::*;
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
use crate::net::routing::dispatcher::tables::Tables;
use crate::net::routing::dispatcher::tables::{Route, RoutingExpr};
use crate::net::routing::hat::HatPubSubTrait;
use crate::net::routing::hat::{HatPubSubTrait, Sources};
use crate::net::routing::router::RoutesIndexes;
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
use petgraph::graph::NodeIndex;
Expand Down Expand Up @@ -605,8 +605,31 @@ impl HatPubSubTrait for HatCode {
}
}

fn get_subscriptions(&self, tables: &Tables) -> Vec<Arc<Resource>> {
hat!(tables).peer_subs.iter().cloned().collect()
fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
// Compute the list of known suscriptions (keys)
hat!(tables)
.peer_subs
.iter()
.map(|s| {
(
s.clone(),
// Compute the list of routers, peers and clients that are known
// sources of those subscriptions
Sources {
routers: vec![],
peers: Vec::from_iter(res_hat!(s).peer_subs.iter().cloned()),
clients: s
.session_ctxs
.values()
.filter_map(|f| {
(f.face.whatami == WhatAmI::Client && f.subs.is_some())
.then_some(f.face.zid)
})
.collect(),
},
)
})
.collect()
}

fn compute_data_route(
Expand Down
29 changes: 26 additions & 3 deletions zenoh/src/net/routing/hat/linkstate_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::queries::*;
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
use crate::net::routing::dispatcher::tables::Tables;
use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr};
use crate::net::routing::hat::HatQueriesTrait;
use crate::net::routing::hat::{HatQueriesTrait, Sources};
use crate::net::routing::router::RoutesIndexes;
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
use ordered_float::OrderedFloat;
Expand Down Expand Up @@ -670,8 +670,31 @@ impl HatQueriesTrait for HatCode {
}
}

fn get_queryables(&self, tables: &Tables) -> Vec<Arc<Resource>> {
hat!(tables).peer_qabls.iter().cloned().collect()
fn get_queryables(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
// Compute the list of known queryables (keys)
hat!(tables)
.peer_qabls
.iter()
.map(|s| {
(
s.clone(),
// Compute the list of routers, peers and clients that are known
// sources of those queryables
Sources {
routers: vec![],
peers: Vec::from_iter(res_hat!(s).peer_qabls.keys().cloned()),
clients: s
.session_ctxs
.values()
.filter_map(|f| {
(f.face.whatami == WhatAmI::Client && f.qabl.is_some())
.then_some(f.face.zid)
})
.collect(),
},
)
})
.collect()
}

fn compute_query_route(
Expand Down
23 changes: 20 additions & 3 deletions zenoh/src/net/routing/hat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use super::{
use crate::runtime::Runtime;
use std::{any::Any, sync::Arc};
use zenoh_buffers::ZBuf;
use zenoh_config::{unwrap_or_default, Config, WhatAmI};
use zenoh_config::{unwrap_or_default, Config, WhatAmI, ZenohId};
use zenoh_protocol::{
core::WireExpr,
network::{
Expand All @@ -47,6 +47,23 @@ zconfigurable! {
pub static ref TREES_COMPUTATION_DELAY_MS: u64 = 100;
}

#[derive(serde::Serialize)]
pub(crate) struct Sources {
routers: Vec<ZenohId>,
peers: Vec<ZenohId>,
clients: Vec<ZenohId>,
}

impl Sources {
pub(crate) fn empty() -> Self {
Self {
routers: vec![],
peers: vec![],
clients: vec![],
}
}
}

pub(crate) trait HatTrait: HatBaseTrait + HatPubSubTrait + HatQueriesTrait {}

pub(crate) trait HatBaseTrait {
Expand Down Expand Up @@ -129,7 +146,7 @@ pub(crate) trait HatPubSubTrait {
node_id: NodeId,
);

fn get_subscriptions(&self, tables: &Tables) -> Vec<Arc<Resource>>;
fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)>;

fn compute_data_route(
&self,
Expand Down Expand Up @@ -159,7 +176,7 @@ pub(crate) trait HatQueriesTrait {
node_id: NodeId,
);

fn get_queryables(&self, tables: &Tables) -> Vec<Arc<Resource>>;
fn get_queryables(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)>;

fn compute_query_route(
&self,
Expand Down
18 changes: 13 additions & 5 deletions zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use crate::net::routing::dispatcher::face::FaceState;
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
use crate::net::routing::dispatcher::tables::Tables;
use crate::net::routing::dispatcher::tables::{Route, RoutingExpr};
use crate::net::routing::hat::HatPubSubTrait;
use crate::net::routing::hat::{HatPubSubTrait, Sources};
use crate::net::routing::router::RoutesIndexes;
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use zenoh_protocol::core::key_expr::OwnedKeyExpr;
use zenoh_protocol::{
Expand Down Expand Up @@ -275,11 +275,19 @@ impl HatPubSubTrait for HatCode {
forget_client_subscription(tables, face, res);
}

fn get_subscriptions(&self, tables: &Tables) -> Vec<Arc<Resource>> {
let mut subs = HashSet::new();
fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
// Compute the list of known suscriptions (keys)
let mut subs = HashMap::new();
for src_face in tables.faces.values() {
for sub in &face_hat!(src_face).remote_subs {
subs.insert(sub.clone());
// Insert the key in the list of known suscriptions
let srcs = subs.entry(sub.clone()).or_insert_with(Sources::empty);
// Append src_face as a suscription source in the proper list
match src_face.whatami {
WhatAmI::Router => srcs.routers.push(src_face.zid),
WhatAmI::Peer => srcs.peers.push(src_face.zid),
WhatAmI::Client => srcs.clients.push(src_face.zid),
}
}
}
Vec::from_iter(subs)
Expand Down
18 changes: 13 additions & 5 deletions zenoh/src/net/routing/hat/p2p_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use crate::net::routing::dispatcher::face::FaceState;
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
use crate::net::routing::dispatcher::tables::Tables;
use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr};
use crate::net::routing::hat::HatQueriesTrait;
use crate::net::routing::hat::{HatQueriesTrait, Sources};
use crate::net::routing::router::RoutesIndexes;
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
use ordered_float::OrderedFloat;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use zenoh_buffers::ZBuf;
use zenoh_protocol::core::key_expr::include::{Includer, DEFAULT_INCLUDER};
Expand Down Expand Up @@ -272,11 +272,19 @@ impl HatQueriesTrait for HatCode {
forget_client_queryable(tables, face, res);
}

fn get_queryables(&self, tables: &Tables) -> Vec<Arc<Resource>> {
let mut qabls = HashSet::new();
fn get_queryables(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
// Compute the list of known queryables (keys)
let mut qabls = HashMap::new();
for src_face in tables.faces.values() {
for qabl in &face_hat!(src_face).remote_qabls {
qabls.insert(qabl.clone());
// Insert the key in the list of known queryables
let srcs = qabls.entry(qabl.clone()).or_insert_with(Sources::empty);
// Append src_face as a queryable source in the proper list
match src_face.whatami {
WhatAmI::Router => srcs.routers.push(src_face.zid),
WhatAmI::Peer => srcs.peers.push(src_face.zid),
WhatAmI::Client => srcs.clients.push(src_face.zid),
}
}
}
Vec::from_iter(qabls)
Expand Down
39 changes: 36 additions & 3 deletions zenoh/src/net/routing/hat/router/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::pubsub::*;
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
use crate::net::routing::dispatcher::tables::Tables;
use crate::net::routing::dispatcher::tables::{Route, RoutingExpr};
use crate::net::routing::hat::HatPubSubTrait;
use crate::net::routing::hat::{HatPubSubTrait, Sources};
use crate::net::routing::router::RoutesIndexes;
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
use petgraph::graph::NodeIndex;
Expand Down Expand Up @@ -925,8 +925,41 @@ impl HatPubSubTrait for HatCode {
}
}

fn get_subscriptions(&self, tables: &Tables) -> Vec<Arc<Resource>> {
hat!(tables).router_subs.iter().cloned().collect()
fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
// Compute the list of known suscriptions (keys)
hat!(tables)
.router_subs
.iter()
.map(|s| {
(
s.clone(),
// Compute the list of routers, peers and clients that are known
// sources of those subscriptions
Sources {
routers: Vec::from_iter(res_hat!(s).router_subs.iter().cloned()),
peers: if hat!(tables).full_net(WhatAmI::Peer) {
Vec::from_iter(res_hat!(s).peer_subs.iter().cloned())
} else {
s.session_ctxs
.values()
.filter_map(|f| {
(f.face.whatami == WhatAmI::Peer && f.subs.is_some())
.then_some(f.face.zid)
})
.collect()
},
clients: s
.session_ctxs
.values()
.filter_map(|f| {
(f.face.whatami == WhatAmI::Client && f.subs.is_some())
.then_some(f.face.zid)
})
.collect(),
},
)
})
.collect()
}

fn compute_data_route(
Expand Down
39 changes: 36 additions & 3 deletions zenoh/src/net/routing/hat/router/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::queries::*;
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
use crate::net::routing::dispatcher::tables::Tables;
use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr};
use crate::net::routing::hat::HatQueriesTrait;
use crate::net::routing::hat::{HatQueriesTrait, Sources};
use crate::net::routing::router::RoutesIndexes;
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
use ordered_float::OrderedFloat;
Expand Down Expand Up @@ -1073,8 +1073,41 @@ impl HatQueriesTrait for HatCode {
}
}

fn get_queryables(&self, tables: &Tables) -> Vec<Arc<Resource>> {
hat!(tables).router_qabls.iter().cloned().collect()
fn get_queryables(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
// Compute the list of known queryables (keys)
hat!(tables)
.router_qabls
.iter()
.map(|s| {
(
s.clone(),
// Compute the list of routers, peers and clients that are known
// sources of those queryables
Sources {
routers: Vec::from_iter(res_hat!(s).router_qabls.keys().cloned()),
peers: if hat!(tables).full_net(WhatAmI::Peer) {
Vec::from_iter(res_hat!(s).peer_qabls.keys().cloned())
} else {
s.session_ctxs
.values()
.filter_map(|f| {
(f.face.whatami == WhatAmI::Peer && f.qabl.is_some())
.then_some(f.face.zid)
})
.collect()
},
clients: s
.session_ctxs
.values()
.filter_map(|f| {
(f.face.whatami == WhatAmI::Client && f.qabl.is_some())
.then_some(f.face.zid)
})
.collect(),
},
)
})
.collect()
}

fn compute_query_route(
Expand Down
Loading

0 comments on commit 81217c7

Please sign in to comment.