Skip to content

Commit

Permalink
Fixes a bug in the implementation of BatchSemaphore that caused deadl…
Browse files Browse the repository at this point in the history
…ocks in the presence of tasks that gave up waiting for requested permits, under the policy of strict fairness.

---------

Co-authored-by: Sarek Høverstad Skotåm <[email protected]>
  • Loading branch information
2 people authored and Rajeev Joshi committed Dec 5, 2024
1 parent 4915ff6 commit 0a4f0bb
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 106 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ serde_json = { version = "1.0", optional = true }
criterion = { version = "0.4.0", features = ["html_reports"] }
futures = "0.3.15"
proptest = "1.0.0"
proptest-derive = "0.5.0"
regex = "1.5.5"
tempfile = "3.2.0"
test-log = { version = "0.2.8", default-features = false, features = ["trace"] }
Expand Down
102 changes: 60 additions & 42 deletions src/future/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,49 @@ impl BatchSemaphoreState {
Err(TryAcquireError::NoPermits)
}
}

fn unblock_waiters_from_front(&mut self) {
while let Some(front) = self.waiters.front() {
if front.num_permits <= self.permits_available.available() {
let waiter = self.waiters.pop_front().unwrap();

crate::annotations::record_semaphore_acquire_unblocked(
self.id.unwrap(),
waiter.task_id,
waiter.num_permits,
);

// The clock we pass into the semaphore is the clock of the
// waiter, corresponding to the point at which the waiter was
// enqueued. The clock we get in return corresponds to the
// join of the clocks of the acquired permits, used to update
// the waiter's clock to causally depend on the release events.
let clock = self
.permits_available
.acquire(waiter.num_permits, waiter.clock.clone())
.unwrap();
trace!("granted {:?} permits to waiter {:?}", waiter.num_permits, waiter);

// Update waiter state as it is no longer in the queue
assert!(waiter.is_queued.swap(false, Ordering::SeqCst));
assert!(!waiter.has_permits.swap(true, Ordering::SeqCst));
ExecutionState::with(|s| {
let task = s.get_mut(waiter.task_id);
assert!(!task.finished());
// The acquiry is causally dependent on the event
// which released the acquired permits.
task.clock.update(&clock);
task.unblock();
});
let mut maybe_waker = waiter.waker.lock().unwrap();
if let Some(waker) = maybe_waker.take() {
waker.wake();
}
} else {
return;
}
}
}
}

/// Counting semaphore
Expand Down Expand Up @@ -447,6 +490,20 @@ impl BatchSemaphore {

state.waiters.remove(index).unwrap();
assert!(waiter.is_queued.swap(false, Ordering::SeqCst));

match self.fairness {
Fairness::StrictlyFair => {
if index == 0 {
// If the semaphore is strictly fair, and we removed the first waiter, check if its
// removal unblocks remaining waiters. This can happen in the following situation:
// - the semahore has 1 permit available
// - there are 2 waiters W1 and W2 where W1 wants 2 permits, and W2 wants 1 permit
// - if W1 gives up and drops out, we want to ensure W2 is granted the semaphore
state.unblock_waiters_from_front();
}
}
Fairness::Unfair => {}
}
}

