From 7ae76c574725b58a6d889f15df6b7f46201789c3 Mon Sep 17 00:00:00 2001 From: Pierre Brouca Date: Sat, 13 Nov 2021 11:20:28 +0100 Subject: [PATCH] Initial implementation (#1) --- .github/workflows/ci.yml | 58 +++++ .github/workflows/pr-security-audit.yml | 21 ++ .github/workflows/security-audit.yml | 22 ++ .gitignore | 2 + CHANGELOG.md | 1 + Cargo.toml | 12 + README.md | 183 +++++++++++++++ benches/queue.rs | 51 ++++ src/cache_pad.rs | 62 +++++ src/lib.rs | 173 ++++++++++++++ src/node.rs | 99 ++++++++ src/queue.rs | 295 ++++++++++++++++++++++++ src/slot.rs | 97 ++++++++ src/variant.rs | 79 +++++++ tests/loom_queue.rs | 121 ++++++++++ tests/queue.rs | 131 +++++++++++ 16 files changed, 1407 insertions(+) create mode 100644 .github/workflows/ci.yml create mode 100644 .github/workflows/pr-security-audit.yml create mode 100644 .github/workflows/security-audit.yml create mode 100644 .gitignore create mode 100644 CHANGELOG.md create mode 100644 Cargo.toml create mode 100644 benches/queue.rs create mode 100644 src/cache_pad.rs create mode 100644 src/lib.rs create mode 100644 src/node.rs create mode 100644 src/queue.rs create mode 100644 src/slot.rs create mode 100644 src/variant.rs create mode 100644 tests/loom_queue.rs create mode 100644 tests/queue.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..414bc0b --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,58 @@ +name: CI + +on: + push: + branches: ['main', 'v*.x'] + pull_request: + branches: ['main', 'v*.x'] + +env: + RUSTFLAGS: -Dwarnings + RUST_BACKTRACE: 1 + +jobs: + ci-msrv: + runs-on: ubuntu-latest + strategy: + matrix: + rust: + - stable + - beta + - nightly + - 1.56.1 # MSRV + + steps: + - uses: actions/checkout@v2 + + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: ${{ matrix.rust }} + override: true + components: rustfmt, clippy + + - uses: actions-rs/cargo@v1 + with: + command: build + + - uses: actions-rs/cargo@v1 + with: + command: test + + - uses: actions-rs/cargo@v1 + env: + RUSTFLAGS: --cfg loom -Dwarnings + LOOM_MAX_PREEMPTIONS: 2 + SCOPE: ${{ matrix.scope }} + with: + command: test + args: --lib --release -- --nocapture $SCOPE + + - uses: actions-rs/cargo@v1 + with: + command: fmt + args: --all -- --check + + - uses: actions-rs/cargo@v1 + with: + command: clippy diff --git a/.github/workflows/pr-security-audit.yml b/.github/workflows/pr-security-audit.yml new file mode 100644 index 0000000..5fcdfba --- /dev/null +++ b/.github/workflows/pr-security-audit.yml @@ -0,0 +1,21 @@ +name: PR Security Audit + +on: + push: + paths: + - '**/Cargo.toml' + pull_request: + paths: + - '**/Cargo.toml' + +jobs: + security-audit: + runs-on: ubuntu-latest + if: "!contains(github.event.head_commit.message, 'ci skip')" + steps: + - uses: actions/checkout@v2 + + - name: Audit Check + uses: actions-rs/audit-check@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/security-audit.yml b/.github/workflows/security-audit.yml new file mode 100644 index 0000000..e596d2b --- /dev/null +++ b/.github/workflows/security-audit.yml @@ -0,0 +1,22 @@ +name: Security Audit + +on: + push: + branches: + - master + paths: + - '**/Cargo.toml' + schedule: + - cron: '0 7 * * 1' # run at 7 AM UTC on Monday + +jobs: + security-audit: + runs-on: ubuntu-latest + if: "!contains(github.event.head_commit.message, 'ci skip')" + steps: + - uses: actions/checkout@v2 + + - name: Audit Check + uses: actions-rs/audit-check@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..825c32f --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1 @@ +# Changelog diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..7e94567 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "lf-queue" +description = "A lock-free multi-producer multi-consumer unbounded queue." +version = "0.1.0" +license = "MIT" +edition = "2021" +authors = ["Pierre Brouca "] +categories = ["concurrency", "data-structures"] +keywords = ["spsc", "mpsc", "spmc", "mpmc",] + +[target.'cfg(loom)'.dependencies] +loom = "0.5" diff --git a/README.md b/README.md index b7fd665..ac4db08 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,185 @@ # lf-queue + +[![Crates.io](https://img.shields.io/crates/v/lf-queue)](https://crates.io/crates/lf-queue) +[![Documentation](https://docs.rs/lf-queue/badge.svg)](https://docs.rs/lf-queue) +[![Build Status](https://github.com/broucz/lf-queue/workflows/CI/badge.svg)](https://github.com/broucz/lf-queue/actions/workflows/ci.yml?query=branch%3Amain) +[![MIT licensed](https://img.shields.io/crates/l/lf-queue)](LICENSE) + A lock-free multi-producer multi-consumer unbounded queue. + +## Examples + +```toml +[dependencies] +lf-queue = "0.1" +``` + +Single Producer - Single Consumer: + +```rust +use lf_queue::Queue; + +fn main() { + const COUNT: usize = 1_000; + let queue: Queue = Queue::new(); + + for i in 0..COUNT { + queue.push(i); + } + + for i in 0..COUNT { + assert_eq!(i, queue.pop().unwrap()); + } + + assert!(queue.pop().is_none()); +} +``` + +Multi Producer - Single Consumer: + +```rust +use lf_queue::Queue; +use std::thread; + +fn main() { + const COUNT: usize = 1_000; + const CONCURRENCY: usize = 4; + + let queue: Queue = Queue::new(); + + let ths: Vec<_> = (0..CONCURRENCY) + .map(|_| { + let q = queue.clone(); + thread::spawn(move || { + for i in 0..COUNT { + q.push(i); + } + }) + }) + .collect(); + + for th in ths { + th.join().unwrap(); + } + + for _ in 0..COUNT * CONCURRENCY { + assert!(queue.pop().is_some()); + } + + assert!(queue.pop().is_none()); +} +``` + +Single Producer - Multi Consumer: + +```rust +use lf_queue::Queue; +use std::thread; + +fn main() { + const COUNT: usize = 1_000; + const CONCURRENCY: usize = 4; + + let queue: Queue = Queue::new(); + + for i in 0..COUNT * CONCURRENCY { + queue.push(i); + } + + let ths: Vec<_> = (0..CONCURRENCY) + .map(|_| { + let q = queue.clone(); + thread::spawn(move || { + for _ in 0..COUNT { + loop { + if q.pop().is_some() { + break; + } + } + } + }) + }) + .collect(); + + for th in ths { + th.join().unwrap(); + } + + assert!(queue.pop().is_none()); +} + +``` + +Multi Producer - Multi Consumer: + +```rust +use lf_queue::Queue; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread; + +fn main() { + const COUNT: usize = 1_000; + const CONCURRENCY: usize = 4; + + let queue: Queue = Queue::new(); + let items = Arc::new((0..COUNT).map(|_| AtomicUsize::new(0)).collect::>()); + + let ths: Vec<_> = (0..CONCURRENCY) + .map(|_| { + let q = queue.clone(); + let its = items.clone(); + thread::spawn(move || { + for _ in 0..COUNT { + let n = loop { + if let Some(x) = q.pop() { + break x; + } else { + thread::yield_now(); + } + }; + its[n].fetch_add(1, Ordering::SeqCst); + } + }) + }) + .map(|_| { + let q = queue.clone(); + thread::spawn(move || { + for i in 0..COUNT { + q.push(i); + } + }) + }) + .collect(); + + for th in ths { + th.join().unwrap(); + } + + thread::sleep(std::time::Duration::from_millis(10)); + + for c in &*items { + assert_eq!(c.load(Ordering::SeqCst), CONCURRENCY); + } + + assert!(queue.pop().is_none()); +} +``` + +## Acknowledgement + +This implementation of a lock-free queue in Rust took inspiration from the [`concurrent-queue`](https://github.com/smol-rs/concurrent-queue) crate and aims to be used for educational purposes. The code documentation help you to discover the algorithm used to implement a concurrent lock-free queue in Rust, but might not yet be beginner-friendly. More details and learning materials will be added over time. + +## License + +This project is licensed under the [MIT license](LICENSE). + +## Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, shall be licensed as above, without any additional +terms or conditions. + +Note that, as of now, my focus is on improving the documentation of this crate, not adding any additional feature. Please open an issue and start a discussion before working on any significant PR. + +Contributions are welcome. diff --git a/benches/queue.rs b/benches/queue.rs new file mode 100644 index 0000000..c7a1f02 --- /dev/null +++ b/benches/queue.rs @@ -0,0 +1,51 @@ +#![feature(test)] +extern crate test; + +use lf_queue::Queue; + +// cargo +nightly bench +#[cfg(test)] +mod tests { + use super::*; + use test::Bencher; + + // cargo +nightly bench --package lf-queue --bench queue -- tests::mpmc --exact + // + // Latest results: + // - MacBook Air (M1, 2020): 260,718 ns/iter (+/- 16,344) + #[bench] + fn mpmc(b: &mut Bencher) { + const COUNT: usize = 1_000; + const CONCURRENCY: usize = 4; + let queue: Queue = Queue::new(); + + b.iter(|| { + let ths: Vec<_> = (0..CONCURRENCY) + .map(|_| { + let q = queue.clone(); + std::thread::spawn(move || { + for _ in 0..COUNT { + loop { + if q.pop().is_some() { + break; + } + } + } + }) + }) + .map(|_| { + let q = queue.clone(); + std::thread::spawn(move || { + for i in 0..COUNT { + q.push(i); + } + }) + }) + .collect(); + + for th in ths { + th.join().unwrap(); + } + }); + } +} diff --git a/src/cache_pad.rs b/src/cache_pad.rs new file mode 100644 index 0000000..6b8bbd6 --- /dev/null +++ b/src/cache_pad.rs @@ -0,0 +1,62 @@ +//! Prevents [false sharing](https://en.wikipedia.org/wiki/False_sharing) by adding padding +//! (unused bytes) between variables. +//! +//! As CPU memory is cached in line of some small power of two-word size (e.g., 128 aligned), +//! using padding prevents two processors to operate on independent data that might have been +//! stored on the same cache line. Such operations could result in the invalidation of the +//! whole cache line. Reducing cache invalidation on frequently accessed shared data structure +//! helps in improving the performance by reducing memory stalls and waste of system bandwidth. +//! +//! # Size and alignment +//! +//! Cache lines are assumed to be N bytes long, depending on the architecture: +//! +//! - On x86_64 and aarch64, N = 128. +//! - On all others, N = 64. +//! +//! The size of `CachePad` is the smallest multiple of N bytes large enough to accommodate +//! a value of type `T`. +//! +//! # Notes +//! +//! CPU cache line: +//! +//! - MacBook Air (M1, 2020) +//! +//! ```bash +//! sysctl -a | grep cachelinesize +//! hw.cachelinesize: 128 +//! ``` + +use std::fmt; +use std::ops::Deref; + +/// Pads and aligns data to the length of a cache line. +#[cfg_attr(any(target_arch = "x86_64", target_arch = "aarch64"), repr(align(128)))] +#[cfg_attr( + not(any(target_arch = "x86_64", target_arch = "aarch64")), + repr(align(64)) +)] +pub(crate) struct CachePad(T); + +impl CachePad { + /// Creates a padded representation of the data aligned with the + /// length of a cache line. + pub(crate) fn new(t: T) -> CachePad { + CachePad(t) + } +} + +impl Deref for CachePad { + type Target = T; + + fn deref(&self) -> &T { + &self.0 + } +} + +impl fmt::Debug for CachePad { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("CachePad").field(&self.0).finish() + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..ec4bb06 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,173 @@ +#![deny( + warnings, + rustdoc::broken_intra_doc_links, + rustdoc::private_intra_doc_links, + missing_docs, + missing_debug_implementations, + trivial_casts, + trivial_numeric_casts, + unreachable_pub, + unsafe_op_in_unsafe_fn, + unused_crate_dependencies, + unused_extern_crates, + unused_import_braces, + unused_lifetimes, + unused_qualifications, + unused_results, + rust_2018_idioms +)] + +//! A lock-free multi-producer multi-consumer unbounded queue. +//! +//! # Examples +//! +//! Single Producer - Single Consumer: +//! +//! ``` +//! use lf_queue::Queue; +//! +//! const COUNT: usize = 1_000; +//! let queue: Queue = Queue::new(); +//! +//! for i in 0..COUNT { +//! queue.push(i); +//! } +//! +//! for i in 0..COUNT { +//! assert_eq!(i, queue.pop().unwrap()); +//! } +//! +//! assert!(queue.pop().is_none()); +//! ``` +//! Multi Producer - Single Consumer: +//! +//! ``` +//! use lf_queue::Queue; +//! use std::thread; +//! +//! const COUNT: usize = 1_000; +//! const CONCURRENCY: usize = 4; +//! +//! let queue: Queue = Queue::new(); +//! +//! let ths: Vec<_> = (0..CONCURRENCY) +//! .map(|_| { +//! let q = queue.clone(); +//! thread::spawn(move || { +//! for i in 0..COUNT { +//! q.push(i); +//! } +//! }) +//! }) +//! .collect(); +//! +//! for th in ths { +//! th.join().unwrap(); +//! } +//! +//! for _ in 0..COUNT * CONCURRENCY { +//! assert!(queue.pop().is_some()); +//! } +//! +//! assert!(queue.pop().is_none()); +//! ``` +//! +//! Single Producer - Multi Consumer: +//! +//! ``` +//! use lf_queue::Queue; +//! use std::thread; +//! +//! const COUNT: usize = 1_000; +//! const CONCURRENCY: usize = 4; +//! +//! let queue: Queue = Queue::new(); +//! +//! for i in 0..COUNT * CONCURRENCY { +//! queue.push(i); +//! } +//! +//! let ths: Vec<_> = (0..CONCURRENCY) +//! .map(|_| { +//! let q = queue.clone(); +//! thread::spawn(move || { +//! for _ in 0..COUNT { +//! loop { +//! if q.pop().is_some() { +//! break; +//! } +//! } +//! } +//! }) +//! }) +//! .collect(); +//! +//! for th in ths { +//! th.join().unwrap(); +//! } +//! +//! assert!(queue.pop().is_none()); +//! ``` +//! +//! Multi Producer - Multi Consumer: +//! +//! ``` +//! use lf_queue::Queue; +//! use std::sync::atomic::{AtomicUsize, Ordering}; +//! use std::sync::Arc; +//! use std::thread; +//! +//! const COUNT: usize = 1_000; +//! const CONCURRENCY: usize = 4; +//! +//! let queue: Queue = Queue::new(); +//! let items = Arc::new((0..COUNT).map(|_| AtomicUsize::new(0)).collect::>()); +//! +//! let ths: Vec<_> = (0..CONCURRENCY) +//! .map(|_| { +//! let q = queue.clone(); +//! let its = items.clone(); +//! thread::spawn(move || { +//! for _ in 0..COUNT { +//! let n = loop { +//! if let Some(x) = q.pop() { +//! break x; +//! } else { +//! thread::yield_now(); +//! } +//! }; +//! its[n].fetch_add(1, Ordering::SeqCst); +//! } +//! }) +//! }) +//! .map(|_| { +//! let q = queue.clone(); +//! thread::spawn(move || { +//! for i in 0..COUNT { +//! q.push(i); +//! } +//! }) +//! }) +//! .collect(); +//! +//! for th in ths { +//! th.join().unwrap(); +//! } +//! +//! thread::sleep(std::time::Duration::from_millis(10)); +//! +//! for c in &*items { +//! assert_eq!(c.load(Ordering::SeqCst), CONCURRENCY); +//! } +//! +//! assert!(queue.pop().is_none()); +//! ``` + +mod queue; + +pub(crate) mod cache_pad; +pub(crate) mod node; +pub(crate) mod slot; +pub(crate) mod variant; + +pub use queue::Queue; diff --git a/src/node.rs b/src/node.rs new file mode 100644 index 0000000..256ade6 --- /dev/null +++ b/src/node.rs @@ -0,0 +1,99 @@ +//! Holds a collection of [`Slot`]. +//! +//! The [`Node`]'s container holds [`NODE_CAPACITY`] times [`Slot`]. Each [`Slot`] +//! is used once before being scheduled for deletion. Each [`Node`] reports a pointer +//! to the next [`Node`] of the [`Queue`] if any. +//! +//! [`Queue`]: crate::queue::Queue + +use crate::cache_pad::CachePad; +use crate::slot::{Slot, DRAINING, READING}; +use crate::variant::sync::atomic::{AtomicPtr, Ordering}; +use crate::variant::thread; + +/// Holds a collection of [`Slot`]. +#[derive(Debug)] +pub(crate) struct Node { + /// A pointer to the next [`Node`] of the [`Queue`] if any. + /// + /// [`Queue`]: crate::queue::Queue + pub(crate) next: AtomicPtr>>, + + /// A collection of [`Slot`]. + pub(crate) container: [Slot; NODE_CAPACITY], +} + +impl Node { + /// New uninitialized [`Node`] are frequently added to the queue. + /// Using a constant help us reducing the cost of this operation. + #[cfg(not(loom))] + pub(crate) const UNINIT: Node = Self { + next: AtomicPtr::new(std::ptr::null_mut()), + container: [Slot::UNINIT; NODE_CAPACITY], + }; + + // Loom model checking can't work with constants as it needs to keep + // track of the initialized items. In loom, `AtomicUsize::new` is therefore + // not a `const fn`. `Slot::UNINIT` can't be use in loom context as a + // constant item can't be initialized with calls to non constant expressions. + #[cfg(loom)] + pub(crate) fn new() -> Self { + Self { + next: AtomicPtr::new(std::ptr::null_mut()), + container: Default::default(), + } + } + + /// Waits until the next pointer is set. + pub(crate) fn wait_next(&self) -> *mut CachePad { + loop { + let next = self.next.load(Ordering::Acquire); + if !next.is_null() { + return next; + } + thread::yield_now(); + } + } + + /// Drain the [`Node`] container starting from `start` and drop the [`Node`] when possible. + pub(crate) unsafe fn drain(node: *mut CachePad, start: usize) { + // We don't need to set the `DRAINING` bit in the last slot because that slot has + // begun the draining of the node. + for i in start..NODE_CAPACITY - 1 { + let slot = unsafe { (*node).container.get_unchecked(i) }; + + // Add the `DRAINING` bit if a thread is still using the slot (i.e., the + // state is not `READING` now and after we add the `DRAINING` flag). + // If a thread is still using the slot, it will be responsible for continuing + // the destruction of the node. + if slot.state.load(Ordering::Acquire) & READING == 0 + && slot.state.fetch_or(DRAINING, Ordering::AcqRel) & READING == 0 + { + return; + } + } + + // No thread is using the node, it's safe to destroy it. + drop(unsafe { Box::from_raw(node) }); + } +} + +/// Each [`Node`] holds [`NODE_SIZE`] of indicies. +/// +/// A [`Node`] as one reference to the next [`Node`] and [`NODE_SIZE`] - 1 +/// (i.e., [`NODE_CAPACITY`]) [`Slot`] available in its container. +#[cfg(not(loom))] +pub(crate) const NODE_SIZE: usize = 8; + +/// Each [`Node`] holds [`NODE_SIZE`] of indicies. +/// +/// A [`Node`] as one reference to the next [`Node`] and [`NODE_SIZE`] - 1 +/// (i.e., [`NODE_CAPACITY`]) [`Slot`] available in its container. +/// +/// When using loom, we shrink the size of the local queue. This shouldn't impact +/// logic, but allows loom to test more edge cases in a reasonable a mount of time. +#[cfg(loom)] +pub(crate) const NODE_SIZE: usize = 4; + +/// Reports the capacity (max number of item), a [`Node`] container can hold. +pub(crate) const NODE_CAPACITY: usize = NODE_SIZE - 1; diff --git a/src/queue.rs b/src/queue.rs new file mode 100644 index 0000000..56c72b2 --- /dev/null +++ b/src/queue.rs @@ -0,0 +1,295 @@ +//! A lock-free multi-producer multi-consumer unbounded queue. + +use crate::cache_pad::CachePad; +use crate::node::{Node, NODE_CAPACITY, NODE_SIZE}; +use crate::slot::{DRAINING, FILLED, READING}; +use crate::variant::sync::atomic::{fence, AtomicPtr, AtomicUsize, Ordering}; +use crate::variant::sync::Arc; +use crate::variant::thread; + +use std::mem::MaybeUninit; + +/// A lock-free multi-producer multi-consumer unbounded queue. +#[derive(Clone, Debug)] +pub struct Queue { + inner: Arc>, +} + +impl Queue { + /// Creates a new [`Queue`]. + /// + /// # Examples + /// + /// ``` + /// use lf_queue::Queue; + /// + /// let queue = Queue::::new(); + /// ``` + pub fn new() -> Self { + Self { + inner: Arc::new(Inner::new()), + } + } + + /// Push an item into the [`Queue`]. + /// + /// # Examples + /// + /// ``` + /// use lf_queue::Queue; + /// + /// let queue = Queue::::new(); + /// + /// queue.push(1); + /// queue.push(2); + /// queue.push(3); + /// ``` + pub fn push(&self, item: T) { + self.inner.push(item) + } + + /// Pop an item from the [`Queue`]. Returns none if the [`Queue`] is empty. + /// + /// # Examples + /// + /// ``` + /// use lf_queue::Queue; + /// + /// let queue = Queue::::new(); + /// for i in 0..8 { + /// queue.push(i); + /// } + /// + /// for i in 0..8 { + /// assert_eq!(i, queue.pop().unwrap()); + /// } + /// + /// assert!(queue.pop().is_none()); + /// ``` + pub fn pop(&self) -> Option { + self.inner.pop() + } +} + +impl Default for Queue { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug)] +struct Inner { + head: CachePad>, + tail: CachePad>, +} + +impl Inner { + fn new() -> Self { + #[cfg(not(loom))] + let node: Node = Node::UNINIT; + #[cfg(loom)] + let node: Node = Node::new(); + + let first_node: *mut CachePad> = Box::into_raw(Box::new(CachePad::new(node))); + + Self { + head: CachePad::new(Cursor { + index: AtomicUsize::new(0), + node: AtomicPtr::new(first_node), + }), + tail: CachePad::new(Cursor { + index: AtomicUsize::new(0), + node: AtomicPtr::new(first_node), + }), + } + } + + fn push(&self, item: T) { + let mut tail_index = self.tail.index.load(Ordering::Acquire); + let mut tail_node = self.tail.node.load(Ordering::Acquire); + + loop { + // Defines the node container offset of the slot where the provided item should be stored. + let offset = (tail_index >> MARK_BIT_SHIFT) % NODE_SIZE; + + // If the node container is full, we wait until the next node is + // installed before moving forward and update our local reference. + if offset == NODE_CAPACITY { + thread::yield_now(); + tail_index = self.tail.index.load(Ordering::Acquire); + tail_node = self.tail.node.load(Ordering::Acquire); + continue; + } + + // Increments the tail index. + let next_tail_index = tail_index + (1 << MARK_BIT_SHIFT); + match self.tail.index.compare_exchange_weak( + tail_index, + next_tail_index, + Ordering::SeqCst, + Ordering::Acquire, + ) { + // The tail index has been updated successfully so we can now use + // the offset to store the item in the next available slot. + Ok(_) => unsafe { + // If we're filling the last available slot of the node container, + // we install a new one and update both the tail and the node to point + // to this new node. + if offset + 1 == NODE_CAPACITY { + #[cfg(not(loom))] + let node: Node = Node::UNINIT; + #[cfg(loom)] + let node: Node = Node::new(); + + let next_node = Box::into_raw(Box::new(CachePad::new(node))); + self.tail.node.store(next_node, Ordering::Release); + let _ = self + .tail + .index + .fetch_add(1 << MARK_BIT_SHIFT, Ordering::Release); + (*tail_node).next.store(next_node, Ordering::Release); + } + + // We can now safely store the provided item into the slot. + let slot = (*tail_node).container.get_unchecked(offset); + slot.item.with_mut(|p| p.write(MaybeUninit::new(item))); + let _ = slot.state.fetch_or(FILLED, Ordering::Release); + + return; + }, + // While trying to push the next item, the tail index + // has been updated by another thread. We update our local + // references with the value stored when we tried to make + // the exchange and what is now the current tail's node. + Err(current_tail_index) => { + tail_index = current_tail_index; + tail_node = self.tail.node.load(Ordering::Acquire); + } + } + } + } + + fn pop(&self) -> Option { + let mut head_index = self.head.index.load(Ordering::Acquire); + let mut head_node = self.head.node.load(Ordering::Acquire); + + loop { + // Defines the offset of the slot from where the next item should gathered. + let offset = (head_index >> MARK_BIT_SHIFT) % NODE_SIZE; + + // If we reach the end of the node container, we wait until the next + // one is installed. + if offset == NODE_CAPACITY { + thread::yield_now(); + head_index = self.head.index.load(Ordering::Acquire); + head_node = self.head.node.load(Ordering::Acquire); + continue; + } + + // Increments the head index. + let mut next_head_index = head_index + (1 << MARK_BIT_SHIFT); + + // If the mark bit is not set in the head index, we check if + // there is a pending item in the queue. + if next_head_index & MARK_BIT == 0 { + // Sync all threads and loads the current tail cursor. + fence(Ordering::SeqCst); + let tail_index = self.tail.index.load(Ordering::Acquire); + + // If the head index equals the tail index, the queue is empty. + if head_index >> MARK_BIT_SHIFT == tail_index >> MARK_BIT_SHIFT { + return None; + } + + // If the head and the tail are not pointing to the same node, + // we set the `MARK_BIT` in the head to skip cheking if there + // is any item pending on the next iteration. + if (head_index >> MARK_BIT_SHIFT) / NODE_SIZE + != (tail_index >> MARK_BIT_SHIFT) / NODE_SIZE + { + next_head_index |= MARK_BIT; + } + } + + // Try update the head index. + match self.head.index.compare_exchange_weak( + head_index, + next_head_index, + Ordering::SeqCst, + Ordering::Acquire, + ) { + // The head index has been updated successfully so we can now use + // the offset to pop the next item. + Ok(_) => unsafe { + // If we're returning the last item of the node container, we + // update the head cursor to point to the next node. + if offset + 1 == NODE_CAPACITY { + let next_node = (*head_node).wait_next(); + + // Remove the mark bit if any and increment the index. + let mut next_index = + (next_head_index & !MARK_BIT).wrapping_add(1 << MARK_BIT_SHIFT); + + // If the next node points to another node, we can already + // update the index to report that the next node that will + // be installed is not the last one. + if !(*next_node).next.load(Ordering::Relaxed).is_null() { + next_index |= MARK_BIT; + } + + self.head.node.store(next_node, Ordering::Release); + self.head.index.store(next_index, Ordering::Release); + } + + // Reads and returns the item. + let slot = (*head_node).container.get_unchecked(offset); + slot.wait_filled(); + let item = slot.item.with(|p| p.read().assume_init()); + + // Drain and drop the node if we've reached the end of its container, or if another + // thread wanted to do so but couldn't because this thread was busy reading from the slot. + if offset + 1 == NODE_CAPACITY { + Node::drain(head_node, 0); + } else if slot.state.fetch_or(READING, Ordering::AcqRel) & DRAINING != 0 { + Node::drain(head_node, offset + 1); + } + + return Some(item); + }, + // While trying to pop the next item, the head index + // has been updated by another thread. We update our local + // references with the value stored when we tried to make + // the exchange and what is now the current head's node. + Err(current_head_index) => { + head_index = current_head_index; + head_node = self.head.node.load(Ordering::Acquire); + } + } + } + } +} + +#[derive(Debug)] +struct Cursor { + /// Reports the index of the next [`Slot`]. + /// + /// Its value is used to define the offset of the slot into the current + /// [`Node`] container by divinding it by the [`NODE_CAPACITY`]. + /// + /// [`Slot`]: crate::slot::Slot + index: AtomicUsize, + + /// Points to the current [`Node`]. + node: AtomicPtr>>, +} + +/// Defines how many lower bits are reserved for metadata. +const MARK_BIT_SHIFT: usize = 1; + +/// The [`MARK_BIT`] indicates that the [`Node`] is not the last one. +/// +/// The [`MARK_BIT`] helps to avoid loading the tail and head simultaneously +/// to check whether or not the queue is empty when calling the `pop` method. +/// +/// [`Node`]: crate::node::Node +const MARK_BIT: usize = 1; diff --git a/src/slot.rs b/src/slot.rs new file mode 100644 index 0000000..870c2cf --- /dev/null +++ b/src/slot.rs @@ -0,0 +1,97 @@ +//! Holds an item of the [`Queue`]. +//! +//! The [`Slot`]'s state uses below bit flags to report its progress: +//! +//! ```txt +//! INITIAL 0b00000000 +//! FILLED 0b00000001 -> Bit flag added after as successful write of the item into the slot. +//! READING 0b00000010 -> Bit flag added from the time a thread starts to read the value. +//! DRAINING 0b00000100 -> Added when a draining of the slot has been scheduled. +//! ``` +//! +//! The state of the [`Slot`] can only move forward, resulting in bit flags being added in below +//! order: +//! +//! ```txt +//! INITIAL 0b00000000 +//! INITIAL -> FILLED 0b00000001 +//! FILLED -> READING 0b00000011 +//! READING -> DRAINING 0b00000111 +//! ``` +//! +//! [`Node`]: crate::node::Node +//! [`NODE_CAPACITY`]: crate::node::NODE_CAPACITY +//! [`Queue`]: crate::queue::Queue + +use crate::variant::cell::UnsafeCell; +use crate::variant::sync::atomic::AtomicUsize; +use crate::variant::thread; + +use std::mem::MaybeUninit; +use std::sync::atomic::Ordering; + +/// Holds an item of the [`Queue`]. +/// +/// [`Queue`]: crate::queue::Queue +#[derive(Debug)] +pub(crate) struct Slot { + /// Holds an item pushed to the [`Queue`]. + /// + /// [`Queue`]: crate::queue::Queue + pub(crate) item: UnsafeCell>, + + /// Reports the state of the [`Slot`]. + pub(crate) state: AtomicUsize, +} + +impl Slot { + /// When creating a new [`Node`], [`NODE_CAPACITY`] unititialized + /// [`Slot`] are added to the [`Node`] container. Using a constant + /// help us reducing the cost of this operation. + /// + /// [`Node`]: crate::node::Node + /// [`NODE_CAPACITY`]: crate::node::NODE_CAPACITY + #[cfg(not(loom))] + pub(crate) const UNINIT: Slot = Self { + item: UnsafeCell::new(MaybeUninit::uninit()), + state: AtomicUsize::new(0), + }; + + // Loom model checking can't work with constants as it needs to keep + // track of the initialized items. In loom, `AtomicUsize::new` is therefore + // not a `const fn`. `Slot::UNINIT` can't be use in loom context as a + // constant item can't be initialized with calls to non constant expressions. + #[cfg(loom)] + pub(crate) fn new() -> Self { + Self { + item: UnsafeCell::new(MaybeUninit::uninit()), + state: AtomicUsize::new(0), + } + } + + /// Waits until the state has a [`FILLED`] state. + pub(crate) fn wait_filled(&self) { + while self.state.load(Ordering::Acquire) & FILLED == 0 { + thread::yield_now() + } + } +} + +// As we can't use compile time constants and function to create a new +// Node container in loom, we implement the Default trait to simplify +// the creation of a new Node container with uninitialized Slots. +#[cfg(loom)] +impl Default for Slot { + fn default() -> Self { + Self::new() + } +} + +/// Bit flag added when the [`Slot`] holds an item. +pub(crate) const FILLED: usize = 1; + +/// Bit flag added when the [`Slot`] item is being used by a thread. +pub(crate) const READING: usize = 2; + +/// Bit flag added when the [`Slot`] is scheduled for deletion. +pub(crate) const DRAINING: usize = 4; diff --git a/src/variant.rs b/src/variant.rs new file mode 100644 index 0000000..3a958d2 --- /dev/null +++ b/src/variant.rs @@ -0,0 +1,79 @@ +//! Switch from [`std`] to [`loom`] for [`std::cell`], [`std::sync`] and [`std::thread`] when using the `--cfg loom` flag. +//! +//! [`loom`]: https://docs.rs/loom/ + +#[cfg(not(loom))] +pub(crate) mod cell { + #[derive(Debug)] + #[repr(transparent)] + pub(crate) struct UnsafeCell(std::cell::UnsafeCell); + + impl UnsafeCell { + pub(crate) const fn new(data: T) -> UnsafeCell { + UnsafeCell(std::cell::UnsafeCell::new(data)) + } + + pub(crate) fn with(&self, f: impl FnOnce(*const T) -> R) -> R { + f(self.0.get()) + } + + pub(crate) fn with_mut(&self, f: impl FnOnce(*mut T) -> R) -> R { + f(self.0.get()) + } + } +} + +#[cfg(not(loom))] +pub(crate) mod sync { + pub(crate) use std::sync::Arc; + + pub(crate) mod atomic { + pub(crate) use std::sync::atomic::{fence, AtomicPtr, Ordering}; + + #[derive(Debug)] + #[repr(transparent)] + pub(crate) struct AtomicUsize(std::sync::atomic::AtomicUsize); + + impl AtomicUsize { + pub(crate) const fn new(v: usize) -> Self { + Self(std::sync::atomic::AtomicUsize::new(v)) + } + + pub(crate) fn load(&self, order: Ordering) -> usize { + self.0.load(order) + } + + pub(crate) fn store(&self, val: usize, order: Ordering) { + self.0.store(val, order) + } + + pub(crate) fn compare_exchange_weak( + &self, + current: usize, + new: usize, + success: Ordering, + failure: Ordering, + ) -> Result { + self.0.compare_exchange_weak(current, new, success, failure) + } + + pub(crate) fn fetch_add(&self, val: usize, order: Ordering) -> usize { + self.0.fetch_add(val, order) + } + + pub(crate) fn fetch_or(&self, val: usize, order: Ordering) -> usize { + self.0.fetch_or(val, order) + } + } + } +} + +#[cfg(not(loom))] +pub(crate) use std::thread; + +#[cfg(loom)] +pub(crate) use loom::cell; +#[cfg(loom)] +pub(crate) use loom::sync; +#[cfg(loom)] +pub(crate) use loom::thread; diff --git a/tests/loom_queue.rs b/tests/loom_queue.rs new file mode 100644 index 0000000..435286c --- /dev/null +++ b/tests/loom_queue.rs @@ -0,0 +1,121 @@ +#![cfg(loom)] + +use lf_queue::Queue; +use loom::thread; + +// When using the `--cfg loom` flag, the node container is equal to 4. Below test uses an item count equal to 5 +// to test both node addition and removal. +// +// Run all tests: +// +// RUSTFLAGS="--cfg loom" cargo test --package lf-queue --test loom_queue --release +// +// Note that running some of these tests may a few seconds. Add `LOOM_MAX_PREEMPTIONS=2` (or =3) to the command +// above to reduce the test complexity and so its duration. + +// RUSTFLAGS="--cfg loom" cargo test --package lf-queue --test loom_queue --release -- test_mpsc --exact +#[test] +fn test_mpsc() { + loom::model(|| { + const COUNT: usize = 5; + let queue: Queue = Queue::new(); + + let q1 = queue.clone(); + let th1 = thread::spawn(move || { + for i in 0..3 { + q1.push(i); + } + }); + + let q2 = queue.clone(); + let th2 = thread::spawn(move || { + for i in 3..5 { + q2.push(i); + } + }); + + th1.join().unwrap(); + th2.join().unwrap(); + + for _ in 0..COUNT { + assert!(queue.pop().is_some()); + } + }); +} + +// RUSTFLAGS="--cfg loom" cargo test --package lf-queue --test loom_queue --release -- test_spmc --exact +#[test] +fn test_spmc() { + loom::model(|| { + const COUNT: usize = 5; + let queue: Queue = Queue::new(); + + for i in 0..COUNT { + queue.push(i); + } + + let mut n = 0; + + let q1 = queue.clone(); + let th1 = thread::spawn(move || { + let mut x = 0; + while q1.pop().is_some() { + x += 1; + } + + x + }); + + let q2 = queue.clone(); + let th2 = thread::spawn(move || { + let mut x = 0; + while q2.pop().is_some() { + x += 1; + } + + x + }); + + n += th1.join().unwrap(); + n += th2.join().unwrap(); + + assert_eq!(n, COUNT); + }); +} + +// RUSTFLAGS="--cfg loom" cargo test --package lf-queue --test loom_queue --release -- test_concurrent_push_and_pop --exact +#[test] +fn test_concurrent_push_and_pop() { + loom::model(|| { + const COUNT: usize = 5; + let queue: Queue = Queue::new(); + + let q1 = queue.clone(); + let th1 = thread::spawn(move || { + for i in 0..COUNT { + q1.push(i); + } + }); + + let q2 = queue.clone(); + let th2 = thread::spawn(move || { + for _ in 0..COUNT { + loop { + if q2.pop().is_some() { + break; + } else { + // Loom scheduler is, by design, not fair. Yielding here indicates to Loom + // that this thread needs another one to be scheduled before making progress. + // In our case, some loom executions will be blocked when we reach the end of + // a node container which requires the next node to be installed before making + // progress. + thread::yield_now() + } + } + } + }); + + th1.join().unwrap(); + th2.join().unwrap(); + }); +} diff --git a/tests/queue.rs b/tests/queue.rs new file mode 100644 index 0000000..815312e --- /dev/null +++ b/tests/queue.rs @@ -0,0 +1,131 @@ +use lf_queue::Queue; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread; + +// cargo test --package lf-queue --test queue -- test_spsc --exact --nocapture +#[test] +fn test_spsc() { + const COUNT: usize = 7 * 3; + let queue: Queue = Queue::new(); + + for i in 0..COUNT { + queue.push(i); + } + + for i in 0..COUNT { + assert_eq!(i, queue.pop().unwrap()); + } + + assert!(queue.pop().is_none()); +} + +// cargo test --package lf-queue --test queue -- test_mpsc --exact --nocapture +#[test] +fn test_mpsc() { + const COUNT: usize = 1_000; + const CONCURRENCY: usize = 4; + let queue: Queue = Queue::new(); + + let ths: Vec<_> = (0..CONCURRENCY) + .map(|_| { + let q = queue.clone(); + thread::spawn(move || { + for i in 0..COUNT { + q.push(i); + } + }) + }) + .collect(); + + for th in ths { + th.join().unwrap(); + } + + for _ in 0..COUNT * CONCURRENCY { + assert!(queue.pop().is_some()); + } + + assert!(queue.pop().is_none()); +} + +// cargo test --package lf-queue --test queue -- test_spmc --exact --nocapture +#[test] +fn test_spmc() { + const COUNT: usize = 1_000; + const CONCURRENCY: usize = 4; + let queue: Queue = Queue::new(); + + for i in 0..COUNT * CONCURRENCY { + queue.push(i); + } + + let ths: Vec<_> = (0..CONCURRENCY) + .map(|_| { + let q = queue.clone(); + thread::spawn(move || { + for _ in 0..COUNT { + loop { + if q.pop().is_some() { + break; + } + } + } + }) + }) + .collect(); + + for th in ths { + th.join().unwrap(); + } + + assert!(queue.pop().is_none()); +} + +// cargo test --package lf-queue --test queue -- test_mpmc --exact --nocapture +#[test] +fn test_mpmc() { + const COUNT: usize = 1_000; + const CONCURRENCY: usize = 4; + let queue: Queue = Queue::new(); + let items = Arc::new((0..COUNT).map(|_| AtomicUsize::new(0)).collect::>()); + + let ths: Vec<_> = (0..CONCURRENCY) + .map(|_| { + let q = queue.clone(); + let its = items.clone(); + thread::spawn(move || { + for _ in 0..COUNT { + let n = loop { + if let Some(x) = q.pop() { + break x; + } else { + thread::yield_now(); + } + }; + its[n].fetch_add(1, Ordering::SeqCst); + } + }) + }) + .map(|_| { + let q = queue.clone(); + thread::spawn(move || { + for i in 0..COUNT { + q.push(i); + } + }) + }) + .collect(); + + for th in ths { + th.join().unwrap(); + } + + thread::sleep(std::time::Duration::from_millis(10)); + + for c in &*items { + assert_eq!(c.load(Ordering::SeqCst), CONCURRENCY); + } + + assert!(queue.pop().is_none()); +}