Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spmc bench added #5

Merged
merged 1 commit into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ If write calls will be synchronized - all messages will be ordered by that "sync

## Example

Write from multiple threads, read from multiple threads.
Write from multiple threads, read from multiple threads:

```rust
const WRITERS : usize = 4;
Expand Down Expand Up @@ -78,14 +78,17 @@ std::thread::scope(|s| {
});
```

See [examples](examples).

## Benchmarks

Intel i4771 (3.5Ghz 4C/8T), DDR3 1600Mhz, Windows 10. See [benchmarks](benchmarks) sub-project.

![seq benchmark](doc/img/benchmarks/seq.svg)
![spsc benchmark](doc/img/benchmarks/spsc.svg)
![mpsc benchmark](doc/img/benchmarks/mpsc.svg)
![mpmc benchmark](doc/img/benchmarks/mpmc.svg)
![broadcast mpmc benchmark](doc/img/benchmarks/mpmc.svg)
![broadcast spmc benchmark](doc/img/benchmarks/spmc.svg)

Benchmarks compare with a channels since chute can be used +/- as a channel, by
spinning on the reader side.
Expand Down
12 changes: 9 additions & 3 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,23 @@ charming = { git = "https://github.com/yuankunzhang/charming.git", rev = "aa18c2
#charming = { version = "0.4", features = ["ssr"] }

[[bench]]
name = "mpmc"
name = "seq"
harness = false

[[bench]]
name = "spsc"
harness = false

[[bench]]
name = "mpsc"
harness = false

# MULTICAST

[[bench]]
name = "spsc"
name = "mpmc"
harness = false

[[bench]]
name = "seq"
name = "spmc"
harness = false
2 changes: 1 addition & 1 deletion benchmarks/benches/mpmc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Multiple-producers, multiple-consumers
//! Multicast multiple-producers, multiple-consumers

use chute::LendingReader;
use arrayvec::ArrayVec;
Expand Down
122 changes: 122 additions & 0 deletions benchmarks/benches/spmc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
//! Multicast single-producer, multiple-consumers

use chute::LendingReader;
use arrayvec::ArrayVec;
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use std::sync::Arc;

mod common;
use common::*;

fn chute_spmc(reader_threads: usize){
let mut queue: chute::spmc::Queue<_> = Default::default();

let mut joins: ArrayVec<_, 64> = Default::default();

// READ
for _ in 0..reader_threads {
let mut reader = queue.reader();
joins.push(std::thread::spawn(move || {
for _ in 0..COUNT {
let msg = loop {
if let Some(msg) = reader.next() {
break msg;
}
yield_fn();
};
}
}));
}

// WRITE
for i in 0..COUNT {
queue.push(message::new(i));
}

for join in joins{
join.join().unwrap();
}
}

fn chute_mpmc(reader_threads: usize){
let queue = chute::mpmc::Queue::new();

let mut joins: ArrayVec<_, 64> = Default::default();

// READ
for _ in 0..reader_threads {
let mut reader = queue.reader();
joins.push(std::thread::spawn(move || {
for _ in 0..COUNT {
let msg = loop {
if let Some(msg) = reader.next() {
break msg;
}
yield_fn();
};
}
}));
}

// WRITE
let mut writer = queue.writer();
for i in 0..COUNT {
writer.push(message::new(i));
}

for join in joins{
join.join().unwrap();
}
}

fn tokio_broadcast(reader_threads: usize){
use tokio::sync::broadcast;
let (tx, _) = broadcast::channel(COUNT);

let mut joins: ArrayVec<_, 64> = Default::default();

// READ
for _ in 0..reader_threads {
let mut reader = tx.subscribe();
joins.push(std::thread::spawn(move || {
for _ in 0..COUNT {
reader.blocking_recv().unwrap();
}
}));
}

// WRITE
let mut writer = tx;
for i in 0..COUNT {
writer.send(message::new(i));
}

for join in joins{
join.join().unwrap();
}
}

fn criterion_benchmark(c: &mut Criterion) {
use criterion::BenchmarkId;

let mut group = c.benchmark_group("spmc");
for reader_threads in [1, 2, 4, 8] {
let parameter_string = format!("w:1 r:{}", reader_threads);

group.bench_with_input(BenchmarkId::new("chute::spmc", parameter_string.clone()), &reader_threads
, |b, rt| b.iter(|| chute_spmc(*rt))
);

group.bench_with_input(BenchmarkId::new("chute::mpmc", parameter_string.clone()), &reader_threads
, |b, rt| b.iter(|| chute_mpmc(*rt))
);

group.bench_with_input(BenchmarkId::new("tokio::broadcast", parameter_string.clone()), &reader_threads
, |b, rt| b.iter(|| tokio_broadcast(*rt))
);
}
group.finish();
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
11 changes: 9 additions & 2 deletions benchmarks/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod spmc;
mod mpmc;
mod mpsc;
mod spsc;
mod seq;
mod multi_chart;

use std::collections::{BTreeMap};
use std::fs;
Expand Down Expand Up @@ -31,6 +33,7 @@ fn read_estimate(fname: &Path) -> f64 {
point_estimate.as_f64().unwrap()
}

/// [writer_count][reader_count] -> estimate
type EstimatesMPMC = BTreeMap<usize, BTreeMap<usize, f64>>;

fn read_group(dir_name: &Path, writers: &[usize], readers: &[usize]) -> EstimatesMPMC {
Expand All @@ -54,11 +57,12 @@ const CHART_WIDTH: u32 = 570;

fn main(){
#[derive(Eq, PartialEq)]
enum Command{All, Mpmc, Mpsc, Spsc, Seq}
enum Command{All, Spmc, Mpmc, Mpsc, Spsc, Seq}

let args: Vec<String> = env::args().collect();
let command = match args.get(1).map(|s| s.as_str()) {
None => Command::All,
Some("spmc") => Command::Spmc,
Some("mpmc") => Command::Mpmc,
Some("mpsc") => Command::Mpsc,
Some("spsc") => Command::Spsc,
Expand All @@ -70,7 +74,10 @@ fn main(){
let criterion_dir = current_dir.join("target/criterion");

let _ = fs::create_dir(current_dir.join("out"));


if command == Command::All || command == Command::Spmc {
spmc::spmc(criterion_dir.join("spmc"));
}
if command == Command::All || command == Command::Mpmc {
mpmc::mpmc(criterion_dir.join("mpmc"));
}
Expand Down
124 changes: 16 additions & 108 deletions benchmarks/src/mpmc.rs
Original file line number Diff line number Diff line change
@@ -1,123 +1,31 @@
use std::collections::BTreeMap;
use std::path::Path;
use charming::{component::{Grid, Axis}, Chart, ImageRenderer};
use charming::element::{AxisLabel, AxisType, Color, Formatter};
use charming::element::LabelPosition;
use charming::element::Label;
use charming::series::{Bar, Series};
use charming::component::{Legend};
use charming::component::Title;
use str_macro::str;
use std::string::String;
use charming::element::font_settings::{FontFamily, FontStyle, FontWeight};
use crate::{read_group, EstimatesMPMC};
use crate::CHART_WIDTH;
use crate::CHART_THEME;
use crate::CHART_BACKGROUND;
use crate::{read_group};
use crate::multi_chart::{multi_chart, MultiChartData, Visual};

pub fn mpmc(dir_name: impl AsRef<Path>) {
let rt = 4;
let wts = [1,2,4,8];
let rts = [1,2,4,8];
let read = |dir: &str| -> EstimatesMPMC {
read_group(
let read = |dir: &str| -> BTreeMap<usize, f64> {
let data = read_group(
&std::path::Path::new(dir_name.as_ref()).join(dir)
,&wts, &rts
)
,&wts, &[rt]
);
data.iter().map(|(&wt, readers)| (wt, readers[&rt])).collect()
};

let all: Vec<(String, EstimatesMPMC)> = vec![
let all: MultiChartData = vec![
(str!("chute::spmc\nw/ mutex"), read("chute__spmc_mutex")),
(str!("chute::mpmc"), read("chute__mpmc")),
(str!("tokio::\nbroadcast"), read("tokio__broadcast")),
];

chart(&all, 4, str!("mpmc (4 readers)"), "out/mpmc");
}

/// `rt` - read thread count
pub fn chart(
all_estimates: &Vec<(String, EstimatesMPMC)>,
rt: usize,
title: String,
fname: impl AsRef<Path>
) {
let wts: Vec<usize> = all_estimates.first().unwrap().1
.iter().map(|(wt, _)| *wt)
.collect();

let unit = String::from("ms");
let ns_to_unit = 1.0 / 1_000_000.0;

let mut chart =
Chart::new()
.background_color(CHART_BACKGROUND)
.title(
Title::new()
.text(title)
.left("center")
)
.legend(
Legend::new().top("bottom")
)
.grid(
Grid::new()
.left(100)
.right(40)
.top(40)
.bottom(60)
)
.y_axis(
Axis::new()
.type_(AxisType::Category)
.data(
all_estimates.iter()
.map(|(name,_)| name.clone())
.collect()
)
.axis_label(
AxisLabel::new().show(true)
.font_size(13)
.font_weight(FontWeight::Bolder)
.color("#666666")
)
)
.x_axis(
Axis::new()
.type_(AxisType::Value)
.axis_label(AxisLabel::new().formatter(
Formatter::String(
"{value}".to_string() + &unit
)
))
);

for wt in wts {
let mut bar =
Bar::new()
.name(format!("{wt} writers"))
.label(
Label::new()
.show(true)
.font_size(11)
.font_weight(FontWeight::Bold)
.font_family(FontFamily::MonoSpace)
.position(LabelPosition::InsideRight)
.formatter(Formatter::Function(
(
"function (param) { return param.data.toFixed(2); }"
).into()
))
);
let mut datas = Vec::new();
for (_, estimates) in all_estimates {
let data_ns = estimates[&wt][&rt];
datas.push(data_ns * ns_to_unit);
}
bar = bar.data(datas);
chart = chart.series(Series::Bar(bar));
}

let height = all_estimates.len() as u32 * 80 + 100;
let mut renderer = ImageRenderer::new(CHART_WIDTH, height).theme(CHART_THEME);
renderer.save(&chart, fname.as_ref().with_extension("svg")).unwrap();
renderer.save_format(charming::ImageFormat::Png, &chart, fname.as_ref().with_extension("png")).unwrap();
let visual = Visual{
title: format!("broadcast mpmc ({rt} readers)"),
sub_chart_name: str!("writers"),
label_pos: LabelPosition::Right,
};
multi_chart(&all, "out/mpmc", visual);
}
Loading