Skip to content

Commit

Permalink
fixup: select on new tunnel events
Browse files Browse the repository at this point in the history
  • Loading branch information
hulthe committed Dec 16, 2024
1 parent 819a46b commit bece7a9
Showing 1 changed file with 49 additions and 39 deletions.
88 changes: 49 additions & 39 deletions mullvad-daemon/src/leak_checker/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use anyhow::anyhow;
use futures::{select, FutureExt};
use leak_checker::traceroute::TracerouteOpt;
pub use leak_checker::LeakInfo;
use std::net::IpAddr;
use std::ops::ControlFlow;
use std::time::Duration;
use talpid_types::tunnel::TunnelStateTransition;
use tokio::runtime::Handle;
use tokio::sync::mpsc;

/// An actor that tries to leak traffic outside the tunnel while we are connected.
Expand Down Expand Up @@ -75,11 +75,7 @@ impl Task {
};

match event {
TaskEvent::NewTunnelState(s) => {
if self.on_new_tunnel_state(s).await.is_break() {
break;
}
}
TaskEvent::NewTunnelState(s) => self.on_new_tunnel_state(s).await,
TaskEvent::AddCallback(c) => self.on_add_callback(c),
}
}
Expand All @@ -89,17 +85,12 @@ impl Task {
self.callbacks.push(c);
}

async fn on_new_tunnel_state(
&mut self,
mut tunnel_state: TunnelStateTransition,
) -> ControlFlow<()> {
async fn on_new_tunnel_state(&mut self, mut tunnel_state: TunnelStateTransition) {
'leak_test: loop {
let TunnelStateTransition::Connected(tunnel) = &tunnel_state else {
return ControlFlow::Continue(());
break 'leak_test;
};

tokio::time::sleep(std::time::Duration::from_millis(500)).await;

let ping_destination = tunnel.endpoint.address.ip();

// TODO (linux):
Expand All @@ -121,44 +112,63 @@ impl Task {

let interface = "wlan0"; // TODO

let leak_info = match check_for_leaks(interface, ping_destination).await {
// Make sure the tunnel state doesn't change while we're doing the leak test.
// If that happens, then our results might be invalid.
let another_tunnel_state = async {
'listen_for_events: while let Some(event) = self.events_rx.recv().await {
let new_state = match event {
TaskEvent::NewTunnelState(tunnel_state) => tunnel_state,
TaskEvent::AddCallback(c) => {
self.on_add_callback(c);
continue 'listen_for_events;
}
};

if let TunnelStateTransition::Connected(..) = new_state {
// Still connected, all is well...
} else {
// Tunnel state changed! We have to discard the leak test and try again.
tunnel_state = new_state;
break 'listen_for_events;
}
}
};

let leak_test = async {
// Give the connection a little time to settle before starting the test.
// TODO: is this necessary? is there some better way?
// TODO: ether remove this or add some concrete motivation.
tokio::time::sleep(Duration::from_millis(500)).await;

check_for_leaks(interface, ping_destination).await
};

let leak_result = select! {
// If tunnel state changes, restart the test.
_ = another_tunnel_state.fuse() => continue 'leak_test,

leak_result = leak_test.fuse() => leak_result,
};

let leak_info = match leak_result {
Ok(Some(leak_info)) => leak_info,
Ok(None) => {
log::debug!("No leak detected");
continue;
break 'leak_test;
}
Err(e) => {
log::debug!("Leak check errored: {e:#?}");
return ControlFlow::Continue(());
break 'leak_test;
}
};

log::debug!("leak detected: {leak_info:?}");

// Make sure the tunnel state didn't change while we were doing the leak test.
// If that happened, then our results might be invalid.
while let Ok(event) = self.events_rx.try_recv() {
let new_state = match event {
TaskEvent::NewTunnelState(tunnel_state) => tunnel_state,
TaskEvent::AddCallback(c) => {
self.on_add_callback(c);
continue;
}
};

if let TunnelStateTransition::Connected(..) = new_state {
// Still connected, all is well...
} else {
// Tunnel state changed! We have to discard the leak test and try again.
tunnel_state = new_state;
continue 'leak_test;
}
}
log::debug!("Leak detected: {leak_info:?}");

for callback in &mut self.callbacks {
callback.on_leak(leak_info.clone());
}
return ControlFlow::Continue(());

break 'leak_test;
}
}
}
Expand Down

0 comments on commit bece7a9

Please sign in to comment.