Skip to content

Commit

Permalink
synchronize use of owned guards
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheemdev committed Apr 17, 2024
1 parent 6edfaf6 commit d9b127b
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 123 deletions.
134 changes: 78 additions & 56 deletions src/guard.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::fmt;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::sync::atomic::{AtomicPtr, Ordering};

use crate::tls::Thread;
Expand All @@ -11,11 +10,8 @@ use crate::{AsLink, Collector, Link};
/// This trait provides common functionality implemented by [`LocalGuard`],
/// [`OwnedGuard`], and [`UnprotectedGuard`].
///
/// All guards implement `Clone` using reference counting. A guard is dropped
/// and the protection of any values is lost after the last reference is dropped.
///
/// See [the guide](crate::guide#starting-operations) for an introduction to using guards.
pub trait Guard: Clone {
pub trait Guard {
/// Refreshes the guard.
///
/// Calling this method is similar to dropping and immediately
Expand Down Expand Up @@ -103,8 +99,18 @@ pub struct LocalGuard<'a> {
impl LocalGuard<'_> {
pub(crate) fn enter(collector: &Collector) -> LocalGuard<'_> {
let thread = Thread::current();
// safety: only called on the current thread
unsafe { collector.raw.enter(thread) };
// safety: `thread` is the current thread
let reservation = unsafe { collector.raw.reservation(thread) };

// calls to `enter` may be reentrant, so we need to keep track of the number
// of active guards for the current thread
let guards = reservation.guards.get();
reservation.guards.set(guards + 1);

if guards == 0 {
// safety: only called on the current thread which is currently inactive
unsafe { collector.raw.enter(reservation) };
}

LocalGuard {
thread,
Expand All @@ -119,28 +125,33 @@ impl Guard for LocalGuard<'_> {
#[inline]
fn protect<T: AsLink>(&self, ptr: &AtomicPtr<T>, ordering: Ordering) -> *mut T {
// safety: self.thread is the current thread
unsafe { self.collector.raw.protect(ptr, ordering, self.thread) }
unsafe { self.collector.raw.protect_local(ptr, ordering, self.thread) }
}

/// Retires a value, running `reclaim` when no threads hold a reference to it.
unsafe fn defer_retire<T: AsLink>(&self, ptr: *mut T, reclaim: unsafe fn(*mut Link)) {
debug_assert!(!ptr.is_null(), "attempted to retire null pointer");

// safety: - self.thread is the current thread
// - validity of the pointer is guaranteed by the caller
// - the validity of the pointer is guaranteed by the caller
unsafe { self.collector.raw.add(ptr, reclaim, self.thread) }
}

/// Refreshes the guard.
fn refresh(&mut self) {
// safety: we have &mut self, and self.thread is the current thread
unsafe { self.collector.raw.refresh(self.thread) }
// safety: self.thread is the current thread
let reservation = unsafe { self.collector.raw.reservation(self.thread) };
let guards = reservation.guards.get();

if guards == 1 {
// safety: we have a unique reference to the last active guard
unsafe { self.collector.raw.refresh(reservation) }
}
}

/// Flush any retired values in the local batch.
fn flush(&self) {
// note that this does not actually retire any values, it just attempts
// to add the batch to any active reservations lists (including ours)
// safety: self.thread is the current thread
unsafe { self.collector.raw.try_retire_batch(self.thread) }
}

Expand All @@ -155,26 +166,19 @@ impl Guard for LocalGuard<'_> {
}
}

impl Clone for LocalGuard<'_> {
fn clone(&self) -> Self {
// this will just increment the guard reference count
// safety: self.thread is the current thread
unsafe { self.collector.raw.enter(self.thread) };

LocalGuard {
thread: self.thread,
collector: self.collector,
_unsend: PhantomData,
}
}
}

impl Drop for LocalGuard<'_> {
fn drop(&mut self) {
// this will mark the thread inactive if this is the last active guard
// on this thread
// safety: self.thread is the current thread
unsafe { self.collector.raw.leave(self.thread) };
let reservation = unsafe { self.collector.raw.reservation(self.thread) };

// decrement the active guard count
let guards = reservation.guards.get();
reservation.guards.set(guards - 1);

if guards == 1 {
// safety: we have a unique reference to the last active guard
unsafe { self.collector.raw.leave(reservation) };
}
}
}

