diff --git a/README.md b/README.md index 3eeeb6a..8e05b9b 100644 --- a/README.md +++ b/README.md @@ -18,8 +18,8 @@ async fn bar(i: i32) { async fn baz(i: i32) { // runtime `String` span is also supported - pending() - .instrument_await(format!("pending in baz {i}")) + work() + .instrument_await(format!("working in baz {i}")) .await } @@ -32,18 +32,19 @@ async fn foo() { .await; } -let root = register("foo"); -tokio::spawn(root.instrument(foo())); +init_global_registry(Config::default()); + +await_tree::spawn("foo", "foo", foo()); sleep(Duration::from_secs(1)).await; -let tree = get_tree("foo"); +let tree = Registry::current().get("foo").unwrap(); // foo [1.006s] // bar [1.006s] // baz in bar [1.006s] -// pending in baz 3 [1.006s] +// working in baz 3 [1.006s] // baz [1.006s] -// pending in baz 2 [1.006s] +// working in baz 2 [1.006s] println!("{tree}"); ``` @@ -52,6 +53,7 @@ println!("{tree}"); [`tokio-rs/async-backtrace`](https://github.com/tokio-rs/async-backtrace) is a similar crate that also provides the ability to dump the execution tree of async tasks. Here are some differences between `await-tree` and `async-backtrace`: **Pros of `await-tree`**: + - `await-tree` support customizing the span with runtime `String`, while `async-backtrace` only supports function name and line number. This is useful when we want to annotate the span with some dynamic information, such as the identifier of a shared resource (e.g., a lock), to see how the contention happens among different tasks. @@ -67,6 +69,7 @@ println!("{tree}"); - `await-tree` maintains the tree structure separately from the `Future` itself, which enables developers to dump the tree at any time with nearly no contention, no matter the `Future` is under active polling or has been pending. For comparison, `async-backtrace` has to [wait](https://docs.rs/async-backtrace/0.2.5/async_backtrace/fn.taskdump_tree.html) for the polling to complete before dumping the tree, which may cause a long delay. **Pros of `async-backtrace`**: + - `async-backtrace` is under the Tokio organization. ## License diff --git a/examples/global.rs b/examples/global.rs new file mode 100644 index 0000000..7b79d17 --- /dev/null +++ b/examples/global.rs @@ -0,0 +1,67 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This example shows the usage of the global registry. + +use std::time::Duration; + +use await_tree::{init_global_registry, Config, InstrumentAwait, Registry}; +use futures::future::pending; + +async fn bar() { + pending::<()>().instrument_await("pending").await; +} + +async fn foo() { + await_tree::spawn_anonymous("spawn bar", bar()); + bar().instrument_await("bar").await; +} + +async fn print() { + tokio::time::sleep(Duration::from_secs(1)).await; + + // Access the registry anywhere and collect all trees. + for (key, tree) in Registry::current().collect_all() { + // [Actor 42] + // foo [1.003s] + // bar [1.003s] + // pending [1.003s] + // + // [Anonymous #2] + // spawn bar [1.003s] + // pending [1.003s] + // + // [Print] + // print [1.003s] + println!("[{}]\n{}\n", key, tree); + } +} + +#[tokio::main] +async fn main() { + init_global_registry(Config::default()); + + // After global registry is initialized, the tasks can be spawned everywhere, being + // registered in the global registry. + await_tree::spawn("Actor 42", "foo", foo()); + + // The line above is a shorthand for the following: + tokio::spawn( + Registry::current() + .register("Print", "print") + .instrument(print()), + ) + .await + .unwrap(); +} diff --git a/src/context.rs b/src/context.rs index 695143b..8c8c321 100644 --- a/src/context.rs +++ b/src/context.rs @@ -50,7 +50,7 @@ impl SpanNode { /// Also used as the key for anonymous trees in the registry. Intentionally made private to prevent /// users from reusing the same id when registering a new tree. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub(crate) struct ContextId(u64); +pub(crate) struct ContextId(pub(crate) u64); /// An await-tree for a task. #[derive(Debug, Clone)] diff --git a/src/global.rs b/src/global.rs new file mode 100644 index 0000000..fa65a34 --- /dev/null +++ b/src/global.rs @@ -0,0 +1,35 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::OnceLock; + +use crate::{Config, Registry}; + +static GLOBAL_REGISTRY: OnceLock = OnceLock::new(); + +/// Initialize the global registry with the given configuration. +/// Panics if the global registry has already been initialized. +/// +/// This is **optional** and only needed if you want to use the global registry. +/// You can always create a new registry with [`Registry::new`] and pass it around to achieve +/// better encapsulation. +pub fn init_global_registry(config: Config) { + if let Err(_r) = GLOBAL_REGISTRY.set(Registry::new(config)) { + panic!("global registry already initialized") + } +} + +pub(crate) fn global_registry() -> Option { + GLOBAL_REGISTRY.get().cloned() +} diff --git a/src/lib.rs b/src/lib.rs index d297563..caf082a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,16 +20,18 @@ use std::future::Future; mod context; mod future; +mod global; mod obj_utils; mod registry; mod root; mod spawn; -pub use context::*; -pub use future::*; -pub use registry::*; -pub use root::*; -pub use spawn::*; +pub use context::{current_tree, Tree}; +pub use future::Instrumented; +pub use global::init_global_registry; +pub use registry::{AnyKey, Config, ConfigBuilder, ConfigBuilderError, Key, Registry}; +pub use root::TreeRoot; +pub use spawn::{spawn, spawn_anonymous}; /// A cheaply cloneable span in the await-tree. #[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] diff --git a/src/registry.rs b/src/registry.rs index 39d076c..6b68999 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -52,6 +52,16 @@ impl Key for T where T: Hash + Eq + Debug + Send + Sync + 'static {} trait ObjKey: DynHash + DynEq + Debug + Send + Sync + 'static {} impl ObjKey for T where T: DynHash + DynEq + Debug + Send + Sync + 'static {} +/// Key type for anonymous await-trees. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +struct AnonymousKey(ContextId); + +impl Display for AnonymousKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Anonymous #{}", self.0 .0) + } +} + /// Type-erased key for the [`Registry`]. #[derive(Clone)] pub struct AnyKey(Arc); @@ -79,13 +89,18 @@ impl Debug for AnyKey { impl Display for AnyKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { // TODO: for all `impl Display`? - if let Some(s) = self.as_any().downcast_ref::() { - write!(f, "{}", s) - } else if let Some(s) = self.as_any().downcast_ref::<&str>() { - write!(f, "{}", s) - } else { - write!(f, "{:?}", self) + macro_rules! delegate_to_display { + ($($t:ty),* $(,)?) => { + $( + if let Some(k) = self.as_any().downcast_ref::<$t>() { + return write!(f, "{}", k); + } + )* + }; } + delegate_to_display!(String, &str, AnonymousKey); + + write!(f, "{:?}", self) } } @@ -108,7 +123,7 @@ impl AnyKey { /// Returns whether the key corresponds to an anonymous await-tree. pub fn is_anonymous(&self) -> bool { - self.as_any().is::() + self.as_any().is::() } /// Returns the key as a reference to type `K`, if it is of type `K`. @@ -167,6 +182,24 @@ impl Registry { ) } + /// Returns the current registry, if exists. + /// + /// 1. If the current task is registered with a registry, returns the registry. + /// 2. If the global registry is initialized with + /// [`init_global_registry`](crate::global::init_global_registry), returns the global + /// registry. + /// 3. Otherwise, returns `None`. + pub fn try_current() -> Option { + crate::root::current_registry() + } + + /// Returns the current registry, panics if not exists. + /// + /// See [`Registry::try_current`] for more information. + pub fn current() -> Self { + Self::try_current().expect("no current registry") + } + fn register_inner(&self, key: impl Key, context: Arc) -> TreeRoot { self.contexts() .write() @@ -192,9 +225,11 @@ impl Registry { /// /// Anonymous await-trees are not able to be retrieved through the [`Registry::get`] method. Use /// [`Registry::collect_anonymous`] or [`Registry::collect_all`] to collect them. + // TODO: we have keyed and anonymous, should we also have a typed-anonymous (for classification + // only)? pub fn register_anonymous(&self, root_span: impl Into) -> TreeRoot { let context = Arc::new(TreeContext::new(root_span.into(), self.config().verbose)); - self.register_inner(context.id(), context) // use the private id as the key + self.register_inner(AnonymousKey(context.id()), context) // use the private id as the key } /// Get a clone of the await-tree with given key. diff --git a/src/root.rs b/src/root.rs index 0d12aed..2f6a3a8 100644 --- a/src/root.rs +++ b/src/root.rs @@ -16,6 +16,7 @@ use std::future::Future; use std::sync::Arc; use crate::context::TreeContext; +use crate::global::global_registry; use crate::registry::WeakRegistry; use crate::Registry; @@ -26,7 +27,7 @@ pub struct TreeRoot { } tokio::task_local! { - pub(crate) static ROOT: TreeRoot + static ROOT: TreeRoot } pub(crate) fn current_context() -> Option> { @@ -34,7 +35,10 @@ pub(crate) fn current_context() -> Option> { } pub(crate) fn current_registry() -> Option { - ROOT.try_with(|r| r.registry.upgrade()).ok().flatten() + let local = || ROOT.try_with(|r| r.registry.upgrade()).ok().flatten(); + let global = global_registry; + + local().or_else(global) } impl TreeRoot { diff --git a/src/spawn.rs b/src/spawn.rs index 4e41f26..86003d0 100644 --- a/src/spawn.rs +++ b/src/spawn.rs @@ -21,20 +21,20 @@ use std::future::Future; use tokio::task::JoinHandle; -use crate::root::current_registry; -use crate::{Key, Span}; +use crate::{Key, Registry, Span}; /// Spawns a new asynchronous task instrumented with the given root [`Span`], returning a /// [`JoinHandle`] for it. /// -/// The spawned task will be registered in the current [`Registry`](crate::Registry) with the given -/// [`Key`], if it exists. Otherwise, this is equivalent to [`tokio::spawn`]. +/// The spawned task will be registered in the current [`Registry`](crate::Registry) returned by +/// [`Registry::try_current`] with the given [`Key`], if it exists. Otherwise, this is equivalent to +/// [`tokio::spawn`]. pub fn spawn(key: impl Key, root_span: impl Into, future: T) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static, { - if let Some(registry) = current_registry() { + if let Some(registry) = Registry::try_current() { tokio::spawn(registry.register(key, root_span).instrument(future)) } else { tokio::spawn(future) @@ -44,14 +44,15 @@ where /// Spawns a new asynchronous task instrumented with the given root [`Span`], returning a /// [`JoinHandle`] for it. /// -/// The spawned task will be registered in the current [`Registry`](crate::Registry) anonymously, if -/// it exists. Otherwise, this is equivalent to [`tokio::spawn`]. +/// The spawned task will be registered in the current [`Registry`](crate::Registry) returned by +/// [`Registry::try_current`] anonymously, if it exists. Otherwise, this is equivalent to +/// [`tokio::spawn`]. pub fn spawn_anonymous(root_span: impl Into, future: T) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static, { - if let Some(registry) = current_registry() { + if let Some(registry) = Registry::try_current() { tokio::spawn(registry.register_anonymous(root_span).instrument(future)) } else { tokio::spawn(future)