Skip to content

Commit

Permalink
spawn instrumented task contextually (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
BugenZhao authored Mar 28, 2024
1 parent e2b0cf9 commit 3f1915f
Show file tree
Hide file tree
Showing 10 changed files with 439 additions and 216 deletions.
58 changes: 58 additions & 0 deletions examples/spawn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This example shows how to spawn tasks with `await_tree::spawn` that are automatically registered
//! to the current registry of the scope.
use std::time::Duration;

use await_tree::{Config, InstrumentAwait, Registry};
use futures::future::pending;
use tokio::time::sleep;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct Actor(usize);

async fn actor(i: usize) {
// Since we're already inside the scope of a registered/instrumented task, we can directly spawn
// new tasks with `await_tree::spawn` to also register them in the same registry.
await_tree::spawn_anonymous(format!("background task {i}"), async {
pending::<()>().await;
})
.instrument_await("waiting for background task")
.await
.unwrap();
}

#[tokio::main]
async fn main() {
let registry = Registry::new(Config::default());

for i in 0..3 {
let root = registry.register(Actor(i), format!("actor {i}"));
tokio::spawn(root.instrument(actor(i)));
}

sleep(Duration::from_secs(1)).await;

for (_actor, tree) in registry.collect::<Actor>() {
// actor 0 [1.004s]
// waiting for background task [1.004s]
println!("{tree}");
}
for tree in registry.collect_anonymous() {
// background task 0 [1.004s]
println!("{tree}");
}
}
24 changes: 9 additions & 15 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

use std::fmt::{Debug, Write};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use indextree::{Arena, NodeId};
use itertools::Itertools;
use parking_lot::{Mutex, MutexGuard};

use crate::root::current_context;
use crate::Span;

/// Node in the span tree.
Expand All @@ -42,11 +42,13 @@ impl SpanNode {
}
}

/// The id of an await-tree context. We will check the id recorded in the instrumented future
/// against the current task-local context before trying to update the tree.
// Also used as the key for anonymous trees in the registry.
// Intentionally made private to prevent users from reusing the same id when registering a new tree.
/// The id of an await-tree context.
///
/// We will check the id recorded in the instrumented future against the current task-local context
/// before trying to update the tree.
///
/// Also used as the key for anonymous trees in the registry. Intentionally made private to prevent
/// users from reusing the same id when registering a new tree.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) struct ContextId(u64);

Expand Down Expand Up @@ -253,17 +255,9 @@ impl TreeContext {
}
}

tokio::task_local! {
pub(crate) static CONTEXT: Arc<TreeContext>
}

pub(crate) fn context() -> Option<Arc<TreeContext>> {
CONTEXT.try_with(Arc::clone).ok()
}

/// Get the await-tree of current task. Returns `None` if we're not instrumented.
///
/// This is useful if you want to check which component or runtime task is calling this function.
pub fn current_tree() -> Option<Tree> {
context().map(|c| c.tree().clone())
current_context().map(|c| c.tree().clone())
}
7 changes: 4 additions & 3 deletions src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::task::Poll;
use indextree::NodeId;
use pin_project::{pin_project, pinned_drop};

use crate::context::{context, ContextId};
use crate::context::ContextId;
use crate::root::current_context;
use crate::Span;

enum State {
Expand Down Expand Up @@ -57,7 +58,7 @@ impl<F: Future, const VERBOSE: bool> Future for Instrumented<F, VERBOSE> {

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let context = context();
let context = current_context();

let (context, this_node) = match this.state {
State::Initial(span) => {
Expand Down Expand Up @@ -140,7 +141,7 @@ impl<F: Future, const VERBOSE: bool> PinnedDrop for Instrumented<F, VERBOSE> {
State::Polled {
this_node,
this_context_id,
} => match context() {
} => match current_context() {
// Context correct
Some(c) if c.id() == *this_context_id => {
c.tree().remove_and_detach(*this_node);
Expand Down
6 changes: 5 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ mod context;
mod future;
mod obj_utils;
mod registry;
mod root;
mod spawn;

pub use context::current_tree;
use flexstr::SharedStr;
pub use future::Instrumented;
pub use registry::{AnyKey, Config, ConfigBuilder, ConfigBuilderError, Key, Registry, TreeRoot};
pub use registry::{AnyKey, Config, ConfigBuilder, ConfigBuilderError, Key, Registry};
pub use root::TreeRoot;
pub use spawn::{spawn, spawn_anonymous};

/// A cheaply cloneable span in the await-tree.
#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
Expand Down
39 changes: 19 additions & 20 deletions src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@

use std::any::Any;
use std::fmt::Debug;
use std::future::Future;
use std::hash::Hash;
use std::sync::{Arc, Weak};

use derive_builder::Builder;
use parking_lot::RwLock;
use weak_table::WeakValueHashMap;

use crate::context::{ContextId, Tree, TreeContext, CONTEXT};
use crate::context::{ContextId, Tree, TreeContext};
use crate::obj_utils::{DynEq, DynHash};
use crate::Span;
use crate::{Span, TreeRoot};

/// Configuration for an await-tree registry, which affects the behavior of all await-trees in the
/// registry.
Expand All @@ -42,20 +41,6 @@ impl Default for Config {
}
}

/// The root of an await-tree.
pub struct TreeRoot {
context: Arc<TreeContext>,
#[allow(dead_code)]
registry: Weak<RegistryCore>,
}

impl TreeRoot {
/// Instrument the given future with the context of this tree root.
pub async fn instrument<F: Future>(self, future: F) -> F::Output {
CONTEXT.scope(self.context, future).await
}
}

/// A key that can be used to identify a task and its await-tree in the [`Registry`].
///
/// All thread-safe types that can be used as a key of a hash map are automatically implemented with
Expand Down Expand Up @@ -103,7 +88,6 @@ impl AnyKey {

type Contexts = RwLock<WeakValueHashMap<AnyKey, Weak<TreeContext>>>;

#[derive(Debug)]
struct RegistryCore {
contexts: Contexts,
config: Config,
Expand All @@ -112,9 +96,16 @@ struct RegistryCore {
/// The registry of multiple await-trees.
///
/// Can be cheaply cloned to share the same registry.
#[derive(Debug)]
pub struct Registry(Arc<RegistryCore>);

impl Debug for Registry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Registry")
.field("config", self.config())
.finish_non_exhaustive()
}
}

impl Clone for Registry {
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
Expand Down Expand Up @@ -150,7 +141,7 @@ impl Registry {

TreeRoot {
context,
registry: Arc::downgrade(&self.0),
registry: WeakRegistry(Arc::downgrade(&self.0)),
}
}

Expand Down Expand Up @@ -227,6 +218,14 @@ impl Registry {
}
}

pub(crate) struct WeakRegistry(Weak<RegistryCore>);

impl WeakRegistry {
pub fn upgrade(&self) -> Option<Registry> {
self.0.upgrade().map(Registry)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
45 changes: 45 additions & 0 deletions src/root.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::sync::Arc;

use crate::context::TreeContext;
use crate::registry::WeakRegistry;
use crate::Registry;

/// The root of an await-tree.
pub struct TreeRoot {
pub(crate) context: Arc<TreeContext>,
pub(crate) registry: WeakRegistry,
}

tokio::task_local! {
pub(crate) static ROOT: TreeRoot
}

pub(crate) fn current_context() -> Option<Arc<TreeContext>> {
ROOT.try_with(|r| r.context.clone()).ok()
}

pub(crate) fn current_registry() -> Option<Registry> {
ROOT.try_with(|r| r.registry.upgrade()).ok().flatten()
}

impl TreeRoot {
/// Instrument the given future with the context of this tree root.
pub async fn instrument<F: Future>(self, future: F) -> F::Output {
ROOT.scope(self, future).await
}
}
59 changes: 59 additions & 0 deletions src/spawn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// TODO: should we consider exposing `current_registry`
// so that users can not only spawn tasks but also get and collect trees?

// TODO: should we support "global registry" for users to quick start?

use std::future::Future;

use tokio::task::JoinHandle;

use crate::root::current_registry;
use crate::{Key, Span};

/// Spawns a new asynchronous task instrumented with the given root [`Span`], returning a
/// [`JoinHandle`] for it.
///
/// The spawned task will be registered in the current [`Registry`](crate::Registry) with the given
/// [`Key`], if it exists. Otherwise, this is equivalent to [`tokio::spawn`].
pub fn spawn<T>(key: impl Key, root_span: impl Into<Span>, future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
if let Some(registry) = current_registry() {
tokio::spawn(registry.register(key, root_span).instrument(future))
} else {
tokio::spawn(future)
}
}

/// Spawns a new asynchronous task instrumented with the given root [`Span`], returning a
/// [`JoinHandle`] for it.
///
/// The spawned task will be registered in the current [`Registry`](crate::Registry) anonymously, if
/// it exists. Otherwise, this is equivalent to [`tokio::spawn`].
pub fn spawn_anonymous<T>(root_span: impl Into<Span>, future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
if let Some(registry) = current_registry() {
tokio::spawn(registry.register_anonymous(root_span).instrument(future))
} else {
tokio::spawn(future)
}
}
Loading

0 comments on commit 3f1915f

Please sign in to comment.