Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tokio taskname registration for use in tokio-console #89

Merged
merged 19 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ jobs:
uses: taiki-e/install-action@cross

- name: Build
run: cross build --all-features --release --target=${{ matrix.target }}
run: cross build --release --target=${{ matrix.target }}

build-examples:
name: Build Examples
runs-on: ubuntu-latest
needs: [lints, docs]
env:
RUSTFLAGS: "-D warnings"
RUSTFLAGS: "-D warnings --cfg tokio_unstable"
steps:
- name: Checkout sources
uses: actions/checkout@v4
Expand Down Expand Up @@ -112,9 +112,6 @@ jobs:
- name: Check with minimal versions
run: cargo minimal-versions check --workspace --ignore-private

- name: Test with minimal versions
run: cargo minimal-versions test -- --test-threads 1

min-versions-msrv:
name: Minimal Dependency Versions (MSRV)
runs-on: ubuntu-latest
Expand Down Expand Up @@ -165,7 +162,7 @@ jobs:
run: cargo fmt --all -- --check

- name: Run cargo clippy
run: cargo clippy --all-features --all-targets -- -D warnings
run: cargo clippy --all-targets -- -D warnings

docs:
name: Documentation
Expand Down
3 changes: 1 addition & 2 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ jobs:
uses: actions/checkout@v4
- name: Install llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
#- uses: Swatinem/rust-cache@v1
- name: Compute Coverage
run:
cargo llvm-cov --all-features --workspace --ignore-filename-regex tests.rs --codecov --output-path codecov.json
cargo llvm-cov --workspace --ignore-filename-regex tests.rs --codecov --output-path codecov.json
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
env:
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/rust-clippy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ jobs:
- name: Run rust-clippy
run:
cargo clippy
--all-features
--all-targets
--message-format=json | clippy-sarif | tee rust-clippy-results.sarif | sarif-fmt
continue-on-error: true
Expand Down
18 changes: 18 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ exclude = [
"/UPCOMING_VERSION_CHANGES.txt",
]

[features]
# Enable task naming and task caller location.
tracing = ["tokio/tracing"]

[[example]]
name = "tokio_console"
required-features = ["tracing"]


[dependencies]
tracing = { version = "0.1.37", default-features = false }

Expand Down Expand Up @@ -67,10 +76,19 @@ headers = ">= 0.3.5" # Required to fix minimal-versions
serde_urlencoded = ">= 0.7.1" # Required to fix minimal-versions
unicode-linebreak = ">= 0.1.5" # Required to fix minimal-versions

gcc = ">= 0.3.4" # Required to fix minimal-versions

# tokio-console
console-subscriber = "0.2.0"

# For testing unix signals
[target.'cfg(unix)'.dev-dependencies]
nix = { version = "0.29.0", default-features = false, features = ["signal"] }

# Make leak sanitizer more reliable
[profile.dev]
opt-level = 1

