Skip to content

Commit a7bd329

Browse files
committed
Add a new executor module to run functions in thread pools
1 parent 7f31df8 commit a7bd329

File tree

7 files changed

+387
-0
lines changed

7 files changed

+387
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ zerocopy = { version = "0.8.24", features = ["derive"] }
1818
parking_lot = { version = "0.12.4", features = ["send_guard"] }
1919
fxhash = "0.2.1"
2020
static_assertions = "1.1.0"
21+
rayon = "1.10.0"
2122

2223
[dev-dependencies]
2324
criterion = "0.6.0"

src/executor/futures.rs

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
use crate::executor::Wait;
2+
use std::{
3+
fmt,
4+
sync::{Arc, OnceLock},
5+
};
6+
7+
#[derive(Debug)]
8+
enum FutureStatus<T> {
9+
Completed(T),
10+
Poisoned,
11+
}
12+
13+
use FutureStatus::*;
14+
15+
/// A placeholder for a value that will be computed at a later time.
16+
///
17+
/// `Future`s are the result of running functions in separate threads using
18+
/// [`Executor::spawn`](crate::executor::Executor::spawn): calling `spawn()` in fact returns
19+
/// immediately, even though the function will complete at a later time. The `Future` returned by
20+
/// `spawn()` allows retrieving the result of the function once it completes.
21+
///
22+
/// # Poisoning
23+
///
24+
/// A `Future` may be in a "poisoned" status if the execution of the function that produced it
25+
/// failed with a panic.
26+
pub struct Future<T> {
27+
cell: Arc<OnceLock<FutureStatus<T>>>,
28+
}
29+
30+
impl<T> Future<T> {
31+
#[inline]
32+
#[must_use]
33+
pub(super) fn pending() -> Self {
34+
Self { cell: Arc::new(OnceLock::new()) }
35+
}
36+
37+
/// Creates a new `Future` that is already completed with the given value.
38+
#[inline]
39+
#[must_use]
40+
pub fn ready(value: T) -> Self {
41+
let this = Self::pending();
42+
this.complete(value);
43+
this
44+
}
45+
46+
#[inline]
47+
pub(super) fn complete(&self, value: T) {
48+
self.try_complete(value).unwrap_or_else(|err| panic!("{err}"))
49+
}
50+
51+
#[inline]
52+
pub(super) fn try_complete(&self, value: T) -> Result<(), ReadyError> {
53+
self.cell.set(Completed(value)).map_err(|_| ReadyError)
54+
}
55+
56+
// There's no `poison()` method simply because it's not used internally.
57+
58+
#[inline]
59+
pub(super) fn try_poison(&self) -> Result<(), ReadyError> {
60+
self.cell.set(Poisoned).map_err(|_| ReadyError)
61+
}
62+
63+
/// Returns the value of the `Future`, or `None` if this `Future` was not completed yet.
64+
///
65+
/// # Panics
66+
///
67+
/// If the `Future` is poisoned. See [`Future::try_get()`] for a non-panicking version of this
68+
/// method.
69+
#[inline]
70+
#[must_use]
71+
pub fn get(&self) -> Option<&T> {
72+
self.try_get().map(|result| result.expect("Future is poisoned"))
73+
}
74+
75+
/// Returns the value of the `Future`, `None` if this `Future` was not completed yet, or an
76+
/// error if this `Future` is poisoned.
77+
#[inline]
78+
#[must_use]
79+
pub fn try_get(&self) -> Option<Result<&T, PoisonError>> {
80+
match self.cell.get() {
81+
None => None,
82+
Some(Completed(ref value)) => Some(Ok(value)),
83+
Some(Poisoned) => Some(Err(PoisonError)),
84+
}
85+
}
86+
}
87+
88+
impl<T> Wait for Future<T> {
89+
type Output = T;
90+
91+
#[inline]
92+
fn wait(&self) -> &Self::Output {
93+
match self.cell.wait() {
94+
Completed(value) => value,
95+
Poisoned => panic!("{PoisonError}"),
96+
}
97+
}
98+
}
99+
100+
impl<T> Clone for Future<T> {
101+
fn clone(&self) -> Self {
102+
Self { cell: Arc::clone(&self.cell) }
103+
}
104+
}
105+
106+
impl<T: fmt::Debug> fmt::Debug for Future<T> {
107+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108+
struct Pending;
109+
110+
impl fmt::Debug for Pending {
111+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112+
f.write_str("<pending>")
113+
}
114+
}
115+
116+
struct Poisoned;
117+
118+
impl fmt::Debug for Poisoned {
119+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120+
f.write_str("<poisoned>")
121+
}
122+
}
123+
124+
f.debug_tuple("Future")
125+
.field(match self.try_get() {
126+
None => &Pending,
127+
Some(Ok(value)) => value,
128+
Some(Err(PoisonError)) => &Poisoned,
129+
})
130+
.finish()
131+
}
132+
}
133+
134+
#[derive(Clone, PartialEq, Eq, Debug)]
135+
pub(super) struct ReadyError;
136+
137+
impl fmt::Display for ReadyError {
138+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139+
f.write_str("attempted to complete or poison the Future twice")
140+
}
141+
}
142+
143+
impl std::error::Error for ReadyError {}
144+
145+
#[derive(Clone, PartialEq, Eq, Debug)]
146+
pub struct PoisonError;
147+
148+
impl fmt::Display for PoisonError {
149+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150+
f.write_str("execution of the closure for this Future resulted in a panic")
151+
}
152+
}
153+
154+
impl std::error::Error for PoisonError {}
155+
156+
#[cfg(test)]
157+
mod tests {
158+
use super::*;
159+
use std::{panic, sync::Barrier, thread, time::Duration};
160+
161+
#[test]
162+
fn pending_to_completed() {
163+
let f = Future::<u32>::pending();
164+
165+
assert_eq!(f.get(), None);
166+
assert_eq!(f.try_get(), None);
167+
168+
f.complete(123);
169+
170+
assert_eq!(f.get(), Some(&123));
171+
assert_eq!(f.try_get(), Some(Ok(&123)));
172+
}
173+
174+
#[test]
175+
fn pending_to_poisoned() {
176+
let f = Future::<u32>::pending();
177+
178+
assert_eq!(f.get(), None);
179+
assert_eq!(f.try_get(), None);
180+
181+
f.try_poison().expect("poison failed");
182+
183+
panic::catch_unwind(|| f.get()).expect_err("get() should have panicked");
184+
assert_eq!(f.try_get(), Some(Err(PoisonError)));
185+
}
186+
187+
#[test]
188+
fn wait() {
189+
let f = Future::<u32>::pending();
190+
let g = f.clone();
191+
let barrier = Barrier::new(2);
192+
193+
thread::scope(|s| {
194+
s.spawn(|| {
195+
barrier.wait();
196+
thread::sleep(Duration::from_secs(1));
197+
g.complete(123);
198+
});
199+
200+
assert_eq!(f.get(), None);
201+
assert_eq!(f.try_get(), None);
202+
203+
barrier.wait();
204+
205+
assert_eq!(f.wait(), &123);
206+
assert_eq!(f.get(), Some(&123));
207+
assert_eq!(f.try_get(), Some(Ok(&123)));
208+
209+
// Waiting twice or more should return the same value
210+
assert_eq!(f.wait(), &123);
211+
});
212+
}
213+
}

