diff --git a/Readme.md b/Readme.md index e81699e..c9dc896 100644 --- a/Readme.md +++ b/Readme.md @@ -36,7 +36,7 @@ If write calls will be synchronized - all messages will be ordered by that "sync ## Example -Write from multiple threads, read from multiple threads. +Write from multiple threads, read from multiple threads: ```rust const WRITERS : usize = 4; @@ -78,6 +78,8 @@ std::thread::scope(|s| { }); ``` +See [examples](examples). + ## Benchmarks Intel i4771 (3.5Ghz 4C/8T), DDR3 1600Mhz, Windows 10. See [benchmarks](benchmarks) sub-project. @@ -85,7 +87,8 @@ Intel i4771 (3.5Ghz 4C/8T), DDR3 1600Mhz, Windows 10. See [benchmarks](benchmark ![seq benchmark](doc/img/benchmarks/seq.svg) ![spsc benchmark](doc/img/benchmarks/spsc.svg) ![mpsc benchmark](doc/img/benchmarks/mpsc.svg) -![mpmc benchmark](doc/img/benchmarks/mpmc.svg) +![broadcast mpmc benchmark](doc/img/benchmarks/mpmc.svg) +![broadcast spmc benchmark](doc/img/benchmarks/spmc.svg) Benchmarks compare with a channels since chute can be used +/- as a channel, by spinning on the reader side. diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 49b6ffa..06993cb 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -20,17 +20,23 @@ charming = { git = "https://github.com/yuankunzhang/charming.git", rev = "aa18c2 #charming = { version = "0.4", features = ["ssr"] } [[bench]] -name = "mpmc" +name = "seq" +harness = false + +[[bench]] +name = "spsc" harness = false [[bench]] name = "mpsc" harness = false +# MULTICAST + [[bench]] -name = "spsc" +name = "mpmc" harness = false [[bench]] -name = "seq" +name = "spmc" harness = false \ No newline at end of file diff --git a/benchmarks/benches/mpmc.rs b/benchmarks/benches/mpmc.rs index f46e152..0582189 100644 --- a/benchmarks/benches/mpmc.rs +++ b/benchmarks/benches/mpmc.rs @@ -1,4 +1,4 @@ -//! Multiple-producers, multiple-consumers +//! Multicast multiple-producers, multiple-consumers use chute::LendingReader; use arrayvec::ArrayVec; diff --git a/benchmarks/benches/spmc.rs b/benchmarks/benches/spmc.rs new file mode 100644 index 0000000..928036a --- /dev/null +++ b/benchmarks/benches/spmc.rs @@ -0,0 +1,122 @@ +//! Multicast single-producer, multiple-consumers + +use chute::LendingReader; +use arrayvec::ArrayVec; +use criterion::{criterion_group, criterion_main, Criterion, Throughput}; +use std::sync::Arc; + +mod common; +use common::*; + +fn chute_spmc(reader_threads: usize){ + let mut queue: chute::spmc::Queue<_> = Default::default(); + + let mut joins: ArrayVec<_, 64> = Default::default(); + + // READ + for _ in 0..reader_threads { + let mut reader = queue.reader(); + joins.push(std::thread::spawn(move || { + for _ in 0..COUNT { + let msg = loop { + if let Some(msg) = reader.next() { + break msg; + } + yield_fn(); + }; + } + })); + } + + // WRITE + for i in 0..COUNT { + queue.push(message::new(i)); + } + + for join in joins{ + join.join().unwrap(); + } +} + +fn chute_mpmc(reader_threads: usize){ + let queue = chute::mpmc::Queue::new(); + + let mut joins: ArrayVec<_, 64> = Default::default(); + + // READ + for _ in 0..reader_threads { + let mut reader = queue.reader(); + joins.push(std::thread::spawn(move || { + for _ in 0..COUNT { + let msg = loop { + if let Some(msg) = reader.next() { + break msg; + } + yield_fn(); + }; + } + })); + } + + // WRITE + let mut writer = queue.writer(); + for i in 0..COUNT { + writer.push(message::new(i)); + } + + for join in joins{ + join.join().unwrap(); + } +} + +fn tokio_broadcast(reader_threads: usize){ + use tokio::sync::broadcast; + let (tx, _) = broadcast::channel(COUNT); + + let mut joins: ArrayVec<_, 64> = Default::default(); + + // READ + for _ in 0..reader_threads { + let mut reader = tx.subscribe(); + joins.push(std::thread::spawn(move || { + for _ in 0..COUNT { + reader.blocking_recv().unwrap(); + } + })); + } + + // WRITE + let mut writer = tx; + for i in 0..COUNT { + writer.send(message::new(i)); + } + + for join in joins{ + join.join().unwrap(); + } +} + +fn criterion_benchmark(c: &mut Criterion) { + use criterion::BenchmarkId; + + let mut group = c.benchmark_group("spmc"); + for reader_threads in [1, 2, 4, 8] { + let parameter_string = format!("w:1 r:{}", reader_threads); + + group.bench_with_input(BenchmarkId::new("chute::spmc", parameter_string.clone()), &reader_threads + , |b, rt| b.iter(|| chute_spmc(*rt)) + ); + + group.bench_with_input(BenchmarkId::new("chute::mpmc", parameter_string.clone()), &reader_threads + , |b, rt| b.iter(|| chute_mpmc(*rt)) + ); + + group.bench_with_input(BenchmarkId::new("tokio::broadcast", parameter_string.clone()), &reader_threads + , |b, rt| b.iter(|| tokio_broadcast(*rt)) + ); + } + group.finish(); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); \ No newline at end of file diff --git a/benchmarks/src/main.rs b/benchmarks/src/main.rs index 2bb5ffc..07293c8 100644 --- a/benchmarks/src/main.rs +++ b/benchmarks/src/main.rs @@ -1,7 +1,9 @@ +mod spmc; mod mpmc; mod mpsc; mod spsc; mod seq; +mod multi_chart; use std::collections::{BTreeMap}; use std::fs; @@ -31,6 +33,7 @@ fn read_estimate(fname: &Path) -> f64 { point_estimate.as_f64().unwrap() } +/// [writer_count][reader_count] -> estimate type EstimatesMPMC = BTreeMap>; fn read_group(dir_name: &Path, writers: &[usize], readers: &[usize]) -> EstimatesMPMC { @@ -54,11 +57,12 @@ const CHART_WIDTH: u32 = 570; fn main(){ #[derive(Eq, PartialEq)] - enum Command{All, Mpmc, Mpsc, Spsc, Seq} + enum Command{All, Spmc, Mpmc, Mpsc, Spsc, Seq} let args: Vec = env::args().collect(); let command = match args.get(1).map(|s| s.as_str()) { None => Command::All, + Some("spmc") => Command::Spmc, Some("mpmc") => Command::Mpmc, Some("mpsc") => Command::Mpsc, Some("spsc") => Command::Spsc, @@ -70,7 +74,10 @@ fn main(){ let criterion_dir = current_dir.join("target/criterion"); let _ = fs::create_dir(current_dir.join("out")); - + + if command == Command::All || command == Command::Spmc { + spmc::spmc(criterion_dir.join("spmc")); + } if command == Command::All || command == Command::Mpmc { mpmc::mpmc(criterion_dir.join("mpmc")); } diff --git a/benchmarks/src/mpmc.rs b/benchmarks/src/mpmc.rs index ea9fd60..d5876a6 100644 --- a/benchmarks/src/mpmc.rs +++ b/benchmarks/src/mpmc.rs @@ -1,123 +1,31 @@ +use std::collections::BTreeMap; use std::path::Path; -use charming::{component::{Grid, Axis}, Chart, ImageRenderer}; -use charming::element::{AxisLabel, AxisType, Color, Formatter}; use charming::element::LabelPosition; -use charming::element::Label; -use charming::series::{Bar, Series}; -use charming::component::{Legend}; -use charming::component::Title; use str_macro::str; -use std::string::String; -use charming::element::font_settings::{FontFamily, FontStyle, FontWeight}; -use crate::{read_group, EstimatesMPMC}; -use crate::CHART_WIDTH; -use crate::CHART_THEME; -use crate::CHART_BACKGROUND; +use crate::{read_group}; +use crate::multi_chart::{multi_chart, MultiChartData, Visual}; pub fn mpmc(dir_name: impl AsRef) { + let rt = 4; let wts = [1,2,4,8]; - let rts = [1,2,4,8]; - let read = |dir: &str| -> EstimatesMPMC { - read_group( + let read = |dir: &str| -> BTreeMap { + let data = read_group( &std::path::Path::new(dir_name.as_ref()).join(dir) - ,&wts, &rts - ) + ,&wts, &[rt] + ); + data.iter().map(|(&wt, readers)| (wt, readers[&rt])).collect() }; - let all: Vec<(String, EstimatesMPMC)> = vec![ + let all: MultiChartData = vec![ (str!("chute::spmc\nw/ mutex"), read("chute__spmc_mutex")), (str!("chute::mpmc"), read("chute__mpmc")), (str!("tokio::\nbroadcast"), read("tokio__broadcast")), ]; - chart(&all, 4, str!("mpmc (4 readers)"), "out/mpmc"); -} - -/// `rt` - read thread count -pub fn chart( - all_estimates: &Vec<(String, EstimatesMPMC)>, - rt: usize, - title: String, - fname: impl AsRef -) { - let wts: Vec = all_estimates.first().unwrap().1 - .iter().map(|(wt, _)| *wt) - .collect(); - - let unit = String::from("ms"); - let ns_to_unit = 1.0 / 1_000_000.0; - - let mut chart = - Chart::new() - .background_color(CHART_BACKGROUND) - .title( - Title::new() - .text(title) - .left("center") - ) - .legend( - Legend::new().top("bottom") - ) - .grid( - Grid::new() - .left(100) - .right(40) - .top(40) - .bottom(60) - ) - .y_axis( - Axis::new() - .type_(AxisType::Category) - .data( - all_estimates.iter() - .map(|(name,_)| name.clone()) - .collect() - ) - .axis_label( - AxisLabel::new().show(true) - .font_size(13) - .font_weight(FontWeight::Bolder) - .color("#666666") - ) - ) - .x_axis( - Axis::new() - .type_(AxisType::Value) - .axis_label(AxisLabel::new().formatter( - Formatter::String( - "{value}".to_string() + &unit - ) - )) - ); - - for wt in wts { - let mut bar = - Bar::new() - .name(format!("{wt} writers")) - .label( - Label::new() - .show(true) - .font_size(11) - .font_weight(FontWeight::Bold) - .font_family(FontFamily::MonoSpace) - .position(LabelPosition::InsideRight) - .formatter(Formatter::Function( - ( - "function (param) { return param.data.toFixed(2); }" - ).into() - )) - ); - let mut datas = Vec::new(); - for (_, estimates) in all_estimates { - let data_ns = estimates[&wt][&rt]; - datas.push(data_ns * ns_to_unit); - } - bar = bar.data(datas); - chart = chart.series(Series::Bar(bar)); - } - - let height = all_estimates.len() as u32 * 80 + 100; - let mut renderer = ImageRenderer::new(CHART_WIDTH, height).theme(CHART_THEME); - renderer.save(&chart, fname.as_ref().with_extension("svg")).unwrap(); - renderer.save_format(charming::ImageFormat::Png, &chart, fname.as_ref().with_extension("png")).unwrap(); + let visual = Visual{ + title: format!("broadcast mpmc ({rt} readers)"), + sub_chart_name: str!("writers"), + label_pos: LabelPosition::Right, + }; + multi_chart(&all, "out/mpmc", visual); } \ No newline at end of file diff --git a/benchmarks/src/mpsc.rs b/benchmarks/src/mpsc.rs index 5e2c131..6e01123 100644 --- a/benchmarks/src/mpsc.rs +++ b/benchmarks/src/mpsc.rs @@ -1,25 +1,31 @@ +use std::collections::BTreeMap; use std::path::Path; use str_macro::str; -use std::string::String; -use crate::{read_group, EstimatesMPMC}; -use crate::mpmc; +use charming::element::LabelPosition; +use crate::{read_group}; +use crate::multi_chart::{multi_chart, MultiChartData, Visual}; pub fn mpsc(dir_name: impl AsRef) { let wts = [1,2,4,8]; - let rts = [1]; - let read = |dir: &str| -> EstimatesMPMC { - read_group( + let read = |dir: &str| -> BTreeMap { + let data = read_group( &std::path::Path::new(dir_name.as_ref()).join(dir) - ,&wts, &rts - ) + ,&wts, &[1] + ); + data.iter().map(|(&wt, readers)| (wt, readers[&1])).collect() }; - let all: Vec<(String, EstimatesMPMC)> = vec![ + let all: MultiChartData = vec![ (str!("chute::spmc\nw/ mutex"), read("chute__spmc_mutex")), (str!("chute::mpmc"), read("chute__mpmc")), (str!("crossbeam::\nunbounded"), read("crossbeam__unbounded")), (str!("flume::\nunbounded"), read("flume__unbounded")), ]; - mpmc::chart(&all, 1, str!("mpsc"), "out/mpsc"); + let visual = Visual{ + title: format!("mpsc"), + sub_chart_name: str!("writers"), + label_pos: LabelPosition::InsideRight, + }; + multi_chart(&all, "out/mpsc", visual); } \ No newline at end of file diff --git a/benchmarks/src/multi_chart.rs b/benchmarks/src/multi_chart.rs new file mode 100644 index 0000000..b694eb2 --- /dev/null +++ b/benchmarks/src/multi_chart.rs @@ -0,0 +1,113 @@ +use std::collections::BTreeMap; +use std::path::Path; +use charming::{Chart, ImageRenderer}; +use charming::component::{Axis, Grid, Legend, Title}; +use charming::element::{AxisLabel, AxisType, Formatter, Label, LabelPosition}; +use charming::element::font_settings::{FontFamily, FontWeight}; +use charming::series::{Bar, Series}; +use crate::{CHART_BACKGROUND, CHART_THEME, CHART_WIDTH}; + +pub struct Visual { + pub title: String, + pub label_pos: LabelPosition, + pub sub_chart_name: String, // This can be HashMap as well. +} +/*impl Default for Visual { + fn default() -> Self { + Visual{ + title: String::new(), + label_pos: LabelPosition::InsideRight, + sub_chart_name: + } + } +}*/ + +pub type MultiChartData = Vec<(String, BTreeMap)>; + +pub fn multi_chart( + all_estimates: &MultiChartData, + fname: impl AsRef, + visual: Visual, +) { + let sub_chart_ids: Vec = all_estimates.first().unwrap().1 + .iter().map(|(id, _)| *id) + .collect(); + + let unit = String::from("ms"); + let ns_to_unit = 1.0 / 1_000_000.0; + + let mut chart = + Chart::new() + .background_color(CHART_BACKGROUND) + .title( + Title::new() + .text(visual.title) + .left("center") + ) + .legend( + Legend::new().top("bottom") + ) + .grid( + Grid::new() + .left(100) + .right(40) + .top(40) + .bottom(60) + ) + .y_axis( + Axis::new() + .type_(AxisType::Category) + .data( + all_estimates.iter() + .map(|(name,_)| name.clone()) + .collect() + ) + .axis_label( + AxisLabel::new().show(true) + .font_size(13) + .font_weight(FontWeight::Bolder) + .color("#666666") + ) + ) + .x_axis( + Axis::new() + .type_(AxisType::Value) + .axis_label(AxisLabel::new().formatter( + Formatter::String( + "{value}".to_string() + &unit + ) + )) + ); + + // Sub charts + for sub_chart_id in sub_chart_ids { + let mut bar = + Bar::new() + .name(format!("{sub_chart_id} {:}", visual.sub_chart_name)) + .label( + Label::new() + .show(true) + .font_size(11) + .font_weight(FontWeight::Bold) + .font_family(FontFamily::MonoSpace) + .position(visual.label_pos.clone()) + .formatter(Formatter::Function( + ( + "function (param) { return param.data.toFixed(2); }" + ).into() + )) + ); + let mut datas = Vec::new(); + for (_, estimates) in all_estimates { + let data_ns = estimates[&sub_chart_id]; + datas.push(data_ns * ns_to_unit); + } + bar = bar.data(datas); + chart = chart.series(Series::Bar(bar)); + } + + let height = all_estimates.len() as u32 * 80 + 100; + let mut renderer = ImageRenderer::new(CHART_WIDTH, height).theme(CHART_THEME); + renderer.save(&chart, fname.as_ref().with_extension("svg")).unwrap(); + renderer.save_format(charming::ImageFormat::Png, &chart, fname.as_ref().with_extension("png")).unwrap(); +} \ No newline at end of file diff --git a/benchmarks/src/spmc.rs b/benchmarks/src/spmc.rs new file mode 100644 index 0000000..385572e --- /dev/null +++ b/benchmarks/src/spmc.rs @@ -0,0 +1,30 @@ +use std::collections::BTreeMap; +use std::path::Path; +use charming::element::LabelPosition; +use str_macro::str; +use crate::{read_group}; +use crate::multi_chart::{multi_chart, MultiChartData, Visual}; + +pub fn spmc(dir_name: impl AsRef) { + let rts = [1,2,4,8]; + let read = |dir: &str| -> BTreeMap { + let data = read_group( + &std::path::Path::new(dir_name.as_ref()).join(dir) + ,&[1], &rts + ); + data[&1].clone() + }; + + let all: MultiChartData = vec![ + (str!("chute::spmc"), read("chute__spmc")), + (str!("chute::mpmc"), read("chute__mpmc")), + (str!("tokio::\nbroadcast"), read("tokio__broadcast")), + ]; + + let visual = Visual{ + title: str!("broadcast spmc"), + sub_chart_name: str!("readers"), + label_pos: LabelPosition::Right, + }; + multi_chart(&all, "out/spmc", visual); +} \ No newline at end of file diff --git a/doc/img/benchmarks/mpmc.svg b/doc/img/benchmarks/mpmc.svg index 746167a..ad780cf 100644 --- a/doc/img/benchmarks/mpmc.svg +++ b/doc/img/benchmarks/mpmc.svg @@ -35,23 +35,23 @@ - - + + -5.09 -7.17 -28.87 -9.55 -6.80 -32.21 -12.28 -6.66 -72.85 -16.89 -5.59 -42.03 +5.09 +7.17 +28.87 +9.55 +6.80 +32.21 +12.28 +6.57 +71.00 +16.89 +5.59 +42.03 1 writers @@ -65,8 +65,8 @@ 8 writers - -mpmc (4 readers) + +broadcast mpmc (4 readers) + \ No newline at end of file