forked from lunatic-solutions/async-wormhole
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlib.rs
207 lines (192 loc) · 7.46 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
//! async-wormhole allows you to call `.await` async calls across non-async functions, like extern "C" or JIT
//! generated code.
//!
//! ## Motivation
//!
//! Sometimes, when running inside an async environment you need to call into JIT generated code (e.g. wasm)
//! and .await from there. Because the JIT code is not available at compile time, the Rust compiler can't
//! do their "create a state machine" magic. In the end you can't have `.await` statements in non-async
//! functions.
//!
//! This library creates a special stack for executing the JIT code, so it's possible to suspend it at any
//! point of the execution. Once you pass it a closure inside [AsyncWormhole::new](struct.AsyncWormhole.html#method.new)
//! you will get back a future that you can `.await` on. The passed in closure is going to be executed on a
//! new stack.
//!
//! ## Example
//!
//! ```rust
//! use async_wormhole::{AsyncWormhole, AsyncYielder};
//! use switcheroo::stack::*;
//!
//! // non-async function
//! #[allow(improper_ctypes_definitions)]
//! extern "C" fn non_async(mut yielder: AsyncYielder<u32>) -> u32 {
//! // Suspend the runtime until async value is ready.
//! // Can contain .await calls.
//! yielder.async_suspend(async { 42 })
//! }
//!
//! fn main() {
//! let stack = EightMbStack::new().unwrap();
//! let task = AsyncWormhole::<_, _, fn()>::new(stack, |yielder| {
//! let result = non_async(yielder);
//! assert_eq!(result, 42);
//! 64
//! })
//! .unwrap();
//!
//! let outside = futures::executor::block_on(task);
//! assert_eq!(outside, 64);
//! }
//! ```
use switcheroo::Generator;
use switcheroo::Yielder;
use std::cell::Cell;
use std::future::Future;
use std::io::Error;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
pub use switcheroo::stack;
/// AsyncWormhole represents a Future that uses a generator with a separate stack to execute a closure.
///
/// It has the capability to .await on other Futures in the closure using the received
/// [AsyncYielder](struct.AsyncYielder). Once all Futures have been awaited on AsyncWormhole will resolve
/// to the return value of the provided closure.
///
/// For dealing with thread local storage
/// [AsyncWormhole::set_pre_post_poll](struct.AsyncWormhole.html#method.set_pre_post_poll) is provided.
///
/// Every time an executor polls AsyncWormhole, the `pre_post_poll` function will be called and every time
/// AsyncWormhole returns `Poll::Pending`, `pre_post_poll` will be called again. Between this two calls we
/// have a guarantee that the executor will not be able to move the execution to another thread, and we
/// can use this guarantee to our advantage in specific scenarios.
pub struct AsyncWormhole<'a, Stack, Output, P>
where
Stack: stack::Stack + Send,
P: FnMut() + Send,
{
generator: Option<Cell<Generator<'a, Waker, Option<Output>, Stack>>>,
pre_post_poll: Option<P>,
}
impl<'a, Stack, Output, P> AsyncWormhole<'a, Stack, Output, P>
where
Stack: stack::Stack + Send,
P: FnMut() + Send,
{
/// Returns a new AsyncWormhole, using the passed `stack` to execute the closure `f` on.
/// The closure will not be executed right away, only if you pass AsyncWormhole to an
/// async executor (.await on it)
pub fn new<F>(stack: Stack, f: F) -> Result<Self, Error>
where
F: FnOnce(AsyncYielder<Output>) -> Output + 'a + Send,
{
let generator = Generator::new(stack, |yielder, waker| {
let async_yielder = AsyncYielder::new(yielder, waker);
let finished = Some(f(async_yielder));
yielder.suspend(finished);
});
Ok(Self {
generator: Some(Cell::new(generator)),
pre_post_poll: None,
})
}
/// Every time the executor polls `AsyncWormhole` we may end up on another thread, here we can set a function
/// that swaps some thread local storage and a context that can travel with `AsyncWormhole` between threads.
pub fn set_pre_post_poll(&mut self, f: P) {
self.pre_post_poll = Some(f);
}
/// Get the stack from the internal generator.
pub fn stack(mut self) -> Stack {
let generator = self.generator.take().unwrap().into_inner();
// If the generator didn't finish yet, the stack is going to be unwinded on drop().
// Fire a last pre_post_poll before this happens.
if generator.started() && !generator.finished() {
if let Some(pre_post_poll) = &mut self.pre_post_poll {
pre_post_poll();
}
}
generator.stack()
}
}
impl<'a, Stack, Output, P> Future for AsyncWormhole<'a, Stack, Output, P>
where
Stack: stack::Stack + Unpin + Send,
P: FnMut() + Unpin + Send,
{
type Output = Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// If pre_post_poll is provided execute it before entering separate stack
if let Some(pre_post_poll) = &mut self.pre_post_poll {
pre_post_poll()
}
match self
.generator
.as_mut()
.unwrap()
.get_mut()
.resume(cx.waker().clone())
{
// If we call the future after it completed it will always return Poll::Pending.
// But polling a completed future is either way undefined behaviour.
None | Some(None) => {
// If pre_post_poll is provided execute it before returning a Poll::Pending
if let Some(pre_post_poll) = &mut self.pre_post_poll {
pre_post_poll()
}
Poll::Pending
}
Some(Some(out)) => {
// Poll one last time to finish the generator
self.generator
.as_mut()
.unwrap()
.get_mut()
.resume(cx.waker().clone());
Poll::Ready(out)
}
}
}
}
impl<'a, Stack, Output, P> Drop for AsyncWormhole<'a, Stack, Output, P>
where
Stack: stack::Stack + Send,
P: FnMut() + Send,
{
fn drop(&mut self) {
// Dropping a generator can cause an unwind and execute code inside of the separate context.
// In this regard it's similar to a `poll` call and we need to fire pre and post poll hooks.
// Note, that we **don't** do a last `post_poll` call once the generator is dropped.
if let Some(pre_post_poll) = &mut self.pre_post_poll {
if let Some(generator) = self.generator.as_mut() {
if generator.get_mut().started() && !generator.get_mut().finished() {
pre_post_poll()
}
}
}
}
}
#[derive(Clone)]
pub struct AsyncYielder<'a, Output> {
yielder: &'a Yielder<Waker, Option<Output>>,
waker: Waker,
}
impl<'a, Output> AsyncYielder<'a, Output> {
pub(crate) fn new(yielder: &'a Yielder<Waker, Option<Output>>, waker: Waker) -> Self {
Self { yielder, waker }
}
/// Takes an `impl Future` and awaits it, returning the value from it once ready.
pub fn async_suspend<Fut, R>(&mut self, mut future: Fut) -> R
where
Fut: Future<Output = R>,
{
let mut future = unsafe { Pin::new_unchecked(&mut future) };
loop {
let mut cx = Context::from_waker(&mut self.waker);
self.waker = match future.as_mut().poll(&mut cx) {
Poll::Pending => self.yielder.suspend(None),
Poll::Ready(result) => return result,
};
}
}
}