Skip to content

Commit

Permalink
Handle am.i.mullvad.net with internal daemon event
Browse files Browse the repository at this point in the history
Add `geoip::GeoIpHandler`, which sends an
`InternalDaemonEvent::LocationEvent` when the location arrives. It also
handles aborting in-flight requests and retries.
  • Loading branch information
Serock3 committed Dec 15, 2023
1 parent 5ebdecb commit e4952b5
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 66 deletions.
63 changes: 57 additions & 6 deletions mullvad-daemon/src/geoip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ use std::time::Duration;
use futures::join;
use mullvad_api::{
self,
availability::ApiAvailabilityHandle,
rest::{Error, RequestServiceHandle},
};
use mullvad_types::location::{AmIMullvad, GeoIpLocation};
use once_cell::sync::Lazy;
use talpid_core::future_retry::{retry_future, ExponentialBackoff, Jittered};
use talpid_core::{
future_retry::{retry_future, ExponentialBackoff, Jittered},
mpsc::Sender,
};
use talpid_types::ErrorExt;

use crate::{DaemonEventSender, InternalDaemonEvent};

// Define the Mullvad connection checking api endpoint.
//
// In a development build the host name for the connection checking endpoint can
Expand Down Expand Up @@ -43,17 +47,63 @@ static MULLVAD_CONNCHECK_HOST: Lazy<String> = Lazy::new(|| {
const LOCATION_RETRY_STRATEGY: Jittered<ExponentialBackoff> =
Jittered::jitter(ExponentialBackoff::new(Duration::from_secs(1), 4));

/// Fetch the current `GeoIpLocation` with retrys
pub async fn get_geo_location_with_retry(
/// Handler for request to am.i.mullvad.net, manages in-flight request and validity of responses.
pub(crate) struct GeoIpHandler {
/// Unique ID for each request. If the ID attached to the
/// [`InternalDaemonEvent::LocationEvent`] used by [`crate::Daemon::handle_location_event`] to
/// determine if the location belongs to the current tunnel state.
pub request_id: usize,
rest_service: RequestServiceHandle,
location_sender: DaemonEventSender,
}

impl GeoIpHandler {
pub fn new(rest_service: RequestServiceHandle, location_sender: DaemonEventSender) -> Self {
Self {
request_id: 0,
rest_service,
location_sender,
}
}

/// Send a location request to am.i.mullvad.net. When it arrives, send an
/// [`InternalDaemonEvent::LocationEvent`], which triggers an update of the current
/// tunnel state with the `ipv4` and/or `ipv6` fields filled in.
pub fn send_geo_location_request(&mut self, use_ipv6: bool) {
// Increment request ID
self.request_id = self.request_id.wrapping_add(1);
let request_id_copy = self.request_id;

self.abort_current_request();

let rest_service = self.rest_service.clone();
let location_sender = self.location_sender.clone();
tokio::spawn(async move {
if let Ok(merged_location) = get_geo_location_with_retry(use_ipv6, rest_service).await {
let _ = location_sender.send(InternalDaemonEvent::LocationEvent((
request_id_copy,
merged_location,
)));
}
});
}

/// Abort any ongoing call to am.i.mullvad.net
pub fn abort_current_request(&mut self) {
self.rest_service.reset();
}
}

/// Fetch the current `GeoIpLocation` from am.i.mullvad.net. Handles retries on network errors.
async fn get_geo_location_with_retry(
use_ipv6: bool,
api_handle: ApiAvailabilityHandle,
rest_service: RequestServiceHandle,
) -> Result<GeoIpLocation, Error> {
log::debug!("Fetching GeoIpLocation");
retry_future(
move || send_location_request(rest_service.clone(), use_ipv6),
move |result| match result {
Err(error) if error.is_network_error() => !api_handle.get_state().is_offline(),
Err(error) => error.is_network_error(),
_ => false,
},
LOCATION_RETRY_STRATEGY,
Expand All @@ -76,6 +126,7 @@ async fn send_location_request(
if use_ipv6 {
let uri_v6 = format!("https://ipv6.{}/json", *MULLVAD_CONNCHECK_HOST);
let location = send_location_request_internal(&uri_v6, v6_sender).await;
log::warn!("{location:?}");
Some(location.map(GeoIpLocation::from))
} else {
None
Expand Down
134 changes: 75 additions & 59 deletions mullvad-daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use futures::{
future::{abortable, AbortHandle, Future, LocalBoxFuture},
StreamExt,
};
use geoip::GeoIpHandler;
use mullvad_relay_selector::{
updater::{RelayListUpdater, RelayListUpdaterHandle},
RelaySelector, SelectorConfig,
Expand Down Expand Up @@ -80,7 +81,7 @@ use talpid_types::android::AndroidContext;
#[cfg(target_os = "windows")]
use talpid_types::split_tunnel::ExcludedProcess;
use talpid_types::{
net::{TunnelEndpoint, TunnelType},
net::{IpVersion, TunnelEndpoint, TunnelType},
tunnel::{ErrorStateCause, TunnelStateTransition},
ErrorExt,
};
Expand Down Expand Up @@ -369,6 +370,8 @@ pub(crate) enum InternalDaemonEvent {
DeviceEvent(AccountEvent),
/// Handles updates from versions without devices.
DeviceMigrationEvent(Result<PrivateAccountAndDevice, device::Error>),
/// A geographical location has has been received from am.i.mullvad.net
LocationEvent((usize, GeoIpLocation)),
/// The split tunnel paths or state were updated.
#[cfg(target_os = "windows")]
ExcludedPathsEvent(ExcludedPathsUpdate, oneshot::Sender<Result<(), Error>>),
Expand Down Expand Up @@ -615,7 +618,7 @@ pub struct Daemon<L: EventListener> {
tunnel_state_machine_handle: TunnelStateMachineHandle,
#[cfg(target_os = "windows")]
volume_update_tx: mpsc::UnboundedSender<()>,
location_abort_handle: Option<tokio::task::AbortHandle>,
location_handler: GeoIpHandler,
}

impl<L> Daemon<L>
Expand Down Expand Up @@ -829,6 +832,11 @@ where
// Attempt to download a fresh relay list
relay_list_updater.update().await;

let location_handler = GeoIpHandler::new(
api_runtime.rest_handle().await,
internal_event_tx.clone().to_specialized_sender(),
);

let daemon = Daemon {
tunnel_state: TunnelState::Disconnected(None),
target_state,
Expand Down Expand Up @@ -856,7 +864,7 @@ where
tunnel_state_machine_handle,
#[cfg(target_os = "windows")]
volume_update_tx,
location_abort_handle: None,
location_handler,
};

api_availability.unsuspend();
Expand All @@ -869,6 +877,8 @@ where
pub async fn run(mut self) -> Result<(), Error> {
if *self.target_state == TargetState::Secured {
self.connect_tunnel();
} else {
self.fetch_am_i_mullvad()
}

while let Some(event) = self.rx.next().await {
Expand Down Expand Up @@ -946,6 +956,9 @@ where
}
DeviceEvent(event) => self.handle_device_event(event).await,
DeviceMigrationEvent(event) => self.handle_device_migration_event(event),
LocationEvent((request_id, fetched_location)) => {
self.handle_location_event(request_id, fetched_location)
}
#[cfg(windows)]
ExcludedPathsEvent(update, tx) => self.handle_new_excluded_paths(update, tx).await,
}
Expand All @@ -960,25 +973,14 @@ where
.handle_state_transition(&tunnel_state_transition);

let tunnel_state = match tunnel_state_transition {
TunnelStateTransition::Disconnected => {
self.notify_tunnel_state_when_ip_arrives(None, TunnelState::Disconnected)
.await;
TunnelState::Disconnected(None)
}
TunnelStateTransition::Disconnected => TunnelState::Disconnected(None),
TunnelStateTransition::Connecting(endpoint) => TunnelState::Connecting {
endpoint,
location: self.parameters_generator.get_last_location().await,
},
TunnelStateTransition::Connected(endpoint) => {
let location = self.parameters_generator.get_last_location().await;
let make_tunnel_state = |location| TunnelState::Connected { endpoint, location };
self.notify_tunnel_state_when_ip_arrives(
location.clone(),
make_tunnel_state.clone(),
)
.await;

make_tunnel_state(location)
TunnelState::Connected { endpoint, location }
}
TunnelStateTransition::Disconnecting(after_disconnect) => {
TunnelState::Disconnecting(after_disconnect)
Expand Down Expand Up @@ -1033,53 +1035,67 @@ where

self.tunnel_state = tunnel_state.clone();
self.event_listener.notify_new_state(tunnel_state);
self.fetch_am_i_mullvad();
}

/// Get the geographical location from am.i.mullvad.net. When it arrives,
/// update the "Out IP" field of the front ends by sending a
/// [`InternalDaemonEvent::LocationEvent`].
///
/// See [`Daemon::handle_location_event()`]
fn fetch_am_i_mullvad(&mut self) {
// Always abort any ongoing request when entering a new tunnel state
self.location_handler.abort_current_request();

// Whether or not to poll for an IPv6 exit IP
let use_ipv6 = match &self.tunnel_state {
// If connected, refer to the tunnel setting
TunnelState::Connected { .. } => self.settings.tunnel_options.generic.enable_ipv6,
// If not connected, we have to guess whether the users local connection supports IPv6.
// The only thing we have to go on is the wireguard setting.
TunnelState::Disconnected(_) => {
if let RelaySettings::Normal(relay_constraints) = &self.settings.relay_settings {
// Note that `Constraint::Any` corresponds to just IPv4
matches!(
relay_constraints.wireguard_constraints.ip_version,
mullvad_types::relay_constraints::Constraint::Only(IpVersion::V6)
)
} else {
false
}
}
// Fetching IP from am.i.mullvad.net should only be done from a tunnel state where a
// connection is available. Otherwise we just exist.
_ => return,
};

self.location_handler.send_geo_location_request(use_ipv6);
}

/// Get the out IP from am.i.mullvad.net. When it arrives, send another
/// TunnelState with the IP filled in.
async fn notify_tunnel_state_when_ip_arrives(
&mut self,
old_location: Option<GeoIpLocation>,
make_tunnel_state: impl FnOnce(Option<GeoIpLocation>) -> TunnelState + Send + 'static,
) {
// Abort any ongoing calls to `self.notify_tunnel_state_when_ip_arrives` from
// previous tunnel state transitions to avoid sending an outdated information.
if let Some(handle) = self.location_abort_handle.take() {
handle.abort();
/// Recieves and handles the geographical exit location received from am.i.mullvad.net, i.e. the
/// [`InternalDaemonEvent::LocationEvent`] event.
fn handle_location_event(&mut self, request_id: usize, fetched_location: GeoIpLocation) {
if self.location_handler.request_id != request_id {
log::debug!("Location from am.i.mullvad.net belongs to an outdated tunnel state");
return;
}
let rest_service = self.api_runtime.rest_handle().await;
let use_ipv6 = self.settings.tunnel_options.generic.enable_ipv6;
let api_handle = self.api_handle.availability.clone();
let even_listener = self.event_listener.clone();

self.location_abort_handle = Some(
tokio::spawn(async move {
let merged_location =
geoip::get_geo_location_with_retry(rest_service, use_ipv6, api_handle)
.await
.ok()
// Replace the old location with new information from am.i.mullvad.net,
// but keep the hostname as it is always none
.map(|new_location| match old_location {
Some(old_location) => GeoIpLocation {
hostname: old_location.hostname,
bridge_hostname: old_location.bridge_hostname,
entry_hostname: old_location.entry_hostname,
obfuscator_hostname: old_location.obfuscator_hostname,
..new_location
},
None => GeoIpLocation {
hostname: None,
bridge_hostname: None,
entry_hostname: None,
obfuscator_hostname: None,
..new_location
},
});
even_listener.notify_new_state(make_tunnel_state(merged_location));
})
.abort_handle(),
);
match self.tunnel_state {
TunnelState::Disconnected(ref mut location) => *location = Some(fetched_location),
TunnelState::Connected {
ref mut location, ..
} => {
*location = Some(GeoIpLocation {
ipv4: fetched_location.ipv4,
ipv6: fetched_location.ipv6,
..location.clone().unwrap_or(fetched_location)
})
}
_ => return,
};

self.event_listener
.notify_new_state(self.tunnel_state.clone());
}

fn reset_rpc_sockets_on_tunnel_state_transition(
Expand Down
1 change: 0 additions & 1 deletion mullvad-jni/src/daemon_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use mullvad_daemon::{device, DaemonCommand, DaemonCommandSender};
use mullvad_types::{
account::{AccountData, AccountToken, PlayPurchase, VoucherSubmission},
device::{Device, DeviceState},
location::GeoIpLocation,
relay_constraints::{ObfuscationSettings, RelaySettings},
relay_list::RelayList,
settings::{DnsOptions, Settings},
Expand Down

0 comments on commit e4952b5

Please sign in to comment.