Skip to content

Commit 579945f

Browse files
committed
Merge branch 'routing-improvements' into main
2 parents 1fa40d1 + 9e0f9a1 commit 579945f

File tree

3 files changed

+65
-100
lines changed

3 files changed

+65
-100
lines changed

talpid-core/src/tunnel_state_machine/mod.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,9 @@ impl TunnelStateMachine {
409409
}
410410
}
411411

412-
log::debug!("Exiting tunnel state machine loop");
412+
log::debug!("Tunnel state machine exited");
413+
414+
runtime.block_on(self.shared_values.route_manager.stop());
413415
}
414416
}
415417

talpid-routing/src/unix/mod.rs

+29-55
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use futures::channel::{
77
mpsc::{self, UnboundedSender},
88
oneshot,
99
};
10-
use std::{collections::HashSet, io, sync::Arc};
10+
use std::{collections::HashSet, sync::Arc};
1111

1212
#[cfg(any(target_os = "linux", target_os = "macos"))]
1313
use futures::stream::Stream;
@@ -41,12 +41,6 @@ pub enum Error {
4141
/// Platform specific error occurred
4242
#[error("Internal route manager error")]
4343
PlatformError(#[from] imp::Error),
44-
/// Failed to spawn route manager future
45-
#[error("Failed to spawn route manager on the provided executor")]
46-
FailedToSpawnManager,
47-
/// Failed to spawn route manager runtime
48-
#[error("Failed to spawn route manager runtime")]
49-
FailedToSpawnRuntime(#[from] io::Error),
5044
/// Attempt to use route manager that has been dropped
5145
#[error("Cannot send message to route manager since it is down")]
5246
RouteManagerDown,
@@ -257,7 +251,6 @@ pub enum CallbackMessage {
257251
/// the route will be adjusted dynamically when the default route changes.
258252
pub struct RouteManager {
259253
manage_tx: Option<Arc<UnboundedSender<RouteManagerCommand>>>,
260-
runtime: tokio::runtime::Handle,
261254
}
262255

263256
impl RouteManager {
@@ -280,7 +273,6 @@ impl RouteManager {
280273
tokio::spawn(manager.run(manage_rx));
281274

282275
Ok(Self {
283-
runtime: tokio::runtime::Handle::current(),
284276
manage_tx: Some(manage_tx),
285277
})
286278
}
@@ -289,77 +281,59 @@ impl RouteManager {
289281
pub async fn stop(&mut self) {
290282
if let Some(tx) = self.manage_tx.take() {
291283
let (wait_tx, wait_rx) = oneshot::channel();
292-
293-
if tx
294-
.unbounded_send(RouteManagerCommand::Shutdown(wait_tx))
295-
.is_err()
296-
{
297-
log::error!("RouteManager already down!");
298-
return;
299-
}
300-
301-
if wait_rx.await.is_err() {
302-
log::error!("{}", Error::ManagerChannelDown);
303-
}
284+
let _ = tx.unbounded_send(RouteManagerCommand::Shutdown(wait_tx));
285+
let _ = wait_rx.await;
304286
}
305287
}
306288

307289
/// Applies the given routes until [`RouteManager::stop`] is called.
308-
pub async fn add_routes(&mut self, routes: HashSet<RequiredRoute>) -> Result<(), Error> {
309-
if let Some(tx) = &self.manage_tx {
310-
let (result_tx, result_rx) = oneshot::channel();
311-
if tx
312-
.unbounded_send(RouteManagerCommand::AddRoutes(routes, result_tx))
313-
.is_err()
314-
{
315-
return Err(Error::RouteManagerDown);
316-
}
317-
318-
result_rx
319-
.await
320-
.map_err(|_| Error::ManagerChannelDown)?
321-
.map_err(Error::PlatformError)
322-
} else {
323-
Err(Error::RouteManagerDown)
324-
}
290+
pub async fn add_routes(&self, routes: HashSet<RequiredRoute>) -> Result<(), Error> {
291+
let tx = self.get_command_tx()?;
292+
let (result_tx, result_rx) = oneshot::channel();
293+
tx.unbounded_send(RouteManagerCommand::AddRoutes(routes, result_tx))
294+
.map_err(|_| Error::RouteManagerDown)?;
295+
296+
result_rx
297+
.await
298+
.map_err(|_| Error::ManagerChannelDown)?
299+
.map_err(Error::PlatformError)
325300
}
326301

327302
/// Removes all routes previously applied in [`RouteManager::add_routes`].
328-
pub fn clear_routes(&mut self) -> Result<(), Error> {
329-
if let Some(tx) = &self.manage_tx {
330-
if tx.unbounded_send(RouteManagerCommand::ClearRoutes).is_err() {
331-
return Err(Error::RouteManagerDown);
332-
}
333-
Ok(())
334-
} else {
335-
Err(Error::RouteManagerDown)
336-
}
303+
pub fn clear_routes(&self) -> Result<(), Error> {
304+
let tx = self.get_command_tx()?;
305+
tx.unbounded_send(RouteManagerCommand::ClearRoutes)
306+
.map_err(|_| Error::RouteManagerDown)
337307
}
338308

339309
/// Ensure that packets are routed using the correct tables.
340310
#[cfg(target_os = "linux")]
341-
pub async fn create_routing_rules(&mut self, enable_ipv6: bool) -> Result<(), Error> {
311+
pub async fn create_routing_rules(&self, enable_ipv6: bool) -> Result<(), Error> {
342312
self.handle()?.create_routing_rules(enable_ipv6).await
343313
}
344314

345315
/// Remove any routing rules created by [Self::create_routing_rules].
346316
#[cfg(target_os = "linux")]
347-
pub async fn clear_routing_rules(&mut self) -> Result<(), Error> {
317+
pub async fn clear_routing_rules(&self) -> Result<(), Error> {
348318
self.handle()?.clear_routing_rules().await
349319
}
350320

351321
/// Retrieve a sender directly to the command channel.
352322
pub fn handle(&self) -> Result<RouteManagerHandle, Error> {
353-
if let Some(tx) = &self.manage_tx {
354-
Ok(RouteManagerHandle { tx: tx.clone() })
355-
} else {
356-
Err(Error::RouteManagerDown)
357-
}
323+
let tx = self.get_command_tx()?;
324+
Ok(RouteManagerHandle { tx: tx.clone() })
325+
}
326+
327+
fn get_command_tx(&self) -> Result<&Arc<UnboundedSender<RouteManagerCommand>>, Error> {
328+
self.manage_tx.as_ref().ok_or(Error::RouteManagerDown)
358329
}
359330
}
360331

361332
impl Drop for RouteManager {
362333
fn drop(&mut self) {
363-
self.runtime.clone().block_on(self.stop());
334+
if let Some(tx) = self.manage_tx.take() {
335+
let (done_tx, _) = oneshot::channel();
336+
let _ = tx.unbounded_send(RouteManagerCommand::Shutdown(done_tx));
337+
}
364338
}
365339
}

talpid-routing/src/windows/mod.rs

+33-44
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ pub enum RouteManagerCommand {
151151
GetMtuForRoute(IpAddr, oneshot::Sender<Result<u16>>),
152152
ClearRoutes,
153153
RegisterDefaultRouteChangeCallback(Callback, oneshot::Sender<CallbackHandle>),
154-
Shutdown,
154+
Shutdown(oneshot::Sender<Result<()>>),
155155
}
156156

157157
impl RouteManager {
@@ -176,29 +176,19 @@ impl RouteManager {
176176
&self,
177177
callback: Callback,
178178
) -> Result<CallbackHandle> {
179-
if let Some(tx) = &self.manage_tx {
180-
let (result_tx, result_rx) = oneshot::channel();
181-
if tx
182-
.unbounded_send(RouteManagerCommand::RegisterDefaultRouteChangeCallback(
183-
callback, result_tx,
184-
))
185-
.is_err()
186-
{
187-
return Err(Error::RouteManagerDown);
188-
}
189-
Ok(result_rx.await.map_err(|_| Error::ManagerChannelDown)?)
190-
} else {
191-
Err(Error::RouteManagerDown)
192-
}
179+
let tx = self.get_command_tx()?;
180+
let (result_tx, result_rx) = oneshot::channel();
181+
tx.unbounded_send(RouteManagerCommand::RegisterDefaultRouteChangeCallback(
182+
callback, result_tx,
183+
))
184+
.map_err(|_| Error::RouteManagerDown)?;
185+
result_rx.await.map_err(|_| Error::ManagerChannelDown)
193186
}
194187

195188
/// Retrieve a sender directly to the command channel.
196189
pub fn handle(&self) -> Result<RouteManagerHandle> {
197-
if let Some(tx) = &self.manage_tx {
198-
Ok(RouteManagerHandle { tx: tx.clone() })
199-
} else {
200-
Err(Error::RouteManagerDown)
201-
}
190+
let tx = self.get_command_tx()?;
191+
Ok(RouteManagerHandle { tx: tx.clone() })
202192
}
203193

204194
async fn listen(
@@ -243,7 +233,9 @@ impl RouteManager {
243233
RouteManagerCommand::RegisterDefaultRouteChangeCallback(callback, tx) => {
244234
let _ = tx.send(internal.register_default_route_changed_callback(callback));
245235
}
246-
RouteManagerCommand::Shutdown => {
236+
RouteManagerCommand::Shutdown(tx) => {
237+
drop(internal);
238+
let _ = tx.send(Ok(()));
247239
break;
248240
}
249241
}
@@ -252,38 +244,32 @@ impl RouteManager {
252244

253245
/// Stops the routing manager and invalidates the route manager - no new default route callbacks
254246
/// can be added
255-
pub fn stop(&mut self) {
247+
pub async fn stop(&mut self) {
256248
if let Some(tx) = self.manage_tx.take() {
257-
if tx.unbounded_send(RouteManagerCommand::Shutdown).is_err() {
258-
log::error!("RouteManager channel already down or thread panicked");
259-
}
249+
let (result_tx, result_rx) = oneshot::channel();
250+
let _ = tx.unbounded_send(RouteManagerCommand::Shutdown(result_tx));
251+
_ = result_rx.await;
260252
}
261253
}
262254

263255
/// Applies the given routes until [`RouteManager::stop`] is called.
264256
pub async fn add_routes(&self, routes: HashSet<RequiredRoute>) -> Result<()> {
265-
if let Some(tx) = &self.manage_tx {
266-
let (result_tx, result_rx) = oneshot::channel();
267-
if tx
268-
.unbounded_send(RouteManagerCommand::AddRoutes(routes, result_tx))
269-
.is_err()
270-
{
271-
return Err(Error::RouteManagerDown);
272-
}
273-
result_rx.await.map_err(|_| Error::ManagerChannelDown)?
274-
} else {
275-
Err(Error::RouteManagerDown)
276-
}
257+
let tx = self.get_command_tx()?;
258+
let (result_tx, result_rx) = oneshot::channel();
259+
tx.unbounded_send(RouteManagerCommand::AddRoutes(routes, result_tx))
260+
.map_err(|_| Error::RouteManagerDown)?;
261+
result_rx.await.map_err(|_| Error::ManagerChannelDown)?
277262
}
278263

279264
/// Removes all routes previously applied in [`RouteManager::add_routes`].
280265
pub fn clear_routes(&self) -> Result<()> {
281-
if let Some(tx) = &self.manage_tx {
282-
tx.unbounded_send(RouteManagerCommand::ClearRoutes)
283-
.map_err(|_| Error::RouteManagerDown)
284-
} else {
285-
Err(Error::RouteManagerDown)
286-
}
266+
let tx = self.get_command_tx()?;
267+
tx.unbounded_send(RouteManagerCommand::ClearRoutes)
268+
.map_err(|_| Error::RouteManagerDown)
269+
}
270+
271+
fn get_command_tx(&self) -> Result<&UnboundedSender<RouteManagerCommand>> {
272+
self.manage_tx.as_ref().ok_or(Error::RouteManagerDown)
287273
}
288274
}
289275

@@ -309,6 +295,9 @@ fn get_mtu_for_route(addr_family: AddressFamily) -> Result<Option<u16>> {
309295

310296
impl Drop for RouteManager {
311297
fn drop(&mut self) {
312-
self.stop();
298+
if let Some(tx) = self.manage_tx.take() {
299+
let (done_tx, _) = oneshot::channel();
300+
let _ = tx.unbounded_send(RouteManagerCommand::Shutdown(done_tx));
301+
}
313302
}
314303
}

0 commit comments

Comments
 (0)