Skip to content

Commit

Permalink
Coalesce route events in macOS route monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
dlon committed Oct 5, 2023
1 parent 1653110 commit 156c60d
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 36 deletions.
8 changes: 0 additions & 8 deletions talpid-core/src/offline/macos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ use std::sync::{
};
use talpid_routing::{DefaultRouteEvent, RouteManagerHandle};

/// How long to wait before announcing changes to the offline state
//const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(2);

#[derive(err_derive::Error, Debug)]
pub enum Error {
#[error(display = "Failed to initialize route monitor")]
Expand Down Expand Up @@ -120,11 +117,6 @@ pub async fn spawn_monitor(
None => return,
};

// Debounce event updates
// FIXME: Debounce is disabled because the DNS config can get messed up
// when switching between networks otherwise.
//tokio::time::sleep(DEBOUNCE_INTERVAL).await;

if prev_notified.swap(new_connectivity, Ordering::AcqRel) == new_connectivity {
// We don't care about network changes here
return;
Expand Down
10 changes: 9 additions & 1 deletion talpid-routing/src/debounce.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(dead_code)]

use std::{
sync::mpsc::{channel, RecvTimeoutError, Sender},
time::{Duration, Instant},
Expand Down Expand Up @@ -70,7 +72,7 @@ impl BurstGuard {

/// When `stop` returns an then the `BurstGuard` thread is guaranteed to not make any further
/// calls to `callback`.
pub fn stop(&self) {
pub fn stop(self) {
let (sender, listener) = channel();
// If we could not send then it means the thread has already shut down and we can return
if self.sender.send(BurstGuardEvent::Shutdown(sender)).is_ok() {
Expand All @@ -80,6 +82,12 @@ impl BurstGuard {
}
}

/// Stop without waiting for in-flight events to complete.
pub fn stop_nonblocking(self) {
let (sender, _listener) = channel();
let _ = self.sender.send(BurstGuardEvent::Shutdown(sender));
}

/// Asynchronously trigger burst
pub fn trigger(&self) {
self.sender.send(BurstGuardEvent::Trigger).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion talpid-routing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use ipnetwork::IpNetwork;
use std::{fmt, net::IpAddr};

#[cfg(target_os = "windows")]
#[cfg(any(target_os = "windows", target_os = "macos"))]
mod debounce;

#[cfg(target_os = "windows")]
Expand Down
59 changes: 40 additions & 19 deletions talpid-routing/src/unix/macos/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{NetNode, Node, RequiredRoute, Route};
use crate::{debounce::BurstGuard, NetNode, Node, RequiredRoute, Route};

use futures::{
channel::mpsc,
Expand All @@ -7,11 +7,11 @@ use futures::{
};
use ipnetwork::IpNetwork;
use nix::sys::socket::{AddressFamily, SockaddrLike, SockaddrStorage};
use std::pin::Pin;
use std::{
collections::{BTreeMap, HashSet},
time::Duration,
};
use std::{pin::Pin, sync::Weak};
use talpid_types::ErrorExt;
use watch::RoutingTable;

Expand Down Expand Up @@ -85,15 +85,24 @@ pub struct RouteManagerImpl {
applied_routes: BTreeMap<RouteDestination, RouteMessage>,
v4_default_route: Option<data::RouteMessage>,
v6_default_route: Option<data::RouteMessage>,
update_trigger: BurstGuard,
default_route_listeners: Vec<mpsc::UnboundedSender<DefaultRouteEvent>>,
check_default_routes_restored: Pin<Box<dyn FusedStream<Item = ()> + Send>>,
}

impl RouteManagerImpl {
/// Create new route manager
#[allow(clippy::unused_async)]
pub async fn new() -> Result<Self> {
pub(crate) async fn new(
manage_tx: Weak<mpsc::UnboundedSender<RouteManagerCommand>>,
) -> Result<Self> {
let routing_table = RoutingTable::new().map_err(Error::RoutingTable)?;
let update_trigger = BurstGuard::new(move || {
let Some(manage_tx) = manage_tx.upgrade() else {
return;
};
let _ = manage_tx.unbounded_send(RouteManagerCommand::RefreshRoutes);
});
Ok(Self {
routing_table,
non_tunnel_routes: HashSet::new(),
Expand All @@ -102,6 +111,7 @@ impl RouteManagerImpl {
applied_routes: BTreeMap::new(),
v4_default_route: None,
v6_default_route: None,
update_trigger,
default_route_listeners: vec![],
check_default_routes_restored: Box::pin(futures::stream::pending()),
})
Expand Down Expand Up @@ -129,10 +139,12 @@ impl RouteManagerImpl {
);
});

let mut completion_tx = None;

loop {
futures::select_biased! {
route_message = self.routing_table.next_message().fuse() => {
self.handle_route_message(route_message).await;
self.handle_route_message(route_message);
}

_ = self.check_default_routes_restored.next() => {
Expand All @@ -148,11 +160,8 @@ impl RouteManagerImpl {
command = manage_rx.next() => {
match command {
Some(RouteManagerCommand::Shutdown(tx)) => {
if let Err(err) = self.cleanup_routes().await {
log::error!("Failed to clean up routes: {err}");
}
let _ = tx.send(());
return;
completion_tx = Some(tx);
break;
},

Some(RouteManagerCommand::NewDefaultRouteListener(tx)) => {
Expand Down Expand Up @@ -214,6 +223,11 @@ impl RouteManagerImpl {
log::error!("Failed to clean up rotues: {err}");
}
},
Some(RouteManagerCommand::RefreshRoutes) => {
if let Err(error) = self.refresh_routes().await {
log::error!("Failed to refresh routes: {error}")
}
},
None => {
break;
}
Expand All @@ -225,6 +239,12 @@ impl RouteManagerImpl {
if let Err(err) = self.cleanup_routes().await {
log::error!("Failed to clean up routing table when shutting down: {err}");
}

self.update_trigger.stop_nonblocking();

if let Some(tx) = completion_tx {
let _ = tx.send(());
}
}

async fn add_required_routes(&mut self, required_routes: HashSet<RequiredRoute>) -> Result<()> {
Expand Down Expand Up @@ -287,7 +307,7 @@ impl RouteManagerImpl {
Ok(())
}

async fn handle_route_message(
fn handle_route_message(
&mut self,
message: std::result::Result<RouteSocketMessage, watch::Error>,
) {
Expand All @@ -303,18 +323,19 @@ impl RouteManagerImpl {
log::error!("Failed to process deleted route: {err}");
}
}

if let Err(error) = self.handle_route_socket_message().await {
log::error!("Failed to process route change: {error}");
if route.errno() == 0 && route.is_default().unwrap_or(true) {
self.update_trigger.trigger();
}
}
Ok(RouteSocketMessage::AddRoute(_))
| Ok(RouteSocketMessage::ChangeRoute(_))
| Ok(RouteSocketMessage::AddAddress(_) | RouteSocketMessage::DeleteAddress(_)) => {
if let Err(error) = self.handle_route_socket_message().await {
log::error!("Failed to process route/address change: {error}");
Ok(RouteSocketMessage::AddRoute(route))
| Ok(RouteSocketMessage::ChangeRoute(route)) => {
if route.errno() == 0 && route.is_default().unwrap_or(true) {
self.update_trigger.trigger();
}
}
Ok(RouteSocketMessage::AddAddress(_) | RouteSocketMessage::DeleteAddress(_)) => {
self.update_trigger.trigger();
}
// ignore all other message types
Ok(_) => {}
Err(err) => {
Expand All @@ -329,7 +350,7 @@ impl RouteManagerImpl {
/// * At the same time, update the route used by non-tunnel interfaces to reach the relay/VPN
/// server. The gateway of the relay route is set to the first interface in the network
/// service order that has a working ifscoped default route.
async fn handle_route_socket_message(&mut self) -> Result<()> {
async fn refresh_routes(&mut self) -> Result<()> {
self.update_best_default_route(interface::Family::V4)
.await?;
self.update_best_default_route(interface::Family::V6)
Expand Down
11 changes: 8 additions & 3 deletions talpid-routing/src/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::channel::{
mpsc::{self, UnboundedSender},
oneshot,
};
use std::{collections::HashSet, io};
use std::{collections::HashSet, io, sync::Arc};

#[cfg(any(target_os = "linux", target_os = "macos"))]
use futures::stream::Stream;
Expand Down Expand Up @@ -55,7 +55,7 @@ pub enum Error {
/// Handle to a route manager.
#[derive(Clone)]
pub struct RouteManagerHandle {
tx: UnboundedSender<RouteManagerCommand>,
tx: Arc<UnboundedSender<RouteManagerCommand>>,
}

impl RouteManagerHandle {
Expand Down Expand Up @@ -181,6 +181,8 @@ pub(crate) enum RouteManagerCommand {
ClearRoutes,
Shutdown(oneshot::Sender<()>),
#[cfg(target_os = "macos")]
RefreshRoutes,
#[cfg(target_os = "macos")]
NewDefaultRouteListener(oneshot::Sender<mpsc::UnboundedReceiver<DefaultRouteEvent>>),
#[cfg(target_os = "macos")]
GetDefaultRoutes(oneshot::Sender<(Option<Route>, Option<Route>)>),
Expand Down Expand Up @@ -227,7 +229,7 @@ pub enum CallbackMessage {
/// If a destination has to be routed through the default node,
/// the route will be adjusted dynamically when the default route changes.
pub struct RouteManager {
manage_tx: Option<UnboundedSender<RouteManagerCommand>>,
manage_tx: Option<Arc<UnboundedSender<RouteManagerCommand>>>,
runtime: tokio::runtime::Handle,
}

Expand All @@ -238,11 +240,14 @@ impl RouteManager {
#[cfg(target_os = "linux")] table_id: u32,
) -> Result<Self, Error> {
let (manage_tx, manage_rx) = mpsc::unbounded();
let manage_tx = Arc::new(manage_tx);
let manager = imp::RouteManagerImpl::new(
#[cfg(target_os = "linux")]
fwmark,
#[cfg(target_os = "linux")]
table_id,
#[cfg(target_os = "macos")]
Arc::downgrade(&manage_tx),
)
.await?;
tokio::spawn(manager.run(manage_rx));
Expand Down
6 changes: 2 additions & 4 deletions talpid-routing/src/windows/default_route_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,8 @@ impl Drop for DefaultRouteMonitor {
let context = unsafe { Box::from_raw(self.context as *mut ContextAndBurstGuard) };

// Stop the burst guard
context.burst_guard.lock().unwrap().stop();

// Drop the context now that we are guaranteed nothing might try to access the context
drop(context);
let context = context.burst_guard.into_inner().unwrap();
context.stop();
}
}

Expand Down

0 comments on commit 156c60d

Please sign in to comment.