Skip to content

Commit

Permalink
Add support for 'abort'ing shuttle::future::JoinHandle's
Browse files Browse the repository at this point in the history
  • Loading branch information
sarsko committed Jun 26, 2024
1 parent 9076a44 commit 1a8f665
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 7 deletions.
14 changes: 12 additions & 2 deletions src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,22 @@ impl<T> Default for JoinHandleInner<T> {
}

impl<T> JoinHandle<T> {
/// Detach the task associated with the handle.
fn detach(&self) {
ExecutionState::try_with(|state| {
if !state.is_finished() {
let task = state.get_mut(self.task_id);
task.detach();
}
});
}

/// Abort the task associated with the handle.
pub fn abort(&self) {
ExecutionState::try_with(|state| {
if !state.is_finished() {
let task = state.get_mut(self.task_id);
task.detach();
task.abort();
}
});
}
Expand Down Expand Up @@ -96,7 +106,7 @@ impl Error for JoinError {}

impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
self.abort();
self.detach();
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,12 @@ impl Task {
self.detached = true;
}

pub(crate) fn abort(&mut self) {
self.finish();
let mut continuation = self.continuation.borrow_mut();
continuation.wipe();
}

pub(crate) fn waker(&self) -> Waker {
self.waker.clone()
}
Expand Down Expand Up @@ -373,8 +379,10 @@ impl Task {
self.park_state.blocked_in_park = false;
}

// TODO: Investigate whether we should move `wipe` here. (I think the correct scheme is to have it
// toggleable by the instantiator of the `Task` — those modelling async `JoinHandle`s should
// clean eagerly, thos modelling sync `JoinHandle`s should not.)
pub(crate) fn finish(&mut self) {
assert!(self.state != TaskState::Finished);
self.state = TaskState::Finished;
}

Expand Down
18 changes: 14 additions & 4 deletions src/runtime/thread/continuation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,22 @@ pub(crate) struct PooledContinuation {
queue: Rc<RefCell<VecDeque<Continuation>>>,
}

impl PooledContinuation {
pub fn wipe(&mut self) {
match self.continuation.take() {
Some(c) => {
if c.reusable() {
self.queue.borrow_mut().push_back(c);
}
}
None => {}
}
}
}

impl Drop for PooledContinuation {
fn drop(&mut self) {
let c = self.continuation.take().unwrap();
if c.reusable() {
self.queue.borrow_mut().push_back(c);
}
self.wipe()
}
}

Expand Down
72 changes: 72 additions & 0 deletions tests/future/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,78 @@ fn async_counter() {
});
}

// We need a way to hold the `MutexGuard`, which is `!Send`, across an `await`.
struct WrappedMutexGuard<'a> {
#[allow(unused)]
guard: shuttle::sync::MutexGuard<'a, ()>,
}

unsafe impl<'a> Send for WrappedMutexGuard<'a> {}

async fn acquire_and_loop(mutex: Arc<Mutex<()>>) {
let _g = WrappedMutexGuard {
guard: mutex.lock().unwrap(),
};
loop {
future::yield_now().await;
}
}

// The idea is to acquire a mutex, abort the JoinHandle, then acquire the Mutex.
// This should succeed, because `JoinHandle::abort()` should free the Mutex.
#[test]
fn abort_frees_mutex() {
check_random(
|| {
let mutex = Arc::new(Mutex::new(()));
let jh = future::spawn(acquire_and_loop(mutex.clone()));

jh.abort(); // this unblocks

let _g = mutex.lock();
},
1000,
);
}

// The idea is to acquire a mutex, drop the JoinHandle, then acquire the Mutex.
// This should fail, because `drop`ping the JoinHandle just detaches it, meaning
// it keeps holding the Mutex.
#[test]
#[should_panic(expected = "exceeded max_steps bound")]
fn drop_join_handle_deadlocks() {
check_random(
|| {
let mutex = Arc::new(Mutex::new(()));
let jh = future::spawn(acquire_and_loop(mutex.clone()));

drop(jh);

let _g = mutex.lock();
},
1000,
);
}

// The idea is to acquire a mutex, forget the JoinHandle, then acquire the Mutex.
// This should fail, because `forget`ting the JoinHandle doesn't cause it to release
// the Mutex.
#[test]
#[should_panic(expected = "exceeded max_steps bound")]
fn forget_join_handle_deadlocks() {
check_random(
|| {
let mutex = Arc::new(Mutex::new(()));
let jh = future::spawn(acquire_and_loop(mutex.clone()));

std::mem::forget(jh);

let _g = mutex.lock();
},
1000,
);
}

#[test]
fn async_counter_random() {
check_random(async_counter, 5000)
Expand Down

0 comments on commit 1a8f665

Please sign in to comment.