From 00fa03a4ebcf942e3106f670bd32cf965f733048 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B6nnhager?= Date: Mon, 9 Oct 2023 22:52:05 +0200 Subject: [PATCH] Synthesize offline state on macOS --- talpid-core/src/offline/macos.rs | 112 +++++++++++++++++++++---------- 1 file changed, 78 insertions(+), 34 deletions(-) diff --git a/talpid-core/src/offline/macos.rs b/talpid-core/src/offline/macos.rs index b679a52b226e..926d68918c46 100644 --- a/talpid-core/src/offline/macos.rs +++ b/talpid-core/src/offline/macos.rs @@ -3,10 +3,24 @@ //! user from connecting to a relay. //! //! See [RouteManagerHandle::default_route_listener]. -use futures::{channel::mpsc::UnboundedSender, StreamExt}; -use std::sync::{Arc, Mutex}; +//! +//! This offline monitor synthesizes an offline state between network switches and before coming +//! online from an offline state. This is done to work around issues with DNS being blocked due +//! to macOS's connectivity check. In the offline state, a DNS server on localhost prevents the +//! connectivity check from being blocked. +use futures::{ + channel::mpsc::UnboundedSender, + future::{Fuse, FutureExt}, + select, StreamExt, +}; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; use talpid_routing::{DefaultRouteEvent, RouteManagerHandle}; +const SYNTHETIC_OFFLINE_DURATION: Duration = Duration::from_secs(1); + #[derive(err_derive::Error, Debug)] pub enum Error { #[error(display = "Failed to initialize route monitor")] @@ -18,6 +32,7 @@ pub struct MonitorHandle { _notify_tx: Arc>, } +#[derive(Clone)] struct ConnectivityState { v4_connectivity: bool, v6_connectivity: bool, @@ -45,7 +60,7 @@ pub async fn spawn_monitor( let notify_tx = Arc::new(notify_tx); // note: begin observing before initializing the state - let mut route_listener = route_manager_handle.default_route_listener().await?; + let route_listener = route_manager_handle.default_route_listener().await?; let (v4_connectivity, v6_connectivity) = match route_manager_handle.get_default_routes().await { Ok((v4_route, v6_route)) => (v4_route.is_some(), v6_route.is_some()), @@ -61,6 +76,8 @@ pub async fn spawn_monitor( v4_connectivity, v6_connectivity, }; + let mut real_state = state.clone(); + let state = Arc::new(Mutex::new(state)); let weak_state = Arc::downgrade(&state); @@ -68,43 +85,70 @@ pub async fn spawn_monitor( // Detect changes to the default route tokio::spawn(async move { - while let Some(event) = route_listener.next().await { - let Some(state) = weak_state.upgrade() else { - break; - }; - let mut state = state.lock().unwrap(); + let mut timeout = Fuse::terminated(); + let mut route_listener = route_listener.fuse(); + + loop { + select! { + _ = timeout => { + // Update shared state + let Some(state) = weak_state.upgrade() else { + break; + }; + let mut state = state.lock().unwrap(); + *state = real_state.clone(); + + if state.get_connectivity() { + log::info!("Connectivity changed: Connected"); + let Some(tx) = weak_notify_tx.upgrade() else { + break; + }; + let _ = tx.unbounded_send(false); + } + } - let previous_connectivity = state.get_connectivity(); + route_event = route_listener.next() => { + let Some(event) = route_event else { + break; + }; + + // Update real state + match event { + DefaultRouteEvent::AddedOrChangedV4 => { + real_state.v4_connectivity = true; + } + DefaultRouteEvent::AddedOrChangedV6 => { + real_state.v6_connectivity = true; + } + DefaultRouteEvent::RemovedV4 => { + real_state.v4_connectivity = false; + } + DefaultRouteEvent::RemovedV6 => { + real_state.v6_connectivity = false; + } + } - match event { - DefaultRouteEvent::AddedOrChangedV4 => { - state.v4_connectivity = true; - } - DefaultRouteEvent::AddedOrChangedV6 => { - state.v6_connectivity = true; - } - DefaultRouteEvent::RemovedV4 => { + // Synthesize offline state + // Update shared state + let Some(state) = weak_state.upgrade() else { + break; + }; + let mut state = state.lock().unwrap(); + let previous_connectivity = state.get_connectivity(); state.v4_connectivity = false; - } - DefaultRouteEvent::RemovedV6 => { state.v6_connectivity = false; - } - } - let new_connectivity = state.get_connectivity(); - if previous_connectivity != new_connectivity { - log::info!( - "Connectivity changed: {}", - if new_connectivity { - "Connected" - } else { - "Offline" + if previous_connectivity { + let Some(tx) = weak_notify_tx.upgrade() else { + break; + }; + let _ = tx.unbounded_send(true); + log::info!("Connectivity changed: Offline"); + } + if real_state.get_connectivity() { + timeout = Box::pin(tokio::time::sleep(SYNTHETIC_OFFLINE_DURATION)).fuse(); } - ); - let Some(tx) = weak_notify_tx.upgrade() else { - break; - }; - let _ = tx.unbounded_send(!new_connectivity); + } } }