diff --git a/Cargo.toml b/Cargo.toml index 329e7f5..3831b59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ serde_json = { version = "1.0", optional = true } criterion = { version = "0.4.0", features = ["html_reports"] } futures = "0.3.15" proptest = "1.0.0" +proptest-derive = "0.5.0" regex = "1.5.5" tempfile = "3.2.0" test-log = { version = "0.2.8", default-features = false, features = ["trace"] } diff --git a/src/future/batch_semaphore.rs b/src/future/batch_semaphore.rs index 2078b08..947e6f7 100644 --- a/src/future/batch_semaphore.rs +++ b/src/future/batch_semaphore.rs @@ -238,6 +238,51 @@ impl BatchSemaphoreState { Err(TryAcquireError::NoPermits) } } + + fn unblock_waiters_from_front(&mut self) { + // in a strictly fair mode we will always pick the first waiter + // in the queue, as long as there are enough permits available + while let Some(front) = self.waiters.front() { + if front.num_permits <= self.permits_available.available() { + let waiter = self.waiters.pop_front().unwrap(); + + crate::annotations::record_semaphore_acquire_unblocked( + self.id.unwrap(), + waiter.task_id, + waiter.num_permits, + ); + + // The clock we pass into the semaphore is the clock of the + // waiter, corresponding to the point at which the waiter was + // enqueued. The clock we get in return corresponds to the + // join of the clocks of the acquired permits, used to update + // the waiter's clock to causally depend on the release events. + let clock = self + .permits_available + .acquire(waiter.num_permits, waiter.clock.clone()) + .unwrap(); + trace!("granted {:?} permits to waiter {:?}", waiter.num_permits, waiter); + + // Update waiter state as it is no longer in the queue + assert!(waiter.is_queued.swap(false, Ordering::SeqCst)); + assert!(!waiter.has_permits.swap(true, Ordering::SeqCst)); + ExecutionState::with(|s| { + let task = s.get_mut(waiter.task_id); + assert!(!task.finished()); + // The acquiry is causally dependent on the event + // which released the acquired permits. + task.clock.update(&clock); + task.unblock(); + }); + let mut maybe_waker = waiter.waker.lock().unwrap(); + if let Some(waker) = maybe_waker.take() { + waker.wake(); + } + } else { + return; + } + } + } } /// Counting semaphore @@ -447,6 +492,15 @@ impl BatchSemaphore { state.waiters.remove(index).unwrap(); assert!(waiter.is_queued.swap(false, Ordering::SeqCst)); + + if matches!(self.fairness, Fairness::StrictlyFair) && index == 0 { + // If the semaphore is strictly fair, and we removed the first waiter, check if its + // removal unblocks remaining waiters. This can happen in the following situation: + // - the semahore has 2 permits available + // - there are 3 waiters W1, W2, W3 where W1 wants 3 permits, and W2,W3 want 1 permit each + // - if W3 is removed (because it drops out), we want to ensure W2,W3 are granted the semaphore + state.unblock_waiters_from_front(); + } } /// Acquire the specified number of permits (async API) @@ -501,48 +555,9 @@ impl BatchSemaphore { match self.fairness { Fairness::StrictlyFair => { - // in a strictly fair mode we will always pick the first waiter - // in the queue, as long as there are enough permits available - while let Some(front) = state.waiters.front() { - if front.num_permits <= state.permits_available.available() { - let waiter = state.waiters.pop_front().unwrap(); - - crate::annotations::record_semaphore_acquire_unblocked( - state.id.unwrap(), - waiter.task_id, - waiter.num_permits, - ); - - // The clock we pass into the semaphore is the clock of the - // waiter, corresponding to the point at which the waiter was - // enqueued. The clock we get in return corresponds to the - // join of the clocks of the acquired permits, used to update - // the waiter's clock to causally depend on the release events. - let clock = state - .permits_available - .acquire(waiter.num_permits, waiter.clock.clone()) - .unwrap(); - trace!("granted {:?} permits to waiter {:?}", waiter.num_permits, waiter); - - // Update waiter state as it is no longer in the queue - assert!(waiter.is_queued.swap(false, Ordering::SeqCst)); - assert!(!waiter.has_permits.swap(true, Ordering::SeqCst)); - ExecutionState::with(|s| { - let task = s.get_mut(waiter.task_id); - assert!(!task.finished()); - // The acquiry is causally dependent on the event - // which released the acquired permits. - task.clock.update(&clock); - task.unblock(); - }); - let mut maybe_waker = waiter.waker.lock().unwrap(); - if let Some(waker) = maybe_waker.take() { - waker.wake(); - } - } else { - break; - } - } + // in a strictly fair mode we will grant permits to waiters from the front + // of the queue, as long as there are enough permits available + state.unblock_waiters_from_front(); } Fairness::Unfair => { // in an unfair mode, we will unblock all the waiters for which diff --git a/tests/future/batch_semaphore.rs b/tests/future/batch_semaphore.rs index 0bc7c65..c9b1bec 100644 --- a/tests/future/batch_semaphore.rs +++ b/tests/future/batch_semaphore.rs @@ -512,21 +512,23 @@ fn bugged_cleanup_would_cause_deadlock() { ) } -// This test exercises the following scenario where SEM is a BatchSemaphore with N permits. -// 1. Initially the semaphore has 0 permits -// 2. Task T1 tries to acquire 1 permit, gets added as the first Waiter -// 3. Task T2 tries to acquire 1 permit, gets added as the second Waiter -// 4. Task T0 releases 1 permit -// 5. Task T1 drops its Acquire handle without calling poll() +// This test exercises scenarios to ensure that the BatchSemaphore behaves correctly in the presence +// of tasks that drop an `Acquire` guard without waiting for the semaphore to become available. // -// At this point, T2 should be woken up and should get the permit. -// Unfortunately, in an earlier version of BatchSemaphore, this was not happening because -// the Drop handler for Acquire was only returning permits to the semaphore, but not -// waking up the next waiter in the queue. +// The general idea is that there are 3 types of tasks: `EarlyDrop`, `Hold` and `Release` tasks +// (determined by the `Behavior` enum). +// S1. The semaphore initially has 0 permits. +// S2. The main task spawns a set of tasks, specifying the behavior of each, and the number of permits it should request +// S3. Each task polls the semaphore once when it is created, in order to get added as a Waiter. +// S4. The main task releases N semaphores which are sufficient to ensure that all tasks complete (see below) +// S5. Each task then proceeds according to its defined `behavior`: +// `EarlyDrop` tasks drop their Acquire guards (and are removed from the waiters queue) +// `Hold` tasks wait to acquire their permits, and then terminate (without releasing any permits) +// `Release` tasks wait to acquire their permits, and then release their permits and terminate // -// The tests below exercise both the specific scenario above (with 1 permit and two tasks), and more -// general scenarios involving multiple tasks, of which a random subset drop the Acquire guards early. -mod early_acquire_drop_test { +// The value of N is computed as +// (sum of permits requested by the Hold tasks) + (max over the permits requested by the Release and EarlyDrop tasks) +mod early_acquire_drop_tests { use super::*; use futures::{ future::join_all, @@ -534,30 +536,39 @@ mod early_acquire_drop_test { Future, }; use pin_project::pin_project; - use proptest::proptest; + use proptest::prelude::*; + use proptest_derive::Arbitrary; use shuttle::{ check_random, sync::mpsc::{channel, Sender}, }; use std::pin::Pin; - use test_log::test; + + #[derive(Arbitrary, Clone, Copy, Debug)] + enum Behavior { + EarlyDrop, // Task drops before future completes + Release, // Task releases permits it acquires + Hold, // Task holds permits it acquires + } #[pin_project] struct Task { - poll_count: usize, // how many times the Future has been polled - early_drop: bool, // whether to drop the Acquire handle after polling it once - tx: Sender, // channel for informing the main task + poll_count: usize, // how many times the Future has been polled + behavior: Behavior, // how this task should behave + requested_permits: usize, // how many permits this Task requests + tx: Sender, // channel for informing the main task that this task is added as a Waiter #[pin] acquire: Acquire<'static>, } impl Task { - fn new(early_drop: bool, tx: Sender, sem: &'static BatchSemaphore) -> Self { + fn new(behavior: Behavior, requested_permits: usize, tx: Sender, sem: &'static BatchSemaphore) -> Self { Self { poll_count: 0, - early_drop, + behavior, + requested_permits, tx, - acquire: sem.acquire(1), + acquire: sem.acquire(requested_permits), } } } @@ -575,52 +586,43 @@ mod early_acquire_drop_test { this.tx.send(cx.waker().clone()).unwrap(); // Notify main task *this.poll_count += 1; Poll::Pending - } else if *this.early_drop { + } else if matches!(*this.behavior, Behavior::EarlyDrop) { // Since this is an early drop, we got 0 permits Poll::Ready(0) } else { // If not early dropping, wait until the inner Acquire handle successfully gets - // a permit. When successful, return 1 permit. - this.acquire.as_mut().poll(cx).map(|_| 1) + // a permit. When successful, return the number of permits acquired. + this.acquire.as_mut().poll(cx).map(|_| *this.requested_permits) } } } - // High-level sketch of the test: - // S1. Initialize the semaphore with no permits - // S2. Spawn a set of tasks, each randomly decides whether or not to drop early - // Each task creates an Acquire handle and polls it once (to get into the waiter queue) - // The task then notifies the main task (by sending a message on an mpsc channel) - // S3. The main task waits for messages from all the spawned tasks (so it knows each is a waiter) - // S4. The main task releases N permits on the BatchSemaphore, and wakes up all the tasks - // S5. At this point, each task either drops its Acquire handle, or tries to acquire the BatchSemaphore - // by polling it until it acquires a permit. - fn dropped_acquire_must_release(num_permits: usize, early_drop: Vec) { - shuttle::lazy_static! { - // S1. Initialize the semaphore with no permits - static ref SEM: BatchSemaphore = BatchSemaphore::new(0, Fairness::StrictlyFair); - } - + fn dropped_acquire_must_release(sem: &'static BatchSemaphore, task_config: Vec<(Behavior, usize)>) { future::block_on(async move { let mut wakers = vec![]; let mut handles = vec![]; - // S2. Main task spawns a set of tasks; the `early_drop` vector of booleans determines - // which tasks will drop the `Acquire` after polling it exactly once - for early_drop in early_drop { + let mut total_held = 0usize; + let mut max_requested = 0usize; + + for (behavior, requested_permits) in task_config { let (tx, rx) = channel(); - let task: Task = Task::new(early_drop, tx, &SEM); + match behavior { + Behavior::Hold => total_held += requested_permits, + _ => max_requested = std::cmp::max(max_requested, requested_permits), + } handles.push(future::spawn(async move { + let task: Task = Task::new(behavior, requested_permits, tx, sem); let p = task.await; // Note: tasks doing an early drop will return p=0, and release(0) is a no-op - SEM.release(p); + if matches!(behavior, Behavior::Release) { + sem.release(p); + } })); - // S3. Main task waits for message from spawned task indicating it has polled once wakers.push(rx.recv().unwrap()); } - // S4. Main task releases N permits and wakes up all tasks - SEM.release(num_permits); + sem.release(total_held + max_requested); for w in wakers.into_iter() { w.wake(); } @@ -629,26 +631,55 @@ mod early_acquire_drop_test { }); } - // The minimal test case (generated by the proptest below) is with 1 permit and 2 tasks, where Task1 does - // an early drop, and Task2 does not early drop. This test checks that scenario exhaustively using check_dfs. - #[test] - fn dropped_acquire_must_release_exhaustive() { - check_dfs(|| dropped_acquire_must_release(1, vec![true, false]), None); - } + macro_rules! sem_tests { + ($mod_name:ident, $fairness:expr) => { + mod $mod_name { + use super::*; - // This test checks scenarios where the main task releases multiple permits and there are several tasks, any - // subset of which may do an early drop of their Acquire handle. + #[test_log::test] + fn dropped_acquire_must_release_exhaustive() { + shuttle::lazy_static! { + static ref SEM: BatchSemaphore = BatchSemaphore::new(0, $fairness); + } + check_dfs( + || dropped_acquire_must_release(&SEM, vec![(Behavior::EarlyDrop, 1), (Behavior::Release, 1)]), + None, + ); + } - const MAX_PERMITS: usize = 8; - const MAX_TASKS: usize = 7; + #[test_log::test] + fn dropped_acquire_must_release_deadlock() { + shuttle::lazy_static! { + static ref SEM: BatchSemaphore = BatchSemaphore::new(0, $fairness); + } + check_dfs( + || dropped_acquire_must_release(&SEM, vec![(Behavior::Hold, 1), (Behavior::EarlyDrop, 2), (Behavior::Release, 1)]), + None, + ); + } - proptest! { - #[test] - fn dropped_acquire_must_release_random(num_permits in 1..MAX_PERMITS, early_drop in proptest::collection::vec(proptest::arbitrary::any::(), 1..MAX_TASKS)) { - check_random( - move || dropped_acquire_must_release(num_permits, early_drop.clone()), - 10_000, - ); + const MAX_REQUESTED_PERMITS: usize = 3; + const MAX_TASKS: usize = 7; + + proptest! { + #[test_log::test] + fn dropped_acquire_must_release_random(behavior in proptest::collection::vec((proptest::arbitrary::any::(), 1..=MAX_REQUESTED_PERMITS), 1..=MAX_TASKS)) { + check_random( + move || { + shuttle::lazy_static! { + static ref SEM: BatchSemaphore = BatchSemaphore::new(0, $fairness); + } + dropped_acquire_must_release(&SEM, behavior.clone()) + }, + 10_000, + ); + } + } + } } } + + sem_tests!(unfair, Fairness::Unfair); + + sem_tests!(fair, Fairness::StrictlyFair); }