Skip to content

Commit

Permalink
Add SubsystemBuilder::detach()
Browse files Browse the repository at this point in the history
  • Loading branch information
Finomnis committed Feb 7, 2024
1 parent f10debd commit ff51290
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 2 deletions.
1 change: 0 additions & 1 deletion examples/19_sequential_shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ async fn nested3(subsys: SubsystemHandle, nested2_finished: SubsystemFinishedFut
}

async fn root(subsys: SubsystemHandle) -> Result<()> {
// This subsystem shuts down the nested subsystem after 5 seconds.
tracing::info!("Root started.");

tracing::info!("Starting nested subsystems ...");
Expand Down
81 changes: 81 additions & 0 deletions examples/20_orchestrated_shutdown_order.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//! This example demonstrates how a parent subsystem could orchestrate the shutdown order of its children manually.
//!
//! This is done by spawning the children in 'detached' mode to prevent that the shutdown signal gets passed to the children.
//! Then, the parent calls `initialize_shutdown` on each child manually.
use miette::Result;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{FutureExt, SubsystemBuilder, SubsystemHandle, Toplevel};

async fn counter(id: &str) {
let mut i = 0;
loop {
tracing::info!("{id}: {i}");
i += 1;
sleep(Duration::from_millis(50)).await;
}
}

async fn child(name: &str, subsys: SubsystemHandle) -> Result<()> {
tracing::info!("{name} started.");
if counter(name).cancel_on_shutdown(&subsys).await.is_ok() {
tracing::info!("{name} counter finished.");
} else {
tracing::info!("{name} shutting down ...");
sleep(Duration::from_millis(200)).await;
}
subsys.on_shutdown_requested().await;
tracing::info!("{name} stopped.");
Ok(())
}

async fn parent(subsys: SubsystemHandle) -> Result<()> {
tracing::info!("Parent started.");

tracing::info!("Starting detached nested subsystems ...");
let nested1 =
subsys.start(SubsystemBuilder::new("Nested1", |s| child("Nested1", s)).detached());
let nested2 =
subsys.start(SubsystemBuilder::new("Nested2", |s| child("Nested2", s)).detached());
let nested3 =
subsys.start(SubsystemBuilder::new("Nested3", |s| child("Nested3", s)).detached());
tracing::info!("Nested subsystems started.");

// Wait for the shutdown to happen
subsys.on_shutdown_requested().await;

// Shut down children sequentially. As they are detached, they will not shutdown on their own,
// but need to be shut down manually via `initiate_shutdown`.
tracing::info!("Initiating Nested1 shutdown ...");
nested1.initiate_shutdown();
nested1.join().await?;
tracing::info!("Initiating Nested2 shutdown ...");
nested2.initiate_shutdown();
nested2.join().await?;
tracing::info!("Initiating Nested3 shutdown ...");
nested3.initiate_shutdown();
nested3.join().await?;

tracing::info!("All children finished, stopping Root ...");
sleep(Duration::from_millis(200)).await;
tracing::info!("Root stopped.");

Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
// Init logging
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();

// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("parent", parent));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
12 changes: 12 additions & 0 deletions src/subsystem/subsystem_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ where
pub(crate) subsystem: Subsys,
pub(crate) failure_action: ErrorAction,
pub(crate) panic_action: ErrorAction,
pub(crate) detached: bool,
#[allow(clippy::type_complexity)]
_phantom: PhantomData<fn() -> (Fut, ErrType, Err)>,
}
Expand All @@ -40,6 +41,7 @@ where
subsystem,
failure_action: ErrorAction::Forward,
panic_action: ErrorAction::Forward,
detached: false,
_phantom: Default::default(),
}
}
Expand All @@ -65,4 +67,14 @@ where
self.panic_action = action;
self
}

/// Detaches the subsystem from the parent, causing a shutdown request to not
/// be propagated from the parent to the child.
///
/// If this option is set, the parent needs to call [`NestedSubsystem::initiate_shutdown`](crate::NestedSubsystem::initiate_shutdown)
/// manually to perform a correct shutdown. So use this option with care.
pub fn detached(mut self) -> Self {
self.detached = true;
self
}
}
8 changes: 7 additions & 1 deletion src/subsystem/subsystem_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
on_failure: Atomic::new(builder.failure_action),
on_panic: Atomic::new(builder.panic_action),
},
builder.detached,
)
}

Expand All @@ -96,6 +97,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
name: Arc<str>,
subsystem: Subsys,
error_actions: ErrorActions,
detached: bool,
) -> NestedSubsystem<ErrType>
where
Subsys: 'static + FnOnce(SubsystemHandle<ErrType>) -> Fut + Send,
Expand All @@ -106,7 +108,11 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {

let (error_sender, errors) = mpsc::unbounded_channel();

let cancellation_token = self.inner.cancellation_token.child_token();
let cancellation_token = if detached {
CancellationToken::new()
} else {
self.inner.cancellation_token.child_token()
};

let error_actions = Arc::new(error_actions);

Expand Down
1 change: 1 addition & 0 deletions src/toplevel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
on_failure: Atomic::new(ErrorAction::Forward),
on_panic: Atomic::new(ErrorAction::Forward),
},
false,
);

Self {
Expand Down
45 changes: 45 additions & 0 deletions tests/integration_test_2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,48 @@ async fn subsystem_finished_works_correctly() {
.await;
assert!(result.is_ok());
}

#[tokio::test]
#[traced_test]
async fn shutdown_does_not_propagate_to_detached_subsystem() {
let (nested_started, set_nested_started) = Event::create();
let (nested_finished, set_nested_finished) = Event::create();

let detached_subsystem = |subsys: SubsystemHandle| async move {
set_nested_started();
subsys.on_shutdown_requested().await;
set_nested_finished();
BoxedResult::Ok(())
};

let subsystem = |subsys: SubsystemHandle| async move {
let nested = subsys.start(SubsystemBuilder::new("detached", detached_subsystem).detached());
sleep(Duration::from_millis(20)).await;
assert!(nested_started.get());
assert!(!nested_finished.get());

subsys.on_shutdown_requested().await;

sleep(Duration::from_millis(20)).await;
assert!(!nested_finished.get());

nested.initiate_shutdown();

sleep(Duration::from_millis(20)).await;
assert!(nested_finished.get());

BoxedResult::Ok(())
};

let toplevel = Toplevel::new(move |s| async move {
s.start(SubsystemBuilder::new("subsys", subsystem));

sleep(Duration::from_millis(100)).await;
s.request_shutdown();
});

let result = toplevel
.handle_shutdown_requests(Duration::from_millis(400))
.await;
assert!(result.is_ok());
}

0 comments on commit ff51290

Please sign in to comment.