Skip to content

Commit

Permalink
reduce per-node memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheemdev committed Aug 2, 2023
1 parent 084159d commit e501c9b
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 130 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![deny(unsafe_op_in_unsafe_fn)]
// #![deny(unsafe_op_in_unsafe_fn)]
#![doc = include_str!("../README.md")]

mod collector;
Expand Down
223 changes: 94 additions & 129 deletions src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use crate::utils::CachePadded;
use crate::{Link, Linked};

use std::cell::{Cell, UnsafeCell};
use std::mem::{self, ManuallyDrop};
use std::mem::ManuallyDrop;
use std::num::NonZeroU64;
use std::ptr;
use std::ptr::{self, NonNull};
use std::sync::atomic::{self, AtomicPtr, AtomicU64, AtomicUsize, Ordering};

// Fast, lock-free, robust concurrent memory reclamation.
Expand Down Expand Up @@ -61,12 +61,8 @@ impl Collector {
};

Node {
reclaim: mem::drop,
batch_link: ptr::null_mut(),
batch: ptr::null_mut(),
reservation: ReservationNode { birth_epoch },
batch: BatchNode {
ref_count: ManuallyDrop::new(AtomicUsize::new(0)),
},
}
}

Expand Down Expand Up @@ -187,7 +183,7 @@ impl Collector {
reclaim: unsafe fn(Link),
) -> (bool, &mut Batch) {
// safety: batches are only accessed by the current thread
let batch = unsafe { &mut *self.batches.get_or(Default::default).get() };
let batch = unsafe { &mut *self.batches.get_or(|| Batch::new(self.batch_size)).get() };

let node = UnsafeCell::raw_get(ptr::addr_of_mut!((*ptr).node));

Expand All @@ -197,38 +193,24 @@ impl Collector {
// reference to the UnsafeCell<Node>, which is allowed to alias. the caller
// guarantees that the same pointer is not retired twice, so we can safely write
// to the node here
unsafe { (*node).reclaim = reclaim }
unsafe { (*node).batch = batch.ptr.as_ptr() }

// add the node to the list
if batch.head.is_null() {
batch.tail = node;
// implicit node.batch.ref_count = 0
} else {
// the TAIL node stores the minimum epoch of the batch. if a thread is active
// in the minimum birth era, it has access to at least one of the nodes in the
// batch and must be tracked.
//
// if epoch tracking is disabled this will always be false (0 > 0).
unsafe {
if (*batch.tail).reservation.min_epoch > (*node).reservation.birth_epoch {
(*batch.tail).reservation.min_epoch = (*node).reservation.birth_epoch;
}
}

// safety: same as the write to `node` above
unsafe {
// the batch link of a SLOT node points to the tail
(*node).batch_link = batch.tail;

// insert this node into the batch
(*node).batch.next = batch.head;
}
// the TAIL node stores the minimum epoch of the batch. if a thread is active
// in the minimum birth era, it has access to at least one of the nodes in the
// batch and must be tracked.
//
// if epoch tracking is disabled this will always be false (0 > 0).
if batch.ptr.as_ref().min_epoch > (*node).reservation.birth_epoch {
batch.ptr.as_mut().min_epoch = (*node).reservation.birth_epoch;
}

batch.head = node;
batch.size += 1;
// add the node to the list
batch.ptr.as_mut().entries.push(Entry { node, reclaim });

(batch.size % self.batch_size == 0, batch)
(
batch.ptr.as_mut().entries.len() % self.batch_size == 0,
batch,
)
}

// Attempt to retire nodes in the current thread's batch
Expand All @@ -237,8 +219,10 @@ impl Collector {
//
// The batch must contain at least one node
pub unsafe fn retire_batch(&self) {
// safety: guaranteed by caller
unsafe { self.retire(&mut *self.batches.get_or(Default::default).get()) }
// safety: batches are only accessed by the current thread
let batch = unsafe { &mut *self.batches.get_or(|| Batch::new(self.batch_size)).get() };
// safety: upheld by caller
unsafe { self.retire(batch) }
}

// Attempt to retire nodes in this batch
Expand All @@ -263,23 +247,20 @@ impl Collector {
// relaxed: the fence above already ensures that we see any threads that might
// have access to any pointers in this batch. any other threads that were created
// after it will see their new values.
if batch.size <= self.reservations.threads.load(Ordering::Relaxed) {
if batch.ptr.as_ref().entries.len() <= self.reservations.threads.load(Ordering::Relaxed) {
return;
}

// safety: caller guarantees that the batch is not empty
unsafe { (*batch.tail).batch_link = batch.head }

// safety: TAIL nodes always have `min_epoch` initialized
let min_epoch = unsafe { (*batch.tail).reservation.min_epoch };

let mut last = batch.head;
let mut marked = 0;

// record all active threads
//
// we need to do this in a separate step before actually retiring to
// make sure we have enough reservation nodes, as the number of threads can grow
dbg!(self.reservations.threads.load(Ordering::Relaxed));
for reservation in self.reservations.iter() {
let node = batch.ptr.as_ref().entries[marked].node;

// if this thread is inactive, we can skip it
//
// acquire: if the thread is inactive, synchronize with `leave`
Expand All @@ -293,13 +274,13 @@ impl Collector {
// have accessed any of the pointers in this batch
//
// if epoch tracking is disabled this is always false (0 < 0)
if reservation.epoch.load(Ordering::Acquire) < min_epoch {
if reservation.epoch.load(Ordering::Acquire) < batch.ptr.as_ref().min_epoch {
continue;
}

// we don't have enough nodes to insert into the reservation lists
// of all active threads, try again later
if last == batch.tail {
if marked == batch.ptr.as_ref().entries.len() - 1 {
return;
}

Expand All @@ -308,8 +289,8 @@ impl Collector {
// safety: we checked that this is not the last node in the batch
// list above, and all nodes in a batch are valid
unsafe {
(*last).reservation.head = &reservation.head;
last = (*last).batch.next;
(*node).reservation.head = &reservation.head;
marked += 1;
}
}

Expand All @@ -320,9 +301,11 @@ impl Collector {

// add the batch to all active thread's reservation lists
let mut active = 0;
let mut curr = batch.head;
let mut i = 0;

while i < marked {
let curr = batch.ptr.as_ref().entries[i].node;

while curr != last {
// safety: all nodes in the batch are valid, and we just initialized
// `reservation.head` for all nodes until `last` in the loop above
let head = unsafe { &*(*curr).reservation.head };
Expand Down Expand Up @@ -365,15 +348,15 @@ impl Collector {
}
}

curr = unsafe { (*curr).batch.next };
i += 1;
}

// safety: the TAIL node stores the reference count
let ref_count = unsafe { &(*batch.tail).batch.ref_count };

// release: if we don't free the list, release any modifications
// of the data to the thread that will
if ref_count
if batch
.ptr
.as_ref()
.ref_count
.fetch_add(active, Ordering::Release)
.wrapping_add(active)
== 0
Expand All @@ -384,12 +367,11 @@ impl Collector {

// safety: The reference count is 0, meaning that either no threads
// were active, or they have all already decremented the count
unsafe { Collector::free_list(batch.tail) }
unsafe { Collector::free_list(batch.ptr.as_ptr()) }
}

// reset the batch
batch.head = ptr::null_mut();
batch.size = 0;
batch.ptr = BatchAlloc::alloc(self.batch_size);
}

// Traverse the reservation list, decrementing the
Expand All @@ -410,7 +392,7 @@ impl Collector {
//
// relaxed: any reservation nodes were acquired when we loaded `head`
list = unsafe { (*curr).reservation.next.load(Ordering::Relaxed) };
let tail = unsafe { (*curr).batch_link };
let batch = unsafe { (*curr).batch };

// safety: TAIL nodes store the reference count of the batch
//
Expand All @@ -423,13 +405,13 @@ impl Collector {
unsafe {
// release: if we don't free the list, release any modifications of
// the data to the thread that will
if (*tail).batch.ref_count.fetch_sub(1, Ordering::Release) == 1 {
if (*batch).ref_count.fetch_sub(1, Ordering::Release) == 1 {
// we are freeing the list, acquire any modifications of the data
// released by the threads that decremented the count
atomic::fence(Ordering::Acquire);

// safety: we have the last reference to the batch
Collector::free_list(tail)
Collector::free_list(batch)
}
}
}
Expand All @@ -439,28 +421,14 @@ impl Collector {
//
// # Safety
//
// `list` must be the last reference to the TAIL node of the batch.
//
// The reference count must be zero
unsafe fn free_list(list: *mut Node) {
// safety: `list` is a valid pointer
let mut list = unsafe { (*list).batch_link };

loop {
let node = list;

unsafe {
list = (*node).batch.next;
((*node).reclaim)(Link { node });
}

// if `node` is the TAIL node, then `node.batch.next` will interpret the
// 0 in `node.batch.ref_count` as a null pointer, indicating that we have
// freed the last node in the list
if list.is_null() {
break;
}
// Must have unique reference to the batch.
unsafe fn free_list(batch: *mut BatchAlloc) {
// safety: unique reference
for entry in (*batch).entries.iter_mut() {
(entry.reclaim)(Link { node: entry.node });
}

BatchAlloc::free(batch);
}
}

Expand All @@ -469,24 +437,8 @@ impl Drop for Collector {
for batch in self.batches.iter() {
// safety: We have &mut self
let batch = unsafe { &mut *batch.get() };

if !batch.head.is_null() {
// safety: batch.head is not null, meaning that `batch.tail` is valid
unsafe {
// `free_list` expects the batch link to point to the head of the list
//
// usually this is done in `retire`
(*batch.tail).batch_link = batch.head;

// `free_list` expects the tail node's link to be null. usually this is
// implied by the reference count field in the union being zero, but that
// might not be the case here, so we have to set it manually
(*batch.tail).batch.next = ptr::null_mut();
}

// safety: We have &mut self
unsafe { Collector::free_list(batch.tail) }
}
// safety: We have &mut self,
unsafe { Collector::free_list(batch.ptr.as_ptr()) }
}
}
}
Expand All @@ -497,13 +449,7 @@ impl Drop for Collector {
// - TAIL: the first node added to the batch (tail of the list), holds the reference count
// - SLOT: everyone else
pub struct Node {
// TAIL: pointer to the head of the list
// SLOT: pointer to TAIL
batch_link: *mut Node,
// User provided drop glue
reclaim: unsafe fn(Link),
// unions for different phases of a node's lifetime
batch: BatchNode,
batch: *mut BatchAlloc,
reservation: ReservationNode,
}

Expand All @@ -519,14 +465,6 @@ union ReservationNode {
birth_epoch: u64,
}

#[repr(C)]
union BatchNode {
// SLOT: next node in the batch
next: *mut Node,
// TAIL: reference count of the batch
ref_count: ManuallyDrop<AtomicUsize>,
}

impl Node {
// Represents an inactive thread
//
Expand Down Expand Up @@ -558,22 +496,49 @@ impl Default for Reservation {

// A batch of nodes waiting to be retired
pub struct Batch {
// Head the batch
head: *mut Node,
// Tail of the batch (TAIL node)
tail: *mut Node,
// The number of nodes in this batch
size: usize,
ptr: NonNull<BatchAlloc>,
}

impl Default for Batch {
fn default() -> Self {
Batch {
head: ptr::null_mut(),
tail: ptr::null_mut(),
size: 0,
impl Batch {
fn new(capacity: usize) -> UnsafeCell<CachePadded<Self>> {
let ptr = unsafe {
NonNull::new_unchecked(Box::into_raw(Box::new(BatchAlloc {
entries: Vec::with_capacity(capacity),
min_epoch: 0,
ref_count: AtomicUsize::new(0),
})))
};

UnsafeCell::new(CachePadded::new(Batch { ptr }))
}
}

// todo: flatten this allocation
struct BatchAlloc {
entries: Vec<Entry>,
min_epoch: u64,
ref_count: AtomicUsize,
}

impl BatchAlloc {
fn alloc(capacity: usize) -> NonNull<BatchAlloc> {
unsafe {
NonNull::new_unchecked(Box::into_raw(Box::new(BatchAlloc {
entries: Vec::with_capacity(capacity),
min_epoch: 0,
ref_count: AtomicUsize::new(0),
})))
}
}

unsafe fn free(ptr: *mut BatchAlloc) {
unsafe { drop(Box::from_raw(ptr)) }
}
}

struct Entry {
node: *mut Node,
reclaim: unsafe fn(Link),
}

unsafe impl Send for Batch {}
Expand Down
Loading

0 comments on commit e501c9b

Please sign in to comment.