Skip to content

Commit

Permalink
Merge branch 'fixup-retry-strat' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
dlon committed Sep 19, 2023
2 parents a93b710 + 2ce9379 commit c6306c5
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 113 deletions.
96 changes: 34 additions & 62 deletions mullvad-daemon/src/device/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ use mullvad_api::{
rest::{self, Error as RestError, MullvadRestHandle},
AccountsProxy, DevicesProxy,
};
use talpid_core::future_retry::{
constant_interval, retry_future, retry_future_n, ExponentialBackoff, Jittered,
};
const RETRY_ACTION_INTERVAL: Duration = Duration::ZERO;
const RETRY_ACTION_MAX_RETRIES: usize = 2;

const RETRY_BACKOFF_INTERVAL_INITIAL: Duration = Duration::from_secs(4);
const RETRY_BACKOFF_INTERVAL_FACTOR: u32 = 5;
const RETRY_BACKOFF_INTERVAL_MAX: Duration = Duration::from_secs(24 * 60 * 60);
use talpid_core::future_retry::{retry_future, ConstantInterval, ExponentialBackoff, Jittered};
/// Retry strategy used for user-initiated actions that require immediate feedback
const RETRY_ACTION_STRATEGY: ConstantInterval = ConstantInterval::new(Duration::ZERO, Some(3));
/// Retry strategy used for background tasks
const RETRY_BACKOFF_STRATEGY: Jittered<ExponentialBackoff> = Jittered::jitter(
ExponentialBackoff::new(Duration::from_secs(4), 5)
.max_delay(Some(Duration::from_secs(24 * 60 * 60))),
);

