Skip to content

Commit

Permalink
Add burst guard to IP registration ioctl
Browse files Browse the repository at this point in the history
  • Loading branch information
dlon committed Dec 15, 2022
1 parent ac7cb5a commit 100f9c6
Showing 1 changed file with 70 additions and 8 deletions.
78 changes: 70 additions & 8 deletions talpid-core/src/split_tunnel/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use crate::{
tunnel::TunnelMetadata,
window::{PowerManagementEvent, PowerManagementListener},
};
use futures::channel::{mpsc, oneshot};
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use std::{
collections::HashMap,
convert::TryFrom,
Expand Down Expand Up @@ -821,18 +824,27 @@ impl Drop for SplitTunnel {
}

struct SplitTunnelDefaultRouteChangeHandlerContext {
request_tx: RequestTx,
tx: mpsc::UnboundedSender<InterfaceAddresses>,
abort_handle: tokio::task::JoinHandle<()>,
pub addresses: InterfaceAddresses,
}

impl Drop for SplitTunnelDefaultRouteChangeHandlerContext {
fn drop(&mut self) {
self.abort_handle.abort();
}
}

impl SplitTunnelDefaultRouteChangeHandlerContext {
pub fn new(
request_tx: RequestTx,
tunnel_ipv4: Option<Ipv4Addr>,
tunnel_ipv6: Option<Ipv6Addr>,
) -> Self {
let (tx, abort_handle) = Self::create_burst_guard(request_tx);
SplitTunnelDefaultRouteChangeHandlerContext {
request_tx,
tx,
abort_handle,
addresses: InterfaceAddresses {
tunnel_ipv4,
tunnel_ipv6,
Expand All @@ -842,6 +854,60 @@ impl SplitTunnelDefaultRouteChangeHandlerContext {
}
}

fn create_burst_guard(
request_tx: RequestTx,
) -> (
mpsc::UnboundedSender<InterfaceAddresses>,
tokio::task::JoinHandle<()>,
) {
let (tx, mut rx) = mpsc::unbounded();

let send_request = move |addresses| {
if request_tx
.send((Request::RegisterIps(addresses), None))
.is_err()
{
log::error!("Split tunnel request thread is down");
}
};

let abort_handle = tokio::spawn(async move {
const GRACE_PERIOD: Duration = Duration::from_secs(5);
const MAX_PERIOD: Duration = Duration::from_secs(10);

while let Some(mut addresses) = rx.next().await {
let initial_time = tokio::time::Instant::now();
loop {
if initial_time.elapsed() >= MAX_PERIOD {
send_request(addresses);
break;
}

let next = rx.next();
let delay = tokio::time::sleep(GRACE_PERIOD);
futures::pin_mut!(delay);

match futures::future::select(next, delay).await {
futures::future::Either::Left((Some(new_addresses), _)) => {
// TODO: combine?
addresses = new_addresses;
continue;
}
futures::future::Either::Left((None, _)) => {
// Return from function
return;
}
futures::future::Either::Right((..)) => {
send_request(addresses);
break;
}
}
}
}
});
(tx, abort_handle)
}

pub fn initialize_internet_addresses(&mut self) -> Result<(), Error> {
// Identify IP address that gives us Internet access
let internet_ipv4 = get_best_default_route(AddressFamily::Ipv4)
Expand Down Expand Up @@ -929,11 +995,7 @@ fn split_tunnel_default_route_change_handler<'a>(
return;
}

if ctx
.request_tx
.send((Request::RegisterIps(ctx.addresses.clone()), None))
.is_err()
{
if ctx.tx.unbounded_send(ctx.addresses.clone()).is_err() {
log::error!("Split tunnel request thread is down");
}
}

0 comments on commit 100f9c6

Please sign in to comment.