Skip to content

Commit

Permalink
Move relay_updater to mullvad-daemon
Browse files Browse the repository at this point in the history
Move `mullvad-relay-selector::relay_updater` to the `mullvad-daemon`.
The implications of this is that `mullvad-relay-selector` can drop the
dependency on both `mullvad-api` and `tokio`, which brings down the
total amount of dependencies when running a simple `Cargo check` from
250+ down to a mere 75. :-)
  • Loading branch information
MarkusPettersson98 committed Feb 20, 2024
1 parent 40c41a1 commit 16831a2
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 57 deletions.
4 changes: 0 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 9 additions & 6 deletions mullvad-daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod macos;
#[cfg(not(target_os = "android"))]
pub mod management_interface;
mod migrations;
mod relay_list;
#[cfg(not(target_os = "android"))]
pub mod rpc_uniqueness_check;
pub mod runtime;
Expand All @@ -36,10 +37,7 @@ use futures::{
StreamExt,
};
use geoip::GeoIpHandler;
use mullvad_relay_selector::{
updater::{RelayListUpdater, RelayListUpdaterHandle},
RelaySelector, SelectorConfig,
};
use mullvad_relay_selector::{RelaySelector, SelectorConfig};
#[cfg(target_os = "android")]
use mullvad_types::account::{PlayPurchase, PlayPurchasePaymentToken};
use mullvad_types::{
Expand All @@ -58,6 +56,7 @@ use mullvad_types::{
version::{AppVersion, AppVersionInfo},
wireguard::{PublicKey, QuantumResistantState, RotationInterval},
};
use relay_list::updater::{self, RelayListUpdater, RelayListUpdaterHandle};
use settings::SettingsPersister;
#[cfg(target_os = "android")]
use std::os::unix::io::RawFd;
Expand Down Expand Up @@ -698,7 +697,11 @@ where
let app_version_info = version_check::load_cache(&cache_dir).await;

let initial_selector_config = new_selector_config(&settings);
let relay_selector = RelaySelector::new(initial_selector_config, &resource_dir, &cache_dir);
let relay_selector = RelaySelector::new(
initial_selector_config,
resource_dir.join(updater::RELAYS_FILENAME),
cache_dir.join(updater::RELAYS_FILENAME),
);

let settings_relay_selector = relay_selector.clone();
settings.register_change_listener(move |settings| {
Expand Down Expand Up @@ -1569,7 +1572,7 @@ where
}

fn on_get_relay_locations(&mut self, tx: oneshot::Sender<RelayList>) {
Self::oneshot_send(tx, self.relay_selector.get_locations(), "relay locations");
Self::oneshot_send(tx, self.relay_selector.get_relays(), "relay locations");
}

async fn on_update_relay_locations(&mut self) {
Expand Down
3 changes: 3 additions & 0 deletions mullvad-daemon/src/relay_list/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Relay list
pub mod updater;
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use super::{Error, ParsedRelays};
use futures::{
channel::mpsc,
future::{Fuse, FusedFuture},
Future, FutureExt, SinkExt, StreamExt,
};
use mullvad_api::{availability::ApiAvailabilityHandle, rest::MullvadRestHandle, RelayListProxy};
use mullvad_types::relay_list::RelayList;
use std::{
path::{Path, PathBuf},
sync::{Arc, Mutex},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::fs::File;

use mullvad_api::{availability::ApiAvailabilityHandle, rest::MullvadRestHandle, RelayListProxy};
use mullvad_relay_selector::{Error, RelaySelector};
use mullvad_types::relay_list::RelayList;
use talpid_future::retry::{retry_future, ExponentialBackoff, Jittered};
use talpid_types::ErrorExt;
use tokio::fs::File;

/// How often the updater should wake up to check the cache of the in-memory cache of relays.
/// This check is very cheap. The only reason to not have it very often is because if downloading
Expand All @@ -27,6 +27,9 @@ const DOWNLOAD_RETRY_STRATEGY: Jittered<ExponentialBackoff> = Jittered::jitter(
.max_delay(Some(Duration::from_secs(2 * 60 * 60))),
);

/// Where the relay list is cached on disk.
pub(crate) const RELAYS_FILENAME: &str = "relays.json";

#[derive(Clone)]
pub struct RelayListUpdaterHandle {
tx: mpsc::Sender<()>,
Expand All @@ -51,15 +54,15 @@ impl RelayListUpdaterHandle {
pub struct RelayListUpdater {
api_client: RelayListProxy,
cache_path: PathBuf,
parsed_relays: Arc<Mutex<ParsedRelays>>,
relay_selector: RelaySelector,
on_update: Box<dyn Fn(&RelayList) + Send + 'static>,
last_check: SystemTime,
api_availability: ApiAvailabilityHandle,
}

impl RelayListUpdater {
pub fn spawn(
selector: super::RelaySelector,
selector: RelaySelector,
api_handle: MullvadRestHandle,
cache_dir: &Path,
on_update: impl Fn(&RelayList) + Send + 'static,
Expand All @@ -69,8 +72,8 @@ impl RelayListUpdater {
let api_client = RelayListProxy::new(api_handle);
let updater = RelayListUpdater {
api_client,
cache_path: cache_dir.join(super::RELAYS_FILENAME),
parsed_relays: selector.parsed_relays,
cache_path: cache_dir.join(RELAYS_FILENAME),
relay_selector: selector,
on_update: Box::new(on_update),
last_check: UNIX_EPOCH,
api_availability,
Expand All @@ -90,7 +93,7 @@ impl RelayListUpdater {
futures::select! {
_check_update = next_check => {
if download_future.is_terminated() && self.should_update() {
let tag = self.parsed_relays.lock().unwrap().parsed_list.etag.clone();
let tag = self.relay_selector.etag();
download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.api_client.clone(), tag).fuse());
self.last_check = SystemTime::now();
}
Expand All @@ -103,7 +106,7 @@ impl RelayListUpdater {
cmd = cmd_rx.next() => {
match cmd {
Some(()) => {
let tag = self.parsed_relays.lock().unwrap().parsed_list.etag.clone();
let tag = self.relay_selector.etag();
download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.api_client.clone(), tag).fuse());
self.last_check = SystemTime::now();
},
Expand Down Expand Up @@ -138,10 +141,7 @@ impl RelayListUpdater {

/// Returns true if the current parsed_relays is older than UPDATE_INTERVAL
fn should_update(&mut self) -> bool {
let last_check = std::cmp::max(
self.parsed_relays.lock().unwrap().last_updated(),
self.last_check,
);
let last_check = std::cmp::max(self.relay_selector.last_updated(), self.last_check);
match SystemTime::now().duration_since(last_check) {
Ok(duration) => duration >= UPDATE_INTERVAL,
// If the clock is skewed we have no idea by how much or when the last update
Expand Down Expand Up @@ -180,9 +180,8 @@ impl RelayListUpdater {
);
}

let mut parsed_relays = self.parsed_relays.lock().unwrap();
parsed_relays.update(new_relay_list);
(self.on_update)(&parsed_relays.original_list);
self.relay_selector.set_relays(new_relay_list.clone());
(self.on_update)(&new_relay_list);
Ok(())
}

Expand Down
4 changes: 0 additions & 4 deletions mullvad-relay-selector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,12 @@ workspace = true
[dependencies]
chrono = { workspace = true }
err-derive = { workspace = true }
futures = "0.3"
ipnetwork = "0.16"
log = { workspace = true }
rand = "0.8.5"
serde_json = "1.0"
tokio = { workspace = true, features = ["fs", "io-util", "time"] }

talpid-future = { path = "../talpid-future" }
talpid-types = { path = "../talpid-types" }
mullvad-api = { path = "../mullvad-api" }
mullvad-types = { path = "../mullvad-types" }

[dev-dependencies]
Expand Down
36 changes: 24 additions & 12 deletions mullvad-relay-selector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ use talpid_types::{
use matcher::{BridgeMatcher, EndpointMatcher, OpenVpnMatcher, RelayMatcher, WireguardMatcher};

mod matcher;
pub mod updater;

const DATE_TIME_FORMAT_STR: &str = "%Y-%m-%d %H:%M:%S%.3f";
const RELAYS_FILENAME: &str = "relays.json";

const WIREGUARD_EXIT_PORT: Constraint<u16> = Constraint::Only(51820);
const WIREGUARD_EXIT_IP_VERSION: Constraint<IpVersion> = Constraint::Only(IpVersion::V4);
Expand Down Expand Up @@ -108,6 +106,10 @@ impl ParsedRelays {
self.last_updated
}

pub fn etag(&self) -> Option<String> {
self.parsed_list.etag.clone()
}

fn set_overrides(&mut self, new_overrides: &[RelayOverride]) {
self.parsed_list = Self::parse_relay_list(&self.original_list, new_overrides);
self.overrides = new_overrides.to_vec();
Expand All @@ -123,15 +125,15 @@ impl ParsedRelays {
}

/// Try to read the relays from disk, preferring the newer ones.
fn from_dir(
cache_path: &Path,
resource_path: &Path,
fn from_file(
cache_path: impl AsRef<Path>,
resource_path: impl AsRef<Path>,
overrides: &[RelayOverride],
) -> Result<Self, Error> {
// prefer the resource path's relay list if the cached one doesn't exist or was modified
// before the resource one was created.
let cached_relays = Self::from_file(cache_path, overrides);
let bundled_relays = match Self::from_file(resource_path, overrides) {
let cached_relays = Self::from_file_inner(cache_path, overrides);
let bundled_relays = match Self::from_file_inner(resource_path, overrides) {
Ok(bundled_relays) => bundled_relays,
Err(e) => {
log::error!("Failed to load bundled relays: {}", e);
Expand All @@ -150,7 +152,7 @@ impl ParsedRelays {
}
}

fn from_file(path: impl AsRef<Path>, overrides: &[RelayOverride]) -> Result<Self, Error> {
fn from_file_inner(path: impl AsRef<Path>, overrides: &[RelayOverride]) -> Result<Self, Error> {
log::debug!("Reading relays from {}", path.as_ref().display());
let (last_modified, file) =
Self::open_file(path.as_ref()).map_err(Error::OpenRelayCache)?;
Expand Down Expand Up @@ -253,11 +255,13 @@ pub struct RelaySelector {

impl RelaySelector {
/// Returns a new `RelaySelector` backed by relays cached on disk.
pub fn new(config: SelectorConfig, resource_dir: &Path, cache_dir: &Path) -> Self {
let cache_path = cache_dir.join(RELAYS_FILENAME);
let resource_path = resource_dir.join(RELAYS_FILENAME);
pub fn new(
config: SelectorConfig,
resource_path: impl AsRef<Path>,
cache_path: impl AsRef<Path>,
) -> Self {
let unsynchronized_parsed_relays =
ParsedRelays::from_dir(&cache_path, &resource_path, &config.relay_overrides)
ParsedRelays::from_file(&cache_path, &resource_path, &config.relay_overrides)
.unwrap_or_else(|error| {
log::error!(
"{}",
Expand Down Expand Up @@ -312,6 +316,14 @@ impl RelaySelector {
parsed_relays.original_list.clone()
}

pub fn etag(&self) -> Option<String> {
self.parsed_relays.lock().unwrap().etag()
}

pub fn last_updated(&self) -> SystemTime {
self.parsed_relays.lock().unwrap().last_updated()
}

/// Returns a random relay and relay endpoint matching the current constraints.
pub fn get_relay(
&self,
Expand Down
13 changes: 0 additions & 13 deletions test/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 16831a2

Please sign in to comment.