Skip to content

Commit

Permalink
Move ingress/egress filters out of pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Nov 13, 2023
1 parent b0f999b commit 2a5466b
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 50 deletions.
4 changes: 2 additions & 2 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use super::super::hat::pubsub::{compute_data_route, compute_data_routes, compute
use super::face::FaceState;
use super::resource::{DataRoutes, Direction, PullCaches, Resource};
use super::tables::{RoutingContext, RoutingExpr, Tables};
use crate::net::routing::hat::map_routing_context;
use crate::net::routing::hat::pubsub::{compute_matching_pulls, egress_filter, ingress_filter};
use crate::net::routing::hat::pubsub::compute_matching_pulls;
use crate::net::routing::hat::{egress_filter, ingress_filter, map_routing_context};
use std::sync::Arc;
use std::sync::RwLock;
use zenoh_core::zread;
Expand Down
3 changes: 1 addition & 2 deletions zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use super::face::FaceState;
use super::resource::{QueryRoute, QueryRoutes, QueryTargetQablSet, Resource};
use super::tables::RoutingContext;
use super::tables::{RoutingExpr, Tables, TablesLock};
use crate::net::routing::hat::pubsub::egress_filter;
use crate::net::routing::hat::pubsub::ingress_filter;
use crate::net::routing::hat::{egress_filter, ingress_filter};
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
Expand Down
47 changes: 46 additions & 1 deletion zenoh/src/net/routing/hat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use self::{
};
use super::dispatcher::{
face::FaceState,
tables::{Resource, RoutingContext, Tables, TablesLock},
tables::{Resource, RoutingContext, RoutingExpr, Tables, TablesLock},
};
use crate::{
hat, hat_mut,
Expand Down Expand Up @@ -758,3 +758,48 @@ fn get_peer(tables: &Tables, face: &Arc<FaceState>, nodeid: RoutingContext) -> O
}
}
}

#[inline]
pub(crate) fn ingress_filter(tables: &Tables, face: &FaceState, expr: &mut RoutingExpr) -> bool {
tables.whatami != WhatAmI::Router
|| face.whatami != WhatAmI::Peer
|| hat!(tables).peers_net.is_none()
|| tables.zid
== *hat!(tables).elect_router(
&tables.zid,
expr.full_expr(),
hat!(tables).get_router_links(face.zid),
)
}

#[inline]
pub(crate) fn egress_filter(
tables: &Tables,
src_face: &FaceState,
out_face: &Arc<FaceState>,
expr: &mut RoutingExpr,
) -> bool {
if src_face.id != out_face.id
&& match (src_face.mcast_group.as_ref(), out_face.mcast_group.as_ref()) {
(Some(l), Some(r)) => l != r,
_ => true,
}
{
let dst_master = tables.whatami != WhatAmI::Router
|| out_face.whatami != WhatAmI::Peer
|| hat!(tables).peers_net.is_none()
|| tables.zid
== *hat!(tables).elect_router(
&tables.zid,
expr.full_expr(),
hat!(tables).get_router_links(out_face.zid),
);

return dst_master
&& (src_face.whatami != WhatAmI::Peer
|| out_face.whatami != WhatAmI::Peer
|| hat!(tables).full_net(WhatAmI::Peer)
|| hat!(tables).failover_brokering(src_face.zid, out_face.zid));
}
false
}
45 changes: 0 additions & 45 deletions zenoh/src/net/routing/hat/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1608,48 +1608,3 @@ pub(crate) fn compute_data_routes(tables: &mut Tables, res: &mut Arc<Resource>)
res_mut.context_mut().matching_pulls = compute_matching_pulls(tables, &mut expr);
}
}

#[inline]
pub(crate) fn ingress_filter(tables: &Tables, face: &FaceState, expr: &mut RoutingExpr) -> bool {
tables.whatami != WhatAmI::Router
|| face.whatami != WhatAmI::Peer
|| hat!(tables).peers_net.is_none()
|| tables.zid
== *hat!(tables).elect_router(
&tables.zid,
expr.full_expr(),
hat!(tables).get_router_links(face.zid),
)
}

#[inline]
pub(crate) fn egress_filter(
tables: &Tables,
src_face: &FaceState,
out_face: &Arc<FaceState>,
expr: &mut RoutingExpr,
) -> bool {
if src_face.id != out_face.id
&& match (src_face.mcast_group.as_ref(), out_face.mcast_group.as_ref()) {
(Some(l), Some(r)) => l != r,
_ => true,
}
{
let dst_master = tables.whatami != WhatAmI::Router
|| out_face.whatami != WhatAmI::Peer
|| hat!(tables).peers_net.is_none()
|| tables.zid
== *hat!(tables).elect_router(
&tables.zid,
expr.full_expr(),
hat!(tables).get_router_links(out_face.zid),
);

return dst_master
&& (src_face.whatami != WhatAmI::Peer
|| out_face.whatami != WhatAmI::Peer
|| hat!(tables).full_net(WhatAmI::Peer)
|| hat!(tables).failover_brokering(src_face.zid, out_face.zid));
}
false
}

0 comments on commit 2a5466b

Please sign in to comment.