Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support global registry #17

Merged
merged 5 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ async fn bar(i: i32) {

async fn baz(i: i32) {
// runtime `String` span is also supported
pending()
.instrument_await(format!("pending in baz {i}"))
work()
.instrument_await(format!("working in baz {i}"))
.await
}

Expand All @@ -32,18 +32,19 @@ async fn foo() {
.await;
}

let root = register("foo");
tokio::spawn(root.instrument(foo()));
init_global_registry(Config::default());

await_tree::spawn("foo", "foo", foo());

sleep(Duration::from_secs(1)).await;
let tree = get_tree("foo");
let tree = Registry::current().get("foo").unwrap();

// foo [1.006s]
// bar [1.006s]
// baz in bar [1.006s]
// pending in baz 3 [1.006s]
// working in baz 3 [1.006s]
// baz [1.006s]
// pending in baz 2 [1.006s]
// working in baz 2 [1.006s]
println!("{tree}");
```

Expand All @@ -52,6 +53,7 @@ println!("{tree}");
[`tokio-rs/async-backtrace`](https://github.com/tokio-rs/async-backtrace) is a similar crate that also provides the ability to dump the execution tree of async tasks. Here are some differences between `await-tree` and `async-backtrace`:

**Pros of `await-tree`**:

- `await-tree` support customizing the span with runtime `String`, while `async-backtrace` only supports function name and line number.

This is useful when we want to annotate the span with some dynamic information, such as the identifier of a shared resource (e.g., a lock), to see how the contention happens among different tasks.
Expand All @@ -67,6 +69,7 @@ println!("{tree}");
- `await-tree` maintains the tree structure separately from the `Future` itself, which enables developers to dump the tree at any time with nearly no contention, no matter the `Future` is under active polling or has been pending. For comparison, `async-backtrace` has to [wait](https://docs.rs/async-backtrace/0.2.5/async_backtrace/fn.taskdump_tree.html) for the polling to complete before dumping the tree, which may cause a long delay.

**Pros of `async-backtrace`**:

- `async-backtrace` is under the Tokio organization.

## License
Expand Down
67 changes: 67 additions & 0 deletions examples/global.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This example shows the usage of the global registry.

use std::time::Duration;

use await_tree::{init_global_registry, Config, InstrumentAwait, Registry};
use futures::future::pending;

async fn bar() {
pending::<()>().instrument_await("pending").await;
}

async fn foo() {
await_tree::spawn_anonymous("spawn bar", bar());
bar().instrument_await("bar").await;
}

async fn print() {
tokio::time::sleep(Duration::from_secs(1)).await;

// Access the registry anywhere and collect all trees.
for (key, tree) in Registry::current().collect_all() {
// [Actor 42]
// foo [1.003s]
// bar [1.003s]
// pending [1.003s]
//
// [Anonymous #2]
// spawn bar [1.003s]
// pending [1.003s]
//
// [Print]
// print [1.003s]
println!("[{}]\n{}\n", key, tree);
}
}

#[tokio::main]
async fn main() {
init_global_registry(Config::default());

// After global registry is initialized, the tasks can be spawned everywhere, being
// registered in the global registry.
await_tree::spawn("Actor 42", "foo", foo());

// The line above is a shorthand for the following:
tokio::spawn(
Registry::current()
.register("Print", "print")
.instrument(print()),
)
.await
.unwrap();
}
2 changes: 1 addition & 1 deletion src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl SpanNode {
/// Also used as the key for anonymous trees in the registry. Intentionally made private to prevent
/// users from reusing the same id when registering a new tree.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) struct ContextId(u64);
pub(crate) struct ContextId(pub(crate) u64);

/// An await-tree for a task.
#[derive(Debug, Clone)]
Expand Down
35 changes: 35 additions & 0 deletions src/global.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::OnceLock;

use crate::{Config, Registry};

static GLOBAL_REGISTRY: OnceLock<Registry> = OnceLock::new();

/// Initialize the global registry with the given configuration.
/// Panics if the global registry has already been initialized.
///
/// This is **optional** and only needed if you want to use the global registry.
/// You can always create a new registry with [`Registry::new`] and pass it around to achieve
/// better encapsulation.
pub fn init_global_registry(config: Config) {
if let Err(_r) = GLOBAL_REGISTRY.set(Registry::new(config)) {
panic!("global registry already initialized")
}
}

pub(crate) fn global_registry() -> Option<Registry> {
GLOBAL_REGISTRY.get().cloned()
}
12 changes: 7 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ use std::future::Future;

mod context;
mod future;
mod global;
mod obj_utils;
mod registry;
mod root;
mod spawn;

pub use context::*;
pub use future::*;
pub use registry::*;
pub use root::*;
pub use spawn::*;
pub use context::{current_tree, Tree};
pub use future::Instrumented;
pub use global::init_global_registry;
pub use registry::{AnyKey, Config, ConfigBuilder, ConfigBuilderError, Key, Registry};
pub use root::TreeRoot;
pub use spawn::{spawn, spawn_anonymous};

/// A cheaply cloneable span in the await-tree.
#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
Expand Down
51 changes: 43 additions & 8 deletions src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ impl<T> Key for T where T: Hash + Eq + Debug + Send + Sync + 'static {}
trait ObjKey: DynHash + DynEq + Debug + Send + Sync + 'static {}
impl<T> ObjKey for T where T: DynHash + DynEq + Debug + Send + Sync + 'static {}

/// Key type for anonymous await-trees.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct AnonymousKey(ContextId);

impl Display for AnonymousKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Anonymous #{}", self.0 .0)
}
}

/// Type-erased key for the [`Registry`].
#[derive(Clone)]
pub struct AnyKey(Arc<dyn ObjKey>);
Expand Down Expand Up @@ -79,13 +89,18 @@ impl Debug for AnyKey {
impl Display for AnyKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// TODO: for all `impl Display`?
if let Some(s) = self.as_any().downcast_ref::<String>() {
write!(f, "{}", s)
} else if let Some(s) = self.as_any().downcast_ref::<&str>() {
write!(f, "{}", s)
} else {
write!(f, "{:?}", self)
macro_rules! delegate_to_display {
($($t:ty),* $(,)?) => {
$(
if let Some(k) = self.as_any().downcast_ref::<$t>() {
return write!(f, "{}", k);
}
)*
};
}
delegate_to_display!(String, &str, AnonymousKey);

write!(f, "{:?}", self)
}
}

Expand All @@ -108,7 +123,7 @@ impl AnyKey {

/// Returns whether the key corresponds to an anonymous await-tree.
pub fn is_anonymous(&self) -> bool {
self.as_any().is::<ContextId>()
self.as_any().is::<AnonymousKey>()
}

/// Returns the key as a reference to type `K`, if it is of type `K`.
Expand Down Expand Up @@ -167,6 +182,24 @@ impl Registry {
)
}

/// Returns the current registry, if exists.
///
/// 1. If the current task is registered with a registry, returns the registry.
/// 2. If the global registry is initialized with
/// [`init_global_registry`](crate::global::init_global_registry), returns the global
/// registry.
/// 3. Otherwise, returns `None`.
pub fn try_current() -> Option<Self> {
crate::root::current_registry()
}

/// Returns the current registry, panics if not exists.
///
/// See [`Registry::try_current`] for more information.
pub fn current() -> Self {
Self::try_current().expect("no current registry")
}

fn register_inner(&self, key: impl Key, context: Arc<TreeContext>) -> TreeRoot {
self.contexts()
.write()
Expand All @@ -192,9 +225,11 @@ impl Registry {
///
/// Anonymous await-trees are not able to be retrieved through the [`Registry::get`] method. Use
/// [`Registry::collect_anonymous`] or [`Registry::collect_all`] to collect them.
// TODO: we have keyed and anonymous, should we also have a typed-anonymous (for classification
// only)?
pub fn register_anonymous(&self, root_span: impl Into<Span>) -> TreeRoot {
let context = Arc::new(TreeContext::new(root_span.into(), self.config().verbose));
self.register_inner(context.id(), context) // use the private id as the key
self.register_inner(AnonymousKey(context.id()), context) // use the private id as the key
}

/// Get a clone of the await-tree with given key.
Expand Down
8 changes: 6 additions & 2 deletions src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::future::Future;
use std::sync::Arc;

use crate::context::TreeContext;
use crate::global::global_registry;
use crate::registry::WeakRegistry;
use crate::Registry;

Expand All @@ -26,15 +27,18 @@ pub struct TreeRoot {
}

tokio::task_local! {
pub(crate) static ROOT: TreeRoot
static ROOT: TreeRoot
}

pub(crate) fn current_context() -> Option<Arc<TreeContext>> {
ROOT.try_with(|r| r.context.clone()).ok()
}

pub(crate) fn current_registry() -> Option<Registry> {
ROOT.try_with(|r| r.registry.upgrade()).ok().flatten()
let local = || ROOT.try_with(|r| r.registry.upgrade()).ok().flatten();
let global = global_registry;

local().or_else(global)
}

impl TreeRoot {
Expand Down
17 changes: 9 additions & 8 deletions src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ use std::future::Future;

use tokio::task::JoinHandle;

use crate::root::current_registry;
use crate::{Key, Span};
use crate::{Key, Registry, Span};

/// Spawns a new asynchronous task instrumented with the given root [`Span`], returning a
/// [`JoinHandle`] for it.
///
/// The spawned task will be registered in the current [`Registry`](crate::Registry) with the given
/// [`Key`], if it exists. Otherwise, this is equivalent to [`tokio::spawn`].
/// The spawned task will be registered in the current [`Registry`](crate::Registry) returned by
/// [`Registry::try_current`] with the given [`Key`], if it exists. Otherwise, this is equivalent to
/// [`tokio::spawn`].
pub fn spawn<T>(key: impl Key, root_span: impl Into<Span>, future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
if let Some(registry) = current_registry() {
if let Some(registry) = Registry::try_current() {
tokio::spawn(registry.register(key, root_span).instrument(future))
} else {
tokio::spawn(future)
Expand All @@ -44,14 +44,15 @@ where
/// Spawns a new asynchronous task instrumented with the given root [`Span`], returning a
/// [`JoinHandle`] for it.
///
/// The spawned task will be registered in the current [`Registry`](crate::Registry) anonymously, if
/// it exists. Otherwise, this is equivalent to [`tokio::spawn`].
/// The spawned task will be registered in the current [`Registry`](crate::Registry) returned by
/// [`Registry::try_current`] anonymously, if it exists. Otherwise, this is equivalent to
/// [`tokio::spawn`].
pub fn spawn_anonymous<T>(root_span: impl Into<Span>, future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
if let Some(registry) = current_registry() {
if let Some(registry) = Registry::try_current() {
tokio::spawn(registry.register_anonymous(root_span).instrument(future))
} else {
tokio::spawn(future)
Expand Down
Loading