Skip to content

Commit

Permalink
add unfair mode to batch semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
Aurel300 committed Jul 23, 2024
1 parent ca5dec7 commit 2b9cc6a
Show file tree
Hide file tree
Showing 2 changed files with 354 additions and 159 deletions.
217 changes: 150 additions & 67 deletions src/future/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,21 @@ impl PermitsAvailable {
}
}

/// Fairness mode for the semaphore. Determines which threads are woken when
/// permits are released.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Fairness {
/// The semaphore is strictly fair, so earlier requesters always get
/// priority over later ones.
StrictlyFair,

/// The semaphore makes no guarantees about fairness. In particular,
/// a waiter can be starved by other threads.
Unfair,
}

/// A counting semaphore which permits waiting on multiple permits at once,
/// and supports both asychronous and synchronous blocking operations.
///
/// The semaphore is strictly fair, so earlier requesters always get priority
/// over later ones.
///
/// TODO: Provide an option to support weaker models for fairness.
#[derive(Debug)]
struct BatchSemaphoreState {
// Key invariants:
Expand All @@ -176,6 +184,7 @@ struct BatchSemaphoreState {
// (4) closed ==> waiters.is_empty()
waiters: VecDeque<Arc<Waiter>>,
permits_available: PermitsAvailable,
fairness: Fairness,
// TODO: should there be a clock for the close event?
closed: bool,
}
Expand All @@ -185,8 +194,15 @@ impl BatchSemaphoreState {
assert!(num_permits > 0);
if self.closed {
Err(TryAcquireError::Closed)
} else if self.waiters.is_empty() {
// No one is waiting: try to acquire permits
} else if self.waiters.is_empty() || matches!(self.fairness, Fairness::Unfair) {
// Permits here can be acquired in one of two scenarios:
// - The waiter queue is empty; nobody else is waiting for permits,
// so if there are enough available, immediately succeed.
// - The semaphore is operating in an unfair mode; the current
// thread is either requesting permits for the first time, or it
// was woken and selected by the scheduler. In either case, the
// thread may succeed, as long as there are enough permits.

let clock = self.permits_available.acquire(num_permits, current::clock())?;

// If successful, the acquiry is causally dependent on the event
Expand Down Expand Up @@ -241,20 +257,22 @@ impl std::error::Error for AcquireError {}

impl BatchSemaphore {
/// Creates a new semaphore with the initial number of permits.
pub fn new(num_permits: usize) -> Self {
pub fn new(num_permits: usize, fairness: Fairness) -> Self {
let state = RefCell::new(BatchSemaphoreState {
waiters: VecDeque::new(),
permits_available: PermitsAvailable::new(num_permits),
fairness,
closed: false,
});
Self { state }
}

/// Creates a new semaphore with the initial number of permits.
pub const fn const_new(num_permits: usize) -> Self {
pub const fn const_new(num_permits: usize, fairness: Fairness) -> Self {
let state = RefCell::new(BatchSemaphoreState {
waiters: VecDeque::new(),
permits_available: PermitsAvailable::const_new(num_permits),
fairness,
closed: false,
});
Self { state }
Expand Down Expand Up @@ -390,38 +408,62 @@ impl BatchSemaphore {
let me = ExecutionState::me();
trace!(task = ?me, avail = ?state.permits_available, waiters = ?state.waiters, "released {} permits for semaphore {:p}", num_permits, &self.state);

while let Some(front) = state.waiters.front() {
if front.num_permits <= state.permits_available.available() {
let waiter = state.waiters.pop_front().unwrap();

// The clock we pass into the semaphore is the clock of the
// waiter, corresponding to the point at which the waiter was
// enqueued. The clock we get in return corresponds to the
// join of the clocks of the acquired permits, used to update
// the waiter's clock to causally depend on the release events.
let clock = state
.permits_available
.acquire(waiter.num_permits, waiter.clock.clone())
.unwrap();
trace!("granted {:?} permits to waiter {:?}", waiter.num_permits, waiter);

// Update waiter state as it is no longer in the queue
assert!(waiter.is_queued.swap(false, Ordering::SeqCst));
assert!(!waiter.has_permits.swap(true, Ordering::SeqCst));
ExecutionState::with(|s| {
let task = s.get_mut(waiter.task_id);
assert!(!task.finished());
// The acquiry is causally dependent on the event
// which released the acquired permits.
task.clock.update(&clock);
task.unblock();
});
let mut maybe_waker = waiter.waker.lock().unwrap();
if let Some(waker) = maybe_waker.take() {
waker.wake();
match state.fairness {
Fairness::StrictlyFair => {
// in a strictly fair mode we will always pick the first waiter
// in the queue, as long as there are enough permits available
while let Some(front) = state.waiters.front() {
if front.num_permits <= state.permits_available.available() {
let waiter = state.waiters.pop_front().unwrap();

// The clock we pass into the semaphore is the clock of the
// waiter, corresponding to the point at which the waiter was
// enqueued. The clock we get in return corresponds to the
// join of the clocks of the acquired permits, used to update
// the waiter's clock to causally depend on the release events.
let clock = state
.permits_available
.acquire(waiter.num_permits, waiter.clock.clone())
.unwrap();
trace!("granted {:?} permits to waiter {:?}", waiter.num_permits, waiter);

// Update waiter state as it is no longer in the queue
assert!(waiter.is_queued.swap(false, Ordering::SeqCst));
assert!(!waiter.has_permits.swap(true, Ordering::SeqCst));
ExecutionState::with(|s| {
let task = s.get_mut(waiter.task_id);
assert!(!task.finished());
// The acquiry is causally dependent on the event
// which released the acquired permits.
task.clock.update(&clock);
task.unblock();
});
let mut maybe_waker = waiter.waker.lock().unwrap();
if let Some(waker) = maybe_waker.take() {
waker.wake();
}
} else {
break;
}
}
}
Fairness::Unfair => {
// in an unfair mode, we will unblock all the waiters for which
// there are enough permits available, then let them race
let num_available = state.permits_available.available();
for waiter in &mut state.waiters {
if waiter.num_permits <= num_available {
ExecutionState::with(|s| {
let task = s.get_mut(waiter.task_id);
assert!(!task.finished());
task.unblock();
});
let maybe_waker = waiter.waker.lock().unwrap();
if let Some(waker) = maybe_waker.as_ref() {
waker.wake_by_ref();
}
}
}
} else {
break;
}
}
drop(state);
Expand All @@ -440,7 +482,7 @@ unsafe impl Sync for BatchSemaphore {}

impl Default for BatchSemaphore {
fn default() -> Self {
Self::new(Default::default())
Self::new(Default::default(), Fairness::StrictlyFair)
}
}

Expand Down Expand Up @@ -479,34 +521,75 @@ impl Future for Acquire<'_> {
self.completed = true;
trace!("Acquire::poll for waiter {:?} with closed", self.waiter);
Poll::Ready(Err(AcquireError::closed()))
} else if self.waiter.is_queued.load(Ordering::SeqCst) {
trace!("Acquire::poll for waiter {:?} already queued", self.waiter);
assert!(self.waiter.waker.lock().unwrap().is_some());
Poll::Pending
} else {
// We access the semaphore state directly instead of using the
// public `try_acquire`, because in case of `NoPermits`, we do not
// want to update the clock, as this thread will be blocked below.
let mut state = self.semaphore.state.borrow_mut();
let acquire_result = state.acquire_permits(self.waiter.num_permits);
drop(state);
match acquire_result {
Ok(()) => {
assert!(!self.waiter.is_queued.load(Ordering::SeqCst));
self.waiter.has_permits.store(true, Ordering::SeqCst);
self.completed = true;
trace!("Acquire::poll for waiter {:?} that got permits", self.waiter);
crate::runtime::thread::switch();
Poll::Ready(Ok(()))
let is_queued = self.waiter.is_queued.load(Ordering::SeqCst);
trace!("Acquire::poll for waiter {:?}; is queued: {is_queued:?}", self.waiter);

// Sanity check: there should be a waker if the waiter is in
// the queue. Also true for unfair semaphores, which wake by ref.
assert_eq!(is_queued, self.waiter.waker.lock().unwrap().is_some());

// Should the waiter try to acquire permits here? Four cases:
// 1. unfair semaphore, waiter not yet enqueued;
// 2. fair semaphore, waiter not yet enqueued;
// 3. unfair semaphore, waiter already enqueued.
// 4. fair semaphore, waiter already enqueued;
//
// 1. and 2. are similar: the future was polled for the first time,
// so the waiter will try to acquire some permits. If successful,
// the waiter need not be enqueued, and the future is resolved.
// Otherwise, the waiter is added to the queue.
//
// 3. is slightly different: the future was polled, even though the
// waiter was already in the queue. This can happen either because
// the semaphore just received some permits and woke the waiter up,
// or because the future itself was polled manually. Either way,
// the semaphore is queried.
//
// 4. is a case where we do not try to acquire permits. The request
// would always fail, and the waiter should remain suspended until
// the semaphore has explicitly unblocked it and given it permits
// during a `release` call.
let try_to_acquire = match (self.semaphore.state.borrow().fairness, is_queued) {
// written this way to mirror the cases described above
(Fairness::Unfair, false) | (Fairness::StrictlyFair, false) | (Fairness::Unfair, true) => true,
(Fairness::StrictlyFair, true) => false,
};

if try_to_acquire {
// Access the semaphore state directly instead of `try_acquire`,
// because in case of `NoPermits`, we do not want to update the
// clock, as this thread will be blocked below.
let mut state = self.semaphore.state.borrow_mut();
let acquire_result = state.acquire_permits(self.waiter.num_permits);
drop(state);

match acquire_result {
Ok(()) => {
if is_queued {
self.semaphore.remove_waiter(&self.waiter);
}
self.waiter.has_permits.store(true, Ordering::SeqCst);
self.completed = true;
trace!("Acquire::poll for waiter {:?} that got permits", self.waiter);
crate::runtime::thread::switch();
Poll::Ready(Ok(()))
}
Err(TryAcquireError::NoPermits) => {
let mut maybe_waker = self.waiter.waker.lock().unwrap();
*maybe_waker = Some(cx.waker().clone());
if !is_queued {
self.semaphore.enqueue_waiter(&self.waiter);
self.waiter.is_queued.store(true, Ordering::SeqCst);
}
trace!("Acquire::poll for waiter {:?} that is enqueued", self.waiter);
Poll::Pending
}
Err(TryAcquireError::Closed) => unreachable!(),
}
Err(TryAcquireError::NoPermits) => {
let mut maybe_waker = self.waiter.waker.lock().unwrap();
*maybe_waker = Some(cx.waker().clone());
self.semaphore.enqueue_waiter(&self.waiter);
trace!("Acquire::poll for waiter {:?} that is enqueued", self.waiter);
Poll::Pending
}
Err(TryAcquireError::Closed) => unreachable!(),
} else {
// No progress made, future is still pending.
Poll::Pending
}
}
}
Expand Down
Loading

0 comments on commit 2b9cc6a

Please sign in to comment.