From ff51290908e359d5f58254104d91fd35fe62670d Mon Sep 17 00:00:00 2001 From: Finomnis Date: Wed, 7 Feb 2024 13:34:20 +0100 Subject: [PATCH] Add `SubsystemBuilder::detach()` --- examples/19_sequential_shutdown.rs | 1 - examples/20_orchestrated_shutdown_order.rs | 81 ++++++++++++++++++++++ src/subsystem/subsystem_builder.rs | 12 ++++ src/subsystem/subsystem_handle.rs | 8 ++- src/toplevel.rs | 1 + tests/integration_test_2.rs | 45 ++++++++++++ 6 files changed, 146 insertions(+), 2 deletions(-) create mode 100644 examples/20_orchestrated_shutdown_order.rs diff --git a/examples/19_sequential_shutdown.rs b/examples/19_sequential_shutdown.rs index ca0da8d..5d8c248 100644 --- a/examples/19_sequential_shutdown.rs +++ b/examples/19_sequential_shutdown.rs @@ -83,7 +83,6 @@ async fn nested3(subsys: SubsystemHandle, nested2_finished: SubsystemFinishedFut } async fn root(subsys: SubsystemHandle) -> Result<()> { - // This subsystem shuts down the nested subsystem after 5 seconds. tracing::info!("Root started."); tracing::info!("Starting nested subsystems ..."); diff --git a/examples/20_orchestrated_shutdown_order.rs b/examples/20_orchestrated_shutdown_order.rs new file mode 100644 index 0000000..00f3494 --- /dev/null +++ b/examples/20_orchestrated_shutdown_order.rs @@ -0,0 +1,81 @@ +//! This example demonstrates how a parent subsystem could orchestrate the shutdown order of its children manually. +//! +//! This is done by spawning the children in 'detached' mode to prevent that the shutdown signal gets passed to the children. +//! Then, the parent calls `initialize_shutdown` on each child manually. + +use miette::Result; +use tokio::time::{sleep, Duration}; +use tokio_graceful_shutdown::{FutureExt, SubsystemBuilder, SubsystemHandle, Toplevel}; + +async fn counter(id: &str) { + let mut i = 0; + loop { + tracing::info!("{id}: {i}"); + i += 1; + sleep(Duration::from_millis(50)).await; + } +} + +async fn child(name: &str, subsys: SubsystemHandle) -> Result<()> { + tracing::info!("{name} started."); + if counter(name).cancel_on_shutdown(&subsys).await.is_ok() { + tracing::info!("{name} counter finished."); + } else { + tracing::info!("{name} shutting down ..."); + sleep(Duration::from_millis(200)).await; + } + subsys.on_shutdown_requested().await; + tracing::info!("{name} stopped."); + Ok(()) +} + +async fn parent(subsys: SubsystemHandle) -> Result<()> { + tracing::info!("Parent started."); + + tracing::info!("Starting detached nested subsystems ..."); + let nested1 = + subsys.start(SubsystemBuilder::new("Nested1", |s| child("Nested1", s)).detached()); + let nested2 = + subsys.start(SubsystemBuilder::new("Nested2", |s| child("Nested2", s)).detached()); + let nested3 = + subsys.start(SubsystemBuilder::new("Nested3", |s| child("Nested3", s)).detached()); + tracing::info!("Nested subsystems started."); + + // Wait for the shutdown to happen + subsys.on_shutdown_requested().await; + + // Shut down children sequentially. As they are detached, they will not shutdown on their own, + // but need to be shut down manually via `initiate_shutdown`. + tracing::info!("Initiating Nested1 shutdown ..."); + nested1.initiate_shutdown(); + nested1.join().await?; + tracing::info!("Initiating Nested2 shutdown ..."); + nested2.initiate_shutdown(); + nested2.join().await?; + tracing::info!("Initiating Nested3 shutdown ..."); + nested3.initiate_shutdown(); + nested3.join().await?; + + tracing::info!("All children finished, stopping Root ..."); + sleep(Duration::from_millis(200)).await; + tracing::info!("Root stopped."); + + Ok(()) +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<()> { + // Init logging + tracing_subscriber::fmt() + .with_max_level(tracing::Level::TRACE) + .init(); + + // Setup and execute subsystem tree + Toplevel::new(|s| async move { + s.start(SubsystemBuilder::new("parent", parent)); + }) + .catch_signals() + .handle_shutdown_requests(Duration::from_millis(1000)) + .await + .map_err(Into::into) +} diff --git a/src/subsystem/subsystem_builder.rs b/src/subsystem/subsystem_builder.rs index e2d0e73..3364a9a 100644 --- a/src/subsystem/subsystem_builder.rs +++ b/src/subsystem/subsystem_builder.rs @@ -15,6 +15,7 @@ where pub(crate) subsystem: Subsys, pub(crate) failure_action: ErrorAction, pub(crate) panic_action: ErrorAction, + pub(crate) detached: bool, #[allow(clippy::type_complexity)] _phantom: PhantomData (Fut, ErrType, Err)>, } @@ -40,6 +41,7 @@ where subsystem, failure_action: ErrorAction::Forward, panic_action: ErrorAction::Forward, + detached: false, _phantom: Default::default(), } } @@ -65,4 +67,14 @@ where self.panic_action = action; self } + + /// Detaches the subsystem from the parent, causing a shutdown request to not + /// be propagated from the parent to the child. + /// + /// If this option is set, the parent needs to call [`NestedSubsystem::initiate_shutdown`](crate::NestedSubsystem::initiate_shutdown) + /// manually to perform a correct shutdown. So use this option with care. + pub fn detached(mut self) -> Self { + self.detached = true; + self + } } diff --git a/src/subsystem/subsystem_handle.rs b/src/subsystem/subsystem_handle.rs index 86b032c..08dd32b 100644 --- a/src/subsystem/subsystem_handle.rs +++ b/src/subsystem/subsystem_handle.rs @@ -88,6 +88,7 @@ impl SubsystemHandle { on_failure: Atomic::new(builder.failure_action), on_panic: Atomic::new(builder.panic_action), }, + builder.detached, ) } @@ -96,6 +97,7 @@ impl SubsystemHandle { name: Arc, subsystem: Subsys, error_actions: ErrorActions, + detached: bool, ) -> NestedSubsystem where Subsys: 'static + FnOnce(SubsystemHandle) -> Fut + Send, @@ -106,7 +108,11 @@ impl SubsystemHandle { let (error_sender, errors) = mpsc::unbounded_channel(); - let cancellation_token = self.inner.cancellation_token.child_token(); + let cancellation_token = if detached { + CancellationToken::new() + } else { + self.inner.cancellation_token.child_token() + }; let error_actions = Arc::new(error_actions); diff --git a/src/toplevel.rs b/src/toplevel.rs index 91fbd73..0271e71 100644 --- a/src/toplevel.rs +++ b/src/toplevel.rs @@ -87,6 +87,7 @@ impl Toplevel { on_failure: Atomic::new(ErrorAction::Forward), on_panic: Atomic::new(ErrorAction::Forward), }, + false, ); Self { diff --git a/tests/integration_test_2.rs b/tests/integration_test_2.rs index 47b0802..9d55781 100644 --- a/tests/integration_test_2.rs +++ b/tests/integration_test_2.rs @@ -284,3 +284,48 @@ async fn subsystem_finished_works_correctly() { .await; assert!(result.is_ok()); } + +#[tokio::test] +#[traced_test] +async fn shutdown_does_not_propagate_to_detached_subsystem() { + let (nested_started, set_nested_started) = Event::create(); + let (nested_finished, set_nested_finished) = Event::create(); + + let detached_subsystem = |subsys: SubsystemHandle| async move { + set_nested_started(); + subsys.on_shutdown_requested().await; + set_nested_finished(); + BoxedResult::Ok(()) + }; + + let subsystem = |subsys: SubsystemHandle| async move { + let nested = subsys.start(SubsystemBuilder::new("detached", detached_subsystem).detached()); + sleep(Duration::from_millis(20)).await; + assert!(nested_started.get()); + assert!(!nested_finished.get()); + + subsys.on_shutdown_requested().await; + + sleep(Duration::from_millis(20)).await; + assert!(!nested_finished.get()); + + nested.initiate_shutdown(); + + sleep(Duration::from_millis(20)).await; + assert!(nested_finished.get()); + + BoxedResult::Ok(()) + }; + + let toplevel = Toplevel::new(move |s| async move { + s.start(SubsystemBuilder::new("subsys", subsystem)); + + sleep(Duration::from_millis(100)).await; + s.request_shutdown(); + }); + + let result = toplevel + .handle_shutdown_requests(Duration::from_millis(400)) + .await; + assert!(result.is_ok()); +}