Skip to content

Commit

Permalink
Add .finished() to NestedSubsystem, add sequential shutdown examp…
Browse files Browse the repository at this point in the history
…le (#82)
  • Loading branch information
Finomnis authored Feb 7, 2024
1 parent 20f5671 commit f10debd
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/audit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
name: Audit
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: rustsec/[email protected]
with:
token: ${{ secrets.GITHUB_TOKEN }}
20 changes: 10 additions & 10 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- x86_64-unknown-linux-gnu
steps:
- name: Checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Install cross
uses: taiki-e/install-action@cross
Expand All @@ -40,7 +40,7 @@ jobs:
RUSTFLAGS: "-D warnings"
steps:
- name: Checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Install stable toolchain
uses: dtolnay/rust-toolchain@stable
Expand All @@ -58,7 +58,7 @@ jobs:
RUSTFLAGS: "-D warnings"
steps:
- name: Checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Install cargo-binstall
uses: taiki-e/install-action@cargo-binstall
Expand All @@ -77,7 +77,7 @@ jobs:
needs: [lints, docs]
steps:
- name: Checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Check semver
uses: obi1kenobi/cargo-semver-checks-action@v2

Expand All @@ -87,7 +87,7 @@ jobs:
needs: [lints, docs]
steps:
- name: Checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Install nightly toolchain
uses: dtolnay/rust-toolchain@nightly
Expand All @@ -111,7 +111,7 @@ jobs:
RUSTFLAGS: "-D warnings"
steps:
- name: Checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Install nightly toolchain
uses: dtolnay/rust-toolchain@nightly
Expand Down Expand Up @@ -142,7 +142,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Install stable toolchain
uses: dtolnay/rust-toolchain@stable
Expand All @@ -160,7 +160,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Install stable toolchain
uses: dtolnay/rust-toolchain@stable
Expand All @@ -176,7 +176,7 @@ jobs:
needs: [lints, docs]
steps:
- name: Checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Install llvm
# Required to resolve symbols in sanitizer output
Expand Down Expand Up @@ -208,7 +208,7 @@ jobs:
needs: [build, test, msrv, lints, docs, leaks, semver, min-versions, min-versions-msrv]
steps:
- name: Checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Install stable toolchain
uses: dtolnay/rust-toolchain@stable
Expand Down
8 changes: 5 additions & 3 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@ jobs:
with:
components: llvm-tools-preview
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Install llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
#- uses: Swatinem/rust-cache@v1
- name: Compute Coverage
run:
cargo llvm-cov --all-features --workspace --ignore-filename-regex tests.rs --codecov --output-path codecov.json
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
files: codecov.json
fail_ci_if_error: true
- name: Archive code coverage results
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: code-coverage-report
path: codecov.json
6 changes: 3 additions & 3 deletions .github/workflows/rust-clippy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ jobs:
actions: read # only required for a private repository by github/codeql-action/upload-sarif to get the Action run status
steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
components: clippy

- name: Install sarif tools
uses: taiki-e/install-action@v1
uses: taiki-e/install-action@v2
with:
tool: clippy-sarif,sarif-fmt

Expand All @@ -49,7 +49,7 @@ jobs:
continue-on-error: true

- name: Upload analysis results to GitHub
uses: github/codeql-action/upload-sarif@v2
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: rust-clippy-results.sarif
wait-for-processing: true
126 changes: 126 additions & 0 deletions examples/19_sequential_shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
//! This example demonstrates how multiple subsystems could be shut down sequentially.
//!
//! When a shutdown gets triggered (via Ctrl+C), Nested1 will shutdown first,
//! followed by Nested2 and Nested3. Only once the previous subsystem is finished shutting down,
//! the next subsystem will follow.
use miette::Result;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{
FutureExt, SubsystemBuilder, SubsystemFinishedFuture, SubsystemHandle, Toplevel,
};

async fn counter(id: &str) {
let mut i = 0;
loop {
tracing::info!("{id}: {i}");
i += 1;
sleep(Duration::from_millis(50)).await;
}
}

async fn nested1(subsys: SubsystemHandle) -> Result<()> {
tracing::info!("Nested1 started.");
if counter("Nested1").cancel_on_shutdown(&subsys).await.is_ok() {
tracing::info!("Nested1 counter finished.");
} else {
tracing::info!("Nested1 shutting down ...");
sleep(Duration::from_millis(200)).await;
}
subsys.on_shutdown_requested().await;
tracing::info!("Nested1 stopped.");
Ok(())
}

async fn nested2(subsys: SubsystemHandle, nested1_finished: SubsystemFinishedFuture) -> Result<()> {
// Create a future that triggers once nested1 is finished **and** a shutdown is requested
let shutdown = {
let shutdown_requested = subsys.on_shutdown_requested();
async move {
tokio::join!(shutdown_requested, nested1_finished);
}
};

tracing::info!("Nested2 started.");
tokio::select! {
_ = shutdown => {
tracing::info!("Nested2 shutting down ...");
sleep(Duration::from_millis(200)).await;
}
_ = counter("Nested2") => {
tracing::info!("Nested2 counter finished.");
}
}

tracing::info!("Nested2 stopped.");
Ok(())
}

async fn nested3(subsys: SubsystemHandle, nested2_finished: SubsystemFinishedFuture) -> Result<()> {
// Create a future that triggers once nested2 is finished **and** a shutdown is requested
let shutdown = {
// This is an alternative to `on_shutdown_requested()` (as shown in nested2).
// Use this if `on_shutdown_requested()` gives you lifetime issues.
let cancellation_token = subsys.create_cancellation_token();
async move {
tokio::join!(cancellation_token.cancelled(), nested2_finished);
}
};

tracing::info!("Nested3 started.");
tokio::select! {
_ = shutdown => {
tracing::info!("Nested3 shutting down ...");
sleep(Duration::from_millis(200)).await;
}
_ = counter("Nested3") => {
tracing::info!("Nested3 counter finished.");
}
}

tracing::info!("Nested3 stopped.");
Ok(())
}

async fn root(subsys: SubsystemHandle) -> Result<()> {
// This subsystem shuts down the nested subsystem after 5 seconds.
tracing::info!("Root started.");

tracing::info!("Starting nested subsystems ...");
let nested1 = subsys.start(SubsystemBuilder::new("Nested1", nested1));
let nested1_finished = nested1.finished();
let nested2 = subsys.start(SubsystemBuilder::new("Nested2", |s| {
nested2(s, nested1_finished)
}));
let nested2_finished = nested2.finished();
subsys.start(SubsystemBuilder::new("Nested3", |s| {
nested3(s, nested2_finished)
}));
tracing::info!("Nested subsystems started.");

// Wait for all children to finish shutting down.
subsys.wait_for_children().await;

tracing::info!("All children finished, stopping Root ...");
sleep(Duration::from_millis(200)).await;
tracing::info!("Root stopped.");

Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
// Init logging
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();

// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Root", root));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
2 changes: 1 addition & 1 deletion src/future_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use pin_project_lite::pin_project;
use tokio_util::sync::WaitForCancellationFuture;

pin_project! {
/// A Future that is resolved once the corresponding task is finished
/// A future that is resolved once the corresponding task is finished
/// or a shutdown is initiated.
#[must_use = "futures do nothing unless polled"]
pub struct CancelOnShutdownFuture<'a, T: std::future::Future>{
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,6 @@ pub use future_ext::FutureExt;
pub use into_subsystem::IntoSubsystem;
pub use subsystem::NestedSubsystem;
pub use subsystem::SubsystemBuilder;
pub use subsystem::SubsystemFinishedFuture;
pub use subsystem::SubsystemHandle;
pub use toplevel::Toplevel;
15 changes: 14 additions & 1 deletion src/subsystem/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
mod error_collector;
mod nested_subsystem;
mod subsystem_builder;
mod subsystem_finished_future;
mod subsystem_handle;

use std::sync::{Arc, Mutex};
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
};

pub use subsystem_builder::SubsystemBuilder;
pub use subsystem_handle::SubsystemHandle;
Expand Down Expand Up @@ -35,3 +40,11 @@ pub(crate) struct ErrorActions {
pub(crate) on_failure: Atomic<ErrorAction>,
pub(crate) on_panic: Atomic<ErrorAction>,
}

/// A future that is resolved once the corresponding subsystem is finished.
///
/// Returned by [`NestedSubsystem::finished`].
#[must_use = "futures do nothing unless polled"]
pub struct SubsystemFinishedFuture {
future: Pin<Box<dyn Future<Output = ()> + Send + Sync>>,
}
14 changes: 11 additions & 3 deletions src/subsystem/nested_subsystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::atomic::Ordering;

use crate::{errors::SubsystemJoinError, ErrTypeTraits, ErrorAction};

use super::NestedSubsystem;
use super::{NestedSubsystem, SubsystemFinishedFuture};

impl<ErrType: ErrTypeTraits> NestedSubsystem<ErrType> {
/// Wait for the subsystem to be finished.
Expand Down Expand Up @@ -68,7 +68,7 @@ impl<ErrType: ErrTypeTraits> NestedSubsystem<ErrType> {
/// Changes the way this subsystem should react to failures,
/// meaning if it or one of its children returns an `Err` value.
///
/// For more information, see [ErrorAction].
/// For more information, see [`ErrorAction`].
pub fn change_failure_action(&self, action: ErrorAction) {
self.error_actions
.on_failure
Expand All @@ -78,8 +78,16 @@ impl<ErrType: ErrTypeTraits> NestedSubsystem<ErrType> {
/// Changes the way this subsystem should react if it or one
/// of its children panic.
///
/// For more information, see [ErrorAction].
/// For more information, see [`ErrorAction`].
pub fn change_panic_action(&self, action: ErrorAction) {
self.error_actions.on_panic.store(action, Ordering::Relaxed);
}

/// Returns a future that resolves once the subsystem is finished.
///
/// Similar to [`join`](NestedSubsystem::join), but more light-weight
/// as it does not return any information about subsystem errors.
pub fn finished(&self) -> SubsystemFinishedFuture {
SubsystemFinishedFuture::new(self.joiner.clone())
}
}
25 changes: 25 additions & 0 deletions src/subsystem/subsystem_finished_future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

use crate::utils::JoinerTokenRef;

use super::SubsystemFinishedFuture;

impl SubsystemFinishedFuture {
pub(crate) fn new(joiner: JoinerTokenRef) -> Self {
Self {
future: Box::pin(async move { joiner.join().await }),
}
}
}

impl Future for SubsystemFinishedFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.future.as_mut().poll(cx)
}
}
1 change: 1 addition & 0 deletions src/utils/joiner_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub(crate) struct JoinerToken<ErrType: ErrTypeTraits> {

/// A reference version that does not keep the content alive; purely for
/// joining the subtree.
#[derive(Clone)]
pub(crate) struct JoinerTokenRef {
counter: watch::Receiver<(bool, u32)>,
}
Expand Down
Loading

0 comments on commit f10debd

Please sign in to comment.