diff --git a/Cargo.toml b/Cargo.toml index 7440b3bc..05dd06e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,3 +119,7 @@ rustdoc-args = ["--cfg", "docsrs"] [[bench]] name = "throughput" harness = false + +[[bench]] +name = "xtra_vs_tokio_mpsc" +harness = false diff --git a/benches/xtra_vs_tokio_mpsc.rs b/benches/xtra_vs_tokio_mpsc.rs new file mode 100644 index 00000000..e683d2fc --- /dev/null +++ b/benches/xtra_vs_tokio_mpsc.rs @@ -0,0 +1,116 @@ +use std::future::Future; + +use async_trait::async_trait; +use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; +use xtra::{Actor, Context, Handler}; + +struct Counter(u64); + +#[async_trait] +impl Actor for Counter { + type Stop = (); + async fn stopped(self) -> Self::Stop {} +} + +#[derive(Debug)] +struct Increment {} +struct Stop; + +#[async_trait::async_trait] +impl Handler for Counter { + type Return = (); + + async fn handle(&mut self, _: Increment, _ctx: &mut Context) { + self.0 += 1; + } +} + +#[async_trait::async_trait] +impl Handler for Counter { + type Return = (); + + async fn handle(&mut self, _: Stop, ctx: &mut Context) { + ctx.stop_self(); + } +} + +fn mpsc_counter() -> (mpsc::UnboundedSender, impl Future) { + let (sender, mut receiver) = mpsc::unbounded_channel(); + + let actor = async move { + let mut _counter = 0; + + while let Some(Increment {}) = receiver.recv().await { + _counter += 1; + } + }; + + (sender, actor) +} + +fn xtra_throughput(c: &mut Criterion) { + let mut group = c.benchmark_group("increment"); + let runtime = Runtime::new().unwrap(); + let _g = runtime.enter(); + + for num_messages in [100, 1000, 10000] { + group.bench_with_input( + BenchmarkId::new("xtra", num_messages), + &num_messages, + |b, &num_messages| { + b.to_async(&runtime).iter_batched( + || { + let (xtra_address, xtra_context) = Context::new(None); + runtime.spawn(xtra_context.run(Counter(0))); + + xtra_address + }, + |xtra_address| async move { + for _ in 0..num_messages - 1 { + xtra_address.send(Increment {}).await.unwrap(); + } + + xtra_address.send(Stop).await.unwrap(); + }, + BatchSize::SmallInput, + ); + }, + ); + } +} + +fn mpsc_throughput(c: &mut Criterion) { + let mut group = c.benchmark_group("increment"); + let runtime = Runtime::new().unwrap(); + let _g = runtime.enter(); + + for num_messages in [100, 1000, 10000] { + group.bench_with_input( + BenchmarkId::new("mpsc", num_messages), + &num_messages, + |b, &num_messages| { + b.to_async(&runtime).iter_batched( + || { + let (mpsc_address, mpsc_actor) = mpsc_counter(); + runtime.spawn(mpsc_actor); + + mpsc_address + }, + |mpsc_address| async move { + for _ in 0..num_messages - 1 { + mpsc_address.send(Increment {}).unwrap(); + } + + drop(mpsc_address); + }, + BatchSize::SmallInput, + ); + }, + ); + } +} + +criterion_group!(benches, xtra_throughput, mpsc_throughput); +criterion_main!(benches);