From 2b9cc6ae1b5c8d003ab632700a56b770bc3741b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aurel=20B=C3=ADl=C3=BD?= Date: Thu, 18 Jul 2024 14:57:25 -0700 Subject: [PATCH] add unfair mode to batch semaphore --- src/future/batch_semaphore.rs | 217 +++++++++++++++-------- tests/future/batch_semaphore.rs | 296 ++++++++++++++++++++++---------- 2 files changed, 354 insertions(+), 159 deletions(-) diff --git a/src/future/batch_semaphore.rs b/src/future/batch_semaphore.rs index 85da0de..5f188d3 100644 --- a/src/future/batch_semaphore.rs +++ b/src/future/batch_semaphore.rs @@ -151,13 +151,21 @@ impl PermitsAvailable { } } +/// Fairness mode for the semaphore. Determines which threads are woken when +/// permits are released. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Fairness { + /// The semaphore is strictly fair, so earlier requesters always get + /// priority over later ones. + StrictlyFair, + + /// The semaphore makes no guarantees about fairness. In particular, + /// a waiter can be starved by other threads. + Unfair, +} + /// A counting semaphore which permits waiting on multiple permits at once, /// and supports both asychronous and synchronous blocking operations. -/// -/// The semaphore is strictly fair, so earlier requesters always get priority -/// over later ones. -/// -/// TODO: Provide an option to support weaker models for fairness. #[derive(Debug)] struct BatchSemaphoreState { // Key invariants: @@ -176,6 +184,7 @@ struct BatchSemaphoreState { // (4) closed ==> waiters.is_empty() waiters: VecDeque>, permits_available: PermitsAvailable, + fairness: Fairness, // TODO: should there be a clock for the close event? closed: bool, } @@ -185,8 +194,15 @@ impl BatchSemaphoreState { assert!(num_permits > 0); if self.closed { Err(TryAcquireError::Closed) - } else if self.waiters.is_empty() { - // No one is waiting: try to acquire permits + } else if self.waiters.is_empty() || matches!(self.fairness, Fairness::Unfair) { + // Permits here can be acquired in one of two scenarios: + // - The waiter queue is empty; nobody else is waiting for permits, + // so if there are enough available, immediately succeed. + // - The semaphore is operating in an unfair mode; the current + // thread is either requesting permits for the first time, or it + // was woken and selected by the scheduler. In either case, the + // thread may succeed, as long as there are enough permits. + let clock = self.permits_available.acquire(num_permits, current::clock())?; // If successful, the acquiry is causally dependent on the event @@ -241,20 +257,22 @@ impl std::error::Error for AcquireError {} impl BatchSemaphore { /// Creates a new semaphore with the initial number of permits. - pub fn new(num_permits: usize) -> Self { + pub fn new(num_permits: usize, fairness: Fairness) -> Self { let state = RefCell::new(BatchSemaphoreState { waiters: VecDeque::new(), permits_available: PermitsAvailable::new(num_permits), + fairness, closed: false, }); Self { state } } /// Creates a new semaphore with the initial number of permits. - pub const fn const_new(num_permits: usize) -> Self { + pub const fn const_new(num_permits: usize, fairness: Fairness) -> Self { let state = RefCell::new(BatchSemaphoreState { waiters: VecDeque::new(), permits_available: PermitsAvailable::const_new(num_permits), + fairness, closed: false, }); Self { state } @@ -390,38 +408,62 @@ impl BatchSemaphore { let me = ExecutionState::me(); trace!(task = ?me, avail = ?state.permits_available, waiters = ?state.waiters, "released {} permits for semaphore {:p}", num_permits, &self.state); - while let Some(front) = state.waiters.front() { - if front.num_permits <= state.permits_available.available() { - let waiter = state.waiters.pop_front().unwrap(); - - // The clock we pass into the semaphore is the clock of the - // waiter, corresponding to the point at which the waiter was - // enqueued. The clock we get in return corresponds to the - // join of the clocks of the acquired permits, used to update - // the waiter's clock to causally depend on the release events. - let clock = state - .permits_available - .acquire(waiter.num_permits, waiter.clock.clone()) - .unwrap(); - trace!("granted {:?} permits to waiter {:?}", waiter.num_permits, waiter); - - // Update waiter state as it is no longer in the queue - assert!(waiter.is_queued.swap(false, Ordering::SeqCst)); - assert!(!waiter.has_permits.swap(true, Ordering::SeqCst)); - ExecutionState::with(|s| { - let task = s.get_mut(waiter.task_id); - assert!(!task.finished()); - // The acquiry is causally dependent on the event - // which released the acquired permits. - task.clock.update(&clock); - task.unblock(); - }); - let mut maybe_waker = waiter.waker.lock().unwrap(); - if let Some(waker) = maybe_waker.take() { - waker.wake(); + match state.fairness { + Fairness::StrictlyFair => { + // in a strictly fair mode we will always pick the first waiter + // in the queue, as long as there are enough permits available + while let Some(front) = state.waiters.front() { + if front.num_permits <= state.permits_available.available() { + let waiter = state.waiters.pop_front().unwrap(); + + // The clock we pass into the semaphore is the clock of the + // waiter, corresponding to the point at which the waiter was + // enqueued. The clock we get in return corresponds to the + // join of the clocks of the acquired permits, used to update + // the waiter's clock to causally depend on the release events. + let clock = state + .permits_available + .acquire(waiter.num_permits, waiter.clock.clone()) + .unwrap(); + trace!("granted {:?} permits to waiter {:?}", waiter.num_permits, waiter); + + // Update waiter state as it is no longer in the queue + assert!(waiter.is_queued.swap(false, Ordering::SeqCst)); + assert!(!waiter.has_permits.swap(true, Ordering::SeqCst)); + ExecutionState::with(|s| { + let task = s.get_mut(waiter.task_id); + assert!(!task.finished()); + // The acquiry is causally dependent on the event + // which released the acquired permits. + task.clock.update(&clock); + task.unblock(); + }); + let mut maybe_waker = waiter.waker.lock().unwrap(); + if let Some(waker) = maybe_waker.take() { + waker.wake(); + } + } else { + break; + } + } + } + Fairness::Unfair => { + // in an unfair mode, we will unblock all the waiters for which + // there are enough permits available, then let them race + let num_available = state.permits_available.available(); + for waiter in &mut state.waiters { + if waiter.num_permits <= num_available { + ExecutionState::with(|s| { + let task = s.get_mut(waiter.task_id); + assert!(!task.finished()); + task.unblock(); + }); + let maybe_waker = waiter.waker.lock().unwrap(); + if let Some(waker) = maybe_waker.as_ref() { + waker.wake_by_ref(); + } + } } - } else { - break; } } drop(state); @@ -440,7 +482,7 @@ unsafe impl Sync for BatchSemaphore {} impl Default for BatchSemaphore { fn default() -> Self { - Self::new(Default::default()) + Self::new(Default::default(), Fairness::StrictlyFair) } } @@ -479,34 +521,75 @@ impl Future for Acquire<'_> { self.completed = true; trace!("Acquire::poll for waiter {:?} with closed", self.waiter); Poll::Ready(Err(AcquireError::closed())) - } else if self.waiter.is_queued.load(Ordering::SeqCst) { - trace!("Acquire::poll for waiter {:?} already queued", self.waiter); - assert!(self.waiter.waker.lock().unwrap().is_some()); - Poll::Pending } else { - // We access the semaphore state directly instead of using the - // public `try_acquire`, because in case of `NoPermits`, we do not - // want to update the clock, as this thread will be blocked below. - let mut state = self.semaphore.state.borrow_mut(); - let acquire_result = state.acquire_permits(self.waiter.num_permits); - drop(state); - match acquire_result { - Ok(()) => { - assert!(!self.waiter.is_queued.load(Ordering::SeqCst)); - self.waiter.has_permits.store(true, Ordering::SeqCst); - self.completed = true; - trace!("Acquire::poll for waiter {:?} that got permits", self.waiter); - crate::runtime::thread::switch(); - Poll::Ready(Ok(())) + let is_queued = self.waiter.is_queued.load(Ordering::SeqCst); + trace!("Acquire::poll for waiter {:?}; is queued: {is_queued:?}", self.waiter); + + // Sanity check: there should be a waker if the waiter is in + // the queue. Also true for unfair semaphores, which wake by ref. + assert_eq!(is_queued, self.waiter.waker.lock().unwrap().is_some()); + + // Should the waiter try to acquire permits here? Four cases: + // 1. unfair semaphore, waiter not yet enqueued; + // 2. fair semaphore, waiter not yet enqueued; + // 3. unfair semaphore, waiter already enqueued. + // 4. fair semaphore, waiter already enqueued; + // + // 1. and 2. are similar: the future was polled for the first time, + // so the waiter will try to acquire some permits. If successful, + // the waiter need not be enqueued, and the future is resolved. + // Otherwise, the waiter is added to the queue. + // + // 3. is slightly different: the future was polled, even though the + // waiter was already in the queue. This can happen either because + // the semaphore just received some permits and woke the waiter up, + // or because the future itself was polled manually. Either way, + // the semaphore is queried. + // + // 4. is a case where we do not try to acquire permits. The request + // would always fail, and the waiter should remain suspended until + // the semaphore has explicitly unblocked it and given it permits + // during a `release` call. + let try_to_acquire = match (self.semaphore.state.borrow().fairness, is_queued) { + // written this way to mirror the cases described above + (Fairness::Unfair, false) | (Fairness::StrictlyFair, false) | (Fairness::Unfair, true) => true, + (Fairness::StrictlyFair, true) => false, + }; + + if try_to_acquire { + // Access the semaphore state directly instead of `try_acquire`, + // because in case of `NoPermits`, we do not want to update the + // clock, as this thread will be blocked below. + let mut state = self.semaphore.state.borrow_mut(); + let acquire_result = state.acquire_permits(self.waiter.num_permits); + drop(state); + + match acquire_result { + Ok(()) => { + if is_queued { + self.semaphore.remove_waiter(&self.waiter); + } + self.waiter.has_permits.store(true, Ordering::SeqCst); + self.completed = true; + trace!("Acquire::poll for waiter {:?} that got permits", self.waiter); + crate::runtime::thread::switch(); + Poll::Ready(Ok(())) + } + Err(TryAcquireError::NoPermits) => { + let mut maybe_waker = self.waiter.waker.lock().unwrap(); + *maybe_waker = Some(cx.waker().clone()); + if !is_queued { + self.semaphore.enqueue_waiter(&self.waiter); + self.waiter.is_queued.store(true, Ordering::SeqCst); + } + trace!("Acquire::poll for waiter {:?} that is enqueued", self.waiter); + Poll::Pending + } + Err(TryAcquireError::Closed) => unreachable!(), } - Err(TryAcquireError::NoPermits) => { - let mut maybe_waker = self.waiter.waker.lock().unwrap(); - *maybe_waker = Some(cx.waker().clone()); - self.semaphore.enqueue_waiter(&self.waiter); - trace!("Acquire::poll for waiter {:?} that is enqueued", self.waiter); - Poll::Pending - } - Err(TryAcquireError::Closed) => unreachable!(), + } else { + // No progress made, future is still pending. + Poll::Pending } } } diff --git a/tests/future/batch_semaphore.rs b/tests/future/batch_semaphore.rs index b722772..0bc7c65 100644 --- a/tests/future/batch_semaphore.rs +++ b/tests/future/batch_semaphore.rs @@ -2,7 +2,7 @@ use crate::basic::clocks::{check_clock, me}; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use shuttle::future::{self, batch_semaphore::*}; -use shuttle::{check_dfs, current, thread}; +use shuttle::{check_dfs, check_random, current, thread}; use std::collections::HashSet; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -13,7 +13,7 @@ use test_log::test; fn batch_semaphore_basic() { check_dfs( || { - let s = BatchSemaphore::new(3); + let s = BatchSemaphore::new(3, Fairness::StrictlyFair); future::spawn(async move { s.acquire(2).await.unwrap(); @@ -28,111 +28,223 @@ fn batch_semaphore_basic() { ); } +/// Checks the behavior of an unfair batch semaphore is unfair: if there are +/// two threads blocked on the same semaphore, releasing permits may unblock +/// them in any order. #[test] -fn batch_semaphore_clock_1() { - check_dfs( - || { - let s = Arc::new(BatchSemaphore::new(0)); +fn batch_semaphore_unfair() { + let observed_values = Arc::new(std::sync::Mutex::new(HashSet::new())); + let observed_values_clone = Arc::clone(&observed_values); - let s2 = s.clone(); - thread::spawn(move || { - assert_eq!(me(), 1); - s2.release(1); - }); - thread::spawn(move || { - assert_eq!(me(), 2); - check_clock(|i, c| (i != 1) || (c == 0)); - s.acquire_blocking(1).unwrap(); - // after the acquire, we are causally dependent on task 1 - check_clock(|i, c| (i != 1) || (c > 0)); - }); + check_random( + move || { + let semaphore = Arc::new(BatchSemaphore::new(0, Fairness::Unfair)); + + // Here we use a stdlib mutex to avoid introducing yield points. + // It is used to record in which order the threads were enqueued + // into the semaphore's waiters list, because `thread::spawn` is + // a yield point and as such the enqueing can happen in either + // order. + let order1 = Arc::new(std::sync::Mutex::new(vec![])); + let order2 = Arc::new(std::sync::Mutex::new(vec![])); + let threads = (0..3) + .map(|tid| { + let semaphore = semaphore.clone(); + let order1 = order1.clone(); + let order2 = order2.clone(); + thread::spawn(move || { + // once the ID is pushed to the vector here and + // observed in the busy loop below, the thread is + // assumed to be blocked, because there is no yield + // point between the push and the acquire + order1.lock().unwrap().push(tid); // stdlib mutex + let val = [2, 1, 1][tid]; + semaphore.acquire_blocking(val).unwrap(); // shuttle semaphore + + // after unblock, record which thread acquired how many + order2.lock().unwrap().push((tid, val)); // stdlib mutex + }) + }) + .collect::>(); + + // wait until all threads are blocked on the semaphore + while order1.lock().unwrap().len() < 3 { + thread::yield_now(); + } + + // record the order in which they enqueued for the semaphore + let order1_after_enqueued = order1.lock().unwrap().clone(); + + // release 2 permits, which either unblocks thread 0 (which needs + // 2 permits), or both thread 1 and 2 (both of which need 1 permit) + semaphore.release(2); + + // wait until the threads unblock and finish + while order2.lock().unwrap().iter().map(|(_tid, val)| val).sum::() < 2 { + thread::yield_now(); + } + + // record the order in which they were woken + let order2_after_release = order2.lock().unwrap().clone(); + + // clean up: release 2 more permits to unblock any remaining + // threads, then join all threads + semaphore.release(2); + for thread in threads { + thread.join().unwrap(); + } + + observed_values_clone + .lock() + .unwrap() + .insert((order1_after_enqueued, order2_after_release)); }, - None, + 1000, // should be enough to find all permutations + ); + + // We expect to see 18 (= 6 * 3) different outcomes: + // - the three threads may block on the semaphore in any order (6), + // - once the permits are released, then either both are consumed by + // thread 0, or they are consumed threads 1 and 2 (3). + let observed_values = Arc::try_unwrap(observed_values).unwrap().into_inner().unwrap(); + assert_eq!( + observed_values, + HashSet::from([ + (vec![0, 1, 2], vec![(0, 2)]), + (vec![0, 1, 2], vec![(1, 1), (2, 1)]), + (vec![0, 1, 2], vec![(2, 1), (1, 1)]), + (vec![0, 2, 1], vec![(0, 2)]), + (vec![0, 2, 1], vec![(1, 1), (2, 1)]), + (vec![0, 2, 1], vec![(2, 1), (1, 1)]), + (vec![1, 0, 2], vec![(0, 2)]), + (vec![1, 0, 2], vec![(1, 1), (2, 1)]), + (vec![1, 0, 2], vec![(2, 1), (1, 1)]), + (vec![1, 2, 0], vec![(0, 2)]), + (vec![1, 2, 0], vec![(1, 1), (2, 1)]), + (vec![1, 2, 0], vec![(2, 1), (1, 1)]), + (vec![2, 1, 0], vec![(0, 2)]), + (vec![2, 1, 0], vec![(1, 1), (2, 1)]), + (vec![2, 1, 0], vec![(2, 1), (1, 1)]), + (vec![2, 0, 1], vec![(0, 2)]), + (vec![2, 0, 1], vec![(1, 1), (2, 1)]), + (vec![2, 0, 1], vec![(2, 1), (1, 1)]), + ]) ); } #[test] -fn batch_semaphore_clock_2() { - check_dfs( - || { - let s = Arc::new(BatchSemaphore::new(0)); +fn batch_semaphore_clock_1() { + for fairness in [Fairness::StrictlyFair, Fairness::Unfair] { + check_dfs( + move || { + let s = Arc::new(BatchSemaphore::new(0, fairness)); - for i in 1..=2 { let s2 = s.clone(); thread::spawn(move || { - assert_eq!(me(), i); + assert_eq!(me(), 1); s2.release(1); }); - } + thread::spawn(move || { + assert_eq!(me(), 2); + check_clock(|i, c| (i != 1) || (c == 0)); + s.acquire_blocking(1).unwrap(); + // after the acquire, we are causally dependent on task 1 + check_clock(|i, c| (i != 1) || (c > 0)); + }); + }, + None, + ); + } +} - thread::spawn(move || { - assert_eq!(me(), 3); - check_clock(|i, c| (c > 0) == (i == 0)); - // acquire 2: unblocked once both of the threads finished - s.acquire_blocking(2).unwrap(); - // after the acquire, we are causally dependent on both tasks - check_clock(|i, c| (i == 3) || (c > 0)); - }); - }, - None, - ); +#[test] +fn batch_semaphore_clock_2() { + for fairness in [Fairness::StrictlyFair, Fairness::Unfair] { + check_dfs( + move || { + let s = Arc::new(BatchSemaphore::new(0, fairness)); + + for i in 1..=2 { + let s2 = s.clone(); + thread::spawn(move || { + assert_eq!(me(), i); + s2.release(1); + }); + } + + thread::spawn(move || { + assert_eq!(me(), 3); + check_clock(|i, c| (c > 0) == (i == 0)); + // acquire 2: unblocked once both of the threads finished + s.acquire_blocking(2).unwrap(); + // after the acquire, we are causally dependent on both tasks + check_clock(|i, c| (i == 3) || (c > 0)); + }); + }, + None, + ); + } } #[test] fn batch_semaphore_clock_3() { - check_dfs( - || { - let s = Arc::new(BatchSemaphore::new(0)); + for fairness in [Fairness::StrictlyFair, Fairness::Unfair] { + check_dfs( + move || { + let s = Arc::new(BatchSemaphore::new(0, fairness)); + + for i in 1..=2 { + let s2 = s.clone(); + thread::spawn(move || { + assert_eq!(me(), i); + s2.release(1); + }); + } - for i in 1..=2 { - let s2 = s.clone(); thread::spawn(move || { - assert_eq!(me(), i); - s2.release(1); + assert_eq!(me(), 3); + check_clock(|i, c| (c > 0) == (i == 0)); + // acquire 1: unblocked once either of the threads finished + s.acquire_blocking(1).unwrap(); + // after the acquire, we are causally dependent on exactly one of the two tasks + let clock = current::clock(); + assert!((clock[1] > 0 && clock[2] == 0) || (clock[1] == 0 && clock[2] > 0)); }); - } - - thread::spawn(move || { - assert_eq!(me(), 3); - check_clock(|i, c| (c > 0) == (i == 0)); - // acquire 1: unblocked once either of the threads finished - s.acquire_blocking(1).unwrap(); - // after the acquire, we are causally dependent on exactly one of the two tasks - let clock = current::clock(); - assert!((clock[1] > 0 && clock[2] == 0) || (clock[1] == 0 && clock[2] > 0)); - }); - }, - None, - ); + }, + None, + ); + } } #[test] fn batch_semaphore_clock_4() { - check_dfs( - || { - let s = Arc::new(BatchSemaphore::new(1)); - - for tid in 1..=2 { - let other_tid = 2 - tid; - let s2 = s.clone(); - thread::spawn(move || { - assert_eq!(me(), tid); - match s2.try_acquire(1) { - Ok(()) => { - // we won the race, no causal dependence on another thread - check_clock(|i, c| (c > 0) == (i == 0 || i == tid)); + for fairness in [Fairness::StrictlyFair, Fairness::Unfair] { + check_dfs( + move || { + let s = Arc::new(BatchSemaphore::new(1, fairness)); + + for tid in 1..=2 { + let other_tid = 2 - tid; + let s2 = s.clone(); + thread::spawn(move || { + assert_eq!(me(), tid); + match s2.try_acquire(1) { + Ok(()) => { + // we won the race, no causal dependence on another thread + check_clock(|i, c| (c > 0) == (i == 0 || i == tid)); + } + Err(TryAcquireError::NoPermits) => { + // we lost the race, so we causally depend on the other thread + check_clock(|i, c| !(i == 0 || i == other_tid) || (c > 0)); + } + Err(TryAcquireError::Closed) => unreachable!(), } - Err(TryAcquireError::NoPermits) => { - // we lost the race, so we causally depend on the other thread - check_clock(|i, c| !(i == 0 || i == other_tid) || (c > 0)); - } - Err(TryAcquireError::Closed) => unreachable!(), - } - }); - } - }, - None, - ); + }); + } + }, + None, + ); + } } /// Shows a case in which causality tracking in the batch semaphore is @@ -170,8 +282,8 @@ fn batch_semaphore_clock_imprecise() { // Create a semaphore with `num_permits` permits and spawn a bunch of tasks that each // try to grab a bunch of permits. Task i sets the i'th bit in a shared atomic counter. // Afterwards, we'll see which combinations were allowable over a full dfs run. -async fn semtest(num_permits: usize, counts: Vec, states: &Arc>>) { - let s = Arc::new(BatchSemaphore::new(num_permits)); +async fn semtest(num_permits: usize, counts: Vec, states: &Arc>>, mode: Fairness) { + let s = Arc::new(BatchSemaphore::new(num_permits, mode)); let r = Arc::new(AtomicUsize::new(0)); let mut handles = vec![]; for (i, &c) in counts.iter().enumerate() { @@ -201,7 +313,7 @@ fn batch_semaphore_test_1() { move || { let states2 = states2.clone(); future::block_on(async move { - semtest(5, vec![3, 3, 3], &states2).await; + semtest(5, vec![3, 3, 3], &states2, Fairness::StrictlyFair).await; }); }, None, @@ -219,7 +331,7 @@ fn batch_semaphore_test_2() { move || { let states2 = states2.clone(); future::block_on(async move { - semtest(5, vec![3, 3, 2], &states2).await; + semtest(5, vec![3, 3, 2], &states2, Fairness::StrictlyFair).await; }); }, None, @@ -237,7 +349,7 @@ fn batch_semaphore_signal() { // Use a semaphore for signaling check_dfs( move || { - let sem = Arc::new(BatchSemaphore::new(0)); + let sem = Arc::new(BatchSemaphore::new(0, Fairness::StrictlyFair)); let sem2 = sem.clone(); let r = Arc::new(AtomicUsize::new(0)); let r2 = r.clone(); @@ -264,8 +376,8 @@ fn batch_semaphore_close_acquire() { check_dfs( || { future::block_on(async { - let tx = Arc::new(BatchSemaphore::new(1)); - let rx = Arc::new(BatchSemaphore::new(0)); + let tx = Arc::new(BatchSemaphore::new(1, Fairness::StrictlyFair)); + let rx = Arc::new(BatchSemaphore::new(0, Fairness::StrictlyFair)); let tx2 = tx.clone(); let rx2 = rx.clone(); @@ -302,7 +414,7 @@ fn batch_semaphore_drop_sender() { check_dfs( || { future::block_on(async { - let sem = Arc::new(BatchSemaphore::new(0)); + let sem = Arc::new(BatchSemaphore::new(0, Fairness::StrictlyFair)); let sender = Sender { sem: sem.clone() }; future::spawn(async move { @@ -360,7 +472,7 @@ fn bugged_cleanup_would_cause_deadlock() { check_dfs( || { - let sem = Arc::new(BatchSemaphore::new(1)); + let sem = Arc::new(BatchSemaphore::new(1, Fairness::StrictlyFair)); let sem2 = sem.clone(); future::block_on(async move { @@ -486,7 +598,7 @@ mod early_acquire_drop_test { fn dropped_acquire_must_release(num_permits: usize, early_drop: Vec) { shuttle::lazy_static! { // S1. Initialize the semaphore with no permits - static ref SEM: BatchSemaphore = BatchSemaphore::new(0); + static ref SEM: BatchSemaphore = BatchSemaphore::new(0, Fairness::StrictlyFair); } future::block_on(async move {