Skip to content

Commit

Permalink
Synthesize offline state on macOS
Browse files Browse the repository at this point in the history
  • Loading branch information
dlon committed Oct 9, 2023
1 parent e1fd9e1 commit 00fa03a
Showing 1 changed file with 78 additions and 34 deletions.
112 changes: 78 additions & 34 deletions talpid-core/src/offline/macos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,24 @@
//! user from connecting to a relay.
//!
//! See [RouteManagerHandle::default_route_listener].
use futures::{channel::mpsc::UnboundedSender, StreamExt};
use std::sync::{Arc, Mutex};
//!
//! This offline monitor synthesizes an offline state between network switches and before coming
//! online from an offline state. This is done to work around issues with DNS being blocked due
//! to macOS's connectivity check. In the offline state, a DNS server on localhost prevents the
//! connectivity check from being blocked.
use futures::{
channel::mpsc::UnboundedSender,
future::{Fuse, FutureExt},
select, StreamExt,
};
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use talpid_routing::{DefaultRouteEvent, RouteManagerHandle};

const SYNTHETIC_OFFLINE_DURATION: Duration = Duration::from_secs(1);

#[derive(err_derive::Error, Debug)]
pub enum Error {
#[error(display = "Failed to initialize route monitor")]
Expand All @@ -18,6 +32,7 @@ pub struct MonitorHandle {
_notify_tx: Arc<UnboundedSender<bool>>,
}

#[derive(Clone)]
struct ConnectivityState {
v4_connectivity: bool,
v6_connectivity: bool,
Expand Down Expand Up @@ -45,7 +60,7 @@ pub async fn spawn_monitor(
let notify_tx = Arc::new(notify_tx);

// note: begin observing before initializing the state
let mut route_listener = route_manager_handle.default_route_listener().await?;
let route_listener = route_manager_handle.default_route_listener().await?;

let (v4_connectivity, v6_connectivity) = match route_manager_handle.get_default_routes().await {
Ok((v4_route, v6_route)) => (v4_route.is_some(), v6_route.is_some()),
Expand All @@ -61,50 +76,79 @@ pub async fn spawn_monitor(
v4_connectivity,
v6_connectivity,
};
let mut real_state = state.clone();

let state = Arc::new(Mutex::new(state));

let weak_state = Arc::downgrade(&state);
let weak_notify_tx = Arc::downgrade(&notify_tx);

// Detect changes to the default route
tokio::spawn(async move {
while let Some(event) = route_listener.next().await {
let Some(state) = weak_state.upgrade() else {
break;
};
let mut state = state.lock().unwrap();
let mut timeout = Fuse::terminated();
let mut route_listener = route_listener.fuse();

loop {
select! {
_ = timeout => {
// Update shared state
let Some(state) = weak_state.upgrade() else {
break;
};
let mut state = state.lock().unwrap();
*state = real_state.clone();

if state.get_connectivity() {
log::info!("Connectivity changed: Connected");
let Some(tx) = weak_notify_tx.upgrade() else {
break;
};
let _ = tx.unbounded_send(false);
}
}

let previous_connectivity = state.get_connectivity();
route_event = route_listener.next() => {
let Some(event) = route_event else {
break;
};

// Update real state
match event {
DefaultRouteEvent::AddedOrChangedV4 => {
real_state.v4_connectivity = true;
}
DefaultRouteEvent::AddedOrChangedV6 => {
real_state.v6_connectivity = true;
}
DefaultRouteEvent::RemovedV4 => {
real_state.v4_connectivity = false;
}
DefaultRouteEvent::RemovedV6 => {
real_state.v6_connectivity = false;
}
}

match event {
DefaultRouteEvent::AddedOrChangedV4 => {
state.v4_connectivity = true;
}
DefaultRouteEvent::AddedOrChangedV6 => {
state.v6_connectivity = true;
}
DefaultRouteEvent::RemovedV4 => {
// Synthesize offline state
// Update shared state
let Some(state) = weak_state.upgrade() else {
break;
};
let mut state = state.lock().unwrap();
let previous_connectivity = state.get_connectivity();
state.v4_connectivity = false;
}
DefaultRouteEvent::RemovedV6 => {
state.v6_connectivity = false;
}
}

let new_connectivity = state.get_connectivity();
if previous_connectivity != new_connectivity {
log::info!(
"Connectivity changed: {}",
if new_connectivity {
"Connected"
} else {
"Offline"
if previous_connectivity {
let Some(tx) = weak_notify_tx.upgrade() else {
break;
};
let _ = tx.unbounded_send(true);
log::info!("Connectivity changed: Offline");
}
if real_state.get_connectivity() {
timeout = Box::pin(tokio::time::sleep(SYNTHETIC_OFFLINE_DURATION)).fuse();
}
);
let Some(tx) = weak_notify_tx.upgrade() else {
break;
};
let _ = tx.unbounded_send(!new_connectivity);
}
}
}

Expand Down

0 comments on commit 00fa03a

Please sign in to comment.