diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index d2a20db9d98a..c3c64ebacfa6 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -56,7 +56,7 @@ use mullvad_types::{ version::{AppVersion, AppVersionInfo}, wireguard::{PublicKey, QuantumResistantState, RotationInterval}, }; -use relay_list::updater::{self, RelayListUpdater, RelayListUpdaterHandle}; +use relay_list::{RelayListUpdater, RelayListUpdaterHandle, RELAYS_FILENAME}; use settings::SettingsPersister; #[cfg(target_os = "android")] use std::os::unix::io::RawFd; @@ -698,8 +698,8 @@ where let initial_selector_config = new_selector_config(&settings); let relay_selector = RelaySelector::new( initial_selector_config, - resource_dir.join(updater::RELAYS_FILENAME), - cache_dir.join(updater::RELAYS_FILENAME), + resource_dir.join(RELAYS_FILENAME), + cache_dir.join(RELAYS_FILENAME), ); let settings_relay_selector = relay_selector.clone(); diff --git a/mullvad-daemon/src/relay_list/mod.rs b/mullvad-daemon/src/relay_list/mod.rs index 8936941035c6..2b4be3db5432 100644 --- a/mullvad-daemon/src/relay_list/mod.rs +++ b/mullvad-daemon/src/relay_list/mod.rs @@ -1,3 +1,213 @@ -//! Relay list +//! Relay list updater -pub mod updater; +use futures::{ + channel::mpsc, + future::{Fuse, FusedFuture}, + Future, FutureExt, SinkExt, StreamExt, +}; +use std::{ + path::{Path, PathBuf}, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use tokio::fs::File; + +use mullvad_api::{availability::ApiAvailabilityHandle, rest::MullvadRestHandle, RelayListProxy}; +use mullvad_relay_selector::RelaySelector; +use mullvad_types::relay_list::RelayList; +use talpid_future::retry::{retry_future, ExponentialBackoff, Jittered}; +use talpid_types::ErrorExt; + +/// How often the updater should wake up to check the cache of the in-memory cache of relays. +/// This check is very cheap. The only reason to not have it very often is because if downloading +/// constantly fails it will try very often and fill the logs etc. +const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 15); +/// How old the cached relays need to be to trigger an update +const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60); + +const DOWNLOAD_RETRY_STRATEGY: Jittered = Jittered::jitter( + ExponentialBackoff::new(Duration::from_secs(16), 8) + .max_delay(Some(Duration::from_secs(2 * 60 * 60))), +); + +/// Where the relay list is cached on disk. +pub(crate) const RELAYS_FILENAME: &str = "relays.json"; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Downloader already shut down")] + DownloaderShutdown, + + #[error("Mullvad relay selector error")] + RelaySelector(#[from] mullvad_relay_selector::Error), +} + +#[derive(Clone)] +pub struct RelayListUpdaterHandle { + tx: mpsc::Sender<()>, +} + +impl RelayListUpdaterHandle { + pub async fn update(&mut self) { + if let Err(error) = self + .tx + .send(()) + .await + .map_err(|_| Error::DownloaderShutdown) + { + log::error!( + "{}", + error.display_chain_with_msg("Unable to send update command to relay list updater") + ); + } + } +} + +pub struct RelayListUpdater { + api_client: RelayListProxy, + cache_path: PathBuf, + relay_selector: RelaySelector, + on_update: Box, + last_check: SystemTime, + api_availability: ApiAvailabilityHandle, +} + +impl RelayListUpdater { + pub fn spawn( + selector: RelaySelector, + api_handle: MullvadRestHandle, + cache_dir: &Path, + on_update: impl Fn(&RelayList) + Send + 'static, + ) -> RelayListUpdaterHandle { + let (tx, cmd_rx) = mpsc::channel(1); + let api_availability = api_handle.availability.clone(); + let api_client = RelayListProxy::new(api_handle); + let updater = RelayListUpdater { + api_client, + cache_path: cache_dir.join(RELAYS_FILENAME), + relay_selector: selector, + on_update: Box::new(on_update), + last_check: UNIX_EPOCH, + api_availability, + }; + + tokio::spawn(updater.run(cmd_rx)); + + RelayListUpdaterHandle { tx } + } + + async fn run(mut self, mut cmd_rx: mpsc::Receiver<()>) { + let mut download_future = Box::pin(Fuse::terminated()); + loop { + let next_check = tokio::time::sleep(UPDATE_CHECK_INTERVAL).fuse(); + tokio::pin!(next_check); + + futures::select! { + _check_update = next_check => { + if download_future.is_terminated() && self.should_update() { + let tag = self.relay_selector.etag(); + download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.api_client.clone(), tag).fuse()); + self.last_check = SystemTime::now(); + } + }, + + new_relay_list = download_future => { + self.consume_new_relay_list(new_relay_list).await; + }, + + cmd = cmd_rx.next() => { + match cmd { + Some(()) => { + let tag = self.relay_selector.etag(); + download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.api_client.clone(), tag).fuse()); + self.last_check = SystemTime::now(); + }, + None => { + log::trace!("Relay list updater shutting down"); + return; + } + } + } + + }; + } + } + + async fn consume_new_relay_list( + &mut self, + result: Result, mullvad_api::Error>, + ) { + match result { + Ok(Some(relay_list)) => { + if let Err(err) = self.update_cache(relay_list).await { + log::error!("Failed to update relay list cache: {}", err); + } + } + Ok(None) => log::debug!("Relay list is up-to-date"), + Err(error) => log::error!( + "{}", + error.display_chain_with_msg("Failed to fetch new relay list") + ), + } + } + + /// Returns true if the current parsed_relays is older than UPDATE_INTERVAL + fn should_update(&mut self) -> bool { + let last_check = std::cmp::max(self.relay_selector.last_updated(), self.last_check); + match SystemTime::now().duration_since(last_check) { + Ok(duration) => duration >= UPDATE_INTERVAL, + // If the clock is skewed we have no idea by how much or when the last update + // actually was, better download again to get in sync and get a `last_updated` + // timestamp corresponding to the new time. + Err(_) => true, + } + } + + fn download_relay_list( + api_handle: ApiAvailabilityHandle, + proxy: RelayListProxy, + tag: Option, + ) -> impl Future, mullvad_api::Error>> + 'static { + let download_futures = move || { + let available = api_handle.wait_background(); + let req = proxy.relay_list(tag.clone()); + async move { + available.await?; + req.await.map_err(mullvad_api::Error::from) + } + }; + + retry_future( + download_futures, + |result| result.is_err(), + DOWNLOAD_RETRY_STRATEGY, + ) + } + + async fn update_cache(&mut self, new_relay_list: RelayList) -> Result<(), Error> { + if let Err(error) = Self::cache_relays(&self.cache_path, &new_relay_list).await { + log::error!( + "{}", + error.display_chain_with_msg("Failed to update relay cache on disk") + ); + } + + self.relay_selector.set_relays(new_relay_list.clone()); + (self.on_update)(&new_relay_list); + Ok(()) + } + + /// Write a `RelayList` to the cache file. + async fn cache_relays(cache_path: &Path, relays: &RelayList) -> Result<(), Error> { + log::debug!("Writing relays cache to {}", cache_path.display()); + let mut file = File::create(cache_path) + .await + .map_err(mullvad_relay_selector::Error::OpenRelayCache)?; + let bytes = + serde_json::to_vec_pretty(relays).map_err(mullvad_relay_selector::Error::Serialize)?; + let mut slice: &[u8] = bytes.as_slice(); + let _ = tokio::io::copy(&mut slice, &mut file) + .await + .map_err(mullvad_relay_selector::Error::WriteRelayCache)?; + Ok(()) + } +} diff --git a/mullvad-daemon/src/relay_list/updater.rs b/mullvad-daemon/src/relay_list/updater.rs deleted file mode 100644 index 317dbd45cdc5..000000000000 --- a/mullvad-daemon/src/relay_list/updater.rs +++ /dev/null @@ -1,201 +0,0 @@ -use futures::{ - channel::mpsc, - future::{Fuse, FusedFuture}, - Future, FutureExt, SinkExt, StreamExt, -}; -use std::{ - path::{Path, PathBuf}, - time::{Duration, SystemTime, UNIX_EPOCH}, -}; -use tokio::fs::File; - -use mullvad_api::{availability::ApiAvailabilityHandle, rest::MullvadRestHandle, RelayListProxy}; -use mullvad_relay_selector::{Error, RelaySelector}; -use mullvad_types::relay_list::RelayList; -use talpid_future::retry::{retry_future, ExponentialBackoff, Jittered}; -use talpid_types::ErrorExt; - -/// How often the updater should wake up to check the cache of the in-memory cache of relays. -/// This check is very cheap. The only reason to not have it very often is because if downloading -/// constantly fails it will try very often and fill the logs etc. -const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 15); -/// How old the cached relays need to be to trigger an update -const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60); - -const DOWNLOAD_RETRY_STRATEGY: Jittered = Jittered::jitter( - ExponentialBackoff::new(Duration::from_secs(16), 8) - .max_delay(Some(Duration::from_secs(2 * 60 * 60))), -); - -/// Where the relay list is cached on disk. -pub(crate) const RELAYS_FILENAME: &str = "relays.json"; - -#[derive(Clone)] -pub struct RelayListUpdaterHandle { - tx: mpsc::Sender<()>, -} - -impl RelayListUpdaterHandle { - pub async fn update(&mut self) { - if let Err(error) = self - .tx - .send(()) - .await - .map_err(|_| Error::DownloaderShutDown) - { - log::error!( - "{}", - error.display_chain_with_msg("Unable to send update command to relay list updater") - ); - } - } -} - -pub struct RelayListUpdater { - api_client: RelayListProxy, - cache_path: PathBuf, - relay_selector: RelaySelector, - on_update: Box, - last_check: SystemTime, - api_availability: ApiAvailabilityHandle, -} - -impl RelayListUpdater { - pub fn spawn( - selector: RelaySelector, - api_handle: MullvadRestHandle, - cache_dir: &Path, - on_update: impl Fn(&RelayList) + Send + 'static, - ) -> RelayListUpdaterHandle { - let (tx, cmd_rx) = mpsc::channel(1); - let api_availability = api_handle.availability.clone(); - let api_client = RelayListProxy::new(api_handle); - let updater = RelayListUpdater { - api_client, - cache_path: cache_dir.join(RELAYS_FILENAME), - relay_selector: selector, - on_update: Box::new(on_update), - last_check: UNIX_EPOCH, - api_availability, - }; - - tokio::spawn(updater.run(cmd_rx)); - - RelayListUpdaterHandle { tx } - } - - async fn run(mut self, mut cmd_rx: mpsc::Receiver<()>) { - let mut download_future = Box::pin(Fuse::terminated()); - loop { - let next_check = tokio::time::sleep(UPDATE_CHECK_INTERVAL).fuse(); - tokio::pin!(next_check); - - futures::select! { - _check_update = next_check => { - if download_future.is_terminated() && self.should_update() { - let tag = self.relay_selector.etag(); - download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.api_client.clone(), tag).fuse()); - self.last_check = SystemTime::now(); - } - }, - - new_relay_list = download_future => { - self.consume_new_relay_list(new_relay_list).await; - }, - - cmd = cmd_rx.next() => { - match cmd { - Some(()) => { - let tag = self.relay_selector.etag(); - download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.api_client.clone(), tag).fuse()); - self.last_check = SystemTime::now(); - }, - None => { - log::trace!("Relay list updater shutting down"); - return; - } - } - } - - }; - } - } - - async fn consume_new_relay_list( - &mut self, - result: Result, mullvad_api::Error>, - ) { - match result { - Ok(Some(relay_list)) => { - if let Err(err) = self.update_cache(relay_list).await { - log::error!("Failed to update relay list cache: {}", err); - } - } - Ok(None) => log::debug!("Relay list is up-to-date"), - Err(error) => log::error!( - "{}", - error.display_chain_with_msg("Failed to fetch new relay list") - ), - } - } - - /// Returns true if the current parsed_relays is older than UPDATE_INTERVAL - fn should_update(&mut self) -> bool { - let last_check = std::cmp::max(self.relay_selector.last_updated(), self.last_check); - match SystemTime::now().duration_since(last_check) { - Ok(duration) => duration >= UPDATE_INTERVAL, - // If the clock is skewed we have no idea by how much or when the last update - // actually was, better download again to get in sync and get a `last_updated` - // timestamp corresponding to the new time. - Err(_) => true, - } - } - - fn download_relay_list( - api_handle: ApiAvailabilityHandle, - proxy: RelayListProxy, - tag: Option, - ) -> impl Future, mullvad_api::Error>> + 'static { - let download_futures = move || { - let available = api_handle.wait_background(); - let req = proxy.relay_list(tag.clone()); - async move { - available.await?; - req.await.map_err(mullvad_api::Error::from) - } - }; - - retry_future( - download_futures, - |result| result.is_err(), - DOWNLOAD_RETRY_STRATEGY, - ) - } - - async fn update_cache(&mut self, new_relay_list: RelayList) -> Result<(), Error> { - if let Err(error) = Self::cache_relays(&self.cache_path, &new_relay_list).await { - log::error!( - "{}", - error.display_chain_with_msg("Failed to update relay cache on disk") - ); - } - - self.relay_selector.set_relays(new_relay_list.clone()); - (self.on_update)(&new_relay_list); - Ok(()) - } - - /// Write a `RelayList` to the cache file. - async fn cache_relays(cache_path: &Path, relays: &RelayList) -> Result<(), Error> { - log::debug!("Writing relays cache to {}", cache_path.display()); - let mut file = File::create(cache_path) - .await - .map_err(Error::OpenRelayCache)?; - let bytes = serde_json::to_vec_pretty(relays).map_err(Error::Serialize)?; - let mut slice: &[u8] = bytes.as_slice(); - let _ = tokio::io::copy(&mut slice, &mut file) - .await - .map_err(Error::WriteRelayCache)?; - Ok(()) - } -} diff --git a/mullvad-relay-selector/src/error.rs b/mullvad-relay-selector/src/error.rs index 87598877cd79..fdc3cdc01238 100644 --- a/mullvad-relay-selector/src/error.rs +++ b/mullvad-relay-selector/src/error.rs @@ -23,40 +23,41 @@ pub enum Error { NoObfuscator, #[error("No endpoint could be constructed for relay {0:?}")] - NoEndpoint(Box), + NoEndpoint(EndpointError), #[error("Failure in serialization of the relay list")] Serialize(#[from] serde_json::Error), - #[error("Downloader already shut down")] - DownloaderShutDown, - #[error("Invalid bridge settings")] InvalidBridgeSettings(#[from] MissingCustomBridgeSettings), } /// Special type which only shows up in [`Error`]. This error variant signals that no valid -/// endpoint could be constructed from the selected relay. See [`detailer`] for more info. -/// -/// [`detailer`]: mullvad_relay_selector::relay_selector::detailer.rs +/// endpoint could be constructed from the selected relay. #[derive(Debug)] pub enum EndpointError { - /// No valid Wireguard endpoint could be constructed from this [`WireguardConfig`] - Wireguard(WireguardConfig), + /// No valid Wireguard endpoint could be constructed from this [`WireguardConfig`]. + /// + /// # Note + /// The inner value is boxed to not bloat the size of [`Error`] due to the size of [`WireguardConfig`]. + Wireguard(Box), /// No valid OpenVPN endpoint could be constructed from this [`Relay`] - OpenVpn(Relay), + /// + /// # Note + /// The inner value is boxed to not bloat the size of [`Error`] due to the size of [`Relay`]. + OpenVpn(Box), } impl EndpointError { /// Helper function for constructing an [`Error::NoEndpoint`] from `relay`. /// Takes care of boxing the [`WireguardConfig`] for you! pub(crate) fn from_wireguard(relay: WireguardConfig) -> Error { - Error::NoEndpoint(Box::new(EndpointError::Wireguard(relay))) + Error::NoEndpoint(EndpointError::Wireguard(Box::new(relay))) } /// Helper function for constructing an [`Error::NoEndpoint`] from `relay`. /// Takes care of boxing the [`Relay`] for you! pub(crate) fn from_openvpn(relay: Relay) -> Error { - Error::NoEndpoint(Box::new(EndpointError::OpenVpn(relay))) + Error::NoEndpoint(EndpointError::OpenVpn(Box::new(relay))) } } diff --git a/mullvad-relay-selector/src/lib.rs b/mullvad-relay-selector/src/lib.rs index ba59e84d233d..5d170e6c7b54 100644 --- a/mullvad-relay-selector/src/lib.rs +++ b/mullvad-relay-selector/src/lib.rs @@ -3,10 +3,7 @@ mod constants; mod error; -#[cfg(not(target_os = "android"))] -mod relay_selector; -#[cfg(target_os = "android")] -#[allow(unused)] +#[cfg_attr(target_os = "android", allow(unused))] mod relay_selector; // Re-exports diff --git a/mullvad-relay-selector/src/relay_selector/matcher.rs b/mullvad-relay-selector/src/relay_selector/matcher.rs index c1e085e965e9..b2bcd48fc13a 100644 --- a/mullvad-relay-selector/src/relay_selector/matcher.rs +++ b/mullvad-relay-selector/src/relay_selector/matcher.rs @@ -31,7 +31,7 @@ impl RelayMatcher { pub fn new( constraints: RelayQuery, openvpn_data: OpenVpnEndpointData, - brige_state: BridgeState, + bridge_state: BridgeState, wireguard_data: WireguardEndpointData, custom_lists: &CustomListsSettings, ) -> Self { @@ -47,7 +47,7 @@ impl RelayMatcher { openvpn: OpenVpnMatcher::new( constraints.openvpn_constraints, openvpn_data, - brige_state, + bridge_state, ), tunnel_type: constraints.tunnel_protocol, }, @@ -262,7 +262,7 @@ pub const fn filter_bridge(relay: &Relay) -> bool { // --- OpenVPN specific filter --- -/// Returns wheter a relay (endpoint) satisfy the port constraints (transport protocol + port +/// Returns whether a relay (endpoint) satisfy the port constraints (transport protocol + port /// number) posed by `filter`. fn openvpn_filter_on_port(port: Constraint, endpoint: &OpenVpnEndpointData) -> bool { let compatible_port = diff --git a/mullvad-relay-selector/src/relay_selector/mod.rs b/mullvad-relay-selector/src/relay_selector/mod.rs index 30d6360071a8..deb47f43fe0c 100644 --- a/mullvad-relay-selector/src/relay_selector/mod.rs +++ b/mullvad-relay-selector/src/relay_selector/mod.rs @@ -528,7 +528,7 @@ impl RelaySelector { }; let endpoint = Self::get_wireguard_endpoint(query, &inner, parsed_relays)?; let obfuscator = - Self::get_wireguard_obfuscator(query, inner.clone(), &endpoint, parsed_relays); + Self::get_wireguard_obfuscator(query, inner.clone(), &endpoint, parsed_relays)?; Ok(GetRelay::Wireguard { endpoint, @@ -632,21 +632,24 @@ impl RelaySelector { relay: WireguardConfig, endpoint: &MullvadWireguardEndpoint, parsed_relays: &ParsedRelays, - ) -> Option { + ) -> Result, Error> { match query.wireguard_constraints.obfuscation { - SelectedObfuscation::Off | SelectedObfuscation::Auto => None, + SelectedObfuscation::Off | SelectedObfuscation::Auto => Ok(None), SelectedObfuscation::Udp2Tcp => { let obfuscator_relay = match relay { WireguardConfig::Singlehop { exit } => exit, WireguardConfig::Multihop { entry, .. } => entry, }; let udp2tcp_ports = &parsed_relays.parsed_list().wireguard.udp2tcp_ports; + helpers::get_udp2tcp_obfuscator( &query.wireguard_constraints.udp2tcp_port, udp2tcp_ports, obfuscator_relay, endpoint, ) + .map(Some) + .ok_or(Error::NoObfuscator) } } } diff --git a/mullvad-relay-selector/src/relay_selector/query.rs b/mullvad-relay-selector/src/relay_selector/query.rs index 67db5068be18..c1862d5ae65e 100644 --- a/mullvad-relay-selector/src/relay_selector/query.rs +++ b/mullvad-relay-selector/src/relay_selector/query.rs @@ -16,22 +16,6 @@ //! - [Builder patterns][builder]: The module also provides builder patterns for creating instances //! of `RelayQuery`, `WireguardRelayQuery`, and `OpenVpnRelayQuery` with a fluent API. //! -//! ## Examples -//! -//! Creating a basic `RelayQuery` to filter relays by location, ownership and tunnel protocol: -//! -//! ```rust -//! use mullvad_relay_selector::query::RelayQuery; -//! use mullvad_relay_selector::query::builder::RelayQueryBuilder; -//! use mullvad_relay_selector::query::builder::{Ownership, GeographicLocationConstraint}; -//! -//! let query: RelayQuery = RelayQueryBuilder::new() -//! .openvpn() // The relay should use OpenVPN, .. -//! .location(GeographicLocationConstraint::country("no")) // .. be locatated in Norway .. -//! .ownership(Ownership::MullvadOwned) // .. and it must be owned by Mullvad. -//! .build(); // Construct the query -//! ``` -//! //! ## Design //! //! This module has been built in such a way that it should be easy to reason about, @@ -64,7 +48,7 @@ use talpid_types::net::{proxy::CustomProxy, IpVersion, TunnelType}; /// /// # Examples /// -/// Basic usage: +/// Creating a basic `RelayQuery` to filter relays by location, ownership and tunnel protocol: /// /// ```rust /// // Create a query for a Wireguard relay that is owned by Mullvad and located in Norway. @@ -83,7 +67,7 @@ use talpid_types::net::{proxy::CustomProxy, IpVersion, TunnelType}; /// /// This example demonstrates creating a `RelayQuery` which can then be passed /// to the [`crate::RelaySelector`] to find a relay that matches the criteria. -/// See [`builder`] for more info on how to build queries and. +/// See [`builder`] for more info on how to construct queries. #[derive(Debug, Clone, Eq, PartialEq)] pub struct RelayQuery { pub location: Constraint, diff --git a/mullvad-types/src/endpoint.rs b/mullvad-types/src/endpoint.rs index 84a2b1cbc4a6..283ceb6ee338 100644 --- a/mullvad-types/src/endpoint.rs +++ b/mullvad-types/src/endpoint.rs @@ -29,21 +29,4 @@ impl MullvadEndpoint { ), } } - - pub fn unwrap_wireguard(&self) -> &MullvadWireguardEndpoint { - match self { - Self::Wireguard(endpoint) => endpoint, - other => { - panic!("Expected WireGuard enum variant but got {other:?}"); - } - } - } - pub fn unwrap_openvpn(&self) -> &Endpoint { - match self { - Self::OpenVpn(endpoint) => endpoint, - other => { - panic!("Expected WireGuard enum variant but got {other:?}"); - } - } - } } diff --git a/mullvad-types/src/relay_list.rs b/mullvad-types/src/relay_list.rs index a1d6f810cedf..7d7545e97f63 100644 --- a/mullvad-types/src/relay_list.rs +++ b/mullvad-types/src/relay_list.rs @@ -106,16 +106,14 @@ pub struct Relay { pub location: Option, } -// TODO(markus): Justify -// TODO(markus): Document impl PartialEq for Relay { fn eq(&self, other: &Self) -> bool { + // Hostnames are assumed to be unique per relay, i.e. a relay can be uniquely identified by its hostname. self.hostname == other.hostname } } -// TODO(markus): Justify -// TODO(markus): Document +// See uniqueness argument in the implementation of [`PartialEq`] for [`Relay`]. impl Eq for Relay {} /// Specifies the type of a relay or relay-specific endpoint data.