/// Acquire the specified number of permits (async API)
Expand Down Expand Up @@ -501,48 +558,9 @@ impl BatchSemaphore {

match self.fairness {
Fairness::StrictlyFair => {
// in a strictly fair mode we will always pick the first waiter
// in the queue, as long as there are enough permits available
while let Some(front) = state.waiters.front() {
if front.num_permits <= state.permits_available.available() {
let waiter = state.waiters.pop_front().unwrap();

crate::annotations::record_semaphore_acquire_unblocked(
state.id.unwrap(),
waiter.task_id,
waiter.num_permits,
);

// The clock we pass into the semaphore is the clock of the
// waiter, corresponding to the point at which the waiter was
// enqueued. The clock we get in return corresponds to the
// join of the clocks of the acquired permits, used to update
// the waiter's clock to causally depend on the release events.
let clock = state
.permits_available
.acquire(waiter.num_permits, waiter.clock.clone())
.unwrap();
trace!("granted {:?} permits to waiter {:?}", waiter.num_permits, waiter);

// Update waiter state as it is no longer in the queue
assert!(waiter.is_queued.swap(false, Ordering::SeqCst));
assert!(!waiter.has_permits.swap(true, Ordering::SeqCst));
ExecutionState::with(|s| {
let task = s.get_mut(waiter.task_id);
assert!(!task.finished());
// The acquiry is causally dependent on the event
// which released the acquired permits.
task.clock.update(&clock);
task.unblock();
});
let mut maybe_waker = waiter.waker.lock().unwrap();
if let Some(waker) = maybe_waker.take() {
waker.wake();
}
} else {
break;
}
}
// in a strictly fair mode we will grant permits to waiters from the front
// of the queue, as long as there are enough permits available
state.unblock_waiters_from_front();
}
Fairness::Unfair => {
// in an unfair mode, we will unblock all the waiters for which
Expand Down
159 changes: 95 additions & 64 deletions tests/future/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,52 +512,63 @@ fn bugged_cleanup_would_cause_deadlock() {
)
}

// This test exercises the following scenario where SEM is a BatchSemaphore with N permits.
// 1. Initially the semaphore has 0 permits
// 2. Task T1 tries to acquire 1 permit, gets added as the first Waiter
// 3. Task T2 tries to acquire 1 permit, gets added as the second Waiter
// 4. Task T0 releases 1 permit
// 5. Task T1 drops its Acquire handle without calling poll()
// This test exercises scenarios to ensure that the BatchSemaphore behaves correctly in the presence
// of tasks that drop an `Acquire` guard without waiting for the semaphore to become available.
//
// At this point, T2 should be woken up and should get the permit.
// Unfortunately, in an earlier version of BatchSemaphore, this was not happening because
// the Drop handler for Acquire was only returning permits to the semaphore, but not
// waking up the next waiter in the queue.
// The general idea is that there are 3 types of tasks: `EarlyDrop`, `Hold` and `Release` tasks
// (determined by the `Behavior` enum).
// 1. The semaphore initially has 0 permits.
// 2. The main task spawns a set of tasks, specifying the behavior and the number of permits each task should request
// 3. Each task polls the semaphore once when it is created, in order to get added as a Waiter.
// 4. The main task releases N semaphores, which are sufficient to ensure that all tasks complete (see below)
// 5. Each task then proceeds according to its defined `behavior`:
// `EarlyDrop` tasks drop their Acquire guards (and are removed from the waiters queue)
// `Hold` tasks wait to acquire their permits, and then terminate (without releasing any permits)
// `Release` tasks wait to acquire their permits, and then release their permits and terminate
//
// The tests below exercise both the specific scenario above (with 1 permit and two tasks), and more
// general scenarios involving multiple tasks, of which a random subset drop the Acquire guards early.
mod early_acquire_drop_test {
// The value of N is computed as
// (sum of permits requested by the Hold tasks) + (max over the permits requested by the Release and EarlyDrop tasks)
mod early_acquire_drop_tests {
use super::*;
use futures::{
future::join_all,
task::{Context, Poll, Waker},
Future,
};
use pin_project::pin_project;
use proptest::proptest;
use proptest::prelude::*;
use proptest_derive::Arbitrary;
use shuttle::{
check_random,
sync::mpsc::{channel, Sender},
};
use std::pin::Pin;
use test_log::test;

#[derive(Arbitrary, Clone, Copy, Debug)]
enum Behavior {
EarlyDrop, // Task drops before future completes
Release, // Task releases permits it acquires
Hold, // Task holds permits it acquires
}

#[pin_project]
struct Task {
poll_count: usize, // how many times the Future has been polled
early_drop: bool, // whether to drop the Acquire handle after polling it once
tx: Sender<Waker>, // channel for informing the main task
poll_count: usize, // how many times the Future has been polled
behavior: Behavior, // how this task should behave
requested_permits: usize, // how many permits this Task requests
tx: Sender<Waker>, // channel for informing the main task that this task is added as a Waiter
#[pin]
acquire: Acquire<'static>,
}

impl Task {
fn new(early_drop: bool, tx: Sender<Waker>, sem: &'static BatchSemaphore) -> Self {
fn new(behavior: Behavior, requested_permits: usize, tx: Sender<Waker>, sem: &'static BatchSemaphore) -> Self {
Self {
poll_count: 0,
early_drop,
behavior,
requested_permits,
tx,
acquire: sem.acquire(1),
acquire: sem.acquire(requested_permits),
}
}
}
Expand All @@ -575,52 +586,43 @@ mod early_acquire_drop_test {
this.tx.send(cx.waker().clone()).unwrap(); // Notify main task
*this.poll_count += 1;
Poll::Pending
} else if *this.early_drop {
} else if matches!(*this.behavior, Behavior::EarlyDrop) {
// Since this is an early drop, we got 0 permits
Poll::Ready(0)
} else {
// If not early dropping, wait until the inner Acquire handle successfully gets
// a permit. When successful, return 1 permit.
this.acquire.as_mut().poll(cx).map(|_| 1)
// a permit. When successful, return the number of permits acquired.
this.acquire.as_mut().poll(cx).map(|_| *this.requested_permits)
}
}
}

// High-level sketch of the test:
// S1. Initialize the semaphore with no permits
// S2. Spawn a set of tasks, each randomly decides whether or not to drop early
// Each task creates an Acquire handle and polls it once (to get into the waiter queue)
// The task then notifies the main task (by sending a message on an mpsc channel)
// S3. The main task waits for messages from all the spawned tasks (so it knows each is a waiter)
// S4. The main task releases N permits on the BatchSemaphore, and wakes up all the tasks
// S5. At this point, each task either drops its Acquire handle, or tries to acquire the BatchSemaphore
// by polling it until it acquires a permit.
fn dropped_acquire_must_release(num_permits: usize, early_drop: Vec<bool>) {
shuttle::lazy_static! {
// S1. Initialize the semaphore with no permits
static ref SEM: BatchSemaphore = BatchSemaphore::new(0, Fairness::StrictlyFair);
}

fn dropped_acquire_must_release(sem: &'static BatchSemaphore, task_config: Vec<(Behavior, usize)>) {
future::block_on(async move {
let mut wakers = vec![];
let mut handles = vec![];

// S2. Main task spawns a set of tasks; the `early_drop` vector of booleans determines
// which tasks will drop the `Acquire` after polling it exactly once
for early_drop in early_drop {
let mut total_held = 0usize;
let mut max_requested = 0usize;

for (behavior, requested_permits) in task_config {
let (tx, rx) = channel();
let task: Task = Task::new(early_drop, tx, &SEM);
match behavior {
Behavior::Hold => total_held += requested_permits,
_ => max_requested = std::cmp::max(max_requested, requested_permits),
}
handles.push(future::spawn(async move {
let task: Task = Task::new(behavior, requested_permits, tx, sem);
let p = task.await;
// Note: tasks doing an early drop will return p=0, and release(0) is a no-op
SEM.release(p);
if matches!(behavior, Behavior::Release) {
sem.release(p);
}
}));
// S3. Main task waits for message from spawned task indicating it has polled once
wakers.push(rx.recv().unwrap());
}

// S4. Main task releases N permits and wakes up all tasks
SEM.release(num_permits);
sem.release(total_held + max_requested);
for w in wakers.into_iter() {
w.wake();
}
Expand All @@ -629,26 +631,55 @@ mod early_acquire_drop_test {
});
}

// The minimal test case (generated by the proptest below) is with 1 permit and 2 tasks, where Task1 does
// an early drop, and Task2 does not early drop. This test checks that scenario exhaustively using check_dfs.
#[test]
fn dropped_acquire_must_release_exhaustive() {
check_dfs(|| dropped_acquire_must_release(1, vec![true, false]), None);
}
macro_rules! sem_tests {
($mod_name:ident, $fairness:expr) => {
mod $mod_name {
use super::*;

// This test checks scenarios where the main task releases multiple permits and there are several tasks, any
// subset of which may do an early drop of their Acquire handle.
#[test_log::test]
fn dropped_acquire_must_release_exhaustive() {
shuttle::lazy_static! {
static ref SEM: BatchSemaphore = BatchSemaphore::new(0, $fairness);
}
check_dfs(
|| dropped_acquire_must_release(&SEM, vec![(Behavior::EarlyDrop, 1), (Behavior::Release, 1)]),
None,
);
}

const MAX_PERMITS: usize = 8;
const MAX_TASKS: usize = 7;
#[test_log::test]
fn dropped_acquire_must_release_deadlock() {
shuttle::lazy_static! {
static ref SEM: BatchSemaphore = BatchSemaphore::new(0, $fairness);
}
check_dfs(
|| dropped_acquire_must_release(&SEM, vec![(Behavior::Hold, 1), (Behavior::EarlyDrop, 2), (Behavior::Release, 1)]),
None,
);
}

proptest! {
#[test]
fn dropped_acquire_must_release_random(num_permits in 1..MAX_PERMITS, early_drop in proptest::collection::vec(proptest::arbitrary::any::<bool>(), 1..MAX_TASKS)) {
check_random(
move || dropped_acquire_must_release(num_permits, early_drop.clone()),
10_000,
);
const MAX_REQUESTED_PERMITS: usize = 3;
const MAX_TASKS: usize = 7;

proptest! {
#[test_log::test]
fn dropped_acquire_must_release_random(behavior in proptest::collection::vec((proptest::arbitrary::any::<Behavior>(), 1..=MAX_REQUESTED_PERMITS), 1..=MAX_TASKS)) {
check_random(
move || {
shuttle::lazy_static! {
static ref SEM: BatchSemaphore = BatchSemaphore::new(0, $fairness);
}
dropped_acquire_must_release(&SEM, behavior.clone())
},
10_000,
);
}
}
}
}
}

sem_tests!(unfair, Fairness::Unfair);

sem_tests!(fair, Fairness::StrictlyFair);
}

0 comments on commit 0a4f0bb

Please sign in to comment.