Skip to content

Commit

Permalink
std::sync::Mutex using batch semaphore (#155)
Browse files Browse the repository at this point in the history
* stdlib mutex using batch semaphore

* address comments, fix tests

* move fairness out of semaphore state
  • Loading branch information
Aurel300 authored Aug 9, 2024
1 parent a1a88b3 commit 4281c33
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 175 deletions.
85 changes: 69 additions & 16 deletions src/future/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use crate::current;
use crate::runtime::execution::ExecutionState;
use crate::runtime::task::{clock::VectorClock, TaskId};
use crate::runtime::thread;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
Expand Down Expand Up @@ -184,17 +185,16 @@ struct BatchSemaphoreState {
// (4) closed ==> waiters.is_empty()
waiters: VecDeque<Arc<Waiter>>,
permits_available: PermitsAvailable,
fairness: Fairness,
// TODO: should there be a clock for the close event?
closed: bool,
}

impl BatchSemaphoreState {
fn acquire_permits(&mut self, num_permits: usize) -> Result<(), TryAcquireError> {
fn acquire_permits(&mut self, num_permits: usize, fairness: Fairness) -> Result<(), TryAcquireError> {
assert!(num_permits > 0);
if self.closed {
Err(TryAcquireError::Closed)
} else if self.waiters.is_empty() || matches!(self.fairness, Fairness::Unfair) {
} else if self.waiters.is_empty() || matches!(fairness, Fairness::Unfair) {
// Permits here can be acquired in one of two scenarios:
// - The waiter queue is empty; nobody else is waiting for permits,
// so if there are enough available, immediately succeed.
Expand Down Expand Up @@ -222,6 +222,7 @@ impl BatchSemaphoreState {
#[derive(Debug)]
pub struct BatchSemaphore {
state: RefCell<BatchSemaphoreState>,
fairness: Fairness,
}

/// Error returned from the [`BatchSemaphore::try_acquire`] function.
Expand Down Expand Up @@ -261,21 +262,19 @@ impl BatchSemaphore {
let state = RefCell::new(BatchSemaphoreState {
waiters: VecDeque::new(),
permits_available: PermitsAvailable::new(num_permits),
fairness,
closed: false,
});
Self { state }
Self { state, fairness }
}

/// Creates a new semaphore with the initial number of permits.
pub const fn const_new(num_permits: usize, fairness: Fairness) -> Self {
let state = RefCell::new(BatchSemaphoreState {
waiters: VecDeque::new(),
permits_available: PermitsAvailable::const_new(num_permits),
fairness,
closed: false,
});
Self { state }
Self { state, fairness }
}

/// Returns the current number of available permits.
Expand Down Expand Up @@ -325,7 +324,7 @@ impl BatchSemaphore {
/// If there aren't enough permits, returns `Err(TryAcquireError::NoPermits)`
pub fn try_acquire(&self, num_permits: usize) -> Result<(), TryAcquireError> {
let mut state = self.state.borrow_mut();
state.acquire_permits(num_permits).map_err(|err| {
let res = state.acquire_permits(num_permits, self.fairness).map_err(|err| {
// Conservatively, the requester causally depends on the
// last successful acquire.
// TODO: This is not precise, but `try_acquire` causal dependency
Expand All @@ -343,7 +342,43 @@ impl BatchSemaphore {
s.update_clock(&state.permits_available.last_acquire);
});
err
})
});
drop(state);

// If we won the race for permits of an unfair semaphore, re-block
// other waiting threads that can no longer succeed.
if res.is_ok() {
self.reblock_if_unfair();
}

// We context switch here whether we acquired any permits or not. If
// we have, this is to let other threads fail their `try_acquire`;
// if we have not, we yield so that the current thread can try again
// after other threads have worked.
thread::switch();

res
}

/// Clean-up method used when a thread succeeds in acquiring permits. If
/// the semaphore is unfair, a preceding `release` may have unblocked a
/// number of threads, some of which may no longer be able to succeed with
/// the permits remaining in the semaphore.
fn reblock_if_unfair(&self) {
if self.fairness == Fairness::Unfair {
let state = self.state.borrow_mut();
ExecutionState::with(|s| {
for waiter in &state.waiters {
let available = state.permits_available.available();
if available < waiter.num_permits {
// Block this waiter: it cannot succeed (there are not
// enough permits available); its `poll` would return
// without resolving.
s.get_mut(waiter.task_id).block(false);
}
}
});
}
}

fn enqueue_waiter(&self, waiter: &Arc<Waiter>) {
Expand Down Expand Up @@ -391,12 +426,24 @@ impl BatchSemaphore {
return;
}

let mut state = self.state.borrow_mut();

if ExecutionState::should_stop() {
// In case we are panicking, we release permits, but also clear
// the waiters queue: we should not unblock the threads at this
// point. However, the permits are released such that future
// acquires may succeed, as long as the requesters were not
// blocking on the semaphore at the time of the panic. This is
// used to correctly model lock poisoning.
state.permits_available.release(num_permits, VectorClock::new());
for waiter in &state.waiters {
waiter.is_queued.swap(false, Ordering::SeqCst);
}
state.waiters.clear();
state.closed = true;
return;
}

let mut state = self.state.borrow_mut();

// Permits released into the semaphore reflect the releasing thread's
// clock; future acquires of those permits are causally dependent on
// this event.
Expand All @@ -408,7 +455,7 @@ impl BatchSemaphore {
let me = ExecutionState::me();
trace!(task = ?me, avail = ?state.permits_available, waiters = ?state.waiters, "released {} permits for semaphore {:p}", num_permits, &self.state);

match state.fairness {
match self.fairness {
Fairness::StrictlyFair => {
// in a strictly fair mode we will always pick the first waiter
// in the queue, as long as there are enough permits available
Expand Down Expand Up @@ -469,7 +516,7 @@ impl BatchSemaphore {
drop(state);

// Releasing a semaphore is a yield point
crate::runtime::thread::switch();
thread::switch();
}
}

Expand Down Expand Up @@ -550,7 +597,7 @@ impl Future for Acquire<'_> {
// would always fail, and the waiter should remain suspended until
// the semaphore has explicitly unblocked it and given it permits
// during a `release` call.
let try_to_acquire = match (self.semaphore.state.borrow().fairness, is_queued) {
let try_to_acquire = match (self.semaphore.fairness, is_queued) {
// written this way to mirror the cases described above
(Fairness::Unfair, false) | (Fairness::StrictlyFair, false) | (Fairness::Unfair, true) => true,
(Fairness::StrictlyFair, true) => false,
Expand All @@ -561,7 +608,7 @@ impl Future for Acquire<'_> {
// because in case of `NoPermits`, we do not want to update the
// clock, as this thread will be blocked below.
let mut state = self.semaphore.state.borrow_mut();
let acquire_result = state.acquire_permits(self.waiter.num_permits);
let acquire_result = state.acquire_permits(self.waiter.num_permits, self.semaphore.fairness);
drop(state);

match acquire_result {
Expand All @@ -572,7 +619,13 @@ impl Future for Acquire<'_> {
self.waiter.has_permits.store(true, Ordering::SeqCst);
self.completed = true;
trace!("Acquire::poll for waiter {:?} that got permits", self.waiter);
crate::runtime::thread::switch();

// If the semaphore is unfair, re-block other waiting
// threads that can no longer succeed.
self.semaphore.reblock_if_unfair();

// Yield so other threads can fail a `try_acquire`.
thread::switch();
Poll::Ready(Ok(()))
}
Err(TryAcquireError::NoPermits) => {
Expand Down
2 changes: 0 additions & 2 deletions src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,6 @@ pub fn block_on<F: Future>(future: F) -> F::Output {
let waker = ExecutionState::with(|state| state.current_mut().waker());
let cx = &mut Context::from_waker(&waker);

thread::switch();

loop {
match future.as_mut().poll(cx) {
Poll::Ready(result) => break result,
Expand Down
Loading

0 comments on commit 4281c33

Please sign in to comment.