From 7d9791a32d1c4c4797b715e3b952646e44a83bd2 Mon Sep 17 00:00:00 2001 From: jlizen Date: Fri, 10 Jan 2025 00:33:01 +0000 Subject: [PATCH 1/2] bundle execute() metadata into a context object, prune future module and shift to owned inner future in closure while offloading work --- README.md | 33 +- src/executor/custom.rs | 21 +- src/future.rs | 886 +++++++++++++++++++++++++++++ src/future/first.rs | 419 -------------- src/future/mod.rs | 283 --------- src/future/with.rs | 777 ------------------------- src/lib.rs | 28 +- tests/custom_simple.rs | 14 +- tests/custom_strategy.rs | 12 +- tests/execute_directly_default.rs | 26 +- tests/execute_directly_strategy.rs | 9 +- tests/future.rs | 117 ++++ tests/offload_first.rs | 52 -- tests/offload_with.rs | 80 --- tests/spawn_blocking_strategy.rs | 12 +- 15 files changed, 1109 insertions(+), 1660 deletions(-) create mode 100644 src/future.rs delete mode 100644 src/future/first.rs delete mode 100644 src/future/mod.rs delete mode 100644 src/future/with.rs create mode 100644 tests/future.rs delete mode 100644 tests/offload_first.rs delete mode 100644 tests/offload_with.rs diff --git a/README.md b/README.md index db53d7a..9d511a4 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,13 @@ fn sync_work(input: String)-> u8 { pub async fn a_future_that_has_blocking_sync_work() -> u8 { // relies on application-specified strategy for translating execute into a future that won't // block the current worker thread - vacation::execute(move || { sync_work("foo".to_string()) }, vacation::ChanceOfBlocking::High, "example.operation").await.unwrap() + vacation::execute( + move || { sync_work("foo".to_string()) }, + vacation::ExecuteContext { + chance_of_blocking: vacation::ChanceOfBlocking::High, + namespace: "example.operation" + } + ).await.unwrap() } ``` @@ -50,15 +56,16 @@ want to enable the `future` feature flag: vacation = { version = "0.1", features = ["future"] } ``` -This enables the [`future::FutureBuilder`] api along with the two types of `Vacation` futures it can generate: -- [`future::OffloadFirst`] - delegate work to vacation, and then process the results into an inner future and poll the inner future to completion -- [`future::OffloadWithFuture`] - poll the inner future, while also using a callback to retrieve any vacation work and polling it alongisde the inner future +This enables the [`future::FutureBuilder`] api which a generates [`future::OffloadWith`] wrapper future. On poll, +this wrapper drives the inner future, while checking if there is work available to offload to vacation. If there is, +it drives that work instead, deferring further polling of the inner future until the offloaded work is complete. ## Usage - Application owners Application authors will need to add this library as a a direct dependency in order to customize the execution strategy beyond the default no-op. -Application authors can also call [`execute()`] if there are application-layer compute-heavy segments in futures. +Application authors can also call [`execute()`] if there are application-layer compute-heavy segments in your futures that you +want to delegate to vacation. ### Simple example @@ -77,11 +84,17 @@ async fn main() { vacation::install_tokio_strategy().unwrap(); // if wanting to delegate work to vacation: - let vacation_future = vacation::execute(|| { - // represents compute heavy work - std::thread::sleep(std::time::Duration::from_millis(500)); - 5 - }, vacation::ChanceOfBlocking::High, "example.operation"); + let vacation_future = vacation::execute( + || { + // represents compute heavy work + std::thread::sleep(std::time::Duration::from_millis(500)); + 5 + }, + vacation::ExecuteContext { + chance_of_blocking: vacation::ChanceOfBlocking::High, + namespace: "example.operation" + } + ); assert_eq!(vacation_future.await.unwrap(), 5); # } diff --git a/src/executor/custom.rs b/src/executor/custom.rs index a769b28..28774df 100644 --- a/src/executor/custom.rs +++ b/src/executor/custom.rs @@ -1,22 +1,19 @@ use std::future::Future; -use crate::{concurrency_limit::ConcurrencyLimit, error::Error, ChanceOfBlocking}; +use crate::{concurrency_limit::ConcurrencyLimit, error::Error, ExecuteContext}; /// The input for the custom closure pub struct CustomClosureInput { /// the actual work to execute, your custom closure must run this pub work: Box, - /// caller-specified likehood of blocking, for customizing strategies - pub chance_of_blocking: ChanceOfBlocking, - /// caller-specified operatino namespace, for customizing strategies - pub namespace: &'static str, + /// caller-specified metadata that allows fine tuning strategies + pub context: ExecuteContext, } impl std::fmt::Debug for CustomClosureInput { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("CustomClosureInput") - .field("chance_of_blocking", &self.chance_of_blocking) - .field("namespace", &self.namespace) + .field("context", &self.context) .finish() } } @@ -49,12 +46,7 @@ impl Custom { // the compiler correctly is pointing out that the custom closure isn't guaranteed to call f. // but, we leave that to the implementer to guarantee since we are limited by working with static signatures #[allow(unused_variables)] - pub(crate) async fn execute( - &self, - f: F, - chance_of_blocking: ChanceOfBlocking, - namespace: &'static str, - ) -> Result + pub(crate) async fn execute(&self, f: F, context: ExecuteContext) -> Result where F: FnOnce() -> R + Send + 'static, R: Send + 'static, @@ -74,8 +66,7 @@ impl Custom { let input = CustomClosureInput { work: wrapped_input_closure, - chance_of_blocking, - namespace, + context, }; Box::into_pin((self.closure)(input)) diff --git a/src/future.rs b/src/future.rs new file mode 100644 index 0000000..8082db8 --- /dev/null +++ b/src/future.rs @@ -0,0 +1,886 @@ +//! # Future module +//! +//! This module provides a lower-level wrapper for manually implemented futures. It is intended for library +//! authors that already have a hand-implemented future, that needs to delegate work. +//! +//! Such a use case cannot simply call [`vacation::execute(_)`] and await the returned future +//! because they are already in a sync context. Instead, the vacation future needs to be driven +//! across the individual polls within the custom future's current await handling. +//! +//! The entrypoint for this api is [`vacation::future::builder()`], which allows constructing an [`OffloadWith`] +//! to wrap your custom inner future. +//! +//! This wrapper future processes occasional work offloaded from the inner future, while driving the inner +//! future if no offloaded work is active. Offloaded work is retrieved via a 'pull' model, where, +//! after the inner future returns `Poll::Pending`, a [`get_offload()`] closure is called that has +//! owned access to the inner future. That closure can return any work that needs to be offloaded, +//! structured as an async future that invokes `vacation::execute()` and processes any results. +//! +//! **Note that `OffloadWith` does NOT poll the inner future while offloaded work is active, so there is a deadlock +//! risk if the offloaded work depends on the inner future making progress to resolve.** +//! +//! ## Q&A +//! +//! ### This seems complicated, why not just call vacation from inside my custom future? +//! There are many ways to structure a custom future to efficiently use `vacation`. This is designed +//! to be more of a 'bolt on' utility that requires minimal code changes inside your inner future - +//! mostly just adding a hook with which to get work. +//! +//! If you are writing a custom future with `vacation` in mind, it probably is simplest to store the state +//! of any offloaded vacation work directly in your future, and poll it as part of your `Future` implementation. +//! That way you can simply initialize that work whenever you reach a point that needs it, poll it once, and either +//! return `Poll::Pending` (if you need to wait on that work completing) or carry on (if you can make other progress +//! while the offloaded work is ongoing). +//! +//! ### I only need to send work to vacation once, in order to construct my inner future. What should I do? +//! +//! This can be handled very simply with [`futures_util::then()`]. Simply call `vacation::execute()` first, +//! and then chain `then()` to construct the inner future. +//! +//! Here is an example: +//! +//! ``` +//! use futures_util::FutureExt; +//! +//! let vacation_future = vacation::execute( +//! || { +//! std::thread::sleep(std::time::Duration::from_millis(50)); +//! 5 +//! }, +//! vacation::ExecuteContext { +//! chance_of_blocking: vacation::ChanceOfBlocking::High, +//! namespace: "sample_initialize", +//! } +//! ); +//! +//! let future = vacation_future.then(|res| async move { +//! match res { +//! Err(error) => Err("there was a vacation error"), +//! // placeholder for your custom future +//! Ok(res) => Ok(tokio::time::sleep(std::time::Duration::from_millis(res)).await), +//! } +//! }); +//! +//! ``` +//! +//! ### This seems suspicious, what if the inner future has a waker fire while it is being suppressed +//! due to ongoing offloaded work? +//! +//! We defensively poll the inner future anytime the offloaded work completes. This means that if any +//! waker had fired while that work was ongoing, the inner future will be polled again. +//! +//! Essentially, our guarantee is, every time the wrapper future is polled, either the inner future is polled +//! directly (if no offloaded work), it is polled after vacation work (if vacation work is completing), +//! or there is at minimum a vacation waker registered - which, again, will result in the inner future +//! being polled when that work completes. +//! +//! This results in potentially redundant polls, but no lost polls. +//! +//! ### Why do I need an owned inner future in my `get_offload()` closure? It seems cumbersome. +//! A couple reasons: +//! +//! First, this allows the API to accept just a single closure, which can both delegate work to vacation, +//! which might need to be sent across threads, and then post-process the work with mutable access +//! to the inner future. We can't simply pass in a &mut reference because this library targets +//! pre-async-closure Rust versions, so it would insist that the inner future reference have a 'static lifetime, +//! which it doesn't. You could instead split into two closures, one of which generates the offloaded work, and the +//! second of which post-processes by taking as inputs the offloaded work output and `&mut inner_future`. However, +//! this was pretty unwieldy. +//! +//! Second, this makes it simpler to model the case where the offloaded work needs some state from the inner future +//! that the inner future can't operate without. If it wasn't owned, the inner future would need to add a new variant +//! to its internal state, which represents the case where it is missing state, and then panic or return `Poll::Pending` +//! if it is polled while in that state. +//! +//! If you have a use case that wants to continue polling the inner future, while also driving any offloaded work +//! (or multiple pieces of offloaded work), please cut a GitHub issue! Certainly we can add this functionality in +//! if consumers would use it. +//! +//! [`futures_util::then()`]: https://docs.rs/futures-util/latest/futures_util/future/trait.FutureExt.html#method.then +//! [`vacation::future::builder()`]: crate::future::builder() +//! [`get_offload()`]: crate::future::FutureBuilder::get_offload() + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project_lite::pin_project; + +pin_project! { +/// Wrapper future that processes occasional work offloaded from the inner future, while driving the inner +/// future if no offloaded work is active. +/// +/// **Note that `OffloadWith` does NOT poll the inner future while offloaded work is active, so there is a deadlock +/// risk if the offloaded work depends on the inner future making progress to resolve.** +/// +/// The order of execution is: +/// - poll any active offloaded vacation work +/// - when it completes, immediately poll the inner future +/// - poll inner future if no active offloaded work +/// - if inner future is pending, check poll to see if there is any vacation work to get_offload +/// - if there is, poll it once and then update the future to continue polling it +/// +/// Returning an error in any vacation handling (getting work, or if the offloaded work resolves to an error) +/// will abort the inner future and bubble up. But, you can also discard errors to make these +/// calls infallible. +/// +/// # Examples +/// +/// ``` +/// # // this is hideous to include everywhere but works around https://github.com/rust-lang/rust/issues/67295 +/// # // and keeps examples concise to the reader +/// # use std::{ +/// # future::Future, +/// # pin::Pin, +/// # task::{Context, Poll}, +/// # }; +/// # +/// # #[derive(Debug)] +/// # pub struct SampleFuture; +/// # +/// # #[derive(Debug, PartialEq)] +/// # pub enum SampleFutureResponse { +/// # Success, +/// # InnerError, +/// # VacationError(&'static str), +/// # } +/// # +/// # impl SampleFuture { +/// # pub fn new() -> Self { +/// # Self +/// # } +/// # +/// # pub fn get_offload_work( +/// # &mut self, +/// # ) -> Result Result<(), &'static str>>, &'static str> { +/// # Ok(Some(|| Ok(()))) +/// # } +/// # +/// # pub fn post_process_offload_work(&self, _input: ()) {} +/// # } +/// # +/// # impl Future for SampleFuture { +/// # type Output = SampleFutureResponse; +/// # +/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { +/// # Poll::Ready(SampleFutureResponse::Success) +/// # } +/// # } +/// +/// # #[tokio::main] +/// # async fn main() { +/// let future = vacation::future::builder() +/// // the inner future, you might need to `Box::pin` it if it is !Unpin +/// .future(SampleFuture::new()) +/// .get_offload(move |mut inner_fut: SampleFuture| { +/// // this method is directly exposed on the inner future by the library author +/// let maybe_work = match inner_fut.get_offload_work() { +/// Ok(maybe_work) => maybe_work, +/// // any bubbled up errors must match the inner future output type +/// Err(err) => return Err(SampleFutureResponse::VacationError(err)) +/// }; +/// +/// match maybe_work { +/// // if no work, return the owned inner future back out again +/// None => Ok(vacation::future::OffloadWork::NoWork(inner_fut)), +/// // if there is work, return an async closure that calls `vacation::execute()` +/// // and processes any results (if post-processing is needed) +/// Some(work) => Ok(vacation::future::OffloadWork::HasWork( +/// Box::new(async move { +/// match vacation::execute(work, vacation::ExecuteContext { +/// chance_of_blocking: vacation::ChanceOfBlocking::High, +/// namespace: "sample-future" +/// }).await { +/// Ok(work_res) => { +/// match work_res { +/// Ok(work_output) => { +/// inner_fut.post_process_offload_work(work_output); +/// // on offload work success, you must return the inner future +/// Ok(inner_fut) +/// }, +/// Err(work_err) => Err(SampleFutureResponse::VacationError(work_err)) +/// } +/// }, +/// Err(_vacation_executor_err) => Err(SampleFutureResponse::VacationError("executor_error")) +/// } +/// }) +/// )) +/// } +/// }) +/// .build(); +/// # } +/// ``` +pub struct OffloadWith { + inner: OffloadWithInner, + get_offload: GetOffload, +} +} + +enum OffloadWithInner { + InnerFut(InnerFut), + OffloadActive(Pin> + Send>>), + // used within a poll only, to transition between states as needed + UpdatingState, +} + +/// The successful result of the [`get_offload()`] call. This contains either: +/// - Work to be offloaded, passed as an async closure that resolves to +/// either an error or the owner inner future (which calls `vacation::execute()`) inside and handles +/// the result +/// - No work, with the owned inner future +/// +/// [`get_offload()`]: crate::future::FutureBuilder::get_offload() +pub enum OffloadWork { + /// Work to be offloaded, passed as an async closure that resolves to + /// either an error or the owner inner future (which calls `vacation::execute()`) inside and handles + /// the result + HasWork(Box> + Send>), + /// No work needs offloading, so this returns the owned inner future + NoWork(InnerFut), +} +impl std::fmt::Debug for OffloadWork { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::HasWork(_) => f.debug_tuple("HasWork").finish(), + Self::NoWork(_) => f.debug_tuple("NoWork").finish(), + } + } +} + +/// Needs a call to [`future()`] +/// +/// [`future()`]: crate::future::FutureBuilder::future() +#[derive(Debug)] +pub struct NeedsInnerFuture; + +/// Already has called [`future()`] +/// +/// [`future()`]: crate::future::FutureBuilder::future() +pub struct HasInnerFuture(InnerFut); +impl std::fmt::Debug for HasInnerFuture { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("HasInnerFuture").finish() + } +} + +/// Needs a call to [`get_offload()`] +/// +/// [`get_offload()`]: crate::future::FutureBuilder::get_offload() +#[derive(Debug)] +pub struct NeedsGetOffload; +/// Has already called [`get_offload()`] +/// +/// [`get_offload()`]: crate::future::FutureBuilder::get_offload() +pub struct HasGetOffload(GetOffload); +impl std::fmt::Debug for HasGetOffload { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("HasGetOffload").finish() + } +} + +/// A builder struct with which to construct [`OffloadWith`] future. +/// +/// Start with [`vacation::future::builder()`]. +/// +/// Note that the wrapper future does NOT poll the inner future while offloaded work is active, so there is a deadlock +/// risk if the offloaded work depends on the inner future making progress to resolve. +/// +/// The order of execution is: +/// - poll any active offloaded vacation work +/// - when it completes, immediately poll the inner future +/// - poll inner future if no active offloaded work +/// - if inner future is pending, check poll to see if there is any vacation work to get_offload +/// - if there is, poll it once and then update the future to continue polling it +/// +/// Returning an error in any vacation handling (getting work, or if the offloaded work resolves to an error) +/// will abort the inner future and bubble up. But, you can also discard errors to make these +/// calls infallible. +/// +/// # Examples +/// +/// ``` +/// # // this is hideous to include everywhere but works around https://github.com/rust-lang/rust/issues/67295 +/// # // and keeps examples concise to the reader +/// # use std::{ +/// # future::Future, +/// # pin::Pin, +/// # task::{Context, Poll}, +/// # }; +/// # +/// # #[derive(Debug)] +/// # pub struct SampleFuture; +/// # +/// # #[derive(Debug, PartialEq)] +/// # pub enum SampleFutureResponse { +/// # Success, +/// # InnerError, +/// # VacationError(&'static str), +/// # } +/// # +/// # impl SampleFuture { +/// # pub fn new() -> Self { +/// # Self +/// # } +/// # +/// # pub fn get_offload_work( +/// # &mut self, +/// # ) -> Result Result<(), &'static str>>, &'static str> { +/// # Ok(Some(|| Ok(()))) +/// # } +/// # +/// # pub fn post_process_offload_work(&self, _input: ()) {} +/// # } +/// # +/// # impl Future for SampleFuture { +/// # type Output = SampleFutureResponse; +/// # +/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { +/// # Poll::Ready(SampleFutureResponse::Success) +/// # } +/// # } +/// +/// # #[tokio::main] +/// # async fn main() { +/// let future = vacation::future::builder() +/// // the inner future, you might need to `Box::pin` it if it is !Unpin +/// .future(SampleFuture::new()) +/// .get_offload(move |mut inner_fut: SampleFuture| { +/// // this method is directly exposed on the inner future by the library author +/// let maybe_work = match inner_fut.get_offload_work() { +/// Ok(maybe_work) => maybe_work, +/// // any bubbled up errors must match the inner future output type +/// Err(err) => return Err(SampleFutureResponse::VacationError(err)) +/// }; +/// +/// match maybe_work { +/// // if no work, return the owned inner future back out again +/// None => Ok(vacation::future::OffloadWork::NoWork(inner_fut)), +/// // if there is work, return an async closure that calls `vacation::execute()` +/// // and processes any results (if post-processing is needed) +/// Some(work) => Ok(vacation::future::OffloadWork::HasWork( +/// Box::new(async move { +/// match vacation::execute(work, vacation::ExecuteContext { +/// chance_of_blocking: vacation::ChanceOfBlocking::High, +/// namespace: "sample-future" +/// }).await { +/// Ok(work_res) => { +/// match work_res { +/// Ok(work_output) => { +/// inner_fut.post_process_offload_work(work_output); +/// // on offload work success, you must return the inner future +/// Ok(inner_fut) +/// }, +/// Err(work_err) => Err(SampleFutureResponse::VacationError(work_err)) +/// } +/// }, +/// Err(_vacation_executor_err) => Err(SampleFutureResponse::VacationError("executor_error")) +/// } +/// }) +/// )) +/// } +/// }) +/// .build(); +/// # } +/// ``` +/// +/// [`OffloadWith`]: crate::future::OffloadWith +/// [`vacation::future::builder()`]: crate::future::builder() +#[derive(Debug)] +pub struct FutureBuilder { + inner_fut: InnerFut, + get_offload: GetOffload, +} + +/// Get a builder that constructs an[`OffloadWith`] wrapper future +/// +/// [`OffloadWith`]: crate::future::OffloadWith +#[must_use = "doesn't do anything unless built"] +pub fn builder() -> FutureBuilder { + FutureBuilder { + inner_fut: NeedsInnerFuture, + get_offload: NeedsGetOffload, + } +} + +impl FutureBuilder { + #[must_use = "doesn't do anything unless built"] + /// Accepts an inner future to wrap with [`OffloadWith`] + /// + /// Note that this future will be polled only when there is NOT + /// offload work active. If offload work completes, it will be immediately + /// polled again. + pub fn future( + self, + future: InnerFut, + ) -> FutureBuilder, NeedsGetOffload> + where + InnerFut: Future + Unpin, + { + FutureBuilder::, NeedsGetOffload> { + inner_fut: HasInnerFuture(future), + get_offload: self.get_offload, + } + } +} + +impl FutureBuilder, NeedsGetOffload> { + #[must_use = "doesn't do anything unless built"] + /// A closure accepting an owned inner future, that returns any work to be offloaded, + /// or the inner future if no offload work is needed. + /// + /// Work to be offload should be returned as an async closure that resolves back + /// to the inner future, or an error. Use [`vacation::execute()`] inside the closure future + /// and process its results as needed. + /// + /// Note that the wrapper future does NOT poll the inner future while offloaded work is active, so there is a deadlock + /// risk if the offloaded work depends on the inner future making progress to resolve. + /// + /// The order of execution is: + /// - poll any active offloaded vacation work + /// - when it completes, immediately poll the inner future + /// - poll inner future if no active offloaded work + /// - if inner future is pending, check poll to see if there is any vacation work to get_offload + /// - if there is, poll it once and then update the future to continue polling it + /// + /// Returning an error in any vacation handling (getting work, incorporating the results of work) + /// will abort the inner future and bubble up. But, you can also discard errors to make these + /// calls infallible. + /// + /// # Examples + /// + /// ``` + /// # // this is hideous to include everywhere but works around https://github.com/rust-lang/rust/issues/67295 + /// # // and keeps examples concise to the reader + /// # use std::{ + /// # future::Future, + /// # pin::Pin, + /// # task::{Context, Poll}, + /// # }; + /// # + /// # #[derive(Debug)] + /// # pub struct SampleFuture; + /// # + /// # #[derive(Debug, PartialEq)] + /// # pub enum SampleFutureResponse { + /// # Success, + /// # InnerError, + /// # VacationError(&'static str), + /// # } + /// # + /// # impl SampleFuture { + /// # pub fn new() -> Self { + /// # Self + /// # } + /// # + /// # pub fn get_offload_work( + /// # &mut self, + /// # ) -> Result Result<(), &'static str>>, &'static str> { + /// # Ok(Some(|| Ok(()))) + /// # } + /// # + /// # pub fn post_process_offload_work(&self, _input: ()) {} + /// # } + /// # + /// # impl Future for SampleFuture { + /// # type Output = SampleFutureResponse; + /// # + /// # fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + /// # Poll::Ready(SampleFutureResponse::Success) + /// # } + /// # } + /// + /// # #[tokio::main] + /// # async fn main() { + /// let future = vacation::future::builder() + /// // the inner future, you might need to `Box::pin` it if it is !Unpin + /// .future(SampleFuture::new()) + /// .get_offload(move |mut inner_fut: SampleFuture| { + /// // this method is directly exposed on the inner future by the library author + /// let maybe_work = match inner_fut.get_offload_work() { + /// Ok(maybe_work) => maybe_work, + /// // any bubbled up errors must match the inner future output type + /// Err(err) => return Err(SampleFutureResponse::VacationError(err)) + /// }; + /// + /// match maybe_work { + /// // if no work, return the owned inner future back out again + /// None => Ok(vacation::future::OffloadWork::NoWork(inner_fut)), + /// // if there is work, return an async closure that calls `vacation::execute()` + /// // and processes any results (if post-processing is needed) + /// Some(work) => Ok(vacation::future::OffloadWork::HasWork( + /// Box::new(async move { + /// match vacation::execute(work, vacation::ExecuteContext { + /// chance_of_blocking: vacation::ChanceOfBlocking::High, + /// namespace: "sample-future" + /// }).await { + /// Ok(work_res) => { + /// match work_res { + /// Ok(work_output) => { + /// inner_fut.post_process_offload_work(work_output); + /// // on offload work success, you must return the inner future + /// Ok(inner_fut) + /// }, + /// Err(work_err) => Err(SampleFutureResponse::VacationError(work_err)) + /// } + /// }, + /// Err(_vacation_executor_err) => Err(SampleFutureResponse::VacationError("executor_error")) + /// } + /// }) + /// )) + /// } + /// }) + /// .build(); + /// # } + /// ``` + /// + /// [`vacation::execute()`]: crate::execute() + pub fn get_offload( + self, + get_offload: GetOffload, + ) -> FutureBuilder, HasGetOffload> + where + GetOffload: Fn(InnerFut) -> Result, InnerFut::Output>, + InnerFut: Future + Unpin, + { + FutureBuilder::, HasGetOffload> { + inner_fut: self.inner_fut, + get_offload: HasGetOffload(get_offload), + } + } +} + +impl + FutureBuilder, HasGetOffload> +{ + #[must_use = "doesn't do anything unless polled"] + /// Finish building [`OffloadWith`] + pub fn build(self) -> OffloadWith { + OffloadWith { + inner: OffloadWithInner::InnerFut(self.inner_fut.0), + get_offload: self.get_offload.0, + } + } +} + +impl Future for OffloadWith +where + InnerFut: Future + Unpin, + GetOffload: Fn(InnerFut) -> Result, InnerFut::Output>, +{ + type Output = InnerFut::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + // first poll the inner future or any outstanding work + let inner_fut = match std::mem::replace(&mut this.inner, OffloadWithInner::UpdatingState) { + OffloadWithInner::UpdatingState => { + panic!("unexpected state while polling get_offload with future") + } + OffloadWithInner::InnerFut(mut inner_fut) => match Pin::new(&mut inner_fut).poll(cx) { + Poll::Ready(res) => return Poll::Ready(res), + Poll::Pending => inner_fut, + }, + OffloadWithInner::OffloadActive(mut offload_fut) => { + let mut inner_fut = match offload_fut.as_mut().poll(cx) { + // get_offload work still ongoing, reconstruct state and bail + Poll::Pending => { + this.inner = OffloadWithInner::OffloadActive(offload_fut); + return Poll::Pending; + } + // get_offload work complete + Poll::Ready(offload_res) => match offload_res { + Ok(inner_fut) => inner_fut, + // bubble up error in get_offload + Err(res) => return Poll::Ready(res), + }, + }; + // if get_offload future is done and successful, + // poll our inner future again in case it didn't set wakers since it was relying on + // vacation work completing + match Pin::new(&mut inner_fut).poll(cx) { + Poll::Ready(res) => return Poll::Ready(res), + Poll::Pending => inner_fut, + } + } + }; + + match (this.get_offload)(inner_fut) { + Ok(get_offload_res) => match get_offload_res { + OffloadWork::HasWork(get_offload) => { + let mut get_offload = Box::into_pin(get_offload); + // poll get_offload work once to kick it off + match get_offload.as_mut().poll(cx) { + // get_offload work didn't actually need to sleep + Poll::Ready(offload_res) => match offload_res { + // successfully completed get_offload work, poll our inner future + // again in case it didn't set wakers since it was relying on + // vacation work completing + Ok(mut inner_fut) => match Pin::new(&mut inner_fut).poll(cx) { + Poll::Ready(res) => Poll::Ready(res), + Poll::Pending => { + this.inner = OffloadWithInner::InnerFut(inner_fut); + Poll::Pending + } + }, + // get_offload work failed, bubble up any error + Err(err) => Poll::Ready(err), + }, + // get_offload work still ongoing, store in state + Poll::Pending => { + this.inner = OffloadWithInner::OffloadActive(get_offload); + Poll::Pending + } + } + } + // get get_offload work didn't return any work, reconstruct inner future state + OffloadWork::NoWork(inner_fut) => { + this.inner = OffloadWithInner::InnerFut(inner_fut); + Poll::Pending + } + }, + // bubble up error while getting get_offload work + Err(err) => Poll::Ready(err), + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + }; + + #[derive(Debug, PartialEq)] + enum TestFutureResponse { + Success { + poll_count: usize, + sync_work_remaining: usize, + }, + InnerError { + poll_count: usize, + sync_work_remaining: usize, + }, + VacationGetError, + VacationOffloadWorkError, + VacationExecutorError, + } + + struct TestFuture { + async_work: usize, + sync_work: usize, + poll_count: usize, + error_after_n_polls: Option, + } + + impl TestFuture { + fn new(async_work: usize, sync_work: usize) -> Self { + Self { + async_work, + sync_work, + error_after_n_polls: None, + poll_count: 0, + } + } + + fn new_with_err_after( + async_work: usize, + sync_work: usize, + polls_before_err: usize, + ) -> Self { + Self { + async_work, + sync_work, + error_after_n_polls: Some(polls_before_err), + poll_count: 0, + } + } + + fn get_sync_work(&mut self) -> Option<()> { + if self.sync_work > 0 { + self.sync_work -= 1; + Some(()) + } else { + None + } + } + } + + impl Future for TestFuture { + type Output = TestFutureResponse; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.poll_count += 1; + + if let Some(poll_count) = self.error_after_n_polls { + if poll_count < self.poll_count { + return Poll::Ready(TestFutureResponse::InnerError { + poll_count: self.poll_count, + sync_work_remaining: self.sync_work, + }); + } + } + + self.async_work -= 1; + match self.async_work { + remaining if remaining > 0 => { + // tell executor to immediately poll us again, + // which passes through our vacation future as well + cx.waker().wake_by_ref(); + Poll::Pending + } + _ => Poll::Ready(TestFutureResponse::Success { + poll_count: self.poll_count, + sync_work_remaining: self.sync_work, + }), + } + } + } + + /// Wraps a [`TestFuture`] in vacation handling + /// to send to a mpsc channel each stage's call, or + /// return an error if that channel is dropped. + fn vacation_fut( + inner_fut: TestFuture, + get_offload_err: bool, + offload_work_err: bool, + ) -> OffloadWith< + TestFuture, + impl Fn(TestFuture) -> Result, TestFutureResponse>, + > { + crate::future::builder() + .future(inner_fut) + .get_offload(move |mut inner_fut| { + if get_offload_err { + return Err(TestFutureResponse::VacationGetError); + } + + match inner_fut.get_sync_work() { + None => Ok(OffloadWork::NoWork(inner_fut)), + Some(_) => Ok(OffloadWork::HasWork(Box::new(async move { + match crate::execute( + move || { + if offload_work_err { + Err(TestFutureResponse::VacationOffloadWorkError) + } else { + Ok(()) + } + }, + crate::ExecuteContext { + chance_of_blocking: crate::ChanceOfBlocking::High, + namespace: "test.operation", + }, + ) + .await + { + Ok(res) => match res { + Ok(_) => Ok(inner_fut), + Err(_) => Err(TestFutureResponse::VacationOffloadWorkError), + }, + Err(_) => Err(TestFutureResponse::VacationExecutorError), + } + }))), + } + }) + .build() + } + + #[tokio::test] + async fn no_offload() { + let inner_fut = TestFuture::new(3, 0); + + let future = vacation_fut( + inner_fut, false, true, // this error is not reachable since we don't have work + ); + + let res = future.await; + + assert_eq!( + res, + TestFutureResponse::Success { + poll_count: 3, + sync_work_remaining: 0 + } + ); + } + + #[tokio::test] + async fn with_offload() { + let inner_fut = TestFuture::new(3, 1); + + let future = vacation_fut(inner_fut, false, false); + + let res = future.await; + + assert_eq!( + res, + TestFutureResponse::Success { + poll_count: 3, + sync_work_remaining: 0 + } + ); + } + + #[tokio::test] + async fn with_offload_remaining_after_inner() { + let inner_fut = TestFuture::new(2, 3); + + let future = vacation_fut(inner_fut, false, false); + + let res = future.await; + + assert_eq!( + res, + TestFutureResponse::Success { + poll_count: 2, + // poll inner -> get work -> poll inner -> done, with 2 sync remaining + sync_work_remaining: 2 + } + ); + } + + #[tokio::test] + async fn with_get_offload_err() { + let inner_fut = TestFuture::new(2, 3); + + let future = vacation_fut(inner_fut, true, false); + + let res = future.await; + + assert_eq!(res, TestFutureResponse::VacationGetError); + } + #[tokio::test] + async fn with_offload_work_err() { + let inner_fut = TestFuture::new(2, 3); + + let future = vacation_fut(inner_fut, false, true); + + let res = future.await; + + assert_eq!(res, TestFutureResponse::VacationOffloadWorkError); + } + + #[tokio::test] + async fn delayed_inner_err() { + let inner_fut = TestFuture::new_with_err_after(2, 1, 1); + + let future = vacation_fut(inner_fut, false, false); + + let res = future.await; + + assert_eq!( + res, + // inner poll -> get + run work -> inner poll w/ error + TestFutureResponse::InnerError { + poll_count: 2, + sync_work_remaining: 0 + } + ); + } +} diff --git a/src/future/first.rs b/src/future/first.rs deleted file mode 100644 index 9166732..0000000 --- a/src/future/first.rs +++ /dev/null @@ -1,419 +0,0 @@ -use std::{ - future::Future, - pin::Pin, - task::{ready, Context, Poll}, -}; - -use pin_project_lite::pin_project; - -use super::{ - FutureBuilder, HasIncorporateFn, NeedsIncorporateFn, NeedsOffload, NoInnerFuture, - OffloadFirstStrat, WhileWaitingMode, -}; - -pin_project! { -/// Initialize the inner future after executing a single piece of work with vacation. -/// That work should output return inputs necessary to construct the inner future. -/// -/// Continue polling the inner future as a pass-through after initialization. -/// -/// # Examples -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// let future = vacation::future::builder() -/// .offload_first( -/// vacation::future::OffloadFirst::builder() -/// .offload_future(vacation::execute( -/// // the work to offload -/// || std::thread::sleep(std::time::Duration::from_millis(100)), -/// vacation::ChanceOfBlocking::High, -/// "test.operation" -/// )) -/// // accepts the result of offloaded work, and returns an erorr -/// // or an inner future -/// .incorporate_fn(|res| { -/// // vacation work returned an executor error -/// if let Err(err) = res { -/// return Err(false); -/// } -/// Ok(Box::pin(async move { -/// tokio::time::sleep(std::time::Duration::from_millis(50)).await; -/// true -/// })) -/// }) -/// ) -/// .build(); -/// -/// assert_eq!(future.await, true) -/// # } -/// ``` -pub struct OffloadFirstFuture { - inner: Inner -} -} - -enum Inner { - OffloadInProgress { - offload_fut: Pin> + Send>>, - incorporate: Incorporate, - }, - InnerFuture { - inner_fut: InnerFut, - }, -} - -/// The incorporate function passed via [`incorporate_fn()`]. -/// -/// Takes output of vacation work and generates inner future -/// -/// [`incorporate_fn()`]: crate::future::first::OffloadFirst::incorporate_fn() -#[derive(Debug)] -pub enum Incorporate { - /// Vacation work still pending - Incorporate(IncorporateFn), - /// Transitional state used while resolving - Consumed, -} - -/// A sub-builder for the work to be offloaded via [`OffloadFirstFuture`] -/// -/// Use [`OffloadFirst::builder()`] to construct it. -/// -/// [`OffloadFirst::builder()`]: crate::future::first::OffloadFirst::builder() -/// [`OffloadFirstFuture`]: crate::future::first::OffloadFirstFuture -#[derive(Debug)] -pub struct OffloadFirst { - offload_future: OffloadFuture, - incorporate_fn: IncorporateFn, -} - -/// Needs input work to execute via vacation -#[derive(Debug)] -pub struct NeedsOffloadFuture; -/// Has work ready to execute via vacation -pub struct HassOffloadFuture( - Pin> + Send>>, -); - -impl std::fmt::Debug for HassOffloadFuture { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_tuple("HassOffloadFuture").finish() - } -} - -impl OffloadFirst { - /// A sub-builder for the work to be offloaded via [`OffloadFirstFuture`] - /// - /// ## Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// let sub_builder = vacation::future::OffloadFirst::builder() - /// .offload_future(vacation::execute( - /// // the work to offload - /// || std::thread::sleep(std::time::Duration::from_millis(100)), - /// vacation::ChanceOfBlocking::High, - /// "test.operation" - /// )) - /// // accepts the result of offloaded work, and returns an erorr - /// // or an inner future - /// .incorporate_fn(|res: Result<(), vacation::Error>| { - /// // vacation work returned an executor error - /// if let Err(err) = res { - /// return Err(false); - /// } - /// Ok(Box::pin(async move { - /// tokio::time::sleep(std::time::Duration::from_millis(50)).await; - /// true - /// })) - /// }); - /// # } - /// ``` - /// [`OffloadFirstFuture`]: crate::future::first::OffloadFirstFuture - #[must_use = "doesn't do anything unless built"] - pub fn builder() -> OffloadFirst { - OffloadFirst { - offload_future: NeedsOffloadFuture, - incorporate_fn: NeedsIncorporateFn, - } - } -} - -impl OffloadFirst { - /// The future to offload via vacation. Results of this are processed - /// via the input to [`incorporate_fn()`] - /// - /// [`incorporate_fn()`]: crate::future::first::OffloadFirst::incorporate_fn() - #[must_use = "doesn't do anything unless built"] - pub fn offload_future( - self, - offload_future: impl Future> + Send + 'static, - ) -> OffloadFirst, IncorporateFn> - where - OffloadResult: Send + 'static, - { - OffloadFirst { - offload_future: HassOffloadFuture(Box::pin(offload_future)), - incorporate_fn: self.incorporate_fn, - } - } -} - -impl OffloadFirst { - /// Process the output of the offloaded work, along with any other state - /// owned by this closure, and generate the inner future. - #[must_use = "doesn't do anything unless built"] - pub fn incorporate_fn( - self, - incorporate_fn: IncorporateFn, - ) -> OffloadFirst> - where - IncorporateFn: - FnOnce(Result) -> Result, - OffloadResult: Send + 'static, - InnerFut: Future + Unpin, - { - OffloadFirst { - offload_future: self.offload_future, - incorporate_fn: HasIncorporateFn(incorporate_fn), - } - } -} - -impl - FutureBuilder -{ - /// Initialize the inner future after executing a single piece of work with vacation. - /// That work should output return inputs necessary to construct the inner future. - /// - /// Continue polling the inner future as a pass-through after initialization. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// let future = vacation::future::builder() - /// .offload_first( - /// vacation::future::OffloadFirst::builder() - /// .offload_future(vacation::execute( - /// // the work to offload - /// || std::thread::sleep(std::time::Duration::from_millis(100)), - /// vacation::ChanceOfBlocking::High, - /// "test.operation" - /// )) - /// // accepts the result of offloaded work, and returns an erorr - /// // or an inner future - /// .incorporate_fn(|res| { - /// // vacation work returned an executor error - /// if let Err(err) = res { - /// return Err(false); - /// } - /// Ok(Box::pin(async move { - /// tokio::time::sleep(std::time::Duration::from_millis(50)).await; - /// true - /// })) - /// }) - /// ) - /// .build(); - /// - /// assert_eq!(future.await, true) - /// # } - /// ``` - #[must_use = "doesn't do anything unless built"] - pub fn offload_first( - self, - offload: OffloadFirst, HasIncorporateFn>, - ) -> FutureBuilder< - OffloadFirstStrat, - NoInnerFuture, - OffloadFirst, HasIncorporateFn>, - WhileWaitingMode, - > - where - IncorporateFn: - FnOnce(Result) -> Result, - OffloadResult: Send + 'static, - InnerFut: Future + Unpin, - { - FutureBuilder::< - OffloadFirstStrat, - NoInnerFuture, - OffloadFirst, HasIncorporateFn>, - WhileWaitingMode, - > { - strategy: OffloadFirstStrat, - inner_fut: NoInnerFuture, - offload, - while_waiting: WhileWaitingMode::SuppressInnerPoll, - } - } -} - -impl - FutureBuilder< - OffloadFirstStrat, - NoInnerFuture, - OffloadFirst, HasIncorporateFn>, - WhileWaitingMode, - > -{ - #[must_use = "doesn't do anything unless polled"] - /// Finish building [`OffloadFirstFuture`] - pub fn build(self) -> OffloadFirstFuture - where - IncorporateFn: - FnOnce(Result) -> Result, - OffloadResult: Send + 'static, - InnerFut: Future + Unpin, - { - let inner: Inner = Inner::OffloadInProgress { - offload_fut: self.offload.offload_future.0, - incorporate: Incorporate::Incorporate(self.offload.incorporate_fn.0), - }; - - OffloadFirstFuture { inner } - } -} - -impl Future - for OffloadFirstFuture -where - InnerFut: Future + Unpin, - OffloadResult: Send + 'static, - IncorporateFn: - FnOnce(Result) -> Result, -{ - type Output = InnerFut::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - match &mut this.inner { - // we've already finished our startup vacation work, just poll inner - Inner::InnerFuture { ref mut inner_fut } => Pin::new(inner_fut).as_mut().poll(cx), - // vacation startup work still ongoing - Inner::OffloadInProgress { - ref mut offload_fut, - ref mut incorporate, - } => { - // drive vacation work to completion - let res = ready!(Pin::new(offload_fut).as_mut().poll(cx)); - - // consume the incorporate - let incorporate = match std::mem::replace(incorporate, Incorporate::Consumed) { - Incorporate::Incorporate(incorporate) => incorporate, - Incorporate::Consumed => { - panic!("offload first work can only finish and be incorporated once") - } - }; - - // incorporate offloaded work result into inner future - match (incorporate)(res) { - Ok(mut inner_fut) => { - // poll inner future once - match Pin::new(&mut inner_fut).poll(cx) { - // inner future is done, return it - Poll::Ready(res) => Poll::Ready(res), - Poll::Pending => { - // otherwise the inner future already registered its wakers this past poll - this.inner = Inner::InnerFuture { inner_fut }; - Poll::Pending - } - } - } - // if the incorporate fn returns an error, bubble it up - Err(err) => Poll::Ready(err), - } - } - } - } -} - -// these tests all use the default executor which is `ExecuteDirectly` -#[cfg(test)] -mod test { - use crate::{ - future::test::{TestFuture, TestFutureResponse}, - ChanceOfBlocking, - }; - - use super::*; - - #[tokio::test] - async fn it_works() { - let mut inner_fut = TestFuture::new(3); - - let future = crate::future::builder() - .offload_first( - OffloadFirst::builder() - .offload_future(crate::execute( - move || false, - ChanceOfBlocking::High, - "test.operation", - )) - .incorporate_fn(move |res| { - Ok(Box::pin(async move { - if res.unwrap() { - inner_fut.error_after(0); - } - inner_fut.await - })) - }), - ) - .build(); - - let res = future.await; - // polled inner 3 times since we had 3 inner work units - assert_eq!(res, TestFutureResponse::Success(3)); - } - - #[tokio::test] - async fn inner_fut_fails() { - let mut inner_fut = TestFuture::new(3); - - let future = crate::future::builder() - .offload_first( - OffloadFirst::builder() - .offload_future(crate::execute( - move || true, - ChanceOfBlocking::High, - "test.operation", - )) - .incorporate_fn(move |res| { - Ok(Box::pin(async move { - if res.unwrap() { - inner_fut.error_after(0); - } - inner_fut.await - })) - }), - ) - .build(); - - let res = future.await; - // first poll fails - assert_eq!(res, TestFutureResponse::InnerError(1)); - } - - #[tokio::test] - async fn vacation_fails() { - let future = crate::future::builder() - .offload_first( - OffloadFirst::builder() - .offload_future(crate::execute( - || {}, - ChanceOfBlocking::High, - "test.operation", - )) - .incorporate_fn(move |_res: Result<(), crate::Error>| { - Err(TestFutureResponse::VacationIncorporateError) - }), - ) - .build::(); - - let res = future.await; - assert_eq!(res, TestFutureResponse::VacationIncorporateError); - } -} diff --git a/src/future/mod.rs b/src/future/mod.rs deleted file mode 100644 index f5c7dba..0000000 --- a/src/future/mod.rs +++ /dev/null @@ -1,283 +0,0 @@ -/// Contains [`OffloadFirstFuture`], used to resolve one round of compute-heavy work with vacation, and then construct and poll an inner future -/// -/// [`OffloadFirstFuture`]: crate::future::first::OffloadFirstFuture -pub mod first; -/// Contains [`OffloadWithFuture`], for retrieving and running vacation work alongside polling an inner future -/// -/// [`OffloadWithFuture`]: crate::future::with::OffloadWithFuture -pub mod with; - -pub use first::{OffloadFirst, OffloadFirstFuture}; -pub use with::{OffloadWith, OffloadWithFuture}; - -/// Needs a call to [`future()`] (for [`OffloadWithFuture`]) -/// or [`offload_first()`] (for [`OffloadFirstFuture`]) -/// -/// [`future()`]: crate::future::FutureBuilder::future() -/// [`offload_first()`]: crate::future::FutureBuilder::offload_first() -/// [`OffloadWithFuture`]: crate::future::with::OffloadWithFuture -/// [`OffloadFirstFuture`]: crate::future::first::OffloadFirstFuture -#[derive(Debug)] -pub struct NeedsStrategy; -/// Builder will construct [`OffloadFirstFuture`] -/// -/// [`OffloadFirstFuture`]: crate::future::first::OffloadFirstFuture -#[derive(Debug)] -pub struct OffloadFirstStrat; -/// Builder will construct [`OffloadWithFuture`] -/// -/// [`OffloadWithFuture`]: crate::future::with::OffloadWithFuture -#[derive(Debug)] -pub struct OffloadWithFutureStrat; - -/// Needs a call to [`future()`] (for [`OffloadWithFuture`]) -/// or [`offload_first()`] (for [`OffloadFirstFuture`]) -/// -/// [`future()`]: crate::future::FutureBuilder::future() -/// [`offload_first()`]: crate::future::FutureBuilder::offload_first() -/// [`OffloadWithFuture`]: crate::future::with::OffloadWithFuture -/// [`OffloadFirstFuture`]: crate::future::first::OffloadFirstFuture -#[derive(Debug)] -pub struct NeedsInnerFuture; -/// Builder will construct [`OffloadFirstFuture`], which does not wrap a future -/// directly. -/// -/// [`OffloadFirstFuture`]: crate::future::first::OffloadFirstFuture - -#[derive(Debug)] -pub struct NoInnerFuture; - -/// Builder needs a call to [`offload_with()`] or [`offload_first()`] -/// -/// [`offload_with()`]: crate::future::FutureBuilder::offload_with() -/// [`offload_first()`]: crate::future::FutureBuilder::offload_first() -#[derive(Debug)] -pub struct NeedsOffload; - -/// Sub-builder needs a call to [`OffloadWith::incorporate_fn()`] -/// or [`OffloadFirst::incorporate_fn()`] -/// -/// [`OffloadWith::incorporate_fn()`]: crate::future::with::OffloadWith::incorporate_fn() -/// [`OffloadFirst::incorporate_fn()`]: crate::future::first::OffloadFirst::incorporate_fn() -#[derive(Debug)] -pub struct NeedsIncorporateFn; -/// Sub-builder has an incorporate function loaded -#[derive(Debug)] -pub struct HasIncorporateFn(IncorporateFn); - -/// Builder needs to know whether it should poll the inner future -/// while offloaded vacation work is ongoing, or wait until that offloaded -/// work completes. -/// -/// Use [`FutureBuilder::while_waiting_for_offload()`] to specify. -/// -/// [`FutureBuilder::while_waiting_for_offload()`]: crate::future::FutureBuilder::while_waiting_for_offload() -#[derive(Debug)] -pub struct NeedsWhileWaiting; - -/// A builder struct with which to construct [`OffloadFirst`] or [`OffloadWithFuture`] futures. -/// -/// Start with [`vacation::future::builder()`]. -/// -/// [`OffloadFirst`]: crate::future::first::OffloadFirst -/// [`OffloadWithFuture`]: crate::future::with::OffloadWithFuture -/// [`vacation::future::builder()`]: crate::future::builder() -#[derive(Debug)] -pub struct FutureBuilder { - strategy: Strategy, - inner_fut: InnerFut, - offload: Offload, - while_waiting: WhileWaiting, -} - -/// The mode to use when offload work is active in vacation. -/// -/// Either skip polling the inner until vacation work is complete, -/// or continue polling the inner future alongside -/// the vacation future. -/// -/// If using [`WhileWaitingMode::SuppressInnerPoll`], note that there is a deadlock risk of the offloaded -/// work completion relies on the inner future making progress. -#[derive(PartialEq, Debug)] -pub enum WhileWaitingMode { - /// Skip polling the inner future as long as there is vacation work outstanding - /// - /// Note that there is a deadlock risk of the offloaded - /// work completion relies on the inner future making progress. - SuppressInnerPoll, - /// Continue polling the inner future while there is vacation work active. - PassThroughInnerPoll, -} - -/// Get a builder that constructs a [`OffloadFirstFuture`] or [`OffloadWithFuture`] wrapper future -/// -/// # Examples -/// -/// ## OffloadFirstFuture -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// let future = vacation::future::builder() -/// .offload_first( -/// vacation::future::OffloadFirst::builder() -/// .offload_future(vacation::execute( -/// // the work to offload -/// || std::thread::sleep(std::time::Duration::from_millis(100)), -/// vacation::ChanceOfBlocking::High, -/// "test.operation" -/// )) -/// // accepts the result of offloaded work, and returns an erorr -/// // or an inner future -/// .incorporate_fn(|res| { -/// // vacation work returned an executor error -/// if let Err(err) = res { -/// return Err(false); -/// } -/// Ok(Box::pin(async move { -/// tokio::time::sleep(std::time::Duration::from_millis(50)).await; -/// true -/// })) -/// }) -/// ) -/// .build(); -/// -/// assert_eq!(future.await, true) -/// # } -/// ``` -/// -/// ## OffloadWith -/// -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// let future = vacation::future::builder() -/// // the wrapped future -/// .future(Box::pin( async move { -/// tokio::time::sleep(std::time::Duration::from_millis(50)).await; -/// true -/// })) -/// .offload_with( -/// vacation::future::OffloadWith::builder() -/// // take a mutable reference to inner future, -/// // and retrieve any work to offload -/// .get_offload_fn(|_inner_fut| { -/// // it could be conditional, but here it's always returning work -/// Ok(Some(Box::pin(vacation::execute( -/// || std::thread::sleep(std::time::Duration::from_millis(50)), -/// vacation::ChanceOfBlocking::High, -/// "test.operation" -/// )))) -/// }) -/// // called with the results of the offloaded work and the inner future, -/// // use to convert errors or do any post-processing -/// .incorporate_fn(|_inner_fut, res| { -/// println!("work complete: {res:#?}"); -/// Ok(()) -/// }) -/// ) -/// // could also suppress the inner poll -/// .while_waiting_for_offload(vacation::future::WhileWaitingMode::PassThroughInnerPoll) -/// .build(); -/// -/// assert_eq!(future.await, true) -/// # } -/// ``` -/// -/// [`OffloadFirstFuture`]: crate::future::first::OffloadFirstFuture -/// [`OffloadWithFuture`]: crate::future::with::OffloadWithFuture -#[must_use = "doesn't do anything unless built"] -pub fn builder() -> FutureBuilder -{ - FutureBuilder { - strategy: NeedsStrategy, - inner_fut: NeedsInnerFuture, - offload: NeedsOffload, - while_waiting: NeedsWhileWaiting, - } -} - -#[cfg(test)] -pub(crate) mod test { - use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, - }; - - #[derive(PartialEq)] - pub(crate) enum TestFutureResponse { - Success(usize), - InnerError(usize), - VacationGetError, - VacationWorkError, - VacationIncorporateError, - ExecutorError, - } - - impl std::fmt::Debug for TestFutureResponse { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Success(arg0) => f.debug_tuple("Success").field(arg0).finish(), - Self::InnerError(arg0) => f.debug_tuple("InnerError").field(arg0).finish(), - Self::VacationGetError => write!(f, "VacationGetError"), - Self::VacationWorkError => write!(f, "VacationWorkError"), - Self::VacationIncorporateError => write!(f, "VacationIncorporateError"), - Self::ExecutorError => write!(f, "ExecutorError"), - } - } - } - - pub(crate) struct TestFuture { - work: usize, - poll_count: usize, - error_after_n_polls: Option, - } - - impl TestFuture { - pub(crate) fn new(work: usize) -> Self { - Self { - work, - error_after_n_polls: None, - poll_count: 0, - } - } - - pub(crate) fn new_with_err(work: usize, poll_count: usize) -> Self { - Self { - work, - error_after_n_polls: Some(poll_count), - poll_count: 0, - } - } - - /// 0 means immediately - pub(crate) fn error_after(&mut self, poll_count: usize) { - self.error_after_n_polls = Some(poll_count) - } - } - - impl Future for TestFuture { - type Output = TestFutureResponse; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.poll_count += 1; - - if let Some(poll_count) = self.error_after_n_polls { - if poll_count < self.poll_count { - return Poll::Ready(TestFutureResponse::InnerError(self.poll_count)); - } - } - - self.work -= 1; - match self.work { - remaining if remaining > 0 => { - // tell executor to immediately poll us again, - // which passes through our vacation future as well - cx.waker().wake_by_ref(); - Poll::Pending - } - _ => Poll::Ready(TestFutureResponse::Success(self.poll_count)), - } - } - } -} diff --git a/src/future/with.rs b/src/future/with.rs deleted file mode 100644 index f1aa932..0000000 --- a/src/future/with.rs +++ /dev/null @@ -1,777 +0,0 @@ -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - -use pin_project_lite::pin_project; - -use super::{ - FutureBuilder, HasIncorporateFn, NeedsIncorporateFn, NeedsInnerFuture, NeedsOffload, - NeedsStrategy, NeedsWhileWaiting, OffloadWithFutureStrat, WhileWaitingMode, -}; - -pin_project! { -/// Wrapper future that processes occasional work offloaded from the inner future, while driving the inner future. -/// -/// The order of execution is: -/// - poll any active vacation work -/// - if work completes, the incorporate fn to resolve it (includes mutable pointer to inner task + result of vacation) -/// - poll inner future (if either [`WhileWaitingMode::PassThroughInnerPoll`], or no active offloaded work -/// - poll to see if there is any vacation work to prepare (if no work is already active) -/// - if there is, poll it once and then call the incorporate fn with its results if complete -/// -/// Returning an error in any vacation handling (getting work, incorporating the results of work) -/// will abort the inner future and bubble up. But, you can also discard errors to make these -/// calls infallible. -/// -/// # Examples -/// -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// let future = vacation::future::builder() -/// // the wrapped future -/// .future(Box::pin( async move { -/// tokio::time::sleep(std::time::Duration::from_millis(50)).await; -/// true -/// })) -/// .offload_with( -/// vacation::future::OffloadWith::builder() -/// // take a mutable reference to inner future, -/// // and retrieve any work to offload -/// .get_offload_fn(|_inner_fut| { -/// // it could be conditional, but here it's always returning work -/// Ok(Some(Box::pin(vacation::execute( -/// || std::thread::sleep(std::time::Duration::from_millis(50)), -/// vacation::ChanceOfBlocking::High, -/// "test.operation" -/// )))) -/// }) -/// // called with the results of the offloaded work and the inner future, -/// // use to convert errors or do any post-processing -/// .incorporate_fn(|_inner_fut, res| { -/// println!("work complete: {res:#?}"); -/// Ok(()) -/// }) -/// ) -/// // could also suppress the inner poll -/// .while_waiting_for_offload(vacation::future::WhileWaitingMode::PassThroughInnerPoll) -/// .build(); -/// -/// assert_eq!(future.await, true) -/// # } -/// ``` -/// [`WhileWaitingMode::PassThroughInnerPoll`]: crate::future::WhileWaitingMode::PassThroughInnerPoll -pub struct OffloadWithFuture { - inner_fut: InnerFut, - get_offload_fn: GetOffloadFn, - incorporate_fn: IncorporateFn, - offload_fut: Option> + Send>>>, - while_waiting: WhileWaitingMode, -} -} - -#[derive(Debug)] -/// A sub-builder for work to be offloaded via [`OffloadWithFuture`] -/// -/// # Examples -/// -/// ``` -/// # type InnerFut = Box>>>; -/// # #[tokio::main] -/// # async fn run() { -/// let builder = vacation::future::OffloadWith::builder() -/// // take a mutable reference to inner future, -/// // and retrieve any work to offload -/// .get_offload_fn(|_inner_fut: &mut InnerFut| { -/// // it could be conditional, but here it's always returning work -/// Ok(Some(Box::pin(vacation::execute( -/// || std::thread::sleep(std::time::Duration::from_millis(50)), -/// vacation::ChanceOfBlocking::High, -/// "test.operation" -/// )))) -/// }) -/// .incorporate_fn(|_inner_fut: &mut InnerFut, res: Result| { -/// println!("work complete: {res:#?}"); -/// Ok(()) -/// }); -/// # } -/// ``` -pub struct OffloadWith { - get_offload_fn: GetOffloadFn, - incorporate_fn: IncorporateFn, -} - -// this is the builder impl that splits into together or initialize builders -impl FutureBuilder { - #[must_use = "doesn't do anything unless built"] - /// Accepts an inner future to wrap with [`OffloadWithFuture`] - /// - /// The order of execution is: - /// - poll any active vacation work - /// - if work completes, the incorporate fn to resolve it (includes mutable pointer to inner task + result of vacation) - /// - poll inner future (if either [`WhileWaitingMode::PassThroughInnerPoll`], or no active offloaded work - /// - poll to see if there is any vacation work to prepare (if no work is already active) - /// - if there is, poll it once and then call the incorporate fn with its results if complete - /// - /// Returning an error in any vacation handling (getting work, incorporating the results of work) - /// will abort the inner future and bubble up. But, you can also discard errors to make these - /// calls infallible. - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// let future = vacation::future::builder() - /// // the wrapped future - /// .future(Box::pin( async move { - /// tokio::time::sleep(std::time::Duration::from_millis(50)).await; - /// true - /// })) - /// .offload_with( - /// vacation::future::OffloadWith::builder() - /// // take a mutable reference to inner future, - /// // and retrieve any work to offload - /// .get_offload_fn(|_inner_fut| { - /// // it could be conditional, but here it's always returning work - /// Ok(Some(Box::pin(vacation::execute( - /// || std::thread::sleep(std::time::Duration::from_millis(50)), - /// vacation::ChanceOfBlocking::High, - /// "test.operation" - /// )))) - /// }) - /// // called with the results of the offloaded work and the inner future, - /// // use to convert errors or do any post-processing - /// .incorporate_fn(|_inner_fut, res| { - /// println!("work complete: {res:#?}"); - /// Ok(()) - /// }) - /// ) - /// // could also suppress the inner poll - /// .while_waiting_for_offload(vacation::future::WhileWaitingMode::PassThroughInnerPoll) - /// .build(); - /// - /// assert_eq!(future.await, true) - /// # } - /// ``` - /// [`WhileWaitingMode::PassThroughInnerPoll`]: crate::future::WhileWaitingMode::PassThroughInnerPoll - /// [`OffloadWith`]: crate::future::with::OffloadWithFuture - pub fn future( - self, - inner_fut: InnerFut, - ) -> FutureBuilder - where - InnerFut: Future + Unpin, - { - FutureBuilder:: { - strategy: OffloadWithFutureStrat, - inner_fut, - offload: NeedsOffload, - while_waiting: NeedsWhileWaiting, - } - } -} - -/// The sub-builder still needs a call to [`get_offload_fn()`] -/// -/// [`get_offload_fn()`]: crate::future::with::OffloadWith::get_offload_fn() -#[derive(Debug)] -pub struct NeedsGetOffloadFn; -/// Sub-builder has a get offload function loaded -#[derive(Debug)] -pub struct HasGetOffloadFn(GetOffloadFn); - -impl OffloadWith { - /// docs - #[must_use = "doesn't do anything unless built"] - pub fn builder() -> OffloadWith { - OffloadWith { - get_offload_fn: NeedsGetOffloadFn, - incorporate_fn: NeedsIncorporateFn, - } - } -} - -impl OffloadWith { - /// A function to call after polling the inner future, to see - /// if any work is available. Returns the offloaded work future, - /// already passed into [`vacation::execute()`]. - /// - /// This will generally require that you wrap the work in `Box::new(Box::pin())`. - /// - /// # Examples - /// ``` - /// # type InnerFut = Box>>>; - /// # #[tokio::main] - /// # async fn run() { - /// let builder = vacation::future::OffloadWith::builder() - /// // take a mutable reference to inner future, - /// // and retrieve any work to offload - /// .get_offload_fn(|_inner_fut: &mut InnerFut| { - /// // it could be conditional, but here it's always returning work - /// Ok(Some(Box::pin(vacation::execute( - /// || std::thread::sleep(std::time::Duration::from_millis(50)), - /// vacation::ChanceOfBlocking::High, - /// "test.operation" - /// )))) - /// }); - /// # } - /// ``` - /// - /// [`vacation::execute()`]: crate::execute() - #[must_use = "doesn't do anything unless built"] - pub fn get_offload_fn( - self, - get_offload_fn: GetOffloadFn, - ) -> OffloadWith, IncorporateFn> - where - GetOffloadFn: Fn( - &mut InnerFut, - ) -> Result< - Option> + Send>>>, - InnerFut::Output, - >, - OffloadResult: Send + 'static, - InnerFut: Future + Unpin, - { - OffloadWith { - get_offload_fn: HasGetOffloadFn(get_offload_fn), - incorporate_fn: self.incorporate_fn, - } - } -} - -impl OffloadWith { - /// A function to call with the result of the offloaded work, - /// to handle executor errors and do any post processing. - /// - /// Errors returned by this function will be bubbled up, aborting - /// the inner future. - /// - /// # Examples: - /// - /// ``` - /// # type InnerFut = Box>>>; - /// # #[tokio::main] - /// # async fn run() { - /// let builder = vacation::future::OffloadWith::builder() - /// // take a mutable reference to inner future, - /// // and retrieve any work to offload - /// .get_offload_fn(|_inner_fut: &mut InnerFut| { - /// // it could be conditional, but here it's always returning work - /// Ok(Some(Box::pin(vacation::execute( - /// || std::thread::sleep(std::time::Duration::from_millis(50)), - /// vacation::ChanceOfBlocking::High, - /// "test.operation" - /// )))) - /// }) - /// .incorporate_fn(|_inner_fut: &mut InnerFut, res: Result| { - /// println!("work complete: {res:#?}"); - /// Ok(()) - /// }); - /// # } - /// ``` - #[must_use = "doesn't do anything unless built"] - pub fn incorporate_fn( - self, - incorporate_fn: IncorporateFn, - ) -> OffloadWith> - where - IncorporateFn: - Fn(&mut InnerFut, Result) -> Result<(), InnerFut::Output>, - OffloadResult: Send + 'static, - InnerFut: Future + Unpin, - { - OffloadWith { - get_offload_fn: self.get_offload_fn, - incorporate_fn: HasIncorporateFn(incorporate_fn), - } - } -} - -impl - FutureBuilder -{ - /// Accepts an [`OffloadWith`] builder with a get offload function loaded, as well - /// as an optional incorporator function - #[must_use = "doesn't do anything unless built"] - pub fn offload_with( - self, - offload: OffloadWith, IncorporateFn>, - ) -> FutureBuilder< - OffloadWithFutureStrat, - InnerFut, - OffloadWith, IncorporateFn>, - WhileWaiting, - > - where - InnerFut: Future + Unpin, - { - FutureBuilder::< - OffloadWithFutureStrat, - InnerFut, - OffloadWith, IncorporateFn>, - WhileWaiting, - > { - strategy: self.strategy, - inner_fut: self.inner_fut, - offload, - while_waiting: self.while_waiting, - } - } -} - -impl - FutureBuilder -{ - /// Specify behavior while vacation work is being actively driven. - /// - /// Either skip polling the inner until vacation work is complete, - /// or continue polling the inner future alongside - /// the vacation future. - /// - /// If using [`WhileWaitingMode::SuppressInnerPoll`], note that there is a deadlock risk of the offloaded - /// work completion relies on the inner future making progress. - #[must_use = "doesn't do anything unless built"] - pub fn while_waiting_for_offload( - self, - while_waiting: WhileWaitingMode, - ) -> FutureBuilder { - FutureBuilder:: { - strategy: self.strategy, - inner_fut: self.inner_fut, - offload: self.offload, - while_waiting, - } - } -} - -impl - FutureBuilder< - OffloadWithFutureStrat, - InnerFut, - OffloadWith, HasIncorporateFn>, - WhileWaitingMode, - > -{ - #[must_use = "doesn't do anything unless polled"] - /// Finish building [`OffloadWithFuture`] - pub fn build( - self, - ) -> OffloadWithFuture { - OffloadWithFuture { - inner_fut: self.inner_fut, - get_offload_fn: self.offload.get_offload_fn.0, - incorporate_fn: self.offload.incorporate_fn.0, - offload_fut: None, - while_waiting: self.while_waiting, - } - } -} - -impl - OffloadWithFuture -{ - /// Helper fn to poll the offloaded future and call its incorporator as needed - /// - /// Updates self to store the future again if it's not complete - fn poll_offloaded_work( - &mut self, - mut offload_fut: Pin> + Send>>, - cx: &mut Context<'_>, - ) -> Poll> - where - IncorporateFn: - Fn(&mut InnerFut, Result) -> Result<(), InnerFut::Output>, - { - if let Poll::Ready(res) = Pin::new(&mut offload_fut).poll(cx) { - // if work completes, call the incorporate function with it - match (self.incorporate_fn)(&mut self.inner_fut, res) { - Ok(()) => Poll::Ready(Ok(())), - // bubble any errors up - Err(err) => Poll::Ready(Err(err)), - } - } else { - self.offload_fut = Some(offload_fut); - Poll::Pending - } - } -} - -impl Future - for OffloadWithFuture -where - InnerFut: Future + Unpin, - GetOffloadFn: Fn( - &mut InnerFut, - ) -> Result< - Option> + Send>>>, - InnerFut::Output, - >, - OffloadResult: Send + 'static, - IncorporateFn: - Fn(&mut InnerFut, Result) -> Result<(), InnerFut::Output>, -{ - type Output = InnerFut::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - // first execute any vacation work - if let Some(offload_fut) = this.offload_fut.take() { - match this.poll_offloaded_work(offload_fut, cx) { - // if we are done, clean up state - Poll::Ready(Ok(())) => this.offload_fut = None, - // bubble up any errors - Poll::Ready(Err(err)) => return Poll::Ready(err), - _ => (), - } - } - - if this.offload_fut.is_none() - || this.while_waiting == WhileWaitingMode::PassThroughInnerPoll - { - if let Poll::Ready(res) = Pin::new(&mut this.inner_fut).poll(cx) { - // if inner is ready, we drop any active vacation work (at least on this task's side) and return inner's result - return Poll::Ready(res); - } - } - - // we only prepare new vacation work if there is none loaded already - // TODO: consider allowing a vec of multiple work units - if this.offload_fut.is_none() { - match (this.get_offload_fn)(&mut this.inner_fut) { - // bubble up any errors - Err(err) => return Poll::Ready(err), - Ok(Some(offload_fut)) => { - // poll the work once and bubble up any errors - // poll_offloaded_work updates the state for us if the offloaded future is still pending - if let Poll::Ready(Err(err)) = this.poll_offloaded_work(offload_fut, cx) { - return Poll::Ready(err); - } - } - // no work = do nothing - _ => (), - } - } - - Poll::Pending - } -} - -// these tests all use the default executor which is `ExecuteDirectly` -#[cfg(test)] -mod test { - use std::{future::Future, time::Duration}; - - use tokio::sync::mpsc::Sender; - - use crate::{ - future::{ - test::{TestFuture, TestFutureResponse}, - WhileWaitingMode, - }, - ChanceOfBlocking, - }; - - use super::*; - - /// Wraps a [`TestFuture`] in vacation handling - /// to send to a mpsc channel each stage's call, or - /// return an error if that channel is dropped. - fn vacation_fut( - inner_fut: TestFuture, - get_tx: Sender<()>, - work_tx: Sender<()>, - incorporate_tx: Sender<()>, - while_waiting: WhileWaitingMode, - ) -> OffloadWithFuture< - TestFuture, - impl Fn( - &mut TestFuture, - ) -> Result< - Option< - Pin< - Box< - dyn Future, crate::Error>> - + Send, - >, - >, - >, - TestFutureResponse, - >, - Result<(), TestFutureResponse>, - impl Fn( - &mut TestFuture, - Result, crate::Error>, - ) -> Result<(), TestFutureResponse>, - > { - crate::future::builder() - .future(inner_fut) - .offload_with( - OffloadWith::builder() - .get_offload_fn(move |_| { - if get_tx.clone().try_send(()).is_err() { - return Err(TestFutureResponse::VacationGetError); - } - - let tx = work_tx.clone(); - let closure = move || { - if tx.try_send(()).is_err() { - return Err(TestFutureResponse::VacationWorkError); - } - Ok(()) - }; - - let offload_fut = crate::execute(closure, ChanceOfBlocking::High, "test.operation"); - - Ok(Some(Box::pin(offload_fut))) - }) - .incorporate_fn( - move |_inner: &mut TestFuture, - res: Result, crate::Error>| { - if res.is_err() { - return Err(TestFutureResponse::ExecutorError); - } - - if res.unwrap().is_err() { - return Err(TestFutureResponse::VacationWorkError); - } - - if incorporate_tx.clone().try_send(()).is_err() { - return Err(TestFutureResponse::VacationIncorporateError); - } - - Ok(()) - } - ) - ) - .while_waiting_for_offload(while_waiting) - .build() - } - - #[tokio::test] - async fn it_works() { - let inner_fut = TestFuture::new(3); - - let (get_tx, get_rx) = tokio::sync::mpsc::channel::<()>(5); - let (work_tx, work_rx) = tokio::sync::mpsc::channel::<()>(5); - let (incorporate_tx, incorporate_rx) = tokio::sync::mpsc::channel::<()>(5); - - let future = vacation_fut( - inner_fut, - get_tx, - work_tx, - incorporate_tx, - WhileWaitingMode::PassThroughInnerPoll, - ); - - let res = future.await; - - // gets work on first poll, - // executes work on second poll + gets more work - // executes work on third poll, then finishes inner future and returns before getting more work - assert_eq!(res, TestFutureResponse::Success(3)); - assert_eq!(2, get_rx.len()); - assert_eq!(2, work_rx.len()); - assert_eq!(2, incorporate_rx.len()); - } - - #[tokio::test] - async fn inner_error() { - let inner_fut = TestFuture::new_with_err(3, 0); - - let (get_tx, get_rx) = tokio::sync::mpsc::channel::<()>(5); - let (work_tx, work_rx) = tokio::sync::mpsc::channel::<()>(5); - let (incorporate_tx, incorporate_rx) = tokio::sync::mpsc::channel::<()>(5); - - let future = vacation_fut( - inner_fut, - get_tx, - work_tx, - incorporate_tx, - WhileWaitingMode::PassThroughInnerPoll, - ); - - let res = future.await; - // - - // first poll fails, no work picked up or executed - assert_eq!(res, TestFutureResponse::InnerError(1)); - assert_eq!(0, get_rx.len()); - assert_eq!(0, work_rx.len()); - assert_eq!(0, incorporate_rx.len()); - } - - #[tokio::test] - async fn delayed_inner_error() { - let inner_fut = TestFuture::new_with_err(3, 1); - - let (get_tx, get_rx) = tokio::sync::mpsc::channel::<()>(5); - let (work_tx, work_rx) = tokio::sync::mpsc::channel::<()>(5); - let (incorporate_tx, incorporate_rx) = tokio::sync::mpsc::channel::<()>(5); - - let future = vacation_fut( - inner_fut, - get_tx, - work_tx, - incorporate_tx, - WhileWaitingMode::PassThroughInnerPoll, - ); - - let res = future.await; - - // first poll succeeds, new work from get - // vacation work executes and resolves, then inner poll fails, no new get - assert_eq!(res, TestFutureResponse::InnerError(2)); - assert_eq!(1, get_rx.len()); - assert_eq!(1, work_rx.len()); - assert_eq!(1, incorporate_rx.len()); - } - - #[tokio::test] - async fn incorporate_err() { - let inner_fut: TestFuture = TestFuture::new(3); - - let (get_tx, get_rx) = tokio::sync::mpsc::channel::<()>(5); - let (work_tx, work_rx) = tokio::sync::mpsc::channel::<()>(5); - let (incorporate_tx, incorporate_rx) = tokio::sync::mpsc::channel::<()>(5); - - let future = vacation_fut( - inner_fut, - get_tx, - work_tx, - incorporate_tx, - WhileWaitingMode::PassThroughInnerPoll, - ); - - std::mem::drop(incorporate_rx); - - let res = future.await; - - // first poll succeeds and gathers new work - // execute work succeeds, then resolve fails - assert_eq!(res, TestFutureResponse::VacationIncorporateError); - assert_eq!(1, get_rx.len()); - assert_eq!(1, work_rx.len()); - } - - #[tokio::test] - async fn work_err() { - let inner_fut: TestFuture = TestFuture::new(2); - - let (get_tx, get_rx) = tokio::sync::mpsc::channel::<()>(5); - let (work_tx, work_rx) = tokio::sync::mpsc::channel::<()>(5); - let (incorporate_tx, incorporate_rx) = tokio::sync::mpsc::channel::<()>(5); - - let future = vacation_fut( - inner_fut, - get_tx, - work_tx, - incorporate_tx, - WhileWaitingMode::PassThroughInnerPoll, - ); - - std::mem::drop(work_rx); - - let res = future.await; - - // first poll succeeds and gathers new work - // then execute fails - assert_eq!(res, TestFutureResponse::VacationWorkError); - assert_eq!(1, get_rx.len()); - assert_eq!(0, incorporate_rx.len()); - } - - #[tokio::test] - async fn work_err_but_inner_ready() { - let inner_fut: TestFuture = TestFuture::new(1); - - let (get_tx, get_rx) = tokio::sync::mpsc::channel::<()>(5); - let (work_tx, work_rx) = tokio::sync::mpsc::channel::<()>(5); - let (incorporate_tx, incorporate_rx) = tokio::sync::mpsc::channel::<()>(5); - - let future = vacation_fut( - inner_fut, - get_tx, - work_tx, - incorporate_tx, - WhileWaitingMode::PassThroughInnerPoll, - ); - - std::mem::drop(work_rx); - - let res = future.await; - - // first poll succeeds so we never get or execute the failing work - assert_eq!(res, TestFutureResponse::Success(1)); - assert_eq!(0, get_rx.len()); - assert_eq!(0, incorporate_rx.len()); - } - - #[tokio::test] - async fn get_err() { - let inner_fut: TestFuture = TestFuture::new(2); - - let (get_tx, get_rx) = tokio::sync::mpsc::channel::<()>(5); - let (work_tx, work_rx) = tokio::sync::mpsc::channel::<()>(5); - let (incorporate_tx, incorporate_rx) = tokio::sync::mpsc::channel::<()>(5); - - let future = vacation_fut( - inner_fut, - get_tx, - work_tx, - incorporate_tx, - WhileWaitingMode::PassThroughInnerPoll, - ); - - std::mem::drop(get_rx); - - let res = future.await; - - // first poll succeeds and then get work fails - assert_eq!(res, TestFutureResponse::VacationGetError); - assert_eq!(0, incorporate_rx.len()); - assert_eq!(0, work_rx.len()); - } - - // we test suppressing the inner poll in the integ tests, since we can't block - // and return pending without spawn_blocking strategy - - #[tokio::test] - async fn supress_inner_poll_works() { - // 2 units of work means one poll, then we get work, and then next poll we finish... - // except that we suppress inner poll, so vacation finishes first despite being slower - let inner_fut = TestFuture::new(3); - - let future = crate::future::builder() - .future(inner_fut) - .offload_with( - OffloadWith::builder() - .get_offload_fn(|_: &mut TestFuture| { - Ok(Some(Box::pin(crate::execute( - || std::thread::sleep(Duration::from_millis(50)), - ChanceOfBlocking::High, - "test.operation", - )))) - }) - .incorporate_fn(|_: &mut TestFuture, _: Result<(), crate::Error>| { - Err(TestFutureResponse::VacationIncorporateError) - }), - ) - .while_waiting_for_offload(WhileWaitingMode::SuppressInnerPoll) - .build::<()>(); - - let res = future.await; - - // if the inner future completed, we would have success, but instead we delay polling and return - // an error after the slower vacation work finishes - - // gets work on first poll, - // executes work on second poll + gets more work - // executes work on third poll, then finishes inner future and returns before getting more work - assert_eq!(res, TestFutureResponse::VacationIncorporateError); - } -} diff --git a/src/lib.rs b/src/lib.rs index 00da4a9..f3e057e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,6 @@ mod concurrency_limit; mod error; mod executor; #[cfg(feature = "future")] -/// Vacation future implementations for libraries that are manually implementing futures pub mod future; pub use error::Error; @@ -87,8 +86,19 @@ pub fn init() -> ExecutorBuilder { } } +/// A context object to be passed into [`execute()`] containing +/// metadata that allows the caller to fine tune strategies +#[derive(Debug, Clone, Copy)] +pub struct ExecuteContext { + /// The chance of blocking a future for a significant amount of time with this work + pub chance_of_blocking: ChanceOfBlocking, + /// A namespace for this operation + pub namespace: &'static str, +} + /// Likelihood of the provided closure blocking for a significant period of time. /// Will eventually be used to customize strategies with more granularity. +#[non_exhaustive] #[derive(Debug, Clone, Copy)] pub enum ChanceOfBlocking { /// Very likely to block, use primary sync strategy @@ -115,17 +125,19 @@ pub enum ChanceOfBlocking { /// 5 /// }; /// -/// let res = vacation::execute(closure, vacation::ChanceOfBlocking::High, "example.operation").await.unwrap(); +/// let res = vacation::execute( +/// closure, +/// vacation::ExecuteContext { +/// chance_of_blocking: vacation::ChanceOfBlocking::High, +/// namespace: "example.operation" +/// } +/// ).await.unwrap(); /// assert_eq!(res, 5); /// # } /// /// ``` /// -pub async fn execute( - f: F, - chance_of_blocking: ChanceOfBlocking, - namespace: &'static str, -) -> Result +pub async fn execute(f: F, context: ExecuteContext) -> Result where F: FnOnce() -> R + Send + 'static, R: Send + 'static, @@ -135,7 +147,7 @@ where match executor { Executor::ExecuteDirectly(executor) => executor.execute(f).await, - Executor::Custom(executor) => executor.execute(f, chance_of_blocking, namespace).await, + Executor::Custom(executor) => executor.execute(f, context).await, #[cfg(feature = "tokio")] Executor::SpawnBlocking(executor) => executor.execute(f).await, } diff --git a/tests/custom_simple.rs b/tests/custom_simple.rs index 3633089..a348aec 100644 --- a/tests/custom_simple.rs +++ b/tests/custom_simple.rs @@ -2,7 +2,7 @@ use std::time::Duration; use vacation::{ execute, global_strategy, init, ChanceOfBlocking, CustomClosureInput, CustomClosureOutput, - ExecutorStrategy, GlobalStrategy, + ExecuteContext, ExecutorStrategy, GlobalStrategy, }; #[tokio::test] @@ -21,9 +21,15 @@ async fn custom_simple() { 5 }; - let res = execute(closure, ChanceOfBlocking::High, "test.operation") - .await - .unwrap(); + let res = execute( + closure, + ExecuteContext { + chance_of_blocking: ChanceOfBlocking::High, + namespace: "test.operation", + }, + ) + .await + .unwrap(); assert_eq!(res, 5); assert_eq!( diff --git a/tests/custom_strategy.rs b/tests/custom_strategy.rs index 472a8d1..aa2cef8 100644 --- a/tests/custom_strategy.rs +++ b/tests/custom_strategy.rs @@ -3,8 +3,8 @@ use std::{sync::OnceLock, time::Duration}; use futures_util::future::join_all; use rayon::ThreadPool; use vacation::{ - execute, init, ChanceOfBlocking, CustomClosureInput, CustomClosureOutput, ExecutorStrategy, - GlobalStrategy, + execute, init, ChanceOfBlocking, CustomClosureInput, CustomClosureOutput, ExecuteContext, + ExecutorStrategy, GlobalStrategy, }; static THREADPOOL: OnceLock = OnceLock::new(); @@ -42,7 +42,13 @@ async fn custom_concurrency() { // note that we also are racing against concurrency from other tests in this module for _ in 0..6 { - futures.push(execute(closure, ChanceOfBlocking::High, "test.operation")); + futures.push(execute( + closure, + ExecuteContext { + chance_of_blocking: ChanceOfBlocking::High, + namespace: "test.operation", + }, + )); } let res = join_all(futures).await; diff --git a/tests/execute_directly_default.rs b/tests/execute_directly_default.rs index a90cd92..3bcb060 100644 --- a/tests/execute_directly_default.rs +++ b/tests/execute_directly_default.rs @@ -1,3 +1,5 @@ +use vacation::ExecuteContext; + #[tokio::test] async fn default_to_execute_directly() { use std::time::Duration; @@ -11,9 +13,15 @@ async fn default_to_execute_directly() { 5 }; - let res = execute(closure, ChanceOfBlocking::High, "test.operation") - .await - .unwrap(); + let res = execute( + closure, + ExecuteContext { + chance_of_blocking: ChanceOfBlocking::High, + namespace: "test.operation", + }, + ) + .await + .unwrap(); assert_eq!(res, 5); assert_eq!( @@ -22,9 +30,15 @@ async fn default_to_execute_directly() { ); // make sure we can continue to call it without failures due to repeat initialization - let res = execute(closure, ChanceOfBlocking::High, "test.operation") - .await - .unwrap(); + let res = execute( + closure, + ExecuteContext { + chance_of_blocking: ChanceOfBlocking::High, + namespace: "test.operation", + }, + ) + .await + .unwrap(); assert_eq!(res, 5); assert_eq!( diff --git a/tests/execute_directly_strategy.rs b/tests/execute_directly_strategy.rs index c9ada15..56e6408 100644 --- a/tests/execute_directly_strategy.rs +++ b/tests/execute_directly_strategy.rs @@ -32,7 +32,14 @@ async fn execute_directly() { // we need to spawn tasks since otherwise we'll just block the current worker thread let future = async move { tokio::task::spawn(async move { - execute(closure, ChanceOfBlocking::High, "test.operation").await + execute( + closure, + vacation::ExecuteContext { + chance_of_blocking: ChanceOfBlocking::High, + namespace: "test.operation", + }, + ) + .await }) .await }; diff --git a/tests/future.rs b/tests/future.rs new file mode 100644 index 0000000..9ddff57 --- /dev/null +++ b/tests/future.rs @@ -0,0 +1,117 @@ +#[cfg(all(feature = "tokio", feature = "future"))] +mod test { + use std::time::{Duration, Instant}; + + use futures_util::future::join_all; + + #[tokio::test] + async fn offload_with() { + vacation::init() + .max_concurrency(3) + .spawn_blocking() + .install() + .unwrap(); + + let start = Instant::now(); + + let mut futures = Vec::new(); + + for _ in 0..6 { + let inner_fut = Box::pin(async move { + // make sure we don't finish on first poll, and then this will be complete by the + // time offloaded work finishes + tokio::time::sleep(Duration::from_millis(2)).await; + true + }); + let future = vacation::future::builder() + .future(inner_fut) + .get_offload(|inner_fut| { + // we just always return work in this case, we get more granular with unit tests + // in reality you probably would want to call a method on the future or check a buffer + // or something + Ok(vacation::future::OffloadWork::HasWork(Box::new( + async move { + match vacation::execute( + || std::thread::sleep(Duration::from_millis(20)), + vacation::ExecuteContext { + chance_of_blocking: vacation::ChanceOfBlocking::High, + namespace: "test.operation", + }, + ) + .await + { + Ok(_) => Ok(inner_fut), + Err(_) => Err(false), + } + }, + ))) + }) + .build(); + + futures.push(future); + } + let res = join_all(futures).await; + assert!(res.iter().all(|res| *res)); + + let elapsed_millis = start.elapsed().as_millis(); + assert!( + elapsed_millis < 60, + "futures did not run concurrently with each other" + ); + assert!(elapsed_millis > 30, "futures exceeded max concurrency"); + + // now let's test the case where the inner yields, and then vacation fails, and bubbles up an error + // this demonstrates that we don't poll the inner again while outer is polling, since it would + // otherwise finish first and succeed + let future = vacation::future::builder() + .future(Box::pin(async { + tokio::time::sleep(Duration::from_millis(20)).await; + true + })) + .get_offload(|_inner_fut| { + // we just always return work in this case, we get more granular with unit tests + // in reality you probably would want to call a method on the future or check a buffer + // or something + Ok(vacation::future::OffloadWork::HasWork(Box::new( + async move { + match vacation::execute( + || std::thread::sleep(Duration::from_millis(50)), + vacation::ExecuteContext { + chance_of_blocking: vacation::ChanceOfBlocking::High, + namespace: "test.operation", + }, + ) + .await + { + // we still return an error, so the offloaded work always fails + Ok(_) => Err(false), + Err(_) => Err(false), + } + }, + ))) + }) + .build(); + + let res = future.await; + // if the inner future completed, we would have success, but instead we delay polling and return + // an error after the slower vacation work finishes + assert_eq!(res, false); + + // and a case where there is no work to be done + let future = vacation::future::builder() + .future(Box::pin(async { + tokio::time::sleep(Duration::from_millis(20)).await; + true + })) + .get_offload(|inner_fut| { + // we just never work in this case, we get more granular with unit tests + // in reality you probably would want to call a method on the future or check a buffer + // or something + Ok(vacation::future::OffloadWork::NoWork(inner_fut)) + }) + .build(); + + let res = future.await; + assert_eq!(res, true); + } +} diff --git a/tests/offload_first.rs b/tests/offload_first.rs deleted file mode 100644 index 1dc892f..0000000 --- a/tests/offload_first.rs +++ /dev/null @@ -1,52 +0,0 @@ -#[cfg(all(feature = "tokio", feature = "future"))] -mod test { - use std::time::{Duration, Instant}; - - use futures_util::future::join_all; - - #[tokio::test] - async fn offload_first() { - vacation::init() - .max_concurrency(3) - .spawn_blocking() - .install() - .unwrap(); - - let start = Instant::now(); - - let mut futures = Vec::new(); - - for _ in 0..6 { - let future = vacation::future::builder() - .offload_first( - vacation::future::OffloadFirst::builder() - .offload_future(vacation::execute( - || { - std::thread::sleep(Duration::from_millis(10)); - true - }, - vacation::ChanceOfBlocking::High, - "test.operation", - )) - .incorporate_fn(|res| { - Ok(Box::pin(async move { - tokio::time::sleep(Duration::from_millis(10)).await; - res - })) - }), - ) - .build(); - - futures.push(future); - } - let res = join_all(futures).await; - assert!(res.into_iter().all(|res| res.is_ok() && res.unwrap())); - - let elapsed_millis = start.elapsed().as_millis(); - assert!( - elapsed_millis < 60, - "futures did not run concurrently with each other" - ); - assert!(elapsed_millis > 20, "futures exceeded max concurrency"); - } -} diff --git a/tests/offload_with.rs b/tests/offload_with.rs deleted file mode 100644 index 8a39909..0000000 --- a/tests/offload_with.rs +++ /dev/null @@ -1,80 +0,0 @@ -#[cfg(all(feature = "tokio", feature = "future"))] -mod test { - use std::time::{Duration, Instant}; - - use futures_util::future::join_all; - - #[tokio::test] - async fn offload_with() { - vacation::init() - .max_concurrency(3) - .spawn_blocking() - .install() - .unwrap(); - - let start = Instant::now(); - - let mut futures = Vec::new(); - - for _ in 0..6 { - let inner_future = Box::pin(async move { - tokio::time::sleep(Duration::from_millis(20)).await; - true - }); - let future = vacation::future::builder() - .future(inner_future) - .offload_with( - vacation::future::OffloadWith::builder() - .get_offload_fn(|_inner_fut| { - Ok(Some(Box::pin(vacation::execute( - || std::thread::sleep(Duration::from_millis(20)), - vacation::ChanceOfBlocking::High, - "test.operation", - )))) - }) - .incorporate_fn(|_, _| Ok(())), - ) - .while_waiting_for_offload(vacation::future::WhileWaitingMode::PassThroughInnerPoll) - .build(); - - futures.push(future); - } - let res = join_all(futures).await; - assert!(res.iter().all(|res| *res)); - - let elapsed_millis = start.elapsed().as_millis(); - assert!( - elapsed_millis < 60, - "inner fut did not run concurrently with work, and/or futures did not run concurrently with each other" - ); - assert!(elapsed_millis > 20, "futures exceeded max concurrency"); - - let future = vacation::future::builder() - .future(Box::pin(async { - tokio::time::sleep(Duration::from_millis(20)).await; - Ok::(true) - })) - .offload_with( - vacation::future::OffloadWith::builder() - .get_offload_fn(|_| { - Ok(Some(Box::pin(vacation::execute( - || std::thread::sleep(Duration::from_millis(50)), - vacation::ChanceOfBlocking::High, - "test.operation", - )))) - }) - .incorporate_fn(|_, _: Result<(), vacation::Error>| Err(Err("foo"))), - ) - .while_waiting_for_offload(vacation::future::WhileWaitingMode::SuppressInnerPoll) - .build::<()>(); - - let res = future.await; - // if the inner future completed, we would have success, but instead we delay polling and return - // an error after the slower vacation work finishes - - // gets work on first poll, - // executes work on second poll + gets more work - // executes work on third poll, then finishes inner future and returns before getting more work - assert_eq!(res, Err("foo")); - } -} diff --git a/tests/spawn_blocking_strategy.rs b/tests/spawn_blocking_strategy.rs index 5b9142a..3f84f77 100644 --- a/tests/spawn_blocking_strategy.rs +++ b/tests/spawn_blocking_strategy.rs @@ -32,8 +32,16 @@ mod test { // note that we also are racing against concurrency from other tests in this module for _ in 0..6 { - let future = - async move { execute(closure, ChanceOfBlocking::High, "test.operation").await }; + let future = async move { + execute( + closure, + vacation::ExecuteContext { + chance_of_blocking: ChanceOfBlocking::High, + namespace: "test.operation", + }, + ) + .await + }; futures.push(future); } From 966bb149d5064be0ab124a38dbbfc810d57e1d63 Mon Sep 17 00:00:00 2001 From: jlizen Date: Mon, 13 Jan 2025 16:12:13 +0000 Subject: [PATCH 2/2] remove namespace from `ExecuteContext` and make `non_exhaustive` with ::new(), refactor OffloadWith future impl for readability --- README.md | 10 +- src/future.rs | 259 +++++++++++++++-------------- src/lib.rs | 27 +-- tests/custom_simple.rs | 12 +- tests/custom_strategy.rs | 5 +- tests/execute_directly_default.rs | 24 +-- tests/execute_directly_strategy.rs | 12 +- tests/future.rs | 10 +- tests/spawn_blocking_strategy.rs | 9 +- 9 files changed, 167 insertions(+), 201 deletions(-) diff --git a/README.md b/README.md index 9d511a4..dd87837 100644 --- a/README.md +++ b/README.md @@ -39,10 +39,7 @@ pub async fn a_future_that_has_blocking_sync_work() -> u8 { // block the current worker thread vacation::execute( move || { sync_work("foo".to_string()) }, - vacation::ExecuteContext { - chance_of_blocking: vacation::ChanceOfBlocking::High, - namespace: "example.operation" - } + vacation::ExecuteContext::new(vacation::ChanceOfBlocking::Frequent) ).await.unwrap() } ``` @@ -90,10 +87,7 @@ async fn main() { std::thread::sleep(std::time::Duration::from_millis(500)); 5 }, - vacation::ExecuteContext { - chance_of_blocking: vacation::ChanceOfBlocking::High, - namespace: "example.operation" - } + vacation::ExecuteContext::new(vacation::ChanceOfBlocking::Frequent) ); assert_eq!(vacation_future.await.unwrap(), 5); diff --git a/src/future.rs b/src/future.rs index 8082db8..9675911 100644 --- a/src/future.rs +++ b/src/future.rs @@ -1,101 +1,96 @@ -//! # Future module -//! +//! # Utilities for Composing Futures +//! //! This module provides a lower-level wrapper for manually implemented futures. It is intended for library -//! authors that already have a hand-implemented future, that needs to delegate work. -//! +//! authors that already have a hand-implemented future, that needs to offload compute-heavy work to vacation. +//! //! Such a use case cannot simply call [`vacation::execute(_)`] and await the returned future -//! because they are already in a sync context. Instead, the vacation future needs to be driven +//! because they are already in a sync context. Instead, the offloaded work needs to be driven //! across the individual polls within the custom future's current await handling. -//! +//! //! The entrypoint for this api is [`vacation::future::builder()`], which allows constructing an [`OffloadWith`] //! to wrap your custom inner future. -//! +//! //! This wrapper future processes occasional work offloaded from the inner future, while driving the inner //! future if no offloaded work is active. Offloaded work is retrieved via a 'pull' model, where, //! after the inner future returns `Poll::Pending`, a [`get_offload()`] closure is called that has -//! owned access to the inner future. That closure can return any work that needs to be offloaded, +//! owned access to the inner future. That closure can return any work that needs to be offloaded, //! structured as an async future that invokes `vacation::execute()` and processes any results. -//! +//! //! **Note that `OffloadWith` does NOT poll the inner future while offloaded work is active, so there is a deadlock //! risk if the offloaded work depends on the inner future making progress to resolve.** -//! +//! //! ## Q&A -//! +//! //! ### This seems complicated, why not just call vacation from inside my custom future? //! There are many ways to structure a custom future to efficiently use `vacation`. This is designed -//! to be more of a 'bolt on' utility that requires minimal code changes inside your inner future - -//! mostly just adding a hook with which to get work. -//! +//! to be more of a 'bolt on' utility that requires minimal code changes inside your inner future - +//! mostly just adding a hook with which to get work. +//! //! If you are writing a custom future with `vacation` in mind, it probably is simplest to store the state -//! of any offloaded vacation work directly in your future, and poll it as part of your `Future` implementation. +//! of any offloaded work directly in your future, and poll it as part of your `Future` implementation. //! That way you can simply initialize that work whenever you reach a point that needs it, poll it once, and either //! return `Poll::Pending` (if you need to wait on that work completing) or carry on (if you can make other progress //! while the offloaded work is ongoing). -//! +//! //! ### I only need to send work to vacation once, in order to construct my inner future. What should I do? -//! //! This can be handled very simply with [`futures_util::then()`]. Simply call `vacation::execute()` first, //! and then chain `then()` to construct the inner future. -//! +//! //! Here is an example: -//! +//! //! ``` //! use futures_util::FutureExt; -//! +//! //! let vacation_future = vacation::execute( //! || { +//! // stand-in for compute-heavy work //! std::thread::sleep(std::time::Duration::from_millis(50)); //! 5 //! }, -//! vacation::ExecuteContext { -//! chance_of_blocking: vacation::ChanceOfBlocking::High, -//! namespace: "sample_initialize", -//! } +//! vacation::ExecuteContext::new(vacation::ChanceOfBlocking::Frequent) //! ); -//! +//! //! let future = vacation_future.then(|res| async move { //! match res { //! Err(error) => Err("there was a vacation error"), -//! // placeholder for your custom future +//! // stand-in for your custom future //! Ok(res) => Ok(tokio::time::sleep(std::time::Duration::from_millis(res)).await), //! } //! }); -//! +//! //! ``` -//! +//! //! ### This seems suspicious, what if the inner future has a waker fire while it is being suppressed //! due to ongoing offloaded work? -//! //! We defensively poll the inner future anytime the offloaded work completes. This means that if any -//! waker had fired while that work was ongoing, the inner future will be polled again. -//! +//! waker had fired while that offloaded work was ongoing, the inner future will definitely be polled again. +//! //! Essentially, our guarantee is, every time the wrapper future is polled, either the inner future is polled -//! directly (if no offloaded work), it is polled after vacation work (if vacation work is completing), -//! or there is at minimum a vacation waker registered - which, again, will result in the inner future +//! directly (if no offloaded work), it is immediately polled after offloaded work (if offloaded work just completed), +//! or there is at minimum a waker registered for the offloaded work- which, again, will result in the inner future //! being polled when that work completes. -//! +//! //! This results in potentially redundant polls, but no lost polls. -//! +//! //! ### Why do I need an owned inner future in my `get_offload()` closure? It seems cumbersome. //! A couple reasons: -//! -//! First, this allows the API to accept just a single closure, which can both delegate work to vacation, +//! +//! First, this allows the API to accept just a single closure, which can both offload work to vacation, //! which might need to be sent across threads, and then post-process the work with mutable access //! to the inner future. We can't simply pass in a &mut reference because this library targets //! pre-async-closure Rust versions, so it would insist that the inner future reference have a 'static lifetime, -//! which it doesn't. You could instead split into two closures, one of which generates the offloaded work, and the -//! second of which post-processes by taking as inputs the offloaded work output and `&mut inner_future`. However, -//! this was pretty unwieldy. -//! +//! which it doesn't. +//! //! Second, this makes it simpler to model the case where the offloaded work needs some state from the inner future //! that the inner future can't operate without. If it wasn't owned, the inner future would need to add a new variant //! to its internal state, which represents the case where it is missing state, and then panic or return `Poll::Pending` //! if it is polled while in that state. -//! +//! //! If you have a use case that wants to continue polling the inner future, while also driving any offloaded work //! (or multiple pieces of offloaded work), please cut a GitHub issue! Certainly we can add this functionality in -//! if consumers would use it. -//! +//! if consumers would use it. It just would be a bit more complicated API, so we're trying to avoid adding complexity +//! if it wouldn't be used. +//! //! [`futures_util::then()`]: https://docs.rs/futures-util/latest/futures_util/future/trait.FutureExt.html#method.then //! [`vacation::future::builder()`]: crate::future::builder() //! [`get_offload()`]: crate::future::FutureBuilder::get_offload() @@ -103,7 +98,7 @@ use std::{ future::Future, pin::Pin, - task::{Context, Poll}, + task::{ready, Context, Poll}, }; use pin_project_lite::pin_project; @@ -189,10 +184,7 @@ pin_project! { /// // and processes any results (if post-processing is needed) /// Some(work) => Ok(vacation::future::OffloadWork::HasWork( /// Box::new(async move { -/// match vacation::execute(work, vacation::ExecuteContext { -/// chance_of_blocking: vacation::ChanceOfBlocking::High, -/// namespace: "sample-future" -/// }).await { +/// match vacation::execute(work, vacation::ExecuteContext::new(vacation::ChanceOfBlocking::Frequent)).await { /// Ok(work_res) => { /// match work_res { /// Ok(work_output) => { @@ -361,10 +353,7 @@ impl std::fmt::Debug for HasGetOffload { /// // and processes any results (if post-processing is needed) /// Some(work) => Ok(vacation::future::OffloadWork::HasWork( /// Box::new(async move { -/// match vacation::execute(work, vacation::ExecuteContext { -/// chance_of_blocking: vacation::ChanceOfBlocking::High, -/// namespace: "sample-future" -/// }).await { +/// match vacation::execute(work, vacation::ExecuteContext::new(vacation::ChanceOfBlocking::Frequent)).await { /// Ok(work_res) => { /// match work_res { /// Ok(work_output) => { @@ -511,10 +500,7 @@ impl FutureBuilder, NeedsGetOffload> /// // and processes any results (if post-processing is needed) /// Some(work) => Ok(vacation::future::OffloadWork::HasWork( /// Box::new(async move { - /// match vacation::execute(work, vacation::ExecuteContext { - /// chance_of_blocking: vacation::ChanceOfBlocking::High, - /// namespace: "sample-future" - /// }).await { + /// match vacation::execute(work, vacation::ExecuteContext::new(vacation::ChanceOfBlocking::Frequent)).await { /// Ok(work_res) => { /// match work_res { /// Ok(work_output) => { @@ -564,6 +550,44 @@ impl } } +impl OffloadWith { + /// Polls any offloaded work that is present and return any error. + /// If work completes successfully, also poll the inner future and update inner state. + fn poll_offload_and_then_inner( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll + where + InnerFut: Future + Unpin, + { + if let OffloadWithInner::OffloadActive(ref mut offload_future) = self.as_mut().inner { + let offload_res = ready!(offload_future.as_mut().poll(cx)); + match offload_res { + Ok(inner_fut) => { + self.inner = OffloadWithInner::InnerFut(inner_fut); + self.poll_inner(cx) + } + Err(err) => Poll::Ready(err), + } + } else { + Poll::Pending + } + } + + /// Poll any inner future that is present + fn poll_inner(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll + where + InnerFut: Future + Unpin, + { + let this = self.project(); + if let OffloadWithInner::InnerFut(inner_fut) = this.inner { + Pin::new(inner_fut).as_mut().poll(cx) + } else { + Poll::Pending + } + } +} + impl Future for OffloadWith where InnerFut: Future + Unpin, @@ -571,78 +595,60 @@ where { type Output = InnerFut::Output; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - // first poll the inner future or any outstanding work - let inner_fut = match std::mem::replace(&mut this.inner, OffloadWithInner::UpdatingState) { + // Flow: + // + // First, drive the stored future, one of: + // a) - Poll inner future + // b) - Poll offload work -> if done, poll inner future + // + // Then: + // 1. See if we have offload work active + // 2. If yes, we are done, return Poll::Pending + // 2. If no, split apart inner to get owned inner future + // 3. Send inner future into get_offload() closure + // 4. Put inner back together with returned offload work or inner future + // 5. Poll any new offload work (and inner future, if offload compltes) + // + // Poll::Ready(..) from the inner future, or Poll::Ready(Err<>) from any + // offload work handling, will return Poll::Ready. Else Poll::Pending. + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // drive our stored future, inner or offloaded work + if let Poll::Ready(inner_res) = match self.inner { + OffloadWithInner::InnerFut(_) => self.as_mut().poll_inner(cx), + OffloadWithInner::OffloadActive(_) => self.as_mut().poll_offload_and_then_inner(cx), OffloadWithInner::UpdatingState => { - panic!("unexpected state while polling get_offload with future") - } - OffloadWithInner::InnerFut(mut inner_fut) => match Pin::new(&mut inner_fut).poll(cx) { - Poll::Ready(res) => return Poll::Ready(res), - Poll::Pending => inner_fut, - }, - OffloadWithInner::OffloadActive(mut offload_fut) => { - let mut inner_fut = match offload_fut.as_mut().poll(cx) { - // get_offload work still ongoing, reconstruct state and bail - Poll::Pending => { - this.inner = OffloadWithInner::OffloadActive(offload_fut); - return Poll::Pending; - } - // get_offload work complete - Poll::Ready(offload_res) => match offload_res { - Ok(inner_fut) => inner_fut, - // bubble up error in get_offload - Err(res) => return Poll::Ready(res), - }, - }; - // if get_offload future is done and successful, - // poll our inner future again in case it didn't set wakers since it was relying on - // vacation work completing - match Pin::new(&mut inner_fut).poll(cx) { - Poll::Ready(res) => return Poll::Ready(res), - Poll::Pending => inner_fut, - } + // we only set this state in the subsequent block, where we unconditionally set a different + // state or return Poll::Ready + unreachable!("unexpected state while polling OffloadWith") } - }; - - match (this.get_offload)(inner_fut) { - Ok(get_offload_res) => match get_offload_res { - OffloadWork::HasWork(get_offload) => { - let mut get_offload = Box::into_pin(get_offload); - // poll get_offload work once to kick it off - match get_offload.as_mut().poll(cx) { - // get_offload work didn't actually need to sleep - Poll::Ready(offload_res) => match offload_res { - // successfully completed get_offload work, poll our inner future - // again in case it didn't set wakers since it was relying on - // vacation work completing - Ok(mut inner_fut) => match Pin::new(&mut inner_fut).poll(cx) { - Poll::Ready(res) => Poll::Ready(res), - Poll::Pending => { - this.inner = OffloadWithInner::InnerFut(inner_fut); - Poll::Pending - } - }, - // get_offload work failed, bubble up any error - Err(err) => Poll::Ready(err), - }, - // get_offload work still ongoing, store in state - Poll::Pending => { - this.inner = OffloadWithInner::OffloadActive(get_offload); + } { + return Poll::Ready(inner_res); + } + + // if we don't already have offloaded work, see if there is new work. + if matches!(self.inner, OffloadWithInner::InnerFut(_)) { + match std::mem::replace(&mut self.inner, OffloadWithInner::UpdatingState) { + OffloadWithInner::InnerFut(inner_fut) => match (self.get_offload)(inner_fut) { + Ok(offload_work) => match offload_work { + OffloadWork::HasWork(work) => { + self.inner = OffloadWithInner::OffloadActive(Box::into_pin(work)); + self.poll_offload_and_then_inner(cx) + } + OffloadWork::NoWork(inner_fut) => { + self.inner = OffloadWithInner::InnerFut(inner_fut); Poll::Pending } - } - } - // get get_offload work didn't return any work, reconstruct inner future state - OffloadWork::NoWork(inner_fut) => { - this.inner = OffloadWithInner::InnerFut(inner_fut); - Poll::Pending + }, + Err(err) => Poll::Ready(err), + }, + _ => { + // we match to confirm we have an inner future immediately above + unreachable!("unexpected state while polling OffloadWith") } - }, - // bubble up error while getting get_offload work - Err(err) => Poll::Ready(err), + } + } else { + // we already have offload work active, don't look for more + Poll::Pending } } } @@ -772,10 +778,7 @@ mod test { Ok(()) } }, - crate::ExecuteContext { - chance_of_blocking: crate::ChanceOfBlocking::High, - namespace: "test.operation", - }, + crate::ExecuteContext::new(crate::ChanceOfBlocking::Frequent), ) .await { diff --git a/src/lib.rs b/src/lib.rs index f3e057e..6652f08 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -88,21 +88,32 @@ pub fn init() -> ExecutorBuilder { /// A context object to be passed into [`execute()`] containing /// metadata that allows the caller to fine tune strategies +#[non_exhaustive] #[derive(Debug, Clone, Copy)] pub struct ExecuteContext { /// The chance of blocking a future for a significant amount of time with this work pub chance_of_blocking: ChanceOfBlocking, - /// A namespace for this operation - pub namespace: &'static str, } -/// Likelihood of the provided closure blocking for a significant period of time. +impl ExecuteContext { + /// Create a new execute context, to use with [`execute()`], with + /// a given chance of blocking + pub fn new(chance_of_blocking: ChanceOfBlocking) -> Self { + Self { chance_of_blocking } + } +} + +/// Likelihood of the provided closure blocking for a significant period of time (~50μs+). /// Will eventually be used to customize strategies with more granularity. #[non_exhaustive] #[derive(Debug, Clone, Copy)] pub enum ChanceOfBlocking { - /// Very likely to block, use primary sync strategy - High, + /// Very likely to block for significant amount of time(~50μs+), use primary strategy + Frequent, + /// Blocks some of the time for ~50μs+ but not always, + /// potentially use an optimistic strategy that doesn't + /// send across threads + Occasional, } /// Send a sync closure to the configured or default global compute-heavy executor, and wait on its output. @@ -127,11 +138,7 @@ pub enum ChanceOfBlocking { /// /// let res = vacation::execute( /// closure, -/// vacation::ExecuteContext { -/// chance_of_blocking: vacation::ChanceOfBlocking::High, -/// namespace: "example.operation" -/// } -/// ).await.unwrap(); +/// vacation::ExecuteContext::new(vacation::ChanceOfBlocking::Frequent)).await.unwrap(); /// assert_eq!(res, 5); /// # } /// diff --git a/tests/custom_simple.rs b/tests/custom_simple.rs index a348aec..6b91562 100644 --- a/tests/custom_simple.rs +++ b/tests/custom_simple.rs @@ -21,15 +21,9 @@ async fn custom_simple() { 5 }; - let res = execute( - closure, - ExecuteContext { - chance_of_blocking: ChanceOfBlocking::High, - namespace: "test.operation", - }, - ) - .await - .unwrap(); + let res = execute(closure, ExecuteContext::new(ChanceOfBlocking::Frequent)) + .await + .unwrap(); assert_eq!(res, 5); assert_eq!( diff --git a/tests/custom_strategy.rs b/tests/custom_strategy.rs index aa2cef8..eaa8149 100644 --- a/tests/custom_strategy.rs +++ b/tests/custom_strategy.rs @@ -44,10 +44,7 @@ async fn custom_concurrency() { for _ in 0..6 { futures.push(execute( closure, - ExecuteContext { - chance_of_blocking: ChanceOfBlocking::High, - namespace: "test.operation", - }, + ExecuteContext::new(ChanceOfBlocking::Frequent), )); } diff --git a/tests/execute_directly_default.rs b/tests/execute_directly_default.rs index 3bcb060..6039b5d 100644 --- a/tests/execute_directly_default.rs +++ b/tests/execute_directly_default.rs @@ -13,15 +13,9 @@ async fn default_to_execute_directly() { 5 }; - let res = execute( - closure, - ExecuteContext { - chance_of_blocking: ChanceOfBlocking::High, - namespace: "test.operation", - }, - ) - .await - .unwrap(); + let res = execute(closure, ExecuteContext::new(ChanceOfBlocking::Frequent)) + .await + .unwrap(); assert_eq!(res, 5); assert_eq!( @@ -30,15 +24,9 @@ async fn default_to_execute_directly() { ); // make sure we can continue to call it without failures due to repeat initialization - let res = execute( - closure, - ExecuteContext { - chance_of_blocking: ChanceOfBlocking::High, - namespace: "test.operation", - }, - ) - .await - .unwrap(); + let res = execute(closure, ExecuteContext::new(ChanceOfBlocking::Frequent)) + .await + .unwrap(); assert_eq!(res, 5); assert_eq!( diff --git a/tests/execute_directly_strategy.rs b/tests/execute_directly_strategy.rs index 56e6408..17a33bb 100644 --- a/tests/execute_directly_strategy.rs +++ b/tests/execute_directly_strategy.rs @@ -2,7 +2,8 @@ use std::time::Duration; use futures_util::future::join_all; use vacation::{ - execute, global_strategy, init, ChanceOfBlocking, ExecutorStrategy, GlobalStrategy, + execute, global_strategy, init, ChanceOfBlocking, ExecuteContext, ExecutorStrategy, + GlobalStrategy, }; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -32,14 +33,7 @@ async fn execute_directly() { // we need to spawn tasks since otherwise we'll just block the current worker thread let future = async move { tokio::task::spawn(async move { - execute( - closure, - vacation::ExecuteContext { - chance_of_blocking: ChanceOfBlocking::High, - namespace: "test.operation", - }, - ) - .await + execute(closure, ExecuteContext::new(ChanceOfBlocking::Frequent)).await }) .await }; diff --git a/tests/future.rs b/tests/future.rs index 9ddff57..9526fbd 100644 --- a/tests/future.rs +++ b/tests/future.rs @@ -33,10 +33,7 @@ mod test { async move { match vacation::execute( || std::thread::sleep(Duration::from_millis(20)), - vacation::ExecuteContext { - chance_of_blocking: vacation::ChanceOfBlocking::High, - namespace: "test.operation", - }, + vacation::ExecuteContext::new(vacation::ChanceOfBlocking::Frequent), ) .await { @@ -76,10 +73,7 @@ mod test { async move { match vacation::execute( || std::thread::sleep(Duration::from_millis(50)), - vacation::ExecuteContext { - chance_of_blocking: vacation::ChanceOfBlocking::High, - namespace: "test.operation", - }, + vacation::ExecuteContext::new(vacation::ChanceOfBlocking::Frequent), ) .await { diff --git a/tests/spawn_blocking_strategy.rs b/tests/spawn_blocking_strategy.rs index 3f84f77..824807b 100644 --- a/tests/spawn_blocking_strategy.rs +++ b/tests/spawn_blocking_strategy.rs @@ -4,9 +4,7 @@ mod test { use futures_util::future::join_all; - use vacation::{ - execute, global_strategy, init, ChanceOfBlocking, ExecutorStrategy, GlobalStrategy, - }; + use vacation::{execute, global_strategy, init, ExecutorStrategy, GlobalStrategy}; #[tokio::test] async fn spawn_blocking() { @@ -35,10 +33,7 @@ mod test { let future = async move { execute( closure, - vacation::ExecuteContext { - chance_of_blocking: ChanceOfBlocking::High, - namespace: "test.operation", - }, + vacation::ExecuteContext::new(vacation::ChanceOfBlocking::Frequent), ) .await };