Skip to content

Commit

Permalink
add flush operation
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheemdev committed Mar 8, 2024
1 parent e2f76de commit 19c9365
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 49 deletions.
45 changes: 24 additions & 21 deletions src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Collector {
/// before reclamation is attempted.
///
/// Retired values are added to thread-local *batches*
/// before completing their actual retirement. After
/// before starting the reclamation process. After
/// `batch_size` is hit, values are moved to separate
/// *retirement lists*, where reference counting kicks
/// in and batches are eventually reclaimed.
Expand Down Expand Up @@ -131,7 +131,6 @@ impl Collector {

Guard {
collector: self,
should_retire: UnsafeCell::new(false),
_a: PhantomData,
}
}
Expand Down Expand Up @@ -168,12 +167,10 @@ impl Collector {
pub unsafe fn retire<T>(&self, ptr: *mut Linked<T>, reclaim: unsafe fn(Link)) {
debug_assert!(!ptr.is_null(), "attempted to retire null pointer");

unsafe {
let (should_retire, batch) = self.raw.add(ptr, reclaim);
if should_retire {
self.raw.retire(batch);
}
}
// note that `add` doesn't actually reclaim the pointer immediately if the
// current thread is active, it instead adds it to it's reclamation list,
// but we don't guarantee that publicly.
unsafe { self.raw.add(ptr, reclaim) }
}

/// Returns true if both references point to the same collector.
Expand Down Expand Up @@ -225,7 +222,6 @@ impl fmt::Debug for Collector {
/// See [`Collector::enter`] for details.
pub struct Guard<'a> {
collector: *const Collector,
should_retire: UnsafeCell<bool>,
_a: PhantomData<&'a Collector>,
}

Expand All @@ -248,7 +244,6 @@ impl Guard<'_> {
pub const unsafe fn unprotected() -> Guard<'static> {
Guard {
collector: ptr::null(),
should_retire: UnsafeCell::new(false),
_a: PhantomData,
}
}
Expand Down Expand Up @@ -281,10 +276,7 @@ impl Guard<'_> {
return unsafe { (reclaim)(Link { node: ptr as _ }) };
}

unsafe {
let (should_retire, _) = (*self.collector).raw.add(ptr, reclaim);
*self.should_retire.get() |= should_retire;
}
unsafe { (*self.collector).raw.add(ptr, reclaim) }
}

/// Get a reference to the collector this guard we created from.
Expand Down Expand Up @@ -321,6 +313,23 @@ impl Guard<'_> {

unsafe { (*self.collector).raw.refresh() }
}

/// Flush any retired values in the local batch.
///
/// This method flushes any values from the current thread's local
/// batch, starting the reclamation process. Note that no memory
/// can be reclaimed while this guard is active, but calling `flush`
/// may allow memory to be reclaimed more quickly after the guard is
/// dropped.
///
/// See [`Collector::batch_size`] for details about batching.
pub fn flush(&self) {
if self.collector.is_null() {
return;
}

unsafe { (*self.collector).raw.try_retire_batch() }
}
}

impl Drop for Guard<'_> {
Expand All @@ -329,13 +338,7 @@ impl Drop for Guard<'_> {
return;
}

unsafe {
(*self.collector).raw.leave();

if *self.should_retire.get() {
(*self.collector).raw.retire_batch();
}
}
unsafe { (*self.collector).raw.leave() }
}
}

Expand Down
52 changes: 25 additions & 27 deletions src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,12 @@ impl Collector {
}
}

// Add a node to the retirement batch.
//
// Returns `true` if the batch size has been reached and the batch should be retired.
// Add a node to the retirement batch, retiring the batch if `batch_size` is reached.
//
// # Safety
//
// `ptr` is a valid pointer.
pub unsafe fn add<T>(
&self,
ptr: *mut Linked<T>,
reclaim: unsafe fn(Link),
) -> (bool, &mut Batch) {
pub unsafe fn add<T>(&self, ptr: *mut Linked<T>, reclaim: unsafe fn(Link)) {
// safety: batches are only accessed by the current thread
let batch = unsafe { &mut *self.batches.get_or(Default::default).get() };

Expand Down Expand Up @@ -232,25 +226,23 @@ impl Collector {
batch.head = node;
batch.size += 1;

(batch.size % self.batch_size == 0, batch)
// attempt to retire the batch if we have enough nodes
if batch.size % self.batch_size == 0 {
self.try_retire(batch);
}
}

// Attempt to retire nodes in the current thread's batch
//
// # Safety
//
// The batch must contain at least one node
pub unsafe fn retire_batch(&self) {
// safety: guaranteed by caller
unsafe { self.retire(&mut *self.batches.get_or(Default::default).get()) }
// Attempt to retire nodes in the current thread's batch.
pub fn try_retire_batch(&self) {
// safety: batches are only accessed by the current thread
unsafe { self.try_retire(&mut *self.batches.get_or(Default::default).get()) }
}

// Attempt to retire nodes in this batch
//
// # Safety
// Attempt to retire nodes in this batch.
//
// The batch must contain at least one node
pub unsafe fn retire(&self, batch: &mut Batch) {
// Note that if a guard on the current thread is active, the batch will also be added to it's reservation
// list for deffered reclamation.
pub fn try_retire(&self, batch: &mut Batch) {
// establish a total order between the retirement of nodes in this batch and stores
// marking a thread as active (or active in an epoch):
// - if the store comes first, we will see that the thread is active
Expand All @@ -267,15 +259,16 @@ impl Collector {
// relaxed: the fence above already ensures that we see any threads that might
// have access to any pointers in this batch. any other threads that were created
// after it will see their new values.
if batch.size <= self.reservations.threads.load(Ordering::Relaxed) {
if batch.head.is_null() || batch.size <= self.reservations.threads.load(Ordering::Relaxed) {
return;
}

// safety: caller guarantees that the batch is not empty
// safety: we made sure the batch is not empty
unsafe { (*batch.tail).batch_link = batch.head }

// safety: TAIL nodes always have `min_epoch` initialized
let min_epoch = unsafe { (*batch.tail).reservation.min_epoch };
let current_reservation = self.reservations.get_or(Default::default);

let mut last = batch.head;

Expand All @@ -293,14 +286,19 @@ impl Collector {

// if this thread's epoch is behind the earliest birth epoch in this batch
// we can skip it, as there is no way it could have accessed any of the pointers
// in this batch
// in this batch. we make sure never to skip the current thread even if it's epoch
// is behind because it may still have access to the pointer (because it's the
// thread that allocated it). the current thread is only skipped if there is no
// active guard.
//
// relaxed: if the epoch is behind there is nothing to synchronize with, and
// we already ensured we will see it's relevant epoch with the seqcst fence
// above
//
// if epoch tracking is disabled this is always false (0 < 0)
if reservation.epoch.load(Ordering::Relaxed) < min_epoch {
if !ptr::eq(reservation, current_reservation)
&& reservation.epoch.load(Ordering::Relaxed) < min_epoch
{
continue;
}

Expand Down Expand Up @@ -386,7 +384,7 @@ impl Collector {
// ensure any access of the data in the list happens-before we free the list
atomic::fence(Ordering::Acquire);

// safety: The reference count is 0, meaning that either no threads were active,
// safety: the reference count is 0, meaning that either no threads were active,
// or they have all already decremented the count
unsafe { Collector::free_list(batch.tail) }
}
Expand Down
2 changes: 1 addition & 1 deletion src/tls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ where
let entry = unsafe { &*bucket.add(self.index) };
self.index += 1;
if entry.present.load(Ordering::Acquire) {
return Some(unsafe { &*(*entry.value.get()).as_ptr() });
return Some(unsafe { (*entry.value.get()).assume_init_ref() });
}
}
}
Expand Down

0 comments on commit 19c9365

Please sign in to comment.