Skip to content

Commit

Permalink
Handle am.i.mullvad.net with internal daemon event
Browse files Browse the repository at this point in the history
Add `geoip::GeoIpHandler`, which sends an
`InternalDaemonEvent::LocationEvent` when the location arrives. It also
handles aborting in-flight requests and retries.
  • Loading branch information
Serock3 committed Dec 19, 2023
1 parent b5c6f3d commit 3fd67d4
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 75 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Line wrap the file at 100 chars. Th

### Changed
- Remove `--location` flag from `mullvad status` CLI. Location and IP will now always
be printed (if available). `mullvad status listen` does no longer print location info.
be printed (if available). `mullvad status listen` no longer prints location info.

#### Android
- Migrated to Compose Navigation
Expand Down
4 changes: 1 addition & 3 deletions mullvad-cli/src/cmds/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub struct StatusArgs {
#[arg(long, short = 'v')]
verbose: bool,

// TODO: changelog about removing location flag
/// Enable debug output
#[arg(long, short = 'd')]
debug: bool,
Expand All @@ -36,7 +35,7 @@ impl Status {
} else {
// When we enter the connected or disconnected state, am.i.mullvad.net will
// be polled to get IP information. When it arrives, we will get another
// tunnel state of the same enum type, but with the IP filled in. This
// tunnel state of the same enum type, but with the IP filled in. This
// match statement checks for duplicate tunnel states and skips the second
// print to avoid spamming the user.
match (&previous_tunnel_state, &new_state) {
Expand Down Expand Up @@ -92,7 +91,6 @@ pub async fn handle(cmd: Option<Status>, args: StatusArgs) -> Result<()> {
if args.debug {
println!("Tunnel state: {state:#?}");
} else {
// TODO: respect location arg?
format::print_state(&state, args.verbose);
format::print_location(&state);
}
Expand Down
2 changes: 1 addition & 1 deletion mullvad-cli/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub fn print_location(state: &TunnelState) {
_ => return,
};
if let Some(location) = location {
print!("Your connection appears from: {}", location.country);
print!("Your connection appears to be from: {}", location.country);
if let Some(city) = &location.city {
print!(", {}", city);
}
Expand Down
66 changes: 59 additions & 7 deletions mullvad-daemon/src/geoip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ use std::time::Duration;
use futures::join;
use mullvad_api::{
self,
availability::ApiAvailabilityHandle,
rest::{Error, RequestServiceHandle},
};
use mullvad_types::location::{AmIMullvad, GeoIpLocation};
use mullvad_types::location::{AmIMullvad, GeoIpLocation, LocationEventData};
use once_cell::sync::Lazy;
use talpid_core::future_retry::{retry_future, ExponentialBackoff, Jittered};
use talpid_core::{
future_retry::{retry_future, ExponentialBackoff, Jittered},
mpsc::Sender,
};
use talpid_types::ErrorExt;

use crate::{DaemonEventSender, InternalDaemonEvent};

// Define the Mullvad connection checking api endpoint.
//
// In a development build the host name for the connection checking endpoint can
Expand Down Expand Up @@ -43,17 +47,64 @@ static MULLVAD_CONNCHECK_HOST: Lazy<String> = Lazy::new(|| {
const LOCATION_RETRY_STRATEGY: Jittered<ExponentialBackoff> =
Jittered::jitter(ExponentialBackoff::new(Duration::from_secs(1), 4));

/// Fetch the current `GeoIpLocation` with retrys
pub async fn get_geo_location_with_retry(
/// Handler for request to am.i.mullvad.net, manages in-flight request and validity of responses.
pub(crate) struct GeoIpHandler {
/// Unique ID for each request. If the ID attached to the
/// [`InternalDaemonEvent::LocationEvent`] used by [`crate::Daemon::handle_location_event`] to
/// determine if the location belongs to the current tunnel state.
pub request_id: usize,
rest_service: RequestServiceHandle,
location_sender: DaemonEventSender,
}

impl GeoIpHandler {
pub fn new(rest_service: RequestServiceHandle, location_sender: DaemonEventSender) -> Self {
Self {
request_id: 0,
rest_service,
location_sender,
}
}

/// Send a location request to am.i.mullvad.net. When it arrives, send an
/// [`InternalDaemonEvent::LocationEvent`], which triggers an update of the current
/// tunnel state with the `ipv4` and/or `ipv6` fields filled in.
pub fn send_geo_location_request(&mut self, use_ipv6: bool) {
// Increment request ID
self.request_id = self.request_id.wrapping_add(1);

self.abort_current_request();

let request_id = self.request_id;
let rest_service = self.rest_service.clone();
let location_sender = self.location_sender.clone();
tokio::spawn(async move {
if let Ok(location) = get_geo_location_with_retry(use_ipv6, rest_service).await {
let _ =
location_sender.send(InternalDaemonEvent::LocationEvent(LocationEventData {
request_id,
location,
}));
}
});
}

/// Abort any ongoing call to am.i.mullvad.net
pub fn abort_current_request(&mut self) {
self.rest_service.reset();
}
}

/// Fetch the current `GeoIpLocation` from am.i.mullvad.net. Handles retries on network errors.
async fn get_geo_location_with_retry(
use_ipv6: bool,
api_handle: ApiAvailabilityHandle,
rest_service: RequestServiceHandle,
) -> Result<GeoIpLocation, Error> {
log::debug!("Fetching GeoIpLocation");
retry_future(
move || send_location_request(rest_service.clone(), use_ipv6),
move |result| match result {
Err(error) if error.is_network_error() => !api_handle.get_state().is_offline(),
Err(error) => error.is_network_error(),
_ => false,
},
LOCATION_RETRY_STRATEGY,
Expand All @@ -76,6 +127,7 @@ async fn send_location_request(
if use_ipv6 {
let uri_v6 = format!("https://ipv6.{}/json", *MULLVAD_CONNCHECK_HOST);
let location = send_location_request_internal(&uri_v6, v6_sender).await;
log::warn!("{location:?}");
Some(location.map(GeoIpLocation::from))
} else {
None
Expand Down
149 changes: 87 additions & 62 deletions mullvad-daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use futures::{
future::{abortable, AbortHandle, Future, LocalBoxFuture},
StreamExt,
};
use geoip::GeoIpHandler;
use mullvad_relay_selector::{
updater::{RelayListUpdater, RelayListUpdaterHandle},
RelaySelector, SelectorConfig,
Expand All @@ -46,7 +47,7 @@ use mullvad_types::{
auth_failed::AuthFailed,
custom_list::CustomList,
device::{Device, DeviceEvent, DeviceEventCause, DeviceId, DeviceState, RemoveDeviceEvent},
location::GeoIpLocation,
location::{GeoIpLocation, LocationEventData},
relay_constraints::{
BridgeSettings, BridgeState, ObfuscationSettings, RelayOverride, RelaySettings,
},
Expand Down Expand Up @@ -80,7 +81,7 @@ use talpid_types::android::AndroidContext;
#[cfg(target_os = "windows")]
use talpid_types::split_tunnel::ExcludedProcess;
use talpid_types::{
net::{TunnelEndpoint, TunnelType},
net::{IpVersion, TunnelEndpoint, TunnelType},
tunnel::{ErrorStateCause, TunnelStateTransition},
ErrorExt,
};
Expand Down Expand Up @@ -369,6 +370,8 @@ pub(crate) enum InternalDaemonEvent {
DeviceEvent(AccountEvent),
/// Handles updates from versions without devices.
DeviceMigrationEvent(Result<PrivateAccountAndDevice, device::Error>),
/// A geographical location has has been received from am.i.mullvad.net
LocationEvent(LocationEventData),
/// The split tunnel paths or state were updated.
#[cfg(target_os = "windows")]
ExcludedPathsEvent(ExcludedPathsUpdate, oneshot::Sender<Result<(), Error>>),
Expand Down Expand Up @@ -615,7 +618,7 @@ pub struct Daemon<L: EventListener> {
tunnel_state_machine_handle: TunnelStateMachineHandle,
#[cfg(target_os = "windows")]
volume_update_tx: mpsc::UnboundedSender<()>,
location_abort_handle: Option<tokio::task::AbortHandle>,
location_handler: GeoIpHandler,
}

impl<L> Daemon<L>
Expand Down Expand Up @@ -829,6 +832,11 @@ where
// Attempt to download a fresh relay list
relay_list_updater.update().await;

let location_handler = GeoIpHandler::new(
api_runtime.rest_handle().await,
internal_event_tx.clone().to_specialized_sender(),
);

let daemon = Daemon {
tunnel_state: TunnelState::Disconnected(None),
target_state,
Expand Down Expand Up @@ -856,7 +864,7 @@ where
tunnel_state_machine_handle,
#[cfg(target_os = "windows")]
volume_update_tx,
location_abort_handle: None,
location_handler,
};

api_availability.unsuspend();
Expand All @@ -867,8 +875,16 @@ where
/// Consume the `Daemon` and run the main event loop. Blocks until an error happens or a
/// shutdown event is received.
pub async fn run(mut self) -> Result<(), Error> {
if *self.target_state == TargetState::Secured {
self.connect_tunnel();
match *self.target_state {
TargetState::Secured => {
self.connect_tunnel();
}
TargetState::Unsecured => {
// Fetching GeoIpLocation is automatically done when connecting.
// If TargetState is Unsecured we will not connect on lauch and
// so we have to explicitly fetch this information.
self.fetch_am_i_mullvad()
}
}

while let Some(event) = self.rx.next().await {
Expand Down Expand Up @@ -946,6 +962,7 @@ where
}
DeviceEvent(event) => self.handle_device_event(event).await,
DeviceMigrationEvent(event) => self.handle_device_migration_event(event),
LocationEvent(location_data) => self.handle_location_event(location_data),
#[cfg(windows)]
ExcludedPathsEvent(update, tx) => self.handle_new_excluded_paths(update, tx).await,
}
Expand All @@ -960,25 +977,14 @@ where
.handle_state_transition(&tunnel_state_transition);

let tunnel_state = match tunnel_state_transition {
TunnelStateTransition::Disconnected => {
self.notify_tunnel_state_when_ip_arrives(None, TunnelState::Disconnected)
.await;
TunnelState::Disconnected(None)
}
TunnelStateTransition::Disconnected => TunnelState::Disconnected(None),
TunnelStateTransition::Connecting(endpoint) => TunnelState::Connecting {
endpoint,
location: self.parameters_generator.get_last_location().await,
},
TunnelStateTransition::Connected(endpoint) => {
let location = self.parameters_generator.get_last_location().await;
let make_tunnel_state = |location| TunnelState::Connected { endpoint, location };
self.notify_tunnel_state_when_ip_arrives(
location.clone(),
make_tunnel_state.clone(),
)
.await;

make_tunnel_state(location)
TunnelState::Connected { endpoint, location }
}
TunnelStateTransition::Disconnecting(after_disconnect) => {
TunnelState::Disconnecting(after_disconnect)
Expand Down Expand Up @@ -1033,53 +1039,72 @@ where

self.tunnel_state = tunnel_state.clone();
self.event_listener.notify_new_state(tunnel_state);
self.fetch_am_i_mullvad();
}

/// Get the geographical location from am.i.mullvad.net. When it arrives,
/// update the "Out IP" field of the front ends by sending a
/// [`InternalDaemonEvent::LocationEvent`].
///
/// See [`Daemon::handle_location_event()`]
fn fetch_am_i_mullvad(&mut self) {
// Always abort any ongoing request when entering a new tunnel state
self.location_handler.abort_current_request();

// Whether or not to poll for an IPv6 exit IP
let use_ipv6 = match &self.tunnel_state {
// If connected, refer to the tunnel setting
TunnelState::Connected { .. } => self.settings.tunnel_options.generic.enable_ipv6,
// If not connected, we have to guess whether the users local connection supports IPv6.
// The only thing we have to go on is the wireguard setting.
TunnelState::Disconnected(_) => {
if let RelaySettings::Normal(relay_constraints) = &self.settings.relay_settings {
// Note that `Constraint::Any` corresponds to just IPv4
matches!(
relay_constraints.wireguard_constraints.ip_version,
mullvad_types::relay_constraints::Constraint::Only(IpVersion::V6)
)
} else {
false
}
}
// Fetching IP from am.i.mullvad.net should only be done from a tunnel state where a
// connection is available. Otherwise we just exist.
_ => return,
};

self.location_handler.send_geo_location_request(use_ipv6);
}

/// Get the out IP from am.i.mullvad.net. When it arrives, send another
/// TunnelState with the IP filled in.
async fn notify_tunnel_state_when_ip_arrives(
&mut self,
old_location: Option<GeoIpLocation>,
make_tunnel_state: impl FnOnce(Option<GeoIpLocation>) -> TunnelState + Send + 'static,
) {
// Abort any ongoing calls to `self.notify_tunnel_state_when_ip_arrives` from
// previous tunnel state transitions to avoid sending an outdated information.
if let Some(handle) = self.location_abort_handle.take() {
handle.abort();
/// Recieves and handles the geographical exit location received from am.i.mullvad.net, i.e. the
/// [`InternalDaemonEvent::LocationEvent`] event.
fn handle_location_event(&mut self, location_data: LocationEventData) {
let LocationEventData {
request_id,
location: fetched_location,
} = location_data;

if self.location_handler.request_id != request_id {
log::debug!("Location from am.i.mullvad.net belongs to an outdated tunnel state");
return;
}
let rest_service = self.api_runtime.rest_handle().await;
let use_ipv6 = self.settings.tunnel_options.generic.enable_ipv6;
let api_handle = self.api_handle.availability.clone();
let even_listener = self.event_listener.clone();

self.location_abort_handle = Some(
tokio::spawn(async move {
let merged_location =
geoip::get_geo_location_with_retry(rest_service, use_ipv6, api_handle)
.await
.ok()
// Replace the old location with new information from am.i.mullvad.net,
// but keep the hostname as it is always none
.map(|new_location| match old_location {
Some(old_location) => GeoIpLocation {
hostname: old_location.hostname,
bridge_hostname: old_location.bridge_hostname,
entry_hostname: old_location.entry_hostname,
obfuscator_hostname: old_location.obfuscator_hostname,
..new_location
},
None => GeoIpLocation {
hostname: None,
bridge_hostname: None,
entry_hostname: None,
obfuscator_hostname: None,
..new_location
},
});
even_listener.notify_new_state(make_tunnel_state(merged_location));
})
.abort_handle(),
);
match self.tunnel_state {
TunnelState::Disconnected(ref mut location) => *location = Some(fetched_location),
TunnelState::Connected {
ref mut location, ..
} => {
*location = Some(GeoIpLocation {
ipv4: fetched_location.ipv4,
ipv6: fetched_location.ipv6,
..location.clone().unwrap_or(fetched_location)
})
}
_ => return,
};

self.event_listener
.notify_new_state(self.tunnel_state.clone());
}

fn reset_rpc_sockets_on_tunnel_state_transition(
Expand Down
1 change: 0 additions & 1 deletion mullvad-jni/src/daemon_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use mullvad_daemon::{device, DaemonCommand, DaemonCommandSender};
use mullvad_types::{
account::{AccountData, AccountToken, PlayPurchase, VoucherSubmission},
device::{Device, DeviceState},
location::GeoIpLocation,
relay_constraints::{ObfuscationSettings, RelaySettings},
relay_list::RelayList,
settings::{DnsOptions, Settings},
Expand Down
7 changes: 7 additions & 0 deletions mullvad-types/src/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ impl From<AmIMullvad> for GeoIpLocation {
}
}

pub struct LocationEventData {
/// Keep track of which request led to this event being triggered
pub request_id: usize,
/// New location information
pub location: GeoIpLocation,
}

#[cfg(test)]
mod tests {
use super::Coordinates;
Expand Down

0 comments on commit 3fd67d4

Please sign in to comment.