#[derive(Clone)]
pub struct DeviceService {
Expand Down Expand Up @@ -51,11 +50,10 @@ impl DeviceService {
let api_handle = self.api_availability.clone();
let token_copy = account_token.clone();
async move {
let (device, addresses) = retry_future_n(
let (device, addresses) = retry_future(
move || proxy.create(token_copy.clone(), pubkey.clone()),
move |result| should_retry(result, &api_handle),
constant_interval(RETRY_ACTION_INTERVAL),
RETRY_ACTION_MAX_RETRIES,
RETRY_ACTION_STRATEGY,
)
.await
.map_err(map_rest_error)?;
Expand Down Expand Up @@ -87,7 +85,7 @@ impl DeviceService {
let (device, addresses) = retry_future(
move || api_handle.when_online(proxy.create(token_copy.clone(), pubkey.clone())),
should_retry_backoff,
retry_strategy(),
RETRY_BACKOFF_STRATEGY,
)
.await
.map_err(map_rest_error)?;
Expand Down Expand Up @@ -122,11 +120,10 @@ impl DeviceService {
) -> Result<(), Error> {
let proxy = self.proxy.clone();
let api_handle = self.api_availability.clone();
retry_future_n(
retry_future(
move || proxy.remove(token.clone(), device.clone()),
move |result| should_retry(result, &api_handle),
constant_interval(RETRY_ACTION_INTERVAL),
RETRY_ACTION_MAX_RETRIES,
RETRY_ACTION_STRATEGY,
)
.await
.map_err(map_rest_error)?;
Expand All @@ -141,18 +138,12 @@ impl DeviceService {
let proxy = self.proxy.clone();
let api_handle = self.api_availability.clone();

let retry_strategy = Jittered::jitter(
ExponentialBackoff::new(
RETRY_BACKOFF_INTERVAL_INITIAL,
RETRY_BACKOFF_INTERVAL_FACTOR,
), // Not setting a maximum interval
);

retry_future(
// NOTE: Not honoring "paused" state, because the account may have no time on it.
move || api_handle.when_online(proxy.remove(token.clone(), device.clone())),
should_retry_backoff,
retry_strategy,
// Not setting a maximum interval
RETRY_BACKOFF_STRATEGY.clone().max_delay(None),
)
.await
.map_err(map_rest_error)?;
Expand All @@ -170,11 +161,10 @@ impl DeviceService {
let proxy = self.proxy.clone();
let api_handle = self.api_availability.clone();
let pubkey = private_key.public_key();
let addresses = retry_future_n(
let addresses = retry_future(
move || proxy.replace_wg_key(token.clone(), device.clone(), pubkey.clone()),
move |result| should_retry(result, &api_handle),
constant_interval(RETRY_ACTION_INTERVAL),
RETRY_ACTION_MAX_RETRIES,
RETRY_ACTION_STRATEGY,
)
.await
.map_err(map_rest_error)?;
Expand All @@ -197,6 +187,8 @@ impl DeviceService {
let api_handle = self.api_availability.clone();
let pubkey = private_key.public_key();

let rotate_retry_strategy = std::iter::repeat(Duration::from_secs(24 * 60 * 60));

let addresses = retry_future(
move || {
api_handle.when_bg_resumes(proxy.replace_wg_key(
Expand All @@ -206,7 +198,7 @@ impl DeviceService {
))
},
should_retry_backoff,
rotate_retry_strategy(),
rotate_retry_strategy,
)
.await
.map_err(map_rest_error)?;
Expand All @@ -221,11 +213,10 @@ impl DeviceService {
pub async fn list_devices(&self, token: AccountToken) -> Result<Vec<Device>, Error> {
let proxy = self.proxy.clone();
let api_handle = self.api_availability.clone();
retry_future_n(
retry_future(
move || proxy.list(token.clone()),
move |result| should_retry(result, &api_handle),
constant_interval(RETRY_ACTION_INTERVAL),
RETRY_ACTION_MAX_RETRIES,
RETRY_ACTION_STRATEGY,
)
.await
.map_err(map_rest_error)
Expand All @@ -241,7 +232,7 @@ impl DeviceService {
retry_future(
move || api_handle.when_online(proxy.list(token.clone())),
should_retry_backoff,
retry_strategy(),
RETRY_BACKOFF_STRATEGY,
)
.await
.map_err(map_rest_error)
Expand All @@ -250,11 +241,10 @@ impl DeviceService {
pub async fn get(&self, token: AccountToken, device: DeviceId) -> Result<Device, Error> {
let proxy = self.proxy.clone();
let api_handle = self.api_availability.clone();
retry_future_n(
retry_future(
move || proxy.get(token.clone(), device.clone()),
move |result| should_retry(result, &api_handle),
constant_interval(RETRY_ACTION_INTERVAL),
RETRY_ACTION_MAX_RETRIES,
RETRY_ACTION_STRATEGY,
)
.await
.map_err(map_rest_error)
Expand All @@ -272,11 +262,10 @@ impl AccountService {
pub fn create_account(&self) -> impl Future<Output = Result<AccountToken, rest::Error>> {
let mut proxy = self.proxy.clone();
let api_handle = self.api_availability.clone();
retry_future_n(
retry_future(
move || proxy.create_account(),
move |result| should_retry(result, &api_handle),
constant_interval(RETRY_ACTION_INTERVAL),
RETRY_ACTION_MAX_RETRIES,
RETRY_ACTION_STRATEGY,
)
}

Expand All @@ -286,22 +275,20 @@ impl AccountService {
) -> impl Future<Output = Result<String, rest::Error>> {
let proxy = self.proxy.clone();
let api_handle = self.api_availability.clone();
retry_future_n(
retry_future(
move || proxy.get_www_auth_token(account.clone()),
move |result| should_retry(result, &api_handle),
constant_interval(RETRY_ACTION_INTERVAL),
RETRY_ACTION_MAX_RETRIES,
RETRY_ACTION_STRATEGY,
)
}

pub async fn check_expiry(&self, token: AccountToken) -> Result<DateTime<Utc>, rest::Error> {
let proxy = self.proxy.clone();
let api_handle = self.api_availability.clone();
let result = retry_future_n(
let result = retry_future(
move || proxy.get_expiry(token.clone()),
move |result| should_retry(result, &api_handle),
constant_interval(RETRY_ACTION_INTERVAL),
RETRY_ACTION_MAX_RETRIES,
RETRY_ACTION_STRATEGY,
)
.await;
if handle_expiry_result_inner(&result, &self.api_availability) {
Expand All @@ -321,11 +308,10 @@ impl AccountService {
) -> Result<VoucherSubmission, Error> {
let mut proxy = self.proxy.clone();
let api_handle = self.api_availability.clone();
let result = retry_future_n(
let result = retry_future(
move || proxy.submit_voucher(account_token.clone(), voucher.clone()),
move |result| should_retry(result, &api_handle),
constant_interval(RETRY_ACTION_INTERVAL),
RETRY_ACTION_MAX_RETRIES,
RETRY_ACTION_STRATEGY,
)
.await;
if result.is_ok() {
Expand Down Expand Up @@ -361,7 +347,7 @@ pub fn spawn_account_service(
async move { handle_expiry_result_inner(&expiry_fut.await, &api_availability_copy) }
};
let should_retry = move |state_was_updated: &bool| -> bool { !*state_was_updated };
retry_future(future_generator, should_retry, retry_strategy()).await;
retry_future(future_generator, should_retry, RETRY_BACKOFF_STRATEGY).await;
});
tokio::spawn(future);

Expand Down Expand Up @@ -433,17 +419,3 @@ fn map_rest_error(error: rest::Error) -> Error {
error => Error::OtherRestError(error),
}
}

fn retry_strategy() -> Jittered<ExponentialBackoff> {
Jittered::jitter(
ExponentialBackoff::new(
RETRY_BACKOFF_INTERVAL_INITIAL,
RETRY_BACKOFF_INTERVAL_FACTOR,
)
.max_delay(RETRY_BACKOFF_INTERVAL_MAX),
)
}

fn rotate_retry_strategy() -> impl Iterator<Item = Duration> {
std::iter::repeat(RETRY_BACKOFF_INTERVAL_MAX)
}
12 changes: 5 additions & 7 deletions mullvad-daemon/src/version_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
str::FromStr,
time::Duration,
};
use talpid_core::mpsc::Sender;
use talpid_core::{future_retry::ConstantInterval, mpsc::Sender};
use talpid_types::ErrorExt;
use tokio::fs::{self, File};

Expand All @@ -31,9 +31,8 @@ const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15);
const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60 * 24);
/// Wait this long until next try if an update failed
const UPDATE_INTERVAL_ERROR: Duration = Duration::from_secs(60 * 60 * 6);
/// Retry interval for `RunVersionCheck`.
const IMMEDIATE_UPDATE_INTERVAL_ERROR: Duration = Duration::ZERO;
const IMMEDIATE_UPDATE_MAX_RETRIES: usize = 2;
/// Retry strategy for `RunVersionCheck`.
const IMMEDIATE_RETRY_STRATEGY: ConstantInterval = ConstantInterval::new(Duration::ZERO, Some(3));

#[cfg(target_os = "linux")]
const PLATFORM: &str = "linux";
Expand Down Expand Up @@ -194,11 +193,10 @@ impl VersionUpdater {
.map_err(Error::Download)
};

Box::pin(talpid_core::future_retry::retry_future_n(
Box::pin(talpid_core::future_retry::retry_future(
download_future_factory,
move |result| Self::should_retry_immediate(result, &api_handle),
std::iter::repeat(IMMEDIATE_UPDATE_INTERVAL_ERROR),
IMMEDIATE_UPDATE_MAX_RETRIES,
IMMEDIATE_RETRY_STRATEGY,
))
}

Expand Down
12 changes: 5 additions & 7 deletions mullvad-relay-selector/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 15);
/// How old the cached relays need to be to trigger an update
const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60);

const EXPONENTIAL_BACKOFF_INITIAL: Duration = Duration::from_secs(16);
const EXPONENTIAL_BACKOFF_FACTOR: u32 = 8;
const DOWNLOAD_RETRY_STRATEGY: Jittered<ExponentialBackoff> = Jittered::jitter(
ExponentialBackoff::new(Duration::from_secs(16), 8)
.max_delay(Some(Duration::from_secs(2 * 60 * 60))),
);

#[derive(Clone)]
pub struct RelayListUpdaterHandle {
Expand Down Expand Up @@ -161,14 +163,10 @@ impl RelayListUpdater {
}
};

let exponential_backoff =
ExponentialBackoff::new(EXPONENTIAL_BACKOFF_INITIAL, EXPONENTIAL_BACKOFF_FACTOR)
.max_delay(UPDATE_INTERVAL * 2);

retry_future(
download_futures,
|result| result.is_err(),
Jittered::jitter(exponential_backoff),
DOWNLOAD_RETRY_STRATEGY,
)
}

Expand Down
10 changes: 4 additions & 6 deletions mullvad-setup/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ use once_cell::sync::Lazy;
use std::{path::PathBuf, process, str::FromStr, time::Duration};
use talpid_core::{
firewall::{self, Firewall},
future_retry::{constant_interval, retry_future_n},
future_retry::{retry_future, ConstantInterval},
};
use talpid_types::ErrorExt;

static APP_VERSION: Lazy<ParsedAppVersion> =
Lazy::new(|| ParsedAppVersion::from_str(mullvad_version::VERSION).unwrap());

const KEY_RETRY_INTERVAL: Duration = Duration::ZERO;
const KEY_RETRY_MAX_RETRIES: usize = 4;
const DEVICE_REMOVAL_STRATEGY: ConstantInterval = ConstantInterval::new(Duration::ZERO, Some(5));

#[repr(i32)]
enum ExitStatus {
Expand Down Expand Up @@ -171,14 +170,13 @@ async fn remove_device() -> Result<(), Error> {
.await,
);

let device_removal = retry_future_n(
let device_removal = retry_future(
move || proxy.remove(device.account_token.clone(), device.device.id.clone()),
move |result| match result {
Err(error) => error.is_network_error(),
_ => false,
},
constant_interval(KEY_RETRY_INTERVAL),
KEY_RETRY_MAX_RETRIES,
DEVICE_REMOVAL_STRATEGY,
)
.await;

Expand Down
Loading

0 comments on commit c6306c5

Please sign in to comment.