# Define `tokio_unstable` config for linter
[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
66 changes: 66 additions & 0 deletions examples/tokio_console.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//! This example demonstrates how to use the tokio-console application for tracing tokio tasks's
//! runtime behaviour. Subsystems will appear under their registration names.
//!
//! Run this example with:
//!
//! ```
//! RUSTFLAGS="--cfg tokio_unstable" cargo run --features "tracing" --example tokio_console
//! ```
//!
//! Then, open the `tokio-console` application (see https://crates.io/crates/tokio-console) to
//! follow the subsystem tasks live.

use miette::Result;
Fixed Show fixed Hide fixed
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{FutureExt, SubsystemBuilder, SubsystemHandle, Toplevel};
use tracing::Level;
use tracing_subscriber::{fmt::writer::MakeWriterExt, prelude::*};

async fn child(subsys: SubsystemHandle) -> Result<()> {
sleep(Duration::from_millis(3000))
.cancel_on_shutdown(&subsys)
.await
.ok();
Ok(())
}

async fn parent(subsys: SubsystemHandle) -> Result<()> {
tracing::info!("Parent started.");

let mut iteration = 0;
while !subsys.is_shutdown_requested() {
subsys.start(SubsystemBuilder::new(format!("child{iteration}"), child));
iteration += 1;

sleep(Duration::from_millis(1000))
.cancel_on_shutdown(&subsys)
.await
.ok();
}

tracing::info!("Parent stopped.");
Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
// Init tokio-console server and tracing
let console_layer = console_subscriber::spawn();
tracing_subscriber::registry()
.with(console_layer)
.with(
tracing_subscriber::fmt::layer()
.with_writer(std::io::stdout.with_max_level(Level::DEBUG))
.compact(),
)
.init();

// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("parent", parent));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ mod into_subsystem;
mod runner;
mod signal_handling;
mod subsystem;
mod tokio_task;
mod toplevel;
mod utils;

Expand Down
101 changes: 54 additions & 47 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(crate) struct SubsystemRunner {
}

impl SubsystemRunner {
#[track_caller]
pub(crate) fn new<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
name: Arc<str>,
subsystem: Subsys,
Expand All @@ -32,8 +33,8 @@ impl SubsystemRunner {
Fut: 'static + Future<Output = Result<(), Err>> + Send,
Err: Into<ErrType>,
{
let future = async { run_subsystem(name, subsystem, subsystem_handle, guard).await };
let aborthandle = tokio::spawn(future).abort_handle();
let future = run_subsystem(name, subsystem, subsystem_handle, guard);
let aborthandle = crate::tokio_task::spawn(future, "subsystem_runner").abort_handle();
SubsystemRunner { aborthandle }
}
}
Expand All @@ -44,67 +45,73 @@ impl Drop for SubsystemRunner {
}
}

async fn run_subsystem<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
#[track_caller]
fn run_subsystem<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
name: Arc<str>,
subsystem: Subsys,
mut subsystem_handle: SubsystemHandle<ErrType>,
guard: AliveGuard,
) where
) -> impl Future<Output = ()> + 'static
where
Subsys: 'static + FnOnce(SubsystemHandle<ErrType>) -> Fut + Send,
Fut: 'static + Future<Output = Result<(), Err>> + Send,
Err: Into<ErrType>,
{
let mut redirected_subsystem_handle = subsystem_handle.delayed_clone();

let future = async { subsystem(subsystem_handle).await.map_err(|e| e.into()) };
let join_handle = tokio::spawn(future);
let join_handle = crate::tokio_task::spawn(future, &name);

// Abort on drop
guard.on_cancel({
let abort_handle = join_handle.abort_handle();
let name = Arc::clone(&name);
move || {
if !abort_handle.is_finished() {
tracing::warn!("Subsystem cancelled: '{}'", name);
async move {
// Abort on drop
guard.on_cancel({
let abort_handle = join_handle.abort_handle();
let name = Arc::clone(&name);
move || {
if !abort_handle.is_finished() {
tracing::warn!("Subsystem cancelled: '{}'", name);
}
abort_handle.abort();
}
abort_handle.abort();
}
});
});

let failure = match join_handle.await {
Ok(Ok(())) => None,
Ok(Err(e)) => Some(SubsystemError::Failed(name, SubsystemFailure(e))),
Err(e) => {
// We can assume that this is a panic, because a cancellation
// can never happen as long as we still hold `guard`.
assert!(e.is_panic());
Some(SubsystemError::Panicked(name))
}
};
let failure = match join_handle.await {
Ok(Ok(())) => None,
Ok(Err(e)) => Some(SubsystemError::Failed(name, SubsystemFailure(e))),
Err(e) => {
// We can assume that this is a panic, because a cancellation
// can never happen as long as we still hold `guard`.
assert!(e.is_panic());
Some(SubsystemError::Panicked(name))
}
};

// Retrieve the handle that was passed into the subsystem.
// Originally it was intended to pass the handle as reference, but due
// to complications (https://stackoverflow.com/questions/77172947/async-lifetime-issues-of-pass-by-reference-parameters)
// it was decided to pass ownership instead.
//
// It is still important that the handle does not leak out of the subsystem.
let subsystem_handle = match redirected_subsystem_handle.try_recv() {
Ok(s) => s,
Err(_) => {
tracing::error!("The SubsystemHandle object must not be leaked out of the subsystem!");
panic!("The SubsystemHandle object must not be leaked out of the subsystem!");
// Retrieve the handle that was passed into the subsystem.
// Originally it was intended to pass the handle as reference, but due
// to complications (https://stackoverflow.com/questions/77172947/async-lifetime-issues-of-pass-by-reference-parameters)
// it was decided to pass ownership instead.
//
// It is still important that the handle does not leak out of the subsystem.
let subsystem_handle = match redirected_subsystem_handle.try_recv() {
Ok(s) => s,
Err(_) => {
tracing::error!(
"The SubsystemHandle object must not be leaked out of the subsystem!"
);
panic!("The SubsystemHandle object must not be leaked out of the subsystem!");
}
};

// Raise potential errors
let joiner_token = subsystem_handle.joiner_token;
if let Some(failure) = failure {
joiner_token.raise_failure(failure);
}
};

// Raise potential errors
let joiner_token = subsystem_handle.joiner_token;
if let Some(failure) = failure {
joiner_token.raise_failure(failure);
// Wait for children to finish before we destroy the `SubsystemHandle` object.
// Otherwise the children would be cancelled immediately.
//
// This is the main mechanism that forwards a cancellation to all the children.
joiner_token.downgrade().join().await;
}

// Wait for children to finish before we destroy the `SubsystemHandle` object.
// Otherwise the children would be cancelled immediately.
//
// This is the main mechanism that forwards a cancellation to all the children.
joiner_token.downgrade().join().await;
}
8 changes: 7 additions & 1 deletion src/subsystem/subsystem_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
/// Ok(())
/// }
/// ```
#[track_caller]
pub fn start<Err, Fut, Subsys>(
&self,
builder: SubsystemBuilder<ErrType, Err, Fut, Subsys>,
Expand All @@ -82,7 +83,11 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
Err: Into<ErrType>,
{
self.start_with_abs_name(
Arc::from(format!("{}/{}", self.inner.name, builder.name)),
if self.inner.name.as_ref() == "/" {
Arc::from(format!("/{}", builder.name))
} else {
Arc::from(format!("{}/{}", self.inner.name, builder.name))
},
builder.subsystem,
ErrorActions {
on_failure: Atomic::new(builder.failure_action),
Expand All @@ -92,6 +97,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
)
}

#[track_caller]
pub(crate) fn start_with_abs_name<Err, Fut, Subsys>(
&self,
name: Arc<str>,
Expand Down
23 changes: 23 additions & 0 deletions src/tokio_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::future::Future;
use tokio::task::JoinHandle;

#[cfg(not(all(tokio_unstable, feature = "tracing")))]
#[track_caller]
pub(crate) fn spawn<F: Future + Send + 'static>(f: F, _name: &str) -> JoinHandle<F::Output>
where
<F as Future>::Output: Send + 'static,
{
tokio::spawn(f)
}

#[cfg(all(tokio_unstable, feature = "tracing"))]
#[track_caller]
pub(crate) fn spawn<F: Future + Send + 'static>(f: F, name: &str) -> JoinHandle<F::Output>
where
<F as Future>::Output: Send + 'static,
{
tokio::task::Builder::new()
.name(name)
.spawn(f)
.expect("a task should be spawned")
}
15 changes: 10 additions & 5 deletions src/toplevel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
/// * `subsystem` - The subsystem that should be spawned as the root node.
/// Usually the job of this subsystem is to spawn further subsystems.
#[allow(clippy::new_without_default)]
#[track_caller]
pub fn new<Fut, Subsys>(subsystem: Subsys) -> Self
where
Subsys: 'static + FnOnce(SubsystemHandle<ErrType>) -> Fut + Send,
Expand All @@ -78,7 +79,7 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
});

let toplevel_subsys = root_handle.start_with_abs_name(
Arc::from(""),
Arc::from("/"),
move |s| async move {
subsystem(s).await;
Result::<(), ErrType>::Ok(())
Expand Down Expand Up @@ -118,13 +119,17 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
///
/// Especially the caveats from [tokio::signal::unix::Signal] are important for Unix targets.
///
#[track_caller]
pub fn catch_signals(self) -> Self {
let shutdown_token = self.root_handle.get_cancellation_token().clone();

tokio::spawn(async move {
wait_for_signal().await;
shutdown_token.cancel();
});
crate::tokio_task::spawn(
async move {
wait_for_signal().await;
shutdown_token.cancel();
},
"catch_signals",
);

self
}
Expand Down
Loading