src/executor/inline.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use crate::executor::{Executor, Future};
2+
3+
/// A dummy executor that executes all functions in the same thread.
4+
#[derive(Copy, Clone, Debug)]
5+
pub struct Inline;
6+
7+
impl Executor for Inline {
8+
#[inline]
9+
fn defer<F, T>(&self, f: F) -> Future<T>
10+
where
11+
F: FnOnce() -> T + Send + 'static,
12+
T: Send + Sync + 'static,
13+
{
14+
Future::ready(f())
15+
}
16+
}
17+
18+
#[cfg(test)]
19+
mod tests {
20+
use super::*;
21+
use crate::executor::Wait;
22+
23+
#[test]
24+
fn defer() {
25+
let inline = Inline;
26+
let future = inline.defer(|| 123);
27+
assert_eq!(future.wait(), &123);
28+
}
29+
}

src/executor/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
//! Concurrent execution.
2+
//!
3+
//! This module provides structures and traits to run functions in separate threads and obtain
4+
//! their results later on. The implementation is currently based on the popular [`rayon`] crate.
5+
//! This module also provides a dummy implementation called [`Inline`] that does not spawn any
6+
//! actual thread but executes everything serially.
7+
//!
8+
//! # Examples
9+
//!
10+
//! ```
11+
//! use triedb::executor::{threadpool, Executor, Wait};
12+
//!
13+
//! // Create a thread pool
14+
//! let pool = threadpool::builder().build().unwrap();
15+
//!
16+
//! // Run some closures in the background.
17+
//! let future1 = pool.defer(|| 1 + 1);
18+
//! let future2 = pool.defer(|| 2 + 2);
19+
//!
20+
//! // Wait for the closures to return a result.
21+
//! assert_eq!(future1.wait(), &2);
22+
//! assert_eq!(future2.wait(), &4);
23+
//! ```
24+
25+
mod futures;
26+
mod inline;
27+
mod traits;
28+
29+
pub mod threadpool;
30+
31+
pub use futures::{Future, PoisonError};
32+
pub use inline::Inline;
33+
pub use traits::{Executor, Wait};

