Skip to content

Commit

Permalink
test(ebr, wvwwvwwv#133): add loom-based tests
Browse files Browse the repository at this point in the history
  • Loading branch information
shamilsan committed Mar 26, 2024
1 parent 7340ce1 commit 17b5507
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 22 deletions.
1 change: 1 addition & 0 deletions src/ebr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub use tag::Tag;

mod collector;
mod ref_counted;
mod sync;

/// Suspends the garbage collector of the current thread.
///
Expand Down
22 changes: 21 additions & 1 deletion src/ebr/atomic_owned.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::ref_counted::RefCounted;
use super::sync::atomic::AtomicPtr;
use super::{Guard, Owned, Ptr, Tag};
use std::mem::forget;
use std::panic::UnwindSafe;
use std::ptr::{null_mut, NonNull};
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering::{self, Relaxed};

/// [`AtomicOwned`] owns the underlying instance, and allows users to perform atomic operations
Expand Down Expand Up @@ -54,6 +54,7 @@ impl<T> AtomicOwned<T> {
/// ```
#[inline]
#[must_use]
#[cfg(not(loom))]
pub const fn from(owned: Owned<T>) -> Self {
let ptr = owned.get_underlying_ptr();
forget(owned);
Expand All @@ -62,6 +63,16 @@ impl<T> AtomicOwned<T> {
}
}

#[cfg(loom)]
#[allow(missing_docs)]
pub fn from(owned: Owned<T>) -> Self {
let ptr = owned.get_underlying_ptr();
forget(owned);
Self {
instance_ptr: AtomicPtr::new(ptr),
}
}

/// Creates a null [`AtomicOwned`].
///
/// # Examples
Expand All @@ -73,12 +84,21 @@ impl<T> AtomicOwned<T> {
/// ```
#[inline]
#[must_use]
#[cfg(not(loom))]
pub const fn null() -> Self {
Self {
instance_ptr: AtomicPtr::new(null_mut()),
}
}

#[cfg(loom)]
#[allow(missing_docs)]
pub fn null() -> Self {
Self {
instance_ptr: AtomicPtr::new(null_mut()),
}
}

/// Returns `true` if the [`AtomicOwned`] is null.
///
/// # Examples
Expand Down
22 changes: 21 additions & 1 deletion src/ebr/atomic_shared.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::ref_counted::RefCounted;
use super::sync::atomic::AtomicPtr;
use super::{Guard, Ptr, Shared, Tag};
use std::mem::forget;
use std::panic::UnwindSafe;
use std::ptr::{null_mut, NonNull};
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering::{self, Acquire, Relaxed};

/// [`AtomicShared`] owns the underlying instance, and allows users to perform atomic operations
Expand Down Expand Up @@ -54,6 +54,7 @@ impl<T> AtomicShared<T> {
/// ```
#[inline]
#[must_use]
#[cfg(not(loom))]
pub const fn from(shared: Shared<T>) -> Self {
let ptr = shared.get_underlying_ptr();
forget(shared);
Expand All @@ -62,6 +63,16 @@ impl<T> AtomicShared<T> {
}
}

#[cfg(loom)]
#[allow(missing_docs)]
pub fn from(shared: Shared<T>) -> Self {
let ptr = shared.get_underlying_ptr();
forget(shared);
Self {
instance_ptr: AtomicPtr::new(ptr),
}
}

/// Creates a null [`AtomicShared`].
///
/// # Examples
Expand All @@ -73,12 +84,21 @@ impl<T> AtomicShared<T> {
/// ```
#[inline]
#[must_use]
#[cfg(not(loom))]
pub const fn null() -> Self {
Self {
instance_ptr: AtomicPtr::new(null_mut()),
}
}

#[cfg(loom)]
#[allow(missing_docs)]
pub fn null() -> Self {
Self {
instance_ptr: AtomicPtr::new(null_mut()),
}
}

/// Returns `true` if the [`AtomicShared`] is null.
///
/// # Examples
Expand Down
59 changes: 40 additions & 19 deletions src/ebr/collector.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::sync::atomic::{fence, AtomicPtr, AtomicU8};
use super::{Collectible, Guard, Tag};
use crate::exit_guard::ExitGuard;
use std::panic;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
use std::sync::atomic::{fence, AtomicPtr, AtomicU8};

/// [`Collector`] is a garbage collector that reclaims thread-locally unreachable instances
/// when they are globally unreachable.
Expand Down Expand Up @@ -44,7 +44,7 @@ impl Collector {
if self.num_readers == 0 {
debug_assert_eq!(self.state.load(Relaxed) & Self::INACTIVE, Self::INACTIVE);
self.num_readers = 1;
let new_epoch = EPOCH.load(Relaxed);
let new_epoch = epoch().load(Relaxed);
if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
// This special optimization is excerpted from
// [`crossbeam_epoch`](https://docs.rs/crossbeam-epoch/).
Expand Down Expand Up @@ -81,7 +81,7 @@ impl Collector {

if self.num_readers == 1 {
if self.next_epoch_update == 0 {
if self.has_garbage || Tag::into_tag(GLOBAL_ANCHOR.load(Relaxed)) != Tag::First {
if self.has_garbage || Tag::into_tag(global_anchor().load(Relaxed)) != Tag::First {
self.try_scan();
}
self.next_epoch_update = if self.has_garbage {
Expand Down Expand Up @@ -197,7 +197,7 @@ impl Collector {
link: None,
});
let ptr = Box::into_raw(boxed);
let mut current = GLOBAL_ANCHOR.load(Relaxed);
let mut current = global_anchor().load(Relaxed);
loop {
unsafe {
(*ptr).next_link = Tag::unset_tag(current).cast_mut();
Expand All @@ -206,7 +206,8 @@ impl Collector {
// It keeps the tag intact.
let tag = Tag::into_tag(current);
let new = Tag::update_tag(ptr, tag).cast_mut();
if let Err(actual) = GLOBAL_ANCHOR.compare_exchange_weak(current, new, Release, Relaxed)
if let Err(actual) =
global_anchor().compare_exchange_weak(current, new, Release, Relaxed)
{
current = actual;
} else {
Expand All @@ -223,7 +224,7 @@ impl Collector {

// Only one thread that acquires the anchor lock is allowed to scan the thread-local
// collectors.
let lock_result = GLOBAL_ANCHOR
let lock_result = global_anchor()
.fetch_update(Acquire, Acquire, |p| {
let tag = Tag::into_tag(p);
if tag == Tag::First || tag == Tag::Both {
Expand All @@ -234,7 +235,7 @@ impl Collector {
})
.map(|p| Tag::unset_tag(p).cast_mut());
if let Ok(mut collector_ptr) = lock_result {
let _guard = ExitGuard::new(&GLOBAL_ANCHOR, |a| {
let _guard = ExitGuard::new(global_anchor(), |a| {
// Unlock the anchor.
loop {
let result = a.fetch_update(Release, Relaxed, |p| {
Expand Down Expand Up @@ -263,7 +264,7 @@ impl Collector {
// The collector is obsolete.
let reclaimable = unsafe { prev_collector_ptr.as_mut() }.map_or_else(
|| {
GLOBAL_ANCHOR
global_anchor()
.fetch_update(Release, Relaxed, |p| {
let tag = Tag::into_tag(p);
debug_assert!(tag == Tag::First || tag == Tag::Both);
Expand Down Expand Up @@ -306,7 +307,7 @@ impl Collector {
1 => 2,
_ => 0,
};
EPOCH.store(next_epoch, Relaxed);
epoch().store(next_epoch, Relaxed);
}
}
}
Expand Down Expand Up @@ -350,7 +351,7 @@ impl Drop for CollectorAnchor {
/// Marks `ANCHOR` that there is a potentially unreachable `Collector`.
fn mark_scan_enforced() {
// `Tag::Second` indicates that there is a garbage `Collector`.
let _result = GLOBAL_ANCHOR.fetch_update(Release, Relaxed, |p| {
let _result = global_anchor().fetch_update(Release, Relaxed, |p| {
let new_tag = match Tag::into_tag(p) {
Tag::None => Tag::Second,
Tag::First => Tag::Both,
Expand All @@ -364,9 +365,9 @@ fn try_drop_local_collector() {
let collector_ptr = LOCAL_COLLECTOR.with(|local_collector| local_collector.load(Relaxed));
if let Some(collector) = unsafe { collector_ptr.as_mut() } {
if collector.next_link.is_null() {
let anchor_ptr = GLOBAL_ANCHOR.load(Relaxed);
let anchor_ptr = global_anchor().load(Relaxed);
if ptr::eq(collector_ptr, anchor_ptr)
&& GLOBAL_ANCHOR
&& global_anchor()
.compare_exchange(anchor_ptr, ptr::null_mut(), Relaxed, Relaxed)
.is_ok()
{
Expand All @@ -390,11 +391,31 @@ thread_local! {
static LOCAL_COLLECTOR: AtomicPtr<Collector> = AtomicPtr::default();
}

/// The global epoch.
///
/// The global epoch can have one of 0, 1, or 2, and a difference in the local announcement of
/// a thread and the global is considered to be an epoch change to the thread.
static EPOCH: AtomicU8 = AtomicU8::new(0);
#[cfg(not(loom))]
fn epoch() -> &'static AtomicU8 {
/// The global epoch.
///
/// The global epoch can have one of 0, 1, or 2, and a difference in the local announcement of
/// a thread and the global is considered to be an epoch change to the thread.
static EPOCH: AtomicU8 = AtomicU8::new(0);
&EPOCH
}

#[cfg(loom)]
fn epoch() -> &'static AtomicU8 {
static EPOCH: std::sync::OnceLock<AtomicU8> = std::sync::OnceLock::new();
EPOCH.get_or_init(|| AtomicU8::new(0))
}

/// The global anchor for thread-local instances of [`Collector`].
static GLOBAL_ANCHOR: AtomicPtr<Collector> = AtomicPtr::new(ptr::null_mut());
#[cfg(not(loom))]
fn global_anchor() -> &'static AtomicPtr<Collector> {
/// The global anchor for thread-local instances of [`Collector`].
static GLOBAL_ANCHOR: AtomicPtr<Collector> = AtomicPtr::new(ptr::null_mut());
&GLOBAL_ANCHOR
}

#[cfg(loom)]
fn global_anchor() -> &'static AtomicPtr<Collector> {
static GLOBAL_ANCHOR: std::sync::OnceLock<AtomicPtr<Collector>> = std::sync::OnceLock::new();
GLOBAL_ANCHOR.get_or_init(|| AtomicPtr::new(ptr::null_mut()))
}
36 changes: 35 additions & 1 deletion src/ebr/ref_counted.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::sync::atomic::AtomicUsize;
use super::Collectible;
use std::mem::ManuallyDrop;
use std::ops::Deref;
use std::ptr::NonNull;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{self, Relaxed};

/// [`RefCounted`] stores an instance of type `T`, and a union of a link to the next
Expand All @@ -15,24 +15,42 @@ pub(super) struct RefCounted<T> {
impl<T> RefCounted<T> {
/// Creates a new [`RefCounted`] that allows ownership sharing.
#[inline]
#[cfg(not(loom))]
pub(super) const fn new_shared(t: T) -> Self {
Self {
instance: t,
next_or_refcnt: LinkOrRefCnt::new_shared(),
}
}

#[cfg(loom)]
pub(super) fn new_shared(t: T) -> Self {
Self {
instance: t,
next_or_refcnt: LinkOrRefCnt::new_shared(),
}
}

/// Creates a new [`RefCounted`] that disallows reference counting.
///
/// The reference counter field is never used until the instance is retired.
#[inline]
#[cfg(not(loom))]
pub(super) const fn new_unique(t: T) -> Self {
Self {
instance: t,
next_or_refcnt: LinkOrRefCnt::new_unique(),
}
}

#[cfg(loom)]
pub(super) fn new_unique(t: T) -> Self {
Self {
instance: t,
next_or_refcnt: LinkOrRefCnt::new_unique(),
}
}

/// Tries to add a strong reference to the underlying instance.
///
/// `order` must be as strong as `Acquire` for the caller to correctly validate the newest
Expand Down Expand Up @@ -151,16 +169,32 @@ pub(super) union LinkOrRefCnt {

impl LinkOrRefCnt {
#[inline]
#[cfg(not(loom))]
const fn new_shared() -> Self {
LinkOrRefCnt {
refcnt: ManuallyDrop::new((AtomicUsize::new(1), 0)),
}
}

#[cfg(loom)]
fn new_shared() -> Self {
LinkOrRefCnt {
refcnt: ManuallyDrop::new((AtomicUsize::new(1), 0)),
}
}

#[inline]
#[cfg(not(loom))]
const fn new_unique() -> Self {
LinkOrRefCnt {
refcnt: ManuallyDrop::new((AtomicUsize::new(0), 0)),
}
}

#[cfg(loom)]
fn new_unique() -> Self {
LinkOrRefCnt {
refcnt: ManuallyDrop::new((AtomicUsize::new(0), 0)),
}
}
}
30 changes: 30 additions & 0 deletions src/ebr/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#![allow(unused_imports)]
pub(crate) use self::inner::*;

#[cfg(all(test, loom))]
mod inner {
pub(crate) mod atomic {
pub use loom::sync::atomic::*;
pub use std::sync::atomic::Ordering;

// FIXME: loom does not support compiler_fence at the moment.
// https://github.com/tokio-rs/loom/issues/117
// we use fence as a stand-in for compiler_fence for the time being.
// this may miss some races since fence is stronger than compiler_fence,
// but it's the best we can do for the time being.
pub(crate) use self::fence as compiler_fence;
}
pub(crate) use loom::{
cell::UnsafeCell, hint, lazy_static, sync::Mutex, thread::yield_now, thread_local,
};
}

#[cfg(not(all(loom, test)))]
mod inner {
pub(crate) use std::{
cell::UnsafeCell,
sync::{atomic, Mutex},
thread::yield_now,
thread_local,
};
}
1 change: 1 addition & 0 deletions src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod correctness;
mod loom;
mod model;
mod performance;
8 changes: 8 additions & 0 deletions src/tests/loom.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#[cfg(all(test, loom))]
mod loom_tests {
#[test]
fn it_works() {}

#[test]
fn treiber_stack() {}
}
Loading

0 comments on commit 17b5507

Please sign in to comment.