Expand All @@ -194,49 +198,64 @@ impl fmt::Debug for LocalGuard<'_> {
/// destroy, so should be avoided if cross-thread usage is not required.
///
/// Most of the functionality provided by this type is through the [`Guard`] trait.
#[derive(Clone)]
pub struct OwnedGuard<'a>(ManuallyDrop<LocalGuard<'a>>);

// This is sound because an `OwnedGuard` owns its thread
// slot, so is not tied to any thread-locals.
//
// Note: cannot be Sync because retire takes &self and does
// not synchronize across threads.
pub struct OwnedGuard<'a> {
collector: &'a Collector,
// the current thread
thread: Thread,
}

// safety: `OwnedGuard` owns its thread slot, so is not tied to any thread-locals
unsafe impl Send for OwnedGuard<'_> {}
unsafe impl Sync for OwnedGuard<'_> {}

impl OwnedGuard<'_> {
pub(crate) fn enter(collector: &Collector) -> OwnedGuard<'_> {
OwnedGuard(ManuallyDrop::new(LocalGuard {
collector,
// safety: while this is not the current thread, it is stable
// and never accessed concurrently
thread: Thread::create(),
_unsend: PhantomData,
}))
// create a thread slot that will last for the lifetime of this guard
let thread = Thread::create();

// safety: we have ownership of `thread`
unsafe { collector.raw.enter(collector.raw.reservation(thread)) };

OwnedGuard { collector, thread }
}
}

impl Guard for OwnedGuard<'_> {
/// Protects the load of an atomic pointer.
#[inline]
fn protect<T: AsLink>(&self, ptr: &AtomicPtr<T>, ordering: Ordering) -> *mut T {
self.0.protect(ptr, ordering)
self.collector.raw.protect(ptr, ordering, self.thread)
}

/// Retires a value, running `reclaim` when no threads hold a reference to it.
unsafe fn defer_retire<T: AsLink>(&self, ptr: *mut T, reclaim: unsafe fn(*mut Link)) {
// safety: guaranteed by caller
unsafe { self.0.defer_retire(ptr, reclaim) }
// safety: we only access the reservation with the lock
let reservation = unsafe { self.collector.raw.reservation(self.thread) };
let _lock = reservation.lock.lock().unwrap();
// safety: - we hold the lock and so have unique access to the batch
// - the validity of the pointer is guaranteed by the caller
unsafe { self.collector.raw.add(ptr, reclaim, self.thread) }
}

/// Refreshes the guard.
fn refresh(&mut self) {
self.0.refresh()
// safety: we have &mut self and ownership of the thread
unsafe {
self.collector
.raw
.refresh(self.collector.raw.reservation(self.thread))
}
}

/// Flush any retired values in the local batch.
fn flush(&self) {
self.0.flush()
// safety: we only access the reservation with the lock
let reservation = unsafe { self.collector.raw.reservation(self.thread) };
let _lock = reservation.lock.lock().unwrap();
// note that this does not actually retire any values, it just attempts
// to add the batch to any active reservations lists (including ours)
// safety: we hold the lock and so have unique access to the batch
unsafe { self.collector.raw.try_retire_batch(self.thread) }
}

/// Returns a numeric identifier for the current thread.
Expand All @@ -250,17 +269,20 @@ impl Guard for OwnedGuard<'_> {

/// Returns `true` if this guard belongs to the given collector.
fn belongs_to(&self, collector: &Collector) -> bool {
self.0.belongs_to(collector)
Collector::ptr_eq(self.collector, collector)
}
}

impl Drop for OwnedGuard<'_> {
fn drop(&mut self) {
if unsafe { self.0.collector.raw.leave(self.0.thread) } {
// this was the last reference to the guard. we are now inactive
// and can free the thread slot
self.0.thread.free();
}
// safety: we have ownership of `thread`
let reservation = unsafe { self.collector.raw.reservation(self.thread) };

// safety: self.thread is the current thread
unsafe { self.collector.raw.leave(reservation) };

// we are now inactive and can free the thread slot
self.thread.free();
}
}

Expand Down
Loading

0 comments on commit d9b127b

Please sign in to comment.