From 156c60dfb9e387b29ad58e5341e1272eb63c5a31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B6nnhager?= Date: Tue, 3 Oct 2023 14:33:56 +0200 Subject: [PATCH] Coalesce route events in macOS route monitor --- talpid-core/src/offline/macos.rs | 8 --- talpid-routing/src/debounce.rs | 10 +++- talpid-routing/src/lib.rs | 2 +- talpid-routing/src/unix/macos/mod.rs | 59 +++++++++++++------ talpid-routing/src/unix/mod.rs | 11 +++- .../src/windows/default_route_monitor.rs | 6 +- 6 files changed, 60 insertions(+), 36 deletions(-) diff --git a/talpid-core/src/offline/macos.rs b/talpid-core/src/offline/macos.rs index 7f13638ec94a..9be5aae8a89d 100644 --- a/talpid-core/src/offline/macos.rs +++ b/talpid-core/src/offline/macos.rs @@ -12,9 +12,6 @@ use std::sync::{ }; use talpid_routing::{DefaultRouteEvent, RouteManagerHandle}; -/// How long to wait before announcing changes to the offline state -//const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(2); - #[derive(err_derive::Error, Debug)] pub enum Error { #[error(display = "Failed to initialize route monitor")] @@ -120,11 +117,6 @@ pub async fn spawn_monitor( None => return, }; - // Debounce event updates - // FIXME: Debounce is disabled because the DNS config can get messed up - // when switching between networks otherwise. - //tokio::time::sleep(DEBOUNCE_INTERVAL).await; - if prev_notified.swap(new_connectivity, Ordering::AcqRel) == new_connectivity { // We don't care about network changes here return; diff --git a/talpid-routing/src/debounce.rs b/talpid-routing/src/debounce.rs index 6ded8ece2f16..ba1e52250ce2 100644 --- a/talpid-routing/src/debounce.rs +++ b/talpid-routing/src/debounce.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use std::{ sync::mpsc::{channel, RecvTimeoutError, Sender}, time::{Duration, Instant}, @@ -70,7 +72,7 @@ impl BurstGuard { /// When `stop` returns an then the `BurstGuard` thread is guaranteed to not make any further /// calls to `callback`. - pub fn stop(&self) { + pub fn stop(self) { let (sender, listener) = channel(); // If we could not send then it means the thread has already shut down and we can return if self.sender.send(BurstGuardEvent::Shutdown(sender)).is_ok() { @@ -80,6 +82,12 @@ impl BurstGuard { } } + /// Stop without waiting for in-flight events to complete. + pub fn stop_nonblocking(self) { + let (sender, _listener) = channel(); + let _ = self.sender.send(BurstGuardEvent::Shutdown(sender)); + } + /// Asynchronously trigger burst pub fn trigger(&self) { self.sender.send(BurstGuardEvent::Trigger).unwrap(); diff --git a/talpid-routing/src/lib.rs b/talpid-routing/src/lib.rs index d6997a230107..dd5fd3a7616a 100644 --- a/talpid-routing/src/lib.rs +++ b/talpid-routing/src/lib.rs @@ -6,7 +6,7 @@ use ipnetwork::IpNetwork; use std::{fmt, net::IpAddr}; -#[cfg(target_os = "windows")] +#[cfg(any(target_os = "windows", target_os = "macos"))] mod debounce; #[cfg(target_os = "windows")] diff --git a/talpid-routing/src/unix/macos/mod.rs b/talpid-routing/src/unix/macos/mod.rs index c2cd486f7151..7aceeba78f03 100644 --- a/talpid-routing/src/unix/macos/mod.rs +++ b/talpid-routing/src/unix/macos/mod.rs @@ -1,4 +1,4 @@ -use crate::{NetNode, Node, RequiredRoute, Route}; +use crate::{debounce::BurstGuard, NetNode, Node, RequiredRoute, Route}; use futures::{ channel::mpsc, @@ -7,11 +7,11 @@ use futures::{ }; use ipnetwork::IpNetwork; use nix::sys::socket::{AddressFamily, SockaddrLike, SockaddrStorage}; -use std::pin::Pin; use std::{ collections::{BTreeMap, HashSet}, time::Duration, }; +use std::{pin::Pin, sync::Weak}; use talpid_types::ErrorExt; use watch::RoutingTable; @@ -85,6 +85,7 @@ pub struct RouteManagerImpl { applied_routes: BTreeMap, v4_default_route: Option, v6_default_route: Option, + update_trigger: BurstGuard, default_route_listeners: Vec>, check_default_routes_restored: Pin + Send>>, } @@ -92,8 +93,16 @@ pub struct RouteManagerImpl { impl RouteManagerImpl { /// Create new route manager #[allow(clippy::unused_async)] - pub async fn new() -> Result { + pub(crate) async fn new( + manage_tx: Weak>, + ) -> Result { let routing_table = RoutingTable::new().map_err(Error::RoutingTable)?; + let update_trigger = BurstGuard::new(move || { + let Some(manage_tx) = manage_tx.upgrade() else { + return; + }; + let _ = manage_tx.unbounded_send(RouteManagerCommand::RefreshRoutes); + }); Ok(Self { routing_table, non_tunnel_routes: HashSet::new(), @@ -102,6 +111,7 @@ impl RouteManagerImpl { applied_routes: BTreeMap::new(), v4_default_route: None, v6_default_route: None, + update_trigger, default_route_listeners: vec![], check_default_routes_restored: Box::pin(futures::stream::pending()), }) @@ -129,10 +139,12 @@ impl RouteManagerImpl { ); }); + let mut completion_tx = None; + loop { futures::select_biased! { route_message = self.routing_table.next_message().fuse() => { - self.handle_route_message(route_message).await; + self.handle_route_message(route_message); } _ = self.check_default_routes_restored.next() => { @@ -148,11 +160,8 @@ impl RouteManagerImpl { command = manage_rx.next() => { match command { Some(RouteManagerCommand::Shutdown(tx)) => { - if let Err(err) = self.cleanup_routes().await { - log::error!("Failed to clean up routes: {err}"); - } - let _ = tx.send(()); - return; + completion_tx = Some(tx); + break; }, Some(RouteManagerCommand::NewDefaultRouteListener(tx)) => { @@ -214,6 +223,11 @@ impl RouteManagerImpl { log::error!("Failed to clean up rotues: {err}"); } }, + Some(RouteManagerCommand::RefreshRoutes) => { + if let Err(error) = self.refresh_routes().await { + log::error!("Failed to refresh routes: {error}") + } + }, None => { break; } @@ -225,6 +239,12 @@ impl RouteManagerImpl { if let Err(err) = self.cleanup_routes().await { log::error!("Failed to clean up routing table when shutting down: {err}"); } + + self.update_trigger.stop_nonblocking(); + + if let Some(tx) = completion_tx { + let _ = tx.send(()); + } } async fn add_required_routes(&mut self, required_routes: HashSet) -> Result<()> { @@ -287,7 +307,7 @@ impl RouteManagerImpl { Ok(()) } - async fn handle_route_message( + fn handle_route_message( &mut self, message: std::result::Result, ) { @@ -303,18 +323,19 @@ impl RouteManagerImpl { log::error!("Failed to process deleted route: {err}"); } } - - if let Err(error) = self.handle_route_socket_message().await { - log::error!("Failed to process route change: {error}"); + if route.errno() == 0 && route.is_default().unwrap_or(true) { + self.update_trigger.trigger(); } } - Ok(RouteSocketMessage::AddRoute(_)) - | Ok(RouteSocketMessage::ChangeRoute(_)) - | Ok(RouteSocketMessage::AddAddress(_) | RouteSocketMessage::DeleteAddress(_)) => { - if let Err(error) = self.handle_route_socket_message().await { - log::error!("Failed to process route/address change: {error}"); + Ok(RouteSocketMessage::AddRoute(route)) + | Ok(RouteSocketMessage::ChangeRoute(route)) => { + if route.errno() == 0 && route.is_default().unwrap_or(true) { + self.update_trigger.trigger(); } } + Ok(RouteSocketMessage::AddAddress(_) | RouteSocketMessage::DeleteAddress(_)) => { + self.update_trigger.trigger(); + } // ignore all other message types Ok(_) => {} Err(err) => { @@ -329,7 +350,7 @@ impl RouteManagerImpl { /// * At the same time, update the route used by non-tunnel interfaces to reach the relay/VPN /// server. The gateway of the relay route is set to the first interface in the network /// service order that has a working ifscoped default route. - async fn handle_route_socket_message(&mut self) -> Result<()> { + async fn refresh_routes(&mut self) -> Result<()> { self.update_best_default_route(interface::Family::V4) .await?; self.update_best_default_route(interface::Family::V6) diff --git a/talpid-routing/src/unix/mod.rs b/talpid-routing/src/unix/mod.rs index 757d3775fcf1..02dac8ac0fcd 100644 --- a/talpid-routing/src/unix/mod.rs +++ b/talpid-routing/src/unix/mod.rs @@ -7,7 +7,7 @@ use futures::channel::{ mpsc::{self, UnboundedSender}, oneshot, }; -use std::{collections::HashSet, io}; +use std::{collections::HashSet, io, sync::Arc}; #[cfg(any(target_os = "linux", target_os = "macos"))] use futures::stream::Stream; @@ -55,7 +55,7 @@ pub enum Error { /// Handle to a route manager. #[derive(Clone)] pub struct RouteManagerHandle { - tx: UnboundedSender, + tx: Arc>, } impl RouteManagerHandle { @@ -181,6 +181,8 @@ pub(crate) enum RouteManagerCommand { ClearRoutes, Shutdown(oneshot::Sender<()>), #[cfg(target_os = "macos")] + RefreshRoutes, + #[cfg(target_os = "macos")] NewDefaultRouteListener(oneshot::Sender>), #[cfg(target_os = "macos")] GetDefaultRoutes(oneshot::Sender<(Option, Option)>), @@ -227,7 +229,7 @@ pub enum CallbackMessage { /// If a destination has to be routed through the default node, /// the route will be adjusted dynamically when the default route changes. pub struct RouteManager { - manage_tx: Option>, + manage_tx: Option>>, runtime: tokio::runtime::Handle, } @@ -238,11 +240,14 @@ impl RouteManager { #[cfg(target_os = "linux")] table_id: u32, ) -> Result { let (manage_tx, manage_rx) = mpsc::unbounded(); + let manage_tx = Arc::new(manage_tx); let manager = imp::RouteManagerImpl::new( #[cfg(target_os = "linux")] fwmark, #[cfg(target_os = "linux")] table_id, + #[cfg(target_os = "macos")] + Arc::downgrade(&manage_tx), ) .await?; tokio::spawn(manager.run(manage_rx)); diff --git a/talpid-routing/src/windows/default_route_monitor.rs b/talpid-routing/src/windows/default_route_monitor.rs index e43c26733b71..d42dbc91de7e 100644 --- a/talpid-routing/src/windows/default_route_monitor.rs +++ b/talpid-routing/src/windows/default_route_monitor.rs @@ -122,10 +122,8 @@ impl Drop for DefaultRouteMonitor { let context = unsafe { Box::from_raw(self.context as *mut ContextAndBurstGuard) }; // Stop the burst guard - context.burst_guard.lock().unwrap().stop(); - - // Drop the context now that we are guaranteed nothing might try to access the context - drop(context); + let context = context.burst_guard.into_inner().unwrap(); + context.stop(); } }