src/executor/threadpool.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
use crate::executor::{Executor, Future};
2+
use rayon::{ThreadPool, ThreadPoolBuilder};
3+
use std::thread;
4+
5+
/// A wrapper around [`ThreadPoolBuilder::new()`] which sets some default values to make the
6+
/// resulting [`ThreadPool`] play nicely with the [`Executor`] trait.
7+
pub fn builder() -> ThreadPoolBuilder {
8+
ThreadPoolBuilder::new()
9+
.thread_name(|num| format!("executor-thread-{num:03}"))
10+
// The default behavior for `rayon` is to abort in case of panic, causing the whole program
11+
// to crash. We instead want to catch individual panics and poison the relevant `Future`
12+
// when those occur.
13+
//
14+
// This panic hanlder does nothing, so that the abort behavior is suppressed. We don't need
15+
// to explicitly print an error message or the backtrace because this will be already taken
16+
// care of.
17+
.panic_handler(|_| {})
18+
}
19+
20+
impl Executor for ThreadPool {
21+
fn defer<F, T>(&self, f: F) -> Future<T>
22+
where
23+
F: FnOnce() -> T + Send + 'static,
24+
T: Send + Sync + 'static,
25+
{
26+
let sender_future = Future::pending();
27+
let receiver_future = sender_future.clone();
28+
29+
self.spawn(move || {
30+
// Create the guard panic first, then run the closure. The guard will be dropped at the
31+
// end of this scope. If the function succeeds, the guard won't do anything; if it
32+
// panics, the guard's `Drop` implementation will poison the future.
33+
let _guard = PanicGuard { future: &sender_future };
34+
sender_future.complete(f())
35+
});
36+
37+
receiver_future
38+
}
39+
}
40+
41+
/// A "guard" to detect if this thread panics, and poison the `Future` in that case.
42+
#[derive(Debug)]
43+
struct PanicGuard<'a, T> {
44+
future: &'a Future<T>,
45+
}
46+
47+
impl<'a, T> Drop for PanicGuard<'a, T> {
48+
fn drop(&mut self) {
49+
if thread::panicking() {
50+
// Unlikely, but if the future was already set, just ignore the error from
51+
// `try_poison()` and carry on
52+
let _ = self.future.try_poison();
53+
}
54+
}
55+
}
56+
57+
#[cfg(test)]
58+
mod tests {
59+
use super::*;
60+
use crate::executor::{PoisonError, Wait};
61+
use std::panic;
62+
63+
#[test]
64+
fn defer() {
65+
let pool = builder().build().expect("building thread pool failed");
66+
let future = pool.defer(|| 123);
67+
assert_eq!(future.wait(), &123);
68+
}
69+
70+
#[test]
71+
fn poisoning() {
72+
let pool = builder().build().expect("building thread pool failed");
73+
let future = pool.defer(|| panic!("something went wrong"));
74+
panic::catch_unwind(|| future.wait()).expect_err("wait() was expected to panic");
75+
assert_eq!(future.try_get(), Some(Err(PoisonError)));
76+
}
77+
}

src/executor/traits.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use crate::executor::Future;
2+
3+
/// Trait for objects that can be awaited until they reach their final state.
4+
///
5+
/// The main structure that implements this trait is [`Future`], but also any structure that wraps
6+
/// a `Future` may implement this trait.
7+
pub trait Wait {
8+
type Output;
9+
10+
/// Blocks execution until the final state is reached.
11+
fn wait(&self) -> &Self::Output;
12+
}
13+
14+
/// Trait for objects that can run functions concurrently.
15+
pub trait Executor {
16+
/// Runs the given closure `f` in a separate thread, and returns a [`Future`] that can be used
17+
/// to obtain the result of `f` once its execution is complete.
18+
fn defer<F, T>(&self, f: F) -> Future<T>
19+
where
20+
F: FnOnce() -> T + Send + 'static,
21+
T: Send + Sync + 'static;
22+
}
23+
24+
impl<E: Executor> Executor for &E {
25+
#[inline]
26+
fn defer<F, T>(&self, f: F) -> Future<T>
27+
where
28+
F: FnOnce() -> T + Send + 'static,
29+
T: Send + Sync + 'static,
30+
{
31+
(**self).defer(f)
32+
}
33+
}

0 commit comments

Comments
 (0)