Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tower120 committed Nov 20, 2024
1 parent e505da3 commit d116754
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 101 deletions.
8 changes: 7 additions & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ harness = false
name = "mpsc"
harness = false

# MULTICAST
# UNICAST ONLY

[[bench]]
name = "unicast_spmc"
harness = false

# MULTICAST ONLY

[[bench]]
name = "mpmc"
Expand Down
86 changes: 86 additions & 0 deletions benchmarks/benches/unicast_spmc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//! Multicast single-producer, multiple-consumers
use chute::LendingReader;
use arrayvec::ArrayVec;
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use std::sync::Arc;

mod common;
use common::*;

fn chute_spmc(reader_threads: usize){
let mut queue: chute::unicast::spmc::Queue<_> = Default::default();

let mut joins: ArrayVec<_, 64> = Default::default();

// READ
let read_len = COUNT/reader_threads;
for _ in 0..reader_threads {
let mut reader = queue.reader();
joins.push(std::thread::spawn(move || {
for _ in 0..read_len {
let _ = loop {
if let Some(msg) = reader.next() {
break msg;
}
yield_fn();
};
}
}));
}

// WRITE
for i in 0..COUNT {
queue.push(message::new(i));
}

for join in joins{
join.join().unwrap();
}
}

pub fn crossbeam_unbounded(reader_threads: usize){
let (tx, rx) = crossbeam::channel::unbounded();

let mut joins: ArrayVec<_, 64> = Default::default();

// READ
let read_len = COUNT/reader_threads;
for _ in 0..reader_threads {
let mut rx = rx.clone();
joins.push(std::thread::spawn(move || {
for _ in 0..read_len {
rx.recv().unwrap();
}
}));
}

// WRITE
for i in 0..COUNT {
tx.send(message::new(i));
}

for join in joins{
join.join().unwrap();
}
}

fn criterion_benchmark(c: &mut Criterion) {
use criterion::BenchmarkId;

let mut group = c.benchmark_group("spmc");
for reader_threads in [1, 2, 4, 8] {
let parameter_string = format!("w:1 r:{}", reader_threads);

group.bench_with_input(BenchmarkId::new("chute::spmc", parameter_string.clone()), &reader_threads
, |b, rt| b.iter(|| chute_spmc(*rt))
);
group.bench_with_input(BenchmarkId::new("crossbeam::unbounded", parameter_string.clone()), &reader_threads
, |b, rt| b.iter(|| crossbeam_unbounded(*rt))
);
}
group.finish();
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
35 changes: 18 additions & 17 deletions src/unicast/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,34 @@ use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, AtomicUsize, Ordering}
use branch_hints::unlikely;
use crate::block::CacheLineAlign;

pub const BLOCK_SIZE: usize = if cfg!(miri) { 128 } else { 4096 };
pub const BLOCK_SIZE: usize = if cfg!(miri) { 128 } else { 4096/*1024*/ };

pub type BlockMem<T> = [UnsafeCell<MaybeUninit<T>>; BLOCK_SIZE];

// Lightweight block header. Shared amongst readers and writers.
pub struct Block<T>{
/// Set to None, when the first reader enters the next block.
pub next: spin::Mutex<Option<Arc<Block<T>>>>,


pub read_succ: CacheLineAlign<AtomicUsize>,

pub write_counter: AtomicUsize,

// CacheLineAlign is CRUCIAL here for performance.
pub read_counter : CacheLineAlign<AtomicUsize>,

/// Freed as soon as read_counter == BLOCK_SIZE.
mem_ptr: NonNull<UnsafeCell<[MaybeUninit<T>; BLOCK_SIZE]>>,
/// Freed as soon as read_succ == BLOCK_SIZE.
mem_ptr: NonNull<BlockMem<T>>,
}

impl<T> Default for Block<T>{
fn default() -> Self {
let mem = Box::new(
UnsafeCell::new(
[const{ MaybeUninit::uninit() }; BLOCK_SIZE]
)
[const{ UnsafeCell::new(MaybeUninit::uninit()) }; BLOCK_SIZE]
);
Self{
next: Default::default(),
read_succ: Default::default(),
write_counter: Default::default(),
read_counter : Default::default(),
mem_ptr: unsafe{ NonNull::new_unchecked(Box::into_raw(mem)) },
Expand All @@ -39,9 +44,8 @@ impl<T> Default for Block<T>{
}
impl<T> Drop for Block<T>{
fn drop(&mut self) {
// Drop mem
let read_counter = self.read_counter.load(Ordering::Acquire);
let mem_deallocated = read_counter == BLOCK_SIZE;
let read_succ = self.read_succ.load(Ordering::Acquire);
let mem_deallocated = read_succ == BLOCK_SIZE;
// This could happen either in the very last block,
// or if the whole queue was dropped.
if unlikely(!mem_deallocated) {
Expand All @@ -51,9 +55,9 @@ impl<T> Drop for Block<T>{
if mem::needs_drop::<T>(){
unsafe{
let len = self.write_counter.load(Ordering::Acquire);
let mem = (*mem).get_mut();
for i in read_counter..len {
ptr::drop_in_place(mem.get_unchecked_mut(i).assume_init_mut());
let mem = self.mem_unchecked().cast_mut();
for i in read_succ..len {
ptr::drop_in_place(mem.add(i));
}
}
}
Expand All @@ -66,19 +70,16 @@ impl<T> Drop for Block<T>{
}
}


impl<T> Block<T>{
/// Should be called ONCE.
/// All mem elements must be in destructed state.
#[inline]
pub unsafe fn dealloc_destructed_mem(&self) {
debug_assert!(self.read_counter.load(Ordering::Acquire) == BLOCK_SIZE);
unsafe{ drop(Box::from_raw(self.mem_ptr.as_ptr())); }
}

/// `mem` must exists.
#[inline]
pub unsafe fn mem_unchecked(&self) -> *const T {
self.mem_ptr.as_ref().get().cast()
self.mem_ptr.as_ptr().cast()
}
}
Loading

0 comments on commit d116754

Please sign in to comment.