Skip to content

Commit

Permalink
use batch entries for reservation lists
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheemdev committed Apr 16, 2024
1 parent 0abb62c commit 18531b0
Showing 1 changed file with 81 additions and 72 deletions.
153 changes: 81 additions & 72 deletions src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ impl Collector {
None => 0,
};

Node {
reservation: ReservationNode { birth_epoch },
batch: ptr::null_mut(),
}
Node { birth_epoch }
}

// Mark the current thread as active.
Expand All @@ -85,7 +82,7 @@ impl Collector {
//
// seqcst: establish a total order between this store and the fence in `retire`
// - if our store comes first, the thread retiring will see that we are active
// - if the fence comes first, we will see the new values of any pointers being
// - if the fence comes first, we will see the new values of any objects being
// retired by that thread (all pointer loads are also seqcst and thus participate
// in the total order)
reservation.head.store(ptr::null_mut(), Ordering::SeqCst);
Expand Down Expand Up @@ -123,7 +120,7 @@ impl Collector {
// to reservation.head and reservation.epoch for details
// - acquire the birth epoch of the pointer. we need to record at least
// that epoch below to let other threads know we have access to this pointer
// (TOOD: this requires pointers to be stored with release ordering, which is
// (TOOD: this requires objects to be stored with release ordering, which is
// not documented)
let ptr = ptr.load(Ordering::SeqCst);

Expand All @@ -140,7 +137,7 @@ impl Collector {
// seqcst: establish a total order between this store and the fence in `retire`
// - if our store comes first, the thread retiring will see that we are active in
// the current epoch
// - if the fence comes first, we will see the new values of any pointers being
// - if the fence comes first, we will see the new values of any objects being
// retired by that thread (all pointer loads are also seqcst and thus participate
// in the total order)
reservation.epoch.store(current_epoch, Ordering::SeqCst);
Expand All @@ -164,11 +161,11 @@ impl Collector {
// are dropped
if guards == 1 {
// release: exit the critical section
// acquire: acquire any new reservation nodes
let head = reservation.head.swap(Node::INACTIVE, Ordering::AcqRel);
// acquire: acquire any new entries
let head = reservation.head.swap(Entry::INACTIVE, Ordering::AcqRel);

if head != Node::INACTIVE {
// decrement the reference counts of any nodes that were added
if head != Entry::INACTIVE {
// decrement the reference counts of any entries that were added
unsafe { Collector::traverse(head) }
}
}
Expand All @@ -187,12 +184,12 @@ impl Collector {
// thread are dropped
if guards == 1 {
// release: exit the critical section
// acquire: acquire any new reservation nodes and the values of any pointers
// acquire: acquire any new entries and the values of any objects
// that were retired
let head = reservation.head.swap(ptr::null_mut(), Ordering::AcqRel);

if head != Node::INACTIVE {
// decrement the reference counts of any nodes that were added
if head != Entry::INACTIVE {
// decrement the reference counts of any entries that were added
unsafe { Collector::traverse(head) }
}
}
Expand All @@ -219,26 +216,30 @@ impl Collector {
let batch = unsafe { local_batch.0.as_mut() };

// `ptr` is guaranteed to be a valid pointer that can be cast to a node (`T: AsLink`)
let node = UnsafeCell::raw_get(ptr.cast::<UnsafeCell<Node>>());

//
// any other thread with a reference to the pointer only has a shared
// reference to the UnsafeCell<Node>, which is allowed to alias. the caller
// guarantees that the same pointer is not retired twice, so we can safely write
// to the node here
unsafe { (*node).batch = local_batch.0.as_ptr() }
// to the node through this pointer.
let node = UnsafeCell::raw_get(ptr.cast::<UnsafeCell<Node>>());

// if a thread is active in the minimum birth era, it has access to at least one
// of the nodes in the batch and must be tracked.
//
// if epoch tracking is disabled this will always be false (0 > 0).
let birth_epoch = unsafe { (*node).reservation.birth_epoch };
let birth_epoch = unsafe { (*node).birth_epoch };
if batch.min_epoch > birth_epoch {
batch.min_epoch = birth_epoch;
}

batch.entries.push(Entry { node, reclaim });
// create an entry for this node
batch.entries.push(Entry {
node,
reclaim,
batch: local_batch.0.as_ptr(),
});

// attempt to retire the batch if we have enough nodes
// attempt to retire the batch if we have enough entries
if batch.entries.len() % self.batch_size == 0 {
unsafe { self.try_retire(local_batch, thread) }
}
Expand Down Expand Up @@ -270,21 +271,23 @@ impl Collector {
// establish a total order between the retirement of nodes in this batch and stores
// marking a thread as active (or active in an epoch):
// - if the store comes first, we will see that the thread is active
// - if this fence comes first, the thread will see the new values of any pointers
// - if this fence comes first, the thread will see the new values of any objects
// in this batch.
//
// this fence also establishes synchronizes with the fence run when a thread is created:
// - if our fence comes first, they will see the new values of any pointers in this batch
// - if our fence comes first, they will see the new values of any objects in this batch
// - if their fence comes first, we will see the new thread
atomic::fence(Ordering::SeqCst);

// safety: local batch pointers are always valid until reclamation
// safety: local batch pointers are always valid until reclamation.
// if the batch ends up being retired then this pointer is stable
let batch_entries = unsafe { local_batch.0.as_mut().entries.as_mut_ptr() };
let batch = unsafe { local_batch.0.as_ref() };

// if there are not enough nodes in this batch for active threads, we have to try again later
// if there are not enough entries in this batch for active threads, we have to try again later
//
// relaxed: the fence above already ensures that we see any threads that might
// have access to any pointers in this batch. any other threads that were created
// have access to any objects in this batch. any other threads that were created
// after it will see their new values.
if batch.entries.len() <= self.reservations.threads.load(Ordering::Relaxed) {
return;
Expand All @@ -296,9 +299,9 @@ impl Collector {
// record all active threads
//
// we need to do this in a separate step before actually retiring to
// make sure we have enough reservation nodes, as the number of threads can grow
// make sure we have enough entries, as the number of threads can grow
for reservation in self.reservations.iter() {
// if we don't have enough nodes to insert into the reservation lists
// if we don't have enough entries to insert into the reservation lists
// of all active threads, try again later
let Some(entry) = batch.entries.get(marked) else {
return;
Expand All @@ -307,12 +310,12 @@ impl Collector {
// if this thread is inactive, we can skip it
//
// relaxed: see the acquire fence below
if reservation.head.load(Ordering::Relaxed) == Node::INACTIVE {
if reservation.head.load(Ordering::Relaxed) == Entry::INACTIVE {
continue;
}

// if this thread's epoch is behind the earliest birth epoch in this batch
// we can skip it, as there is no way it could have accessed any of the pointers
// we can skip it, as there is no way it could have accessed any of the objects
// in this batch. we make sure never to skip the current thread even if it's epoch
// is behind because it may still have access to the pointer (because it's the
// thread that allocated it). the current thread is only skipped if there is no
Expand All @@ -331,56 +334,62 @@ impl Collector {

// temporarily store this thread's list in a node in our batch
//
// safety: we checked that this is not the last node in the batch list above,
// and all nodes in a batch are valid
unsafe { (*entry.node).reservation.head = &reservation.head }
// safety: all nodes in a batch are valid, and this batch has not been
// shared yet to other threads
unsafe { (*entry.node).head = &reservation.head }
marked += 1;
}

// for any inactive threads we skipped above, synchronize with `leave` to ensure
// any accesses happen-before we retire. we ensured with the seqcst fence above
// that the next time the thread becomes active it will see the new values of any
// pointers in this batch.
// objects in this batch.
atomic::fence(Ordering::Acquire);

// add the batch to all active thread's reservation lists
let mut active = 0;
for i in 0..marked {
let curr = batch.entries[i].node;
let curr = &batch.entries[i];
let curr_ptr = unsafe { batch_entries.add(i) };

// safety: all nodes in the batch are valid, and we just initialized `reservation.head`
// safety: all nodes in the batch are valid, and we just initialized `head`
// for all `marked` nodes in the loop above
let head = unsafe { &*(*curr).reservation.head };
let head = unsafe { &*(*curr.node).head };

// acquire:
// - if the thread became inactive, synchronize with `leave` to ensure any accesses
// happen-before we retire
// - if the thread is active, acquire any reservation nodes added by a concurrent call
// - if the thread is active, acquire any entries added by a concurrent call
// to `retire`
let mut prev = head.load(Ordering::Acquire);

loop {
// the thread became inactive, skip it
//
// as long as the thread became inactive at some point after we verified it was
// active, it can no longer access any pointers in this batch. the next time it
// becomes active it will load the new pointer values due to the seqcst fence above
if prev == Node::INACTIVE {
// active, it can no longer access any objects in this batch. the next time it
// becomes active it will load the new object values due to the seqcst fence above
if prev == Entry::INACTIVE {
break;
}

// relaxed: acq/rel synchronization is provided by `head`
unsafe { (*curr).reservation.next.store(prev, Ordering::Relaxed) }
// link this node to the reservation list
unsafe { *(*curr.node).next = AtomicPtr::new(prev) }

// release: release the new reservation nodes
match head.compare_exchange_weak(prev, curr, Ordering::Release, Ordering::Relaxed) {
// release: release the entries in this batch
match head.compare_exchange_weak(
prev,
curr_ptr,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => {
active += 1;
break;
}
// lost the race to another thread, retry
Err(found) => {
// acquire the new reservation loads
// acquire the new entries
atomic::fence(Ordering::Acquire);
prev = found;
continue;
Expand Down Expand Up @@ -408,12 +417,12 @@ impl Collector {
*local_batch = LocalBatch::new(self.batch_size).value.into_inner();
}

// Traverse the reservation list, decrementing the refernce count of each batch.
// Traverse the reservation list, decrementing the reference count of each batch.
//
// # Safety
//
// `list` must be a valid reservation list
unsafe fn traverse(mut list: *mut Node) {
unsafe fn traverse(mut list: *mut Entry) {
loop {
let curr = list;

Expand All @@ -423,8 +432,8 @@ impl Collector {

// safety: `curr` is a valid link in the list
//
// relaxed: any reservation nodes were acquired when we loaded `head`
list = unsafe { (*curr).reservation.next.load(Ordering::Relaxed) };
// relaxed: any entries were acquired when we loaded `head`
list = unsafe { (*(*curr).node).next.load(Ordering::Relaxed) };
let batch = unsafe { (*curr).batch };

// safety: batch pointers are valid for reads until they are freed
Expand Down Expand Up @@ -471,36 +480,26 @@ impl Drop for Collector {

// A node attached to every allocated object.
//
// Every node holds a pointer to it's batch, as well as a pointer
// to the next batch for a given thread after it is retired.
pub struct Node {
batch: *mut Batch,
reservation: ReservationNode,
}

// Nodes keep track of their birth epoch, as well as thread-local
// reservation lists.
#[repr(C)]
union ReservationNode {
pub union Node {
// Before retiring: the epoch this node was created in
birth_epoch: u64,
// While retiring: temporary location for an active reservation list
head: *const AtomicPtr<Node>,
// While retiring: temporary location for an active reservation list.
head: *const AtomicPtr<Entry>,
// After retiring: next node in the thread's reservation list
next: ManuallyDrop<AtomicPtr<Node>>,
next: ManuallyDrop<AtomicPtr<Entry>>,
}

impl Node {
// Represents an inactive thread
//
// While null indicates an empty list, INACTIVE indicates the thread has no active
// guards and is not accessing any nodes.
pub const INACTIVE: *mut Node = -1_isize as usize as _;
}

// A per-thread reservation list
// A per-thread reservation list.
//
// Reservation lists are lists of retired entries, where
// each entry represents a batch.
#[repr(C)]
struct Reservation {
// The head of the list
head: AtomicPtr<Node>,
head: AtomicPtr<Entry>,
// The epoch this thread last accessed a pointer in
epoch: AtomicU64,
// the number of active guards for this thread
Expand All @@ -510,7 +509,7 @@ struct Reservation {
impl Default for Reservation {
fn default() -> Self {
Reservation {
head: AtomicPtr::new(Node::INACTIVE),
head: AtomicPtr::new(Entry::INACTIVE),
epoch: AtomicU64::new(0),
guards: Cell::new(0),
}
Expand All @@ -533,6 +532,16 @@ struct Batch {
struct Entry {
node: *mut Node,
reclaim: unsafe fn(*mut Link),
// the batch this node is a part of.
batch: *mut Batch,
}

impl Entry {
// Represents an inactive thread.
//
// While null indicates an empty list, INACTIVE indicates the thread has no active
// guards and is not accessing any objects.
pub const INACTIVE: *mut Entry = -1_isize as usize as _;
}

pub struct LocalBatch(NonNull<Batch>);
Expand Down

0 comments on commit 18531b0

Please sign in to comment.