From c58eaf545ba17f3016e481381498064a38dc3c80 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 28 Mar 2024 14:56:21 +0800 Subject: [PATCH] improve usability of registry (#12) --- benches/basic.rs | 14 ++-- examples/basic.rs | 4 +- examples/detach.rs | 2 +- examples/multiple.rs | 10 ++- examples/verbose.rs | 2 +- src/lib.rs | 7 +- src/obj_utils.rs | 74 ++++++++++++++++++ src/registry.rs | 182 +++++++++++++++++++++++++++++++++++-------- src/tests.rs | 12 ++- 9 files changed, 256 insertions(+), 51 deletions(-) create mode 100644 src/obj_utils.rs diff --git a/benches/basic.rs b/benches/basic.rs index 288217f..01694e4 100644 --- a/benches/basic.rs +++ b/benches/basic.rs @@ -69,13 +69,15 @@ async fn test_baseline() { } async fn spawn_many(size: usize) { - let mut root = Registry::new(Config::default()); + let registry = Registry::new(Config::default()); let mut handles = vec![]; for i in 0..size { let task = async { tokio::time::sleep(Duration::from_millis(10)).await; }; - handles.push(tokio::spawn(root.register(i, "new_task").instrument(task))); + handles.push(tokio::spawn( + registry.register(i, "new_task").instrument(task), + )); } futures::future::try_join_all(handles) .await @@ -102,9 +104,9 @@ fn bench_basic(c: &mut Criterion) { c.bench_function("basic", |b| { b.to_async(runtime()).iter(|| async { let config = ConfigBuilder::default().verbose(false).build().unwrap(); - let mut mgr = Registry::new(config); + let registry = Registry::new(config); - let root = mgr.register(233, "root"); + let root = registry.register(233, "root"); root.instrument(test()).await; }) }); @@ -114,9 +116,9 @@ fn bench_basic_baseline(c: &mut Criterion) { c.bench_function("basic_baseline", |b| { b.to_async(runtime()).iter(|| async { let config = ConfigBuilder::default().verbose(false).build().unwrap(); - let mut mgr = Registry::new(config); + let registry = Registry::new(config); - let root = mgr.register(233, "root"); + let root = registry.register(233, "root"); black_box(root); test_baseline().await }) diff --git a/examples/basic.rs b/examples/basic.rs index 82f81f2..1a365a2 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -43,12 +43,12 @@ async fn foo() { #[tokio::main] async fn main() { - let mut registry = Registry::new(Config::default()); + let registry = Registry::new(Config::default()); let root = registry.register((), "foo"); tokio::spawn(root.instrument(foo())); sleep(Duration::from_secs(1)).await; - let tree = registry.get(&()).unwrap().to_string(); + let tree = registry.get(()).unwrap().to_string(); // foo [1.006s] // bar [1.006s] diff --git a/examples/detach.rs b/examples/detach.rs index d698bbe..3d01a42 100644 --- a/examples/detach.rs +++ b/examples/detach.rs @@ -47,7 +47,7 @@ async fn work(rx: Receiver<()>) { #[tokio::main] async fn main() { - let mut registry = Registry::new(Config::default()); + let registry = Registry::new(Config::default()); let root = registry.register((), "work"); let (tx, rx) = oneshot::channel(); tokio::spawn(root.instrument(work(rx))); diff --git a/examples/multiple.rs b/examples/multiple.rs index 72572bf..bf57ba9 100644 --- a/examples/multiple.rs +++ b/examples/multiple.rs @@ -31,8 +31,8 @@ async fn foo() { #[tokio::main] async fn main() { - let mut registry = Registry::new(Config::default()); - for i in 0..3 { + let registry = Registry::new(Config::default()); + for i in 0_i32..3 { let root = registry.register(i, format!("actor {i}")); tokio::spawn(root.instrument(work(i))); } @@ -50,7 +50,11 @@ async fn main() { // actor 2 [1.007s] // actor work 2 [1.007s] // pending [1.007s] - for (_, tree) in registry.iter().sorted_by_key(|(i, _)| *i) { + for (_, tree) in registry + .collect::() + .into_iter() + .sorted_by_key(|(i, _)| *i) + { println!("{tree}"); } } diff --git a/examples/verbose.rs b/examples/verbose.rs index 38e72db..6b6a785 100644 --- a/examples/verbose.rs +++ b/examples/verbose.rs @@ -27,7 +27,7 @@ async fn foo() { async fn work(verbose: bool) -> String { let config = ConfigBuilder::default().verbose(verbose).build().unwrap(); - let mut registry = Registry::new(config); + let registry = Registry::new(config); let root = registry.register((), "foo"); tokio::spawn(root.instrument(foo())); diff --git a/src/lib.rs b/src/lib.rs index 4fe2d2b..39e4c93 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,16 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Instrument await-tree for actor-based applications. + +#![forbid(missing_docs)] + use std::future::Future; mod context; mod future; +mod obj_utils; mod registry; pub use context::{current_tree, TreeContext}; use flexstr::SharedStr; pub use future::Instrumented; -pub use registry::{Config, ConfigBuilder, ConfigBuilderError, Registry, TreeRoot}; +pub use registry::{AnyKey, Config, ConfigBuilder, ConfigBuilderError, Key, Registry, TreeRoot}; /// A cheaply cloneable span in the await-tree. #[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] diff --git a/src/obj_utils.rs b/src/obj_utils.rs new file mode 100644 index 0000000..f44d8c3 --- /dev/null +++ b/src/obj_utils.rs @@ -0,0 +1,74 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities for using `Any` as the key of a `HashMap`. + +// Adopted from: +// https://github.com/bevyengine/bevy/blob/56bcbb097552b45e3ff48c48947ed8ee4e2c24b1/crates/bevy_utils/src/label.rs + +use std::any::Any; +use std::hash::{Hash, Hasher}; + +/// An object safe version of [`Eq`]. This trait is automatically implemented +/// for any `'static` type that implements `Eq`. +pub(crate) trait DynEq: Any { + /// Casts the type to `dyn Any`. + fn as_any(&self) -> &dyn Any; + + /// This method tests for `self` and `other` values to be equal. + /// + /// Implementers should avoid returning `true` when the underlying types are + /// not the same. + fn dyn_eq(&self, other: &dyn DynEq) -> bool; +} + +impl DynEq for T +where + T: Any + Eq, +{ + fn as_any(&self) -> &dyn Any { + self + } + + fn dyn_eq(&self, other: &dyn DynEq) -> bool { + if let Some(other) = other.as_any().downcast_ref::() { + return self == other; + } + false + } +} + +/// An object safe version of [`Hash`]. This trait is automatically implemented +/// for any `'static` type that implements `Hash`. +pub(crate) trait DynHash: DynEq { + /// Casts the type to `dyn Any`. + fn as_dyn_eq(&self) -> &dyn DynEq; + + /// Feeds this value into the given [`Hasher`]. + fn dyn_hash(&self, state: &mut dyn Hasher); +} + +impl DynHash for T +where + T: DynEq + Hash, +{ + fn as_dyn_eq(&self) -> &dyn DynEq { + self + } + + fn dyn_hash(&self, mut state: &mut dyn Hasher) { + T::hash(self, &mut state); + self.type_id().hash(&mut state); + } +} diff --git a/src/registry.rs b/src/registry.rs index 9ddafc7..e5586c2 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -12,15 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::borrow::Borrow; +use std::any::Any; +use std::fmt::Debug; use std::future::Future; use std::hash::Hash; use std::sync::{Arc, Weak}; use derive_builder::Builder; +use parking_lot::RwLock; use weak_table::WeakValueHashMap; use crate::context::{Tree, TreeContext, CONTEXT}; +use crate::obj_utils::{DynEq, DynHash}; use crate::Span; /// Configuration for an await-tree registry, which affects the behavior of all await-trees in the @@ -42,6 +45,8 @@ impl Default for Config { /// The root of an await-tree. pub struct TreeRoot { context: Arc, + #[allow(dead_code)] + registry: Weak, } impl TreeRoot { @@ -51,59 +56,168 @@ impl TreeRoot { } } -/// The registry of multiple await-trees. +/// A key that can be used to identify a task and its await-tree in the [`Registry`]. +/// +/// All thread-safe types that can be used as a key of a hash map are automatically implemented with +/// this trait. +pub trait Key: Hash + Eq + Debug + Send + Sync + 'static {} +impl Key for T where T: Hash + Eq + Debug + Send + Sync + 'static {} + +/// The object-safe version of [`Key`], automatically implemented. +trait ObjKey: DynHash + DynEq + Debug + Send + Sync + 'static {} +impl ObjKey for T where T: DynHash + DynEq + Debug + Send + Sync + 'static {} + +/// Type-erased key for the [`Registry`]. +#[derive(Debug, Clone)] +pub struct AnyKey(Arc); + +impl PartialEq for AnyKey { + fn eq(&self, other: &Self) -> bool { + self.0.dyn_eq(other.0.as_dyn_eq()) + } +} + +impl Eq for AnyKey {} + +impl Hash for AnyKey { + fn hash(&self, state: &mut H) { + self.0.dyn_hash(state); + } +} + +impl AnyKey { + fn new(key: impl ObjKey) -> Self { + Self(Arc::new(key)) + } + + /// Cast the key to `dyn Any`. + pub fn as_any(&self) -> &dyn Any { + self.0.as_ref().as_any() + } +} + +type Contexts = RwLock>>; + #[derive(Debug)] -pub struct Registry { - contexts: WeakValueHashMap>, +struct RegistryCore { + contexts: Contexts, config: Config, } -impl Registry -where - K: std::hash::Hash + Eq + std::fmt::Debug, -{ +/// The registry of multiple await-trees. +/// +/// Can be cheaply cloned to share the same registry. +#[derive(Debug)] +pub struct Registry(Arc); + +impl Clone for Registry { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } +} + +impl Registry { + fn contexts(&self) -> &Contexts { + &self.0.contexts + } + + fn config(&self) -> &Config { + &self.0.config + } +} + +impl Registry { /// Create a new registry with given `config`. pub fn new(config: Config) -> Self { - Self { - contexts: WeakValueHashMap::new(), - config, - } + Self( + RegistryCore { + contexts: Default::default(), + config, + } + .into(), + ) } -} -impl Registry -where - K: std::hash::Hash + Eq + std::fmt::Debug, -{ /// Register with given key. Returns a [`TreeRoot`] that can be used to instrument a future. /// /// If the key already exists, a new [`TreeRoot`] is returned and the reference to the old /// [`TreeRoot`] is dropped. - pub fn register(&mut self, key: K, root_span: impl Into) -> TreeRoot { - let context = Arc::new(TreeContext::new(root_span.into(), self.config.verbose)); - self.contexts.insert(key, Arc::clone(&context)); + pub fn register(&self, key: impl Key, root_span: impl Into) -> TreeRoot { + let context = Arc::new(TreeContext::new(root_span.into(), self.config().verbose)); + self.contexts() + .write() + .insert(AnyKey::new(key), Arc::clone(&context)); - TreeRoot { context } - } - - /// Iterate over the clones of all registered await-trees. - pub fn iter(&self) -> impl Iterator { - self.contexts.iter().map(|(k, v)| (k, v.tree().clone())) + TreeRoot { + context, + registry: Arc::downgrade(&self.0), + } } /// Get a clone of the await-tree with given key. /// /// Returns `None` if the key does not exist or the tree root has been dropped. - pub fn get(&self, k: &Q) -> Option - where - K: Borrow, - Q: Hash + Eq, - { - self.contexts.get(k).map(|v| v.tree().clone()) + pub fn get(&self, key: impl Key) -> Option { + self.contexts() + .read() + .get(&AnyKey::new(key)) // TODO: accept ref can? + .map(|v| v.tree().clone()) } /// Remove all the registered await-trees. - pub fn clear(&mut self) { - self.contexts.clear(); + pub fn clear(&self) { + self.contexts().write().clear(); + } + + /// Collect the snapshots of all await-trees with the key of type `K`. + pub fn collect(&self) -> Vec<(K, Tree)> { + self.contexts() + .read() + .iter() + .filter_map(|(k, v)| { + k.0.as_ref() + .as_any() + .downcast_ref::() + .map(|k| (k.clone(), v.tree().clone())) + }) + .collect() + } + + /// Collect the snapshots of all await-trees regardless of the key type. + pub fn collect_all(&self) -> Vec<(AnyKey, Tree)> { + self.contexts() + .read() + .iter() + .map(|(k, v)| (k.clone(), v.tree().clone())) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_registry() { + let registry = Registry::new(Config::default()); + + let _0_i32 = registry.register(0_i32, "0"); + let _1_i32 = registry.register(1_i32, "1"); + let _2_i32 = registry.register(2_i32, "2"); + + let _0_str = registry.register("0", "0"); + let _1_str = registry.register("1", "1"); + + let _unit = registry.register((), "()"); + let _unit_replaced = registry.register((), "[]"); + + let i32s = registry.collect::(); + assert_eq!(i32s.len(), 3); + + let strs = registry.collect::<&'static str>(); + assert_eq!(strs.len(), 2); + + let units = registry.collect::<()>(); + assert_eq!(units.len(), 1); } } diff --git a/src/tests.rs b/src/tests.rs index cc99a34..cc99260 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -137,8 +137,8 @@ async fn hello() { #[tokio::test] async fn test_await_tree() { - let mut registry = Registry::new(Config::default()); - let root = registry.register(233, "actor 233"); + let registry = Registry::new(Config::default()); + let root = registry.register((), "actor 233"); let fut = root.instrument(hello()); pin_mut!(fut); @@ -175,7 +175,13 @@ async fn test_await_tree() { let mut actual_counts = vec![]; poll_fn(|cx| { - let tree = registry.iter().exactly_one().ok().unwrap().1; + let tree = registry + .collect::<()>() + .into_iter() + .exactly_one() + .ok() + .unwrap() + .1; println!("{tree}"); actual_counts.push((tree.active_node_count(), tree.detached_node_count())); fut.poll_unpin(cx)