Skip to content

Commit

Permalink
Full rewrite (#64)
Browse files Browse the repository at this point in the history
## Full rewrite of this crate

The main problem was that the API followed the implementation and not the other way round. Some API improvements weren't possible because they weren't implementable.

With the new implementation, several API decisions that bothered me were finally improved.

### Breaking Changes
- Remove `Toplevel::nested` (which was a rather ugly hack), replace with configurable `ErrorAction`s.
- Remove partial shutdown mechanism; replace with `NestedSubsystem::initiate_shutdown()` and `NestedSubsystem::join()`.
- Add `SubsystemHandle::wait_for_children()`.
- Remove `Toplevel::start`. Instead, add a single subsystem to `Toplevel::new`, which can then spawn all further subsystems.
- Rework `SubsystemHandle::start`; it now takes a `SubsystemBuilder` object that provides further subsystem configuration options.
- Minimum Supported Rust Version (MSRV) is now `1.63.0`.


The rest of the API is almost identical and migration should be mostly straight forward.
  • Loading branch information
Finomnis authored Oct 20, 2023
1 parent 15bfcb0 commit 926d497
Show file tree
Hide file tree
Showing 52 changed files with 2,349 additions and 2,339 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/audit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions-rs/audit-check@v1
- uses: rustsec/audit-check@v1.4.1
with:
token: ${{ secrets.GITHUB_TOKEN }}
19 changes: 18 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,23 @@ jobs:
- name: Run cargo test
run: cargo test -- --test-threads 1

msrv:
name: Minimum Supported Rust Version
runs-on: ubuntu-latest
env:
RUSTFLAGS: "-D warnings"
steps:
- name: Checkout sources
uses: actions/checkout@v3

- name: Install MSRV toolchain
uses: dtolnay/[email protected]

#- uses: Swatinem/rust-cache@v1

- name: Run cargo build
run: cargo build

lints:
name: Lints
runs-on: ubuntu-latest
Expand Down Expand Up @@ -115,7 +132,7 @@ jobs:
runs-on: ubuntu-latest
environment: production
if: github.event_name == 'release'
needs: [build, test, lints, docs, leaks]
needs: [build, test, msrv, lints, docs, leaks]
steps:
- name: Checkout sources
uses: actions/checkout@v3
Expand Down
37 changes: 17 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
[package]
name = "tokio-graceful-shutdown"
authors = ["Finomnis <[email protected]>"]
version = "0.13.0"
edition = "2018"
version = "0.14.0"
edition = "2021"
rust-version = "1.63"
license = "MIT OR Apache-2.0"
readme = "README.md"
repository = "https://github.com/Finomnis/tokio-graceful-shutdown"
Expand All @@ -20,39 +21,35 @@ exclude = [
]

[dependencies]
# Error definitions
thiserror = "1.0.32"
miette = "5.3.0"
tracing = { version = "0.1.37", default-features = false }

# For async utilities
tokio = { version = "1.20.1", default-features = false, features = [
tokio = { version = "1.32.0", default-features = false, features = [
"signal",
"rt",
"macros",
"time",
] }
tokio-util = { version = "0.7.2", default-features = false }
futures = "0.3.23"
async-recursion = "1.0.0"
pin-project-lite = "0.2.9"
tokio-util = { version = "0.7.8", default-features = false }

# For 'IntoSubsystem' trait
async-trait = "0.1.57"

# For logging
log = "0.4.17"
pin-project-lite = "0.2.13"
thiserror = "1.0.49"
miette = "5.10.0"
async-trait = "0.1.73"
atomic = "0.6.0"
bytemuck = { version = "1.14.0", features = ["derive"] }

[dev-dependencies]
# Error propagation
anyhow = "1.0.61"
anyhow = "1.0.75"
eyre = "0.6.8"
miette = { version = "5.3.0", features = ["fancy"] }
miette = { version = "5.10.0", features = ["fancy"] }

# Logging
env_logger = "0.10.0"
tracing-subscriber = "0.3.17"
tracing-test = "0.2.4"

# Tokio
tokio = { version = "1.20.1", features = ["full"] }
tokio = { version = "1.32.0", features = ["full"] }

# Hyper example
hyper = { version = "0.14.20", features = ["full"] }
Expand Down
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,19 @@ This subsystem can now be executed like this:
```rust
#[tokio::main]
async fn main() -> Result<()> {
Toplevel::new()
.start("Subsys1", subsys1)
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Subsys1", subsys1))
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
```

The `Toplevel` object is the root object of the subsystem tree.
Subsystems can then be started using the `start()` functionality of the toplevel object.
Subsystems can then be started in it using the `start()` method
of its `SubsystemHandle` object.

The `catch_signals()` method signals the `Toplevel` object to listen for SIGINT/SIGTERM/Ctrl+C and initiate a shutdown thereafter.

Expand Down
36 changes: 19 additions & 17 deletions examples/01_normal_shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,42 @@
//! If custom arguments for the subsystem coroutines are required,
//! a struct has to be used instead, as seen in other examples.
use env_logger::{Builder, Env};
use miette::Result;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{SubsystemHandle, Toplevel};
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle, Toplevel};

async fn subsys1(subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem1 started.");
tracing::info!("Subsystem1 started.");
subsys.on_shutdown_requested().await;
log::info!("Shutting down Subsystem1 ...");
tracing::info!("Shutting down Subsystem1 ...");
sleep(Duration::from_millis(400)).await;
log::info!("Subsystem1 stopped.");
tracing::info!("Subsystem1 stopped.");
Ok(())
}

async fn subsys2(subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem2 started.");
tracing::info!("Subsystem2 started.");
subsys.on_shutdown_requested().await;
log::info!("Shutting down Subsystem2 ...");
tracing::info!("Shutting down Subsystem2 ...");
sleep(Duration::from_millis(500)).await;
log::info!("Subsystem2 stopped.");
tracing::info!("Subsystem2 stopped.");
Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
// Init logging
Builder::from_env(Env::default().default_filter_or("debug")).init();
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();

// Create toplevel
Toplevel::new()
.start("Subsys1", subsys1)
.start("Subsys2", subsys2)
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Subsys1", subsys1));
s.start(SubsystemBuilder::new("Subsys2", subsys2));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
38 changes: 20 additions & 18 deletions examples/02_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,26 @@
//! custom parameters to be passed to the subsystem.
//!
//! There are two ways of using structs as subsystems, by either
//! wrapping them in an async closure, or by implementing the
//! wrapping them in a closure, or by implementing the
//! IntoSubsystem trait. Note, though, that the IntoSubsystem
//! trait requires an additional dependency, `async-trait`.
use async_trait::async_trait;
use env_logger::{Builder, Env};
use miette::Result;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle, Toplevel};
use tokio_graceful_shutdown::{IntoSubsystem, SubsystemBuilder, SubsystemHandle, Toplevel};

struct Subsystem1 {
arg: u32,
}

impl Subsystem1 {
async fn run(self, subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem1 started. Extra argument: {}", self.arg);
tracing::info!("Subsystem1 started. Extra argument: {}", self.arg);
subsys.on_shutdown_requested().await;
log::info!("Shutting down Subsystem1 ...");
tracing::info!("Shutting down Subsystem1 ...");
sleep(Duration::from_millis(500)).await;
log::info!("Subsystem1 stopped.");
tracing::info!("Subsystem1 stopped.");
Ok(())
}
}
Expand All @@ -34,29 +33,32 @@ struct Subsystem2 {
#[async_trait]
impl IntoSubsystem<miette::Report> for Subsystem2 {
async fn run(self, subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem2 started. Extra argument: {}", self.arg);
tracing::info!("Subsystem2 started. Extra argument: {}", self.arg);
subsys.on_shutdown_requested().await;
log::info!("Shutting down Subsystem2 ...");
tracing::info!("Shutting down Subsystem2 ...");
sleep(Duration::from_millis(500)).await;
log::info!("Subsystem2 stopped.");
tracing::info!("Subsystem2 stopped.");
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<()> {
// Init logging
Builder::from_env(Env::default().default_filter_or("debug")).init();
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();

let subsys1 = Subsystem1 { arg: 42 };
let subsys2 = Subsystem2 { arg: 69 };

// Create toplevel
Toplevel::new()
.start("Subsys1", |a| subsys1.run(a))
.start("Subsys2", subsys2.into_subsystem())
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Subsys1", |a| subsys1.run(a)));
s.start(SubsystemBuilder::new("Subsys2", subsys2.into_subsystem()));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
28 changes: 15 additions & 13 deletions examples/03_shutdown_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,32 @@
//! so the subsystem gets cancelled and the program returns an appropriate
//! error code.
use env_logger::{Builder, Env};
use miette::Result;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{SubsystemHandle, Toplevel};
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle, Toplevel};

async fn subsys1(subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem1 started.");
tracing::info!("Subsystem1 started.");
subsys.on_shutdown_requested().await;
log::info!("Shutting down Subsystem1 ...");
tracing::info!("Shutting down Subsystem1 ...");
sleep(Duration::from_millis(2000)).await;
log::info!("Subsystem1 stopped.");
tracing::info!("Subsystem1 stopped.");
Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
// Init logging
Builder::from_env(Env::default().default_filter_or("debug")).init();
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();

// Create toplevel
Toplevel::new()
.start("Subsys1", subsys1)
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(500))
.await
.map_err(Into::into)
// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Subsys1", subsys1));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(500))
.await
.map_err(Into::into)
}
30 changes: 16 additions & 14 deletions examples/04_subsystem_finished.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
//! This subsystem demonstrates that subsystems can also stop
//! This example demonstrates that subsystems can also stop
//! prematurely.
//!
//! Returning Ok(()) from a subsystem indicates that the subsystem
//! stopped intentionally, and no further measures by the runtime are performed.
//! (unless there are no more subsystems left, in that case TopLevel would shut down anyway)
use env_logger::{Builder, Env};
use miette::Result;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{SubsystemHandle, Toplevel};
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle, Toplevel};

async fn subsys1(_subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem1 started.");
tracing::info!("Subsystem1 started.");
sleep(Duration::from_millis(500)).await;
log::info!("Subsystem1 stopped.");
tracing::info!("Subsystem1 stopped.");

// Task ends without an error. This should not cause the main program to shutdown,
// because Subsys2 is still running.
Expand All @@ -28,14 +27,17 @@ async fn subsys2(subsys: SubsystemHandle) -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
// Init logging
Builder::from_env(Env::default().default_filter_or("debug")).init();
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();

// Create toplevel
Toplevel::new()
.start("Subsys1", subsys1)
.start("Subsys2", subsys2)
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Subsys1", subsys1));
s.start(SubsystemBuilder::new("Subsys2", subsys2));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
Loading

0 comments on commit 926d497

Please sign in to comment.