From 97f05b0ec07e472efd2c743656cbf10b0b863af9 Mon Sep 17 00:00:00 2001 From: alandefreitas Date: Fri, 11 Mar 2022 00:02:21 -0300 Subject: [PATCH] Support variant direct storage and inline operation state --- .github/workflows/build.yml | 2 +- docs/adaptors/continuations.md | 18 +- .../adaptor/detail/make_ready_future.hpp | 62 + .../adaptor/detail/unwrap_and_continue.hpp | 85 +- include/futures/adaptor/make_ready_future.hpp | 33 +- .../futures/detail/container/small_vector.hpp | 16 +- .../detail/utility/aligned_storage_for.hpp | 8 +- include/futures/detail/utility/byte.hpp | 6 +- include/futures/detail/utility/empty_base.hpp | 1 + .../futures/detail/utility/maybe_atomic.hpp | 214 +++ .../futures/detail/utility/maybe_copyable.hpp | 3 +- include/futures/executor/default_executor.hpp | 7 +- include/futures/futures/await.hpp | 2 +- include/futures/futures/basic_future.hpp | 1189 +++++++++++------ .../futures/detail/future_launcher.hpp | 30 +- .../futures/futures/detail/future_state.hpp | 864 ++++++++++++ .../futures/detail/operation_state.hpp | 839 ++++++++---- .../detail/operation_state_storage.hpp | 147 +- .../detail/traits/append_future_option.hpp | 2 +- include/futures/futures/future_error.hpp | 20 +- include/futures/futures/is_ready.hpp | 2 +- 21 files changed, 2770 insertions(+), 780 deletions(-) create mode 100644 include/futures/adaptor/detail/make_ready_future.hpp create mode 100644 include/futures/detail/utility/maybe_atomic.hpp create mode 100644 include/futures/futures/detail/future_state.hpp diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 51d459161..9e1fb87e9 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -94,7 +94,7 @@ jobs: - name: Archive Single-Header as is uses: kittaakos/upload-artifact-as-is@v0 with: - path: build/source/single-header/futures/futures.h + path: build/include/single-header/futures/futures.hpp - name: Archive Installer Packages as is uses: kittaakos/upload-artifact-as-is@v0 with: diff --git a/docs/adaptors/continuations.md b/docs/adaptors/continuations.md index 05d607a42..5c003f020 100644 --- a/docs/adaptors/continuations.md +++ b/docs/adaptors/continuations.md @@ -197,18 +197,18 @@ The following table describes all unwrapping functions by their priority: | Future output | Continuation input | Inputs | |---------------------------------------------------------------|------------------------------------------------|--------| -| `future` | `future` | 1 | -| `future` | `` | 0 | -| `future` | `T` | 1 | +| `future` | `future` | 1 | +| `future` | `` | 0 | +| `future` | `R` | 1 | | `future, future, ...>>` | `future`, `future` ... | N | | `future, future, ...>>` | `T1`, `T2` ... | N | -| `future>>` | `vector` | 1 | +| `future>>` | `vector` | 1 | | `future, future, ...>>>` | `size_t`, `tuple, future, ...>` | 2 | | `future, future, ...>>>` | `size_t`, `future`, `future`, ... | N + 1 | -| `future, future, ...>>>` | `future` | 1 | -| `future>>>` | `future` | 1 | -| `future, future, ...>>>` | `T` | 1 | -| `future>>>` | `T` | 1 | +| `future, future, ...>>>` | `future` | 1 | +| `future>>>` | `future` | 1 | +| `future, future, ...>>>` | `R` | 1 | +| `future>>>` | `R` | 1 | Note that types are very important here. Whenever the continuation has the same number of arguments for the same future output, a template function or a lambda using `auto` would be ambiguous. @@ -219,7 +219,7 @@ In this case, the continuation function will attempt to use the unwrapping with be `cfuture`. However, this is not always possible if the unwrapping overloads are ambiguous enough. The continuation with the highest priority is always the safer and usually more verbose continuation. This means a -template continuation will usually unwrap to `future` over `T` continuation input variants. On the other hand, this +template continuation will usually unwrap to `future` over `R` continuation input variants. On the other hand, this is also useful since the most verbose continuation patterns are the ones that could benefit the most from `auto`. ## Return type unwrapping diff --git a/include/futures/adaptor/detail/make_ready_future.hpp b/include/futures/adaptor/detail/make_ready_future.hpp new file mode 100644 index 000000000..ad82dff8c --- /dev/null +++ b/include/futures/adaptor/detail/make_ready_future.hpp @@ -0,0 +1,62 @@ +// +// Copyright (c) 2021 alandefreitas (alandefreitas@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. +// https://www.boost.org/LICENSE_1_0.txt +// + +#ifndef FUTURES_ADAPTOR_DETAIL_MAKE_READY_FUTURE_HPP +#define FUTURES_ADAPTOR_DETAIL_MAKE_READY_FUTURE_HPP + +#include +#include +#include +#include + +namespace futures::detail { + struct make_ready_future_impl + { + template + basic_future, future_options<>> + make_ready_future(T &&value) { + basic_future, future_options<>> result( + std::forward(value)); + return result; + } + + template + basic_future> + make_ready_future(std::reference_wrapper value) { + promise> p; + basic_future> result = p.get_future(); + p.set_value(value); + return result; + } + + basic_future> + make_ready_future() { + promise> p; + basic_future> result = p.get_future(); + p.set_value(); + return result; + } + + template + basic_future> + make_exceptional_future(std::exception_ptr ex) { + promise> p; + p.set_exception(ex); + return p.get_future(); + } + + template + basic_future> + make_exceptional_future(E ex) { + promise> p; + p.set_exception(std::make_exception_ptr(ex)); + return p.get_future(); + } + }; +} // namespace futures::detail + +#endif // FUTURES_ADAPTOR_DETAIL_MAKE_READY_FUTURE_HPP diff --git a/include/futures/adaptor/detail/unwrap_and_continue.hpp b/include/futures/adaptor/detail/unwrap_and_continue.hpp index 086a1efcb..051729381 100644 --- a/include/futures/adaptor/detail/unwrap_and_continue.hpp +++ b/include/futures/adaptor/detail/unwrap_and_continue.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -207,20 +208,24 @@ namespace futures::detail { std::tuple, unwrapped_elements>>; if constexpr (tuple_explode_unwrap) { - return transform_and_apply( - continuation, - [](auto &&el) { + auto future_to_value = [](auto &&el) { if constexpr (!is_future_v>) { return el; } else { return el.get(); } - }, + }; + auto prefix_as_tuple = std::make_tuple( + std::forward(prefix_args)...); + auto futures_tuple = before_future.get(); + // transform each tuple with future_to_value + return transform_and_apply( + continuation, + future_to_value, std::tuple_cat( - std::make_tuple( - std::forward(prefix_args)...), - before_future.get())); + prefix_as_tuple, + std::move(futures_tuple))); } else { detail::throw_exception( "Continuation unwrapping not possible"); @@ -537,26 +542,24 @@ namespace futures::detail { template struct continuation_traits_helper { - // The return type of unwrap and continue function + // The possible return types of unwrap and continue function using unwrap_result = result_of_unwrap_t; - using unwrap_result_with_token_prefix = result_of_unwrap_with_token_t; // Whether the continuation expects a token static constexpr bool is_valid_without_stop_token = !std::is_same_v; - static constexpr bool is_valid_with_stop_token = !std::is_same_v< unwrap_result_with_token_prefix, unwrapping_failure_t>; - // Whether the continuation is valid + // Whether the continuation is valid at all static constexpr bool is_valid = is_valid_without_stop_token || is_valid_with_stop_token; - // The result type of unwrap and continue for the valid version, with or - // without token + // The result type of unwrap and continue for the valid overload + // (with or without the token) using next_value_type = std::conditional_t< is_valid_with_stop_token, unwrap_result_with_token_prefix, @@ -565,7 +568,7 @@ namespace futures::detail { // Stop token for the continuation function constexpr static bool expects_stop_token = is_valid_with_stop_token; - // Check if the stop token should be inherited from previous future + // Check if the stop token can be inherited from to next future constexpr static bool previous_future_has_stop_token = has_stop_token_v< Future>; constexpr static bool previous_future_is_shared = is_shared_future_v< @@ -574,33 +577,45 @@ namespace futures::detail { = previous_future_has_stop_token && (!previous_future_is_shared); // Continuation future should have stop token + // note: this is separate from `expects_stop_token` because (in the + // future), the continuation might reuse the stop source without + // actually containing a function that expects the token. constexpr static bool after_has_stop_token = expects_stop_token; - // The result type of unwrap and continue for the valid version, with or - // without token - using base_future_options = std::conditional_t< + // The result type of unwrap and continue for the valid unwrap overload + // (with or without token) + + // Next needs to inherit the constructor from previous future + // Next needs continuation source if previous is eager + using next_maybe_continuable_future_options = std::conditional_t< !is_always_deferred_v, future_options, continuable_opt>, future_options>>; - using eager_future_options = conditional_append_future_option_t< - after_has_stop_token, - stoppable_opt, - base_future_options>; - - using maybe_deferred_future_options = conditional_append_future_option_t< - is_always_deferred_v, - always_deferred_opt, - eager_future_options>; - - using maybe_function_type_future_options = conditional_append_future_option_t< - is_always_deferred_v, - deferred_function_opt>, - maybe_deferred_future_options>; - - // The result type of unwrap and continue for the valid version, with or - // without token - using next_future_options = maybe_function_type_future_options; + // Next is stoppable if we identified the function expects a token + using next_maybe_stoppable_future_options + = conditional_append_future_option_t< + after_has_stop_token, + stoppable_opt, + next_maybe_continuable_future_options>; + + // Next needs the always_deferred_opt if it's deferred + using next_maybe_deferred_future_options + = conditional_append_future_option_t< + is_always_deferred_v, + always_deferred_opt, + next_maybe_stoppable_future_options>; + + // Next needs the continuation function type if it's deferred + using next_maybe_function_type_future_options + = conditional_append_future_option_t< + is_always_deferred_v, + deferred_function_opt< + detail::unwrap_and_continue_task>, + next_maybe_deferred_future_options>; + + // The result options type of unwrap and continue + using next_future_options = next_maybe_function_type_future_options; }; template diff --git a/include/futures/adaptor/make_ready_future.hpp b/include/futures/adaptor/make_ready_future.hpp index 621458984..913735e5a 100644 --- a/include/futures/adaptor/make_ready_future.hpp +++ b/include/futures/adaptor/make_ready_future.hpp @@ -8,10 +8,7 @@ #ifndef FUTURES_ADAPTOR_MAKE_READY_FUTURE_HPP #define FUTURES_ADAPTOR_MAKE_READY_FUTURE_HPP -#include -#include -#include -#include +#include namespace futures { /** @addtogroup adaptors Adaptors @@ -26,10 +23,8 @@ namespace futures { template basic_future, future_options<>> make_ready_future(T &&value) { - promise, future_options<>> p; - basic_future, future_options<>> result = p.get_future(); - p.set_value(std::forward(value)); - return result; + return detail::make_ready_future_impl{}.template make_ready_future( + std::forward(value)); } /// Make a placeholder future object that is ready from a reference @@ -40,10 +35,8 @@ namespace futures { template basic_future> make_ready_future(std::reference_wrapper value) { - promise> p; - basic_future> result = p.get_future(); - p.set_value(value); - return result; + return detail::make_ready_future_impl{}.template make_ready_future( + value); } /// Make a placeholder void future object that is ready @@ -53,10 +46,7 @@ namespace futures { /// @return A future associated with the shared state that is created. inline basic_future> make_ready_future() { - promise> p; - basic_future> result = p.get_future(); - p.set_value(); - return result; + return detail::make_ready_future_impl{}.make_ready_future(); } /// Make a placeholder future object that is ready with an exception @@ -69,9 +59,8 @@ namespace futures { template basic_future> make_exceptional_future(std::exception_ptr ex) { - promise> p; - p.set_exception(ex); - return p.get_future(); + return detail::make_ready_future_impl{} + .template make_exceptional_future(ex); } /// Make a placeholder future object that is ready with from any @@ -84,10 +73,10 @@ namespace futures { template basic_future> make_exceptional_future(E ex) { - promise> p; - p.set_exception(std::make_exception_ptr(ex)); - return p.get_future(); + return detail::make_ready_future_impl{} + .template make_exceptional_future(ex); } + /** @} */ } // namespace futures diff --git a/include/futures/detail/container/small_vector.hpp b/include/futures/detail/container/small_vector.hpp index a321889e4..084d19348 100644 --- a/include/futures/detail/container/small_vector.hpp +++ b/include/futures/detail/container/small_vector.hpp @@ -69,7 +69,7 @@ namespace futures::detail { template < class T, size_t N - = std::max(std::size_t(5), (sizeof(T *) + sizeof(size_t)) / sizeof(T)), + = (std::max)(std::size_t(5), (sizeof(T *) + sizeof(size_t)) / sizeof(T)), class Allocator = std::allocator, class AllowHeap = std::true_type, class SizeType = size_t, @@ -1426,7 +1426,7 @@ namespace futures::detail { const auto old_capacity = capacity(); // Set the initial capacity if (old_capacity == 0) { - return std::max(64 / sizeof(value_type), size_type(5)); + return (std::max)(64 / sizeof(value_type), size_type(5)); } // Blocks larger than or equal to 4096 bytes can be expanded in place constexpr size_t min_in_place_expansion = 4096; @@ -1791,8 +1791,8 @@ namespace futures::detail { template < class T, size_t N_INPUT, - size_t N_OUTPUT = std::max( - std::max(std::size_t(5), (sizeof(T *) + sizeof(size_t)) / sizeof(T)), + size_t N_OUTPUT = (std::max)( + (std::max)(std::size_t(5), (sizeof(T *) + sizeof(size_t)) / sizeof(T)), N_INPUT)> constexpr small_vector, N_OUTPUT> to_small_vector(T (&a)[N_INPUT]) { @@ -1804,8 +1804,8 @@ namespace futures::detail { template < class T, size_t N_INPUT, - size_t N_OUTPUT = std::max( - std::max(std::size_t(5), (sizeof(T *) + sizeof(size_t)) / sizeof(T)), + size_t N_OUTPUT = (std::max)( + (std::max)(std::size_t(5), (sizeof(T *) + sizeof(size_t)) / sizeof(T)), N_INPUT)> constexpr small_vector, N_OUTPUT> to_small_vector(T(&&a)[N_INPUT]) { @@ -1815,7 +1815,7 @@ namespace futures::detail { template < class T, size_t N - = std::max((sizeof(std::vector) * 2) / sizeof(T), std::size_t(5)), + = (std::max)((sizeof(std::vector) * 2) / sizeof(T), std::size_t(5)), class Allocator = std::allocator, class SizeType = size_t> using max_size_small_vector @@ -1824,7 +1824,7 @@ namespace futures::detail { template < class T, size_t N - = std::max((sizeof(std::vector) * 2) / sizeof(T), std::size_t(5)), + = (std::max)((sizeof(std::vector) * 2) / sizeof(T), std::size_t(5)), class Allocator = std::allocator, class SizeType = size_t> using inline_small_vector diff --git a/include/futures/detail/utility/aligned_storage_for.hpp b/include/futures/detail/utility/aligned_storage_for.hpp index ff53e9750..7e028fac4 100644 --- a/include/futures/detail/utility/aligned_storage_for.hpp +++ b/include/futures/detail/utility/aligned_storage_for.hpp @@ -3,8 +3,8 @@ // See accompanying file LICENSE // -#ifndef FUTURES_ALIGNED_STORAGE_FOR_H -#define FUTURES_ALIGNED_STORAGE_FOR_H +#ifndef FUTURES_DETAIL_UTILITY_ALIGNED_STORAGE_FOR_HPP +#define FUTURES_DETAIL_UTILITY_ALIGNED_STORAGE_FOR_HPP #include #include @@ -19,7 +19,7 @@ namespace futures::detail { return data_; } - [[nodiscard]] constexpr byte* + [[nodiscard]] constexpr const byte* data() const { return data_; } @@ -34,4 +34,4 @@ namespace futures::detail { }; } // namespace futures::detail -#endif // FUTURES_ALIGNED_STORAGE_FOR_H +#endif // FUTURES_DETAIL_UTILITY_ALIGNED_STORAGE_FOR_HPP diff --git a/include/futures/detail/utility/byte.hpp b/include/futures/detail/utility/byte.hpp index 55a7f6f25..c38046023 100644 --- a/include/futures/detail/utility/byte.hpp +++ b/include/futures/detail/utility/byte.hpp @@ -3,8 +3,8 @@ // See accompanying file LICENSE // -#ifndef FUTURES_BYTE_HPP -#define FUTURES_BYTE_HPP +#ifndef FUTURES_DETAIL_UTILITY_BYTE_HPP +#define FUTURES_DETAIL_UTILITY_BYTE_HPP #include @@ -93,4 +93,4 @@ namespace futures::detail { } // namespace futures::detail -#endif // FUTURES_BYTE_HPP +#endif // FUTURES_DETAIL_UTILITY_BYTE_HPP diff --git a/include/futures/detail/utility/empty_base.hpp b/include/futures/detail/utility/empty_base.hpp index 223e77020..b447e1565 100644 --- a/include/futures/detail/utility/empty_base.hpp +++ b/include/futures/detail/utility/empty_base.hpp @@ -105,6 +105,7 @@ namespace futures::detail { } }; + /// Conditional base class depending on a condition template using conditional_base = maybe_empty, BaseIndex>; diff --git a/include/futures/detail/utility/maybe_atomic.hpp b/include/futures/detail/utility/maybe_atomic.hpp new file mode 100644 index 000000000..54c4eae8d --- /dev/null +++ b/include/futures/detail/utility/maybe_atomic.hpp @@ -0,0 +1,214 @@ +// +// Copyright (c) 2022 alandefreitas (alandefreitas@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. +// https://www.boost.org/LICENSE_1_0.txt +// + +#ifndef FUTURES_DETAIL_UTILITY_MAYBE_ATOMIC_HPP +#define FUTURES_DETAIL_UTILITY_MAYBE_ATOMIC_HPP + +#include + +namespace futures::detail { + + /// A class that holds an element of type T in a conditionally atomic way + /** + * Depending on the type of the operation state, in particular whether its + * task is deferred, the synchronization primitives don't really need + * atomic operations. + * + * This class encapsulates this logic so that real atomic operations can be + * conditionally disabled. Only the operations necessary to atomic + * operations are implemented by this class. + * + * @tparam T Value type + * @tparam Enable Whether atomic operations should be enabled + */ + template + class maybe_atomic; + + template + class maybe_atomic + { + public: + /// Initializes the underlying object with desired. + /** + * The initialization is not atomic. + */ + constexpr maybe_atomic(T desired) noexcept : value_(desired) {} + + /// Atomically obtains the value of the atomic object + /** + * Atomically loads and returns the current value of the atomic + * variable. + * + * Memory is affected according to the value of order. + * + * @param order memory order constraints to enforce + * + * @return The current value of the atomic variable + */ + T + load(std::memory_order order = std::memory_order_seq_cst) + const noexcept { + return value_.load(order); + } + + + /// Atomically replaces the underlying value with desired + /** + * The operation is read-modify-write operation. + * + * Memory is affected according to the value of order. + * + * @param desired value to assign + * @param order memory order constraints to enforce + * + * @return The value of the atomic variable before the call. + */ + T + exchange( + T desired, + std::memory_order order = std::memory_order_seq_cst) noexcept { + return value_.exchange(desired, order); + } + + /// Atomically compares the object representation of *this with that of + /// expected, and if those are bitwise-equal, replaces the former with + /// desired + /** + * In the (2) and (4) versions order is used for both read-modify-write + * and load operations, except that std::memory_order_acquire and + * std::memory_order_relaxed are used for the load operation if + * order == std::memory_order_acq_rel, or + * order == std::memory_order_release respectively. + * + * @param expected value expected to be found in the atomic object. + * @param desired value to store in the object if it is as expected + * @param order the memory synchronization ordering for both operations + * + * @return true if the underlying atomic value was successfully changed, + * false otherwise + */ + bool + compare_exchange_strong( + T& expected, + T desired, + std::memory_order order = std::memory_order_seq_cst) noexcept { + return value_.compare_exchange_strong(expected, desired, order); + } + + private: + std::atomic value_; + }; + + template + class maybe_atomic + { + public: + /// Initializes the underlying object with desired. + /** + * The initialization is not atomic. + */ + constexpr maybe_atomic(T desired) noexcept : value_(desired) {} + + /// Atomically obtains the value of the atomic object + /** + * Atomically loads and returns the current value of the atomic + * variable. + * + * Memory is affected according to the value of order. + * + * @param order memory order constraints to enforce + * + * @return The current value of the atomic variable + */ + T + load(std::memory_order order = std::memory_order_seq_cst) + const noexcept { + (void) order; + return value_; + } + + /// Atomically replaces the underlying value with desired + /** + * The operation is read-modify-write operation. + * + * Memory is affected according to the value of order. + * + * @param desired value to assign + * @param order memory order constraints to enforce + * + * @return The value of the atomic variable before the call. + */ + T + exchange( + T desired, + std::memory_order order = std::memory_order_seq_cst) noexcept { + (void) order; + return std::exchange(value_, desired); + } + + /// Atomically compares the object representation of *this with that of + /// expected, and if those are bitwise-equal, replaces the former with + /// desired + /** + * In the (2) and (4) versions order is used for both read-modify-write + * and load operations, except that std::memory_order_acquire and + * std::memory_order_relaxed are used for the load operation if + * order == std::memory_order_acq_rel, or + * order == std::memory_order_release respectively. + * + * @param expected value expected to be found in the atomic object. + * @param desired value to store in the object if it is as expected + * @param order the memory synchronization ordering for both operations + * + * @return true if the underlying atomic value was successfully changed, + * false otherwise + */ + bool + compare_exchange_strong( + T& expected, + T desired, + std::memory_order order = std::memory_order_seq_cst) noexcept { + (void) order; + bool match_expected = value_ == expected; + expected = value_; + if (match_expected) { + value_ = desired; + return true; + } + return false; + } + + private: + T value_; + }; + + /// Establishes memory synchronization ordering of non-atomic and relaxed + /// atomic accesses, as instructed by order, without an associated atomic + /// operation. + /** + * @tparam Enable Whether the synchronization should be enabled + * @param order The memory order + */ + template + inline void + maybe_atomic_thread_fence(std::memory_order order) noexcept; + + template <> + inline void + maybe_atomic_thread_fence(std::memory_order order) noexcept { + std::atomic_thread_fence(order); + } + + template <> + inline void + maybe_atomic_thread_fence(std::memory_order order) noexcept { + (void) order; + } + +} // namespace futures::detail + +#endif // FUTURES_DETAIL_UTILITY_MAYBE_ATOMIC_HPP diff --git a/include/futures/detail/utility/maybe_copyable.hpp b/include/futures/detail/utility/maybe_copyable.hpp index 311c3ec2a..db60e5f6e 100644 --- a/include/futures/detail/utility/maybe_copyable.hpp +++ b/include/futures/detail/utility/maybe_copyable.hpp @@ -13,14 +13,15 @@ namespace futures::detail { struct maybe_copyable { protected: + maybe_copyable() = default; ~maybe_copyable() = default; }; template <> struct maybe_copyable { + maybe_copyable() = default; maybe_copyable(maybe_copyable const&) = delete; - maybe_copyable& operator=(maybe_copyable const&) = delete; diff --git a/include/futures/executor/default_executor.hpp b/include/futures/executor/default_executor.hpp index a835de785..f714336c2 100644 --- a/include/futures/executor/default_executor.hpp +++ b/include/futures/executor/default_executor.hpp @@ -8,8 +8,8 @@ #ifndef FUTURES_EXECUTOR_DEFAULT_EXECUTOR_HPP #define FUTURES_EXECUTOR_DEFAULT_EXECUTOR_HPP -#include #include +#include namespace futures { /** @addtogroup executors Executors @@ -39,7 +39,7 @@ namespace futures { static std::size_t value = std::thread::hardware_concurrency(); // Always return at least 1 core - return std::max(static_cast(1), value); + return (std::max)(static_cast(1), value); } /// The default execution context for async operations @@ -85,7 +85,8 @@ namespace futures { const std::size_t default_thread_pool_size = FUTURES_DEFAULT_THREAD_POOL_SIZE; #else - const std::size_t default_thread_pool_size = hardware_concurrency(); + const std::size_t default_thread_pool_size = std:: + max(hardware_concurrency(), std::size_t(2)); #endif static asio::thread_pool pool(default_thread_pool_size); return pool; diff --git a/include/futures/futures/await.hpp b/include/futures/futures/await.hpp index 3c6233bbb..5eb9a23b5 100644 --- a/include/futures/futures/await.hpp +++ b/include/futures/futures/await.hpp @@ -34,7 +34,7 @@ namespace futures { * The function also makes the syntax optionally a little closer to * languages such as javascript. * - * @note This function only participates in overload resolutions if all + * @note This function only participates in overload resolution if all * types are futures. * * @tparam Future A future type diff --git a/include/futures/futures/basic_future.hpp b/include/futures/futures/basic_future.hpp index f8fd833ce..574120539 100644 --- a/include/futures/futures/basic_future.hpp +++ b/include/futures/futures/basic_future.hpp @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include #include @@ -54,6 +54,7 @@ namespace futures { struct async_future_scheduler; struct internal_then_functor; class waiter_for_any; + struct make_ready_future_impl; } // namespace detail template class promise_base; @@ -75,48 +76,49 @@ namespace futures { * * All this behavior is already encapsulated in the @ref async function. * - * @tparam T Type of element - * @tparam Shared `std::true_value` if this future is shared - * @tparam LazyContinuable `std::true_value` if this future supports - * continuations - * @tparam Stoppable `std::true_value` if this future - * contains a stop token + * @tparam R Type of element + * @tparam Options Future value options + * */ - template > - class basic_future - : private detail::maybe_copyable - , private detail::conditional_base + template > + class basic_future : private detail::maybe_copyable { private: - // join base represents the logical boolean indicating if we should - // join at destruction - using join_base = detail:: - conditional_base; - - // the initial join value - typename join_base::value_type - initial_join_value() { - if constexpr (!Options::is_always_detached) { - return true; - } else { - return detail::empty_value; - } - } + /** + * Private types + */ - // remove shared_opt from operation state options + /// The traits for the shared state + /** + * We remove shared_opt from operation state traits because there's + * no difference in their internal implementation. The difference + * is in the future, which needs to choose between inline storage or + * storage through a shared pointer. + */ using operation_state_options = detail:: remove_future_option_t; + static_assert( + !operation_state_options::is_shared, + "The underlying operation state cannot be shared"); - // determine type of inline operation state - template + /// Determine type of inline operation state + /** + * A future that is always deferred needs to store the function type + * beyond the state type. The extended operation state with information + * about the function type is implemented through the deferred operation + * state class, where the function type is not erased. + * + * @tparam is_always_deferred whether the future is always deferred + */ + template struct operation_state_type_impl { using type = std::conditional_t< - inline_op_state, + is_always_deferred, // Type with complete information about the task - detail::deferred_operation_state, + detail::deferred_operation_state, // Type to be shared, task is erased - detail::operation_state>; + detail::operation_state>; }; static constexpr bool inline_op_state = Options::is_always_deferred @@ -125,30 +127,61 @@ namespace futures { using operation_state_type = typename operation_state_type_impl< inline_op_state>::type; - // determine type of the shared state, whenever it needs to be shared - using shared_state_type = detail:: - shared_state; + /// Determine the shared state type, whenever it needs to be shared + using shared_state_type = std::shared_ptr; + + /// Determine the future state type + /** + * The future state is a variant type the might hold no state, inline + * value storage, an operation state, or a shared state. + * + * The most efficient alternative will be chosen according to how + * the future is constructed. + */ + using future_state_type = detail::future_state; + - // the base operation state implementation type, where the notification - // handle lives + /// Determine the base operation state type + /** + * The base operation state implementation type is where the + * notification handle lives, so we can also expose it in the future + * type. + */ using operation_state_base = detail::operation_state_base< Options::is_always_deferred>; - // the type of notification handles in the shared state + /// Determine the type of notification handles + /** + * Notification handles are used to allow external condition variables + * to wait for a future value. + */ using notify_when_ready_handle = typename operation_state_base:: notify_when_ready_handle; - // the type - using unique_or_shared_state = std::conditional_t< - inline_op_state, - operation_state_type, - shared_state_type>; + /// Determine the type of this basic future when shared + using shared_basic_future = basic_future< + R, + detail::append_future_option_t>; + + /** + * @} + */ - // Types allowed to convert the shared state into a new future + /** + * @name Friend types + * + * Future types need to be created through other function available + * for launching futures. These friend types have access to the + * private basic_future constructors to achieve that. + * + * @{ + */ + + // Convert similar future types template friend class basic_future; - // Types allowed to access the shared state constructor + // Share shared state with the future template friend class ::futures::promise_base; @@ -158,196 +191,608 @@ namespace futures { template friend class ::futures::packaged_task; + // Launch futures friend struct detail::async_future_scheduler; + // Launch future continuations friend struct detail::internal_then_functor; + // Append external waiters to future friend class detail::waiter_for_any; - /// Construct from a shared operation state + // Append external waiters to future + friend struct detail::make_ready_future_impl; + /** - * This constructor is private because we need to ensure the launching - * function appropriately sets this std::future handling these traits. + * @} + */ + + /** + * @name Private constructors + * + * Futures should be constructed through friend functions, which will + * create a shared state for the task they launch and the future. + * + * @{ + */ + + /// Construct future from a shared operation state + /** + * This constructor is used by @ref async and other launching functions + * that create eager futures. It represents the traditional type of + * shared state, also available in C++11 std::future. * - * This is mostly used by the async and other launching functions. + * These eager functions initialize the future with a shared state + * because the task needs to know the address of this state in order + * to properly set the value of the operation state. This means these + * functions require memory allocation, which we later optimize with + * appropriate allocators for the shared states. + * + * This constructor is private because we need to ensure the launching + * functions appropriately set the future handling these traits, such + * as continuations. * * @param s Future shared state */ explicit basic_future(const shared_state_type &s) noexcept + : detail::maybe_copyable{}, state_{ s } {} + + /// Construct future from an rvalue shared operation state + /** + * This constructor is used by @ref async and other launching functions + * that create eager futures. It represents the traditional type of + * shared state, also available in C++11 std::future. + * + * These eager functions initialize the future with a shared state + * because the task needs to know the address of this state in order + * to properly set the value of the operation state. This means these + * functions require memory allocation, which we later optimize with + * appropriate allocators for the shared states. + * + * This constructor is private because we need to ensure the launching + * functions appropriately set the future handling these traits, such + * as continuations. + * + * @param s Future shared state + */ + explicit basic_future(shared_state_type &&s) noexcept : detail::maybe_copyable{}, - join_base(initial_join_value()), state_{ std::move(s) } {} + state_{ std::move(s) } {} /// Construct from an inline operation state /** - * This constructor is private because we need to ensure the launching - * function appropriately sets this std::future handling these traits. + * This constructor is mostly used by @ref schedule and other scheduling + * functions that create deferred futures. It represents an operation + * state that is not-shared, which makes it more efficient in terms + * of memory allocations. * - * This is mostly used by the async and other launching functions. + * These deferred functions initialize the future with an inline + * operation state the task will only be launched later and its + * address cannot change once we wait for the future, because the + * calling thread will be blocked and the state address cannot change. * - * @param op Future operation state + * However, there are circumstances where a deferred future will need + * to convert this into a shared state. The first situation is when + * we `wait_for` or `wait_until` on a deferred future. This means the + * address of an inline state could change after we wait for a while, + * thus launching the task, and the future is moved afterwards. A + * second situation where the deferred state might need to be shared + * is, naturally, with shared futures. + * + * This constructor is private because we need to ensure the scheduling + * functions appropriately set the future handling these traits, such + * as continuations. + * + * @param op Future inline operation state */ explicit basic_future(operation_state_type &&op) noexcept : detail::maybe_copyable{}, - join_base(initial_join_value()), state_{ std::move(op) } {} + state_{ std::move(op) } {} + /// Construct from an direct operation state storage + /** + * This constructor is used by @ref make_ready_future and other + * functions that create futures objects holding values that might be + * already known at construction. These future objects are often + * required in algorithms that involve future values to interoperate + * with known values using the same type for both. + * + * It represents an operation state storage that is not-shared, which + * makes there future objects more efficient than others, instead of + * emulating known values through memory allocations. + * + * @param op Future inline operation state + */ + explicit basic_future(detail::operation_state_storage &&op) noexcept + : detail::maybe_copyable{}, + state_{ std::move(op) } {} + + /// Construct future from a variant future operation state + /** + * This constructor is used by any function that might create a future + * from another future, where the variant state is already constructed. + * + * This function might be used by eager and deferred futures. + * + * @param s Future shared state + */ + explicit basic_future(const future_state_type &s) noexcept + : detail::maybe_copyable{}, state_{ s } {} + + /// Construct future from an rvalue variant future operation state + /** + * This constructor is used by any function that might create a future + * from another future, where the variant state is already constructed. + * + * This function might be used by eager and deferred futures. + * + * @param s Future shared state + */ + explicit basic_future(future_state_type &&s) noexcept + : detail::maybe_copyable{}, + state_{ std::move(s) } {} + + /// Construct a ready future from a value_type + /** + * This constructor is used by any function that might create a future + * that's already ready. + * + * This function might be used by eager and deferred futures. + * + * @param s Future value + */ + template < + class T +#ifndef FUTURES_DOXYGEN + , + std::enable_if_t && !std::is_void_v, int> = 0 +#endif + > + explicit basic_future(T &&v) noexcept + : detail::maybe_copyable{}, state_{ + detail::in_place_type_t>{}, + std::forward(v) + } { + } + + /** + * @} + */ public: /// @name Public types /// @{ - using value_type = T; + using value_type = R; /// @} /// @name Constructors /// @{ - /// Constructs the basic_future + /// Destructor + ~basic_future() { + if constexpr (Options::is_stoppable && !Options::is_shared) { + if (valid() && !is_ready()) { + get_stop_source().request_stop(); + } + } + wait_if_last(); + } + + /// Constructor /** * The default constructor creates an invalid future with no shared * state. * - * Null shared state. Properties inherited from base classes. + * After construction, `valid() == false`. */ - basic_future() noexcept - : detail::maybe_copyable{}, - join_base(initial_join_value()), state_{ nullptr } {}; + basic_future() noexcept = default; - /// Copy constructor for shared futures only. + /// Copy constructor /** - * Inherited from base classes. + * Constructs a shared future that refers to the same shared state, + * if any, as other. * * @note The copy constructor only participates in overload resolution - * if `other` is shared - * - * @param other Another future used as source to initialize the shared - * state + * if `basic_future` is shared. */ - basic_future(const basic_future &other) - : detail::maybe_copyable{ other }, - join_base{ other.join_base::get() }, state_{ other.state_ } { - static_assert( - Options::is_shared, - "Copy constructor is only available for shared futures"); - } + basic_future(const basic_future &other) = default; -#ifndef FUTURES_DOXYGEN - private: - // This makes a non-eligible template instances have two copy - // constructors, which makes it not copy constructible according - // to std::copy_constructible_v<...>. - // This is the least intrusive way to achieve a conditional - // copy constructor that fails std::is_copy_constructible before - // C++20. - struct moo - {}; - basic_future( - std::conditional_t, - moo = moo()); - public: -#endif + /// Copy assignment + /** + * Constructs a shared future that refers to the same shared state, + * if any, as other. + * + * @note The copy assignment only participates in overload resolution + * if `basic_future` is shared. + */ + basic_future & + operator=(const basic_future &other) + = default; /// Move constructor. /** - * Inherited from base classes. + * Constructs a basic_future with the operation state of other using + * move semantics. + * + * After construction, other.valid() == false. */ basic_future(basic_future &&other) noexcept : detail::maybe_copyable{}, - join_base{ std::move(other.join_base::get()) }, - state_{ std::move(other.state_) } { - if constexpr (!inline_op_state) { - other.state_.reset(); + state_{ std::move(other.state_) }, + join_{ std::exchange(other.join_, false) } {} + + /// Move assignment. + /** + * Constructs a basic_future with the operation state of other using + * move semantics. + * + * After construction, other.valid() == false. + */ + basic_future & + operator=(basic_future &&other) noexcept { + state_ = std::move(other.state_); + join_ = std::exchange(other.join_, false); + return *this; + } + + /// @} + + /** + * @name Sharing + * @{ + */ + + /// Create another future whose state is shared + /** + * Create a shared variant of the current future object. + * + * If the current type is not shared, the object becomes invalid. + * + * If the current type is already shared, the new object is equivalent + * to a copy. + * + * @return A shared variant of this future + */ + basic_future> + share() { + if (!valid()) { + detail::throw_exception(); } - if constexpr (!Options::is_always_detached) { - other.detach(); + + // Determine type of corresponding shared future + using shared_options = detail:: + append_future_option_t; + using shared_future_t = basic_future; + + // Create future state for the shared future + if constexpr (Options::is_shared) { + shared_future_t other{ state_ }; + other.join_ = join_; + return other; + } else { + shared_future_t other{ std::move(state_) }; + other.join_ = std::exchange(join_, false); + return other; } } - /// Destructor /** - * The shared pointer will take care of decrementing the reference - * counter of the shared state, but we still take care of the special - * options: - * - We let stoppable futures set the stop token and wait. - * - We run the continuations if possible + * @} */ - ~basic_future() { - if constexpr (Options::is_stoppable && !Options::is_shared) { - if (valid() && !is_ready()) { - get_stop_source().request_stop(); + + /** + * @name Getting the result + * @{ + */ + + /// Returns the result + /** + * The get member function waits until the future has a valid result + * and retrieves it. + * + * It effectively calls wait() in order to wait for the result. + * + * The behavior is undefined if `valid()` is false before the call to + * this function. + * + * If the future is unique, any shared state is released and `valid()` + * is `false` after a call to this member function. + * + * - Unique futures: + * - `R` -> return `R` + * - `R&` -> return `R&` + * - `void` -> return `void` + * - Shared futures: + * - `R` -> return `const R&` + * - `R&` -> return `R&` + * - `void` -> return `void` + * + */ + decltype(auto) + get() { + if (!valid()) { + detail::throw_exception(); + } + state_.wait(); + if constexpr (Options::is_shared) { + // state_.get() should handle the return type for us + return state_.get(); + } else { + future_state_type tmp(std::move(state_)); + if constexpr (std::is_reference_v || std::is_void_v) { + return tmp.get(); + } else { + return R(std::move(tmp.get())); } } - wait_if_last(); } - /// Copy assignment for shared futures only. + /// Get exception pointer without throwing an exception /** - * Inherited from base classes. + * If the future does not hold an exception, the exception_ptr is + * nullptr. + * + * This extends future objects so that we can always check if the future + * contains an exception without throwing it. + * + * @return An exception pointer */ - template - basic_future & - operator=(const basic_future &other) { - static_assert( - Options::is_shared, - "Copy assignment is only available for shared futures"); - if (&other == this) { - return *this; + std::exception_ptr + get_exception_ptr() { + if (!valid()) { + detail::throw_exception(); } - wait_if_last(); // If this is the last shared future waiting for - // previous result, we wait - if constexpr (std::is_same_v< - join_base, - typename basic_future::join_base>) { - join_base::get() = other.join_base::get(); - } else if constexpr ( - std::is_same_v) { - join_base::get() = false; + state_.wait(); + return state_.get_exception_ptr(); + } + + /** + * @} + */ + + /** + * @name Future state + * + * Observe the current state of the future value + * + * @{ + */ + + /// Checks if the future refers to a valid operation state + /** + * This is the case only for futures that were not default-constructed + * or moved from until the first time get() or share() is called. If the + * future is shared, its state is not invalidated when get() is called. + * + * If any member function other than the destructor, the move-assignment + * operator, or `valid` is called on a future that does not refer a + * valid operation state, a future_error will be throw to indicate + * `no_state`. + * + * It is valid to move (or copy, for shared futures) from a future + * object for which `valid()` is `false`. + * + * @return true if `*this` refers to a valid operation state + */ + [[nodiscard]] bool + valid() const { + return state_.valid(); + } + + /// Waits for the result to become available + /** + * Blocks until the result becomes available. + * + * `valid() == true` after the call. + * + * A `future_uninitialized` exception is thrown if `valid() == false` + * before the call to this function. + */ + void + wait() const { + if (!valid()) { + detail::throw_exception(); } - state_ = other.state_; // Make it point to the same shared state - other.detach(); // Detach other to ensure it won't block at - // destruction - return *this; + if constexpr (Options::is_always_deferred) { + detail::throw_exception(); + } + state_.wait(); } - /// Move assignment. + /// Waits for the result to become available /** - * Inherited from base classes. + * Blocks until the result becomes available. + * + * `valid() == true` after the call. + * + * A `future_uninitialized` exception is thrown if `valid() == false` + * before the call to this function. */ - template - basic_future & - operator=(basic_future &&other) noexcept { - if (&other == this) { - return *this; + void + wait() { + if (!valid()) { + detail::throw_exception(); } - wait_if_last(); // If this is the last shared future waiting for - // previous result, we wait - if constexpr (std::is_same_v< - join_base, - typename basic_future::join_base>) { - join_base::get() = other.join_base::get(); - } else if constexpr ( - std::is_same_v) { - join_base::get() = false; + state_.wait(); + } + + /// Waits for the result, returns if it is unavailable for duration + /** + * Waits for the result to become available. Blocks until specified + * timeout_duration has elapsed or the result becomes available, + * whichever comes first. The return value identifies the state of the + * result. + * + * If the future is deferred, the operation state might be converted + * into a shared operation state. This ensures that (i) the result will + * be computed only when explicitly requested, and (ii) the address of + * the operation state will not change once the result is requested. + * + * This function may block for longer than `timeout_duration` due to + * scheduling or resource contention delays. + * + * The behavior is undefined if valid() is false before the call to + * this function. + * + * @param timeout_duration maximum duration to block for + * + * @return future status + */ + template + std::future_status + wait_for( + const std::chrono::duration &timeout_duration) const { + if (!valid()) { + detail::throw_exception(); } - state_ = other.state_; // Make it point to the same shared state - other.state_.reset(); - other.detach(); // Detach other to ensure it won't block at - // destruction - return *this; + if constexpr (Options::is_always_deferred) { + return std::future_status::deferred; + } + return state_.wait_for(timeout_duration); + } + + /// Waits for the result, returns if it is unavailable for duration + template + std::future_status + wait_for(const std::chrono::duration &timeout_duration) { + if (!valid()) { + detail::throw_exception(); + } + return state_.wait_for(timeout_duration); + } + + /// Waits for the result, returns if it is unavailable for duration + /** + * Waits for a result to become available. It blocks until specified + * `timeout_time` has been reached or the result becomes available, + * whichever comes first. The return value indicates why `wait_until` + * returned. + * + * If the future is deferred, the operation state might be converted + * into a shared operation state. This ensures that (i) the result will + * be computed only when explicitly requested, and (ii) the address of + * the operation state will not change once the result is requested. + * + * This function may block until after `timeout_time` due to + * scheduling or resource contention delays. + * + * The behavior is undefined if valid() is false before the call to + * this function. + * + * @param timeout_time maximum time point to block until + * + * @return future status + */ + template + std::future_status + wait_until(const std::chrono::time_point &timeout_time) + const { + if (!valid()) { + detail::throw_exception(); + } + return state_.wait_until(timeout_time); + } + + /// Waits for the result, returns if it is unavailable for duration + template + std::future_status + wait_until( + const std::chrono::time_point &timeout_time) { + if (!valid()) { + detail::throw_exception(); + } + if constexpr (Options::is_always_deferred) { + detail::throw_exception(); + } + return state_.wait_until(timeout_time); + } + + /// Checks if the associated operation state is ready. + /** + * Checks if the associated shared state is ready. + * + * The behavior is undefined if valid() is false. + * + * @return `true` if the associated shared state is ready + */ + [[nodiscard]] bool + is_ready() const { + if (!valid()) { + detail::throw_exception( + std::future_errc::no_state); + } + return state_.is_ready(); + } + + /// Tell this future not to join at destruction + /** + * For safety, all futures wait at destruction by default. + * + * This function separates the execution from the future object, + * allowing execution to continue independently. + */ + void + detach() { + join_ = false; } - /// @} - /// Emplace a function to the shared vector of continuations /** - * If the function is ready, this functions uses the given executor - * instead of executing with the previous executor. + * @} + */ + + /** + * @name Continuations + * + * @{ + */ + + /// Attaches a continuation to a future + /** + * Attach the continuation function to this future object. The behavior + * is undefined if this future has no associated operation state + * (i.e., `valid() == false`). + * + * Creates an operation state associated with the future object to be + * returned. + * + * When the shared state currently associated with this future is ready, + * the continuation is called on the specified executor. + * + * Any value returned from the continuation is stored as the result in + * the operation state of the returned future object. Any exception + * propagated from the execution of the continuation is stored as the + * exceptional result in the operation state of the returned future + * object. + * + * A continuation to an eager future is also eager. If this future is + * eager, the continuation is attached to a list of continuations of + * this future. + * + * A continuation to a deferred future is also deferred. If this future + * is deferred, this future is stored as the parent future of the next + * future. + * + * If this future is ready, the continuation is directly launched or + * scheduled in the specified executor. + * + * @note Unlike `std::experimental::future`, when the return type of the + * continuation function is also a future, this function performs no + * implicit unwrapping on the return type with the `get` function. This + * (i) simplifies the development of generic algorithms with futures, + * (ii) makes the executor for the unwrapping task explicit, and (iii) + * allows the user to retrieve the returned type as a future or as + * its unwrapped type. * * @note This function only participates in overload resolution if the * future supports continuations * * @tparam Executor Executor type * @tparam Fn Function type + * * @param ex An executor * @param fn A continuation function + * * @return The continuation future */ template < @@ -363,99 +808,137 @@ namespace futures { > decltype(auto) then(const Executor &ex, Fn &&fn) { + // Throw if invalid if (!valid()) { detail::throw_exception( std::future_errc::no_state); } - // Determine next future options - using traits = detail:: - continuation_traits; - using next_value_type = typename traits::next_value_type; - using next_future_options = typename traits::next_future_options; - using next_future_type - = basic_future; - if constexpr (Options::is_continuable) { - // Create task for continuation future - detail::continuations_source cont_source - = state_->get_continuations_source(); - + // Determine traits for the next future + using traits = detail:: + continuation_traits; + using next_value_type = typename traits::next_value_type; + using next_future_options = typename traits::next_future_options; + using next_future_type + = basic_future; + + // Both futures are eager and continuable + static_assert(!is_always_deferred_v); + static_assert(!is_always_deferred_v); + static_assert(is_continuable_v); + static_assert(is_continuable_v); + + // Store a backup of the continuations source + auto cont_source = get_continuations_source(); + + // Create continuation function + // note: this future is moved into this task + // note: this future being shared allows this to be copy + // constructible detail::unwrap_and_continue_task< std::decay_t, std::decay_t> task{ detail::move_if_not_shared(*this), std::forward(fn) }; - // Create shared state for next future + // Create a shared operation state for next future + // note: we use a shared state because the continuation is also + // eager using operation_state_t = detail:: operation_state; auto state = std::make_shared(ex); next_future_type fut(state); - // Push task to set next state to this continuation list - auto apply_fn = + // Create task to set next future state + // note: this function might become non-copy-constructible + // because it stores the continuation function. + auto set_state_fn = [state = std::move(state), task = std::move(task)]() mutable { state->apply(std::move(task)); }; + using set_state_fn_type = decltype(set_state_fn); - auto fn_shared_ptr = std::make_shared( - std::move(apply_fn)); - auto copyable_handle = [fn_shared_ptr]() { - (*fn_shared_ptr)(); - }; - - cont_source.push(ex, copyable_handle); - + // Attach set_state_fn to this continuation list + if constexpr (std::is_copy_constructible_v) { + cont_source.push(ex, std::move(set_state_fn)); + } else { + // Make the continuation task copyable if we have to + // note: the continuation source uses `std::function` to + // represent continuations. + // note: This could be improved with an implementation of + // `std::move_only_function` to be used by the continuation + // source. + auto fn_shared_ptr = std::make_shared( + std::move(set_state_fn)); + auto copyable_handle = [fn_shared_ptr]() { + (*fn_shared_ptr)(); + }; + cont_source.push(ex, copyable_handle); + } return fut; } else if constexpr (Options::is_always_deferred) { + // Determine traits for the next future + using traits = detail:: + continuation_traits; + using next_value_type = typename traits::next_value_type; + using next_future_options = typename traits::next_future_options; + using next_future_type + = basic_future; + + // Both future types are deferred static_assert(is_always_deferred_v); static_assert(is_always_deferred_v); - // Create task for the continuation future + // Create continuation function + // note: this future is moved into this task + // note: this future is not always shared, in which case + // the operation state is still inline in another address, + // which is OK because the value hasn't been requested detail::unwrap_and_continue_task< std::decay_t, std::decay_t> task{ detail::move_if_not_shared(*this), std::forward(fn) }; - // Create operation state for the next future - if constexpr (!Options::is_shared) { - static_assert(!is_shared_future_v); - static_assert(!is_shared_future_v); - using operation_state_t = detail::deferred_operation_state< + // Create the operation state for the next future + // note: this state is inline because the continuation + // is also deferred + // note: This operation contains the task + using deferred_operation_state_t = detail:: + deferred_operation_state< next_value_type, next_future_options>; - operation_state_t state(ex, std::move(task)); + deferred_operation_state_t state(ex, std::move(task)); - next_future_type fut(std::move(state)); - return fut; - } else { - static_assert(is_shared_future_v); - static_assert(is_shared_future_v); - using shared_state_t = detail:: - operation_state; - auto state = std::make_shared< - shared_state_t>(ex, std::move(task)); - next_future_type fut(state); - return fut; - } + // Move operation state into the new future + // note: this is the new future representing the deferred + // task graph now. It has inline access to the parent + // operation. + next_future_type fut(std::move(state)); + return fut; } } - /// Emplace a function to the shared vector of continuations + /// Attaches a continuation to a future on the same executor /** - * If properly setup (by async), this future holds the result from a - * function that runs these continuations after the main promise is - * fulfilled. However, if this future is already ready, we can just run - * the continuation right away. + * Attach the continuation function to this future object with the + * default executor. + * + * When the shared state currently associated with this future is ready, + * the continuation is called on the same executor as this future. If + * no executor is associated with this future, the default executor + * is used. * * @note This function only participates in overload resolution if the * future supports continuations * - * @return True if the contination was emplaced without the using the - * default executor + * @tparam Fn Function type + * + * @param fn A continuation function + * + * @return The continuation future */ template < class Fn @@ -466,41 +949,58 @@ namespace futures { U && U == (Options::is_continuable || Options::is_always_deferred), int> = 0 #endif - > decltype(auto) then(Fn &&fn) { if constexpr (Options::has_executor) { - return then(get_executor(), std::forward(fn)); + return this->then(get_executor(), std::forward(fn)); } else { - return then(make_default_executor(), std::forward(fn)); + return this + ->then(make_default_executor(), std::forward(fn)); } } - /// Get this future's continuations source + /// Get the current executor for this task /** - * @note This function only participates in overload resolution if the - * future supports continuations + * @note This function only participates in overload resolution + * if future_options has an associated executor * - * @return The continuations source + * @return The executor associated with this future instance */ template < #ifndef FUTURES_DOXYGEN - bool U = Options::is_continuable, - std::enable_if_t = 0 + bool U = Options::has_executor, + std::enable_if_t = 0 #endif > - [[nodiscard]] decltype(auto) - get_continuations_source() const noexcept { - return state_->get_continuations_source(); + const typename Options::executor_t & + get_executor() const { + static_assert(Options::has_executor); + return state_.get_executor(); } - /// Request the future to stop whatever task it's running /** + * @} + */ + + /** + * @name Stop requests + * @{ + */ + + /// Requests execution stop via the shared stop state of the task + /** + * Issues a stop request to the internal stop-state, if it has not yet + * already had stop requested. The task associated to the future + * can use the stop token to identify it should stop running. + * + * The determination is made atomically, and if stop was requested, the + * stop-state is atomically updated to avoid race conditions. + * * @note This function only participates in overload resolution if the * future supports stop tokens * - * @return Whether the request was made + * @return `true` if this invocation made a stop request */ template < #ifndef FUTURES_DOXYGEN @@ -510,11 +1010,14 @@ namespace futures { > bool request_stop() noexcept { - return state_->get_stop_source().request_stop(); + return get_stop_source().request_stop(); } /// Get this future's stop source /** + * Returns a stop_source associated with the same shared stop-state as + * held internally by the future task object. + * * @note This function only participates in overload resolution if the * future supports stop tokens * @@ -528,11 +1031,14 @@ namespace futures { > [[nodiscard]] stop_source get_stop_source() const noexcept { - return state_->get_stop_source(); + return state_.get_stop_source(); } /// Get this future's stop token /** + * Returns a stop_token associated with the same shared stop-state held + * internally by the future task object. + * * @note This function only participates in overload resolution if the * future supports stop tokens * @@ -549,241 +1055,71 @@ namespace futures { return get_stop_source().get_token(); } - /// Wait until all futures have a valid result and retrieves it /** - * The behaviour depends on shared_based. + * @} */ - decltype(auto) - get() { - if (!valid()) { - detail::throw_exception(); - } - if constexpr (Options::is_shared) { - if constexpr (!inline_op_state) { - return state_->get(); - } else { - return state_.get(); - } - } else { - if constexpr (!inline_op_state) { - shared_state_type tmp; - tmp.swap(state_); - if constexpr (std::is_reference_v || std::is_void_v) { - return tmp->get(); - } else { - return T(std::move(tmp->get())); - } - } else { - if constexpr (std::is_reference_v || std::is_void_v) { - return state_.get(); - } else { - return T(std::move(state_.get())); - } - } - } - } - - /// Get exception pointer without throwing exception - /** - * This extends std::future so that we can always check if the future - * threw an exception - */ - std::exception_ptr - get_exception_ptr() { - if (!valid()) { - detail::throw_exception(); - } - state_->wait(); - return state_->get_exception_ptr(); - } + private: + /// @name Private Functions + /// @{ - /// Create another future whose state is shared + /// Get this future's continuations source /** - * Create a shared variant of the current future type. - * If the current type is not shared, the object becomes invalid. - * If the current type is shared, the new object is equivalent to a - * copy. + * @note This function only participates in overload resolution if the + * future supports continuations * - * @return A shared variant of this future - */ - basic_future> - share() { - if (!valid()) { - detail::throw_exception(); - } - using shared_options = detail:: - append_future_option_t; - using shared_future_t = basic_future; - // this op state might be inline - // shared future is never inline - if constexpr (inline_op_state) { - auto shared_state = std::make_shared( - std::move(state_)); - auto erased_shared_state = std::dynamic_pointer_cast< - detail::operation_state>( - shared_state); - shared_future_t res{ std::move(erased_shared_state) }; - res.join_base::get() = std::exchange( - join_base::get(), - Options::is_shared && join_base::get()); - return res; - } else { - shared_future_t res{ - Options::is_shared ? state_ : std::move(state_) - }; - res.join_base::get() = std::exchange( - join_base::get(), - Options::is_shared && join_base::get()); - return res; - } - } - - - /// Checks if the future refers to a shared state - [[nodiscard]] bool - valid() const { - if constexpr (!inline_op_state) { - return nullptr != state_.get(); - } else { - return true; - } - } - - /// Blocks until the result becomes available. - void - wait() const { - if (!valid()) { - detail::throw_exception(); - } - if constexpr (!inline_op_state) { - state_->wait(); - } else { - state_.wait(); - } - } - - /// Waits for the result to become available. - template - std::future_status - wait_for( - const std::chrono::duration &timeout_duration) const { - if (!valid()) { - detail::throw_exception(); - } - return state_->wait_for(timeout_duration); - } - - /// Waits for the result to become available. - template - std::future_status - wait_until(const std::chrono::time_point &timeout_time) - const { - if (!valid()) { - detail::throw_exception(); - } - return state_->wait_until(timeout_time); - } - - /// Checks if the shared state is ready - [[nodiscard]] bool - is_ready() const { - if (!valid()) { - detail::throw_exception( - std::future_errc::no_state); - } - if constexpr (!inline_op_state) { - return state_->is_ready(); - } else { - return state_.is_ready(); - } - } - - /// Tell this future not to join at destruction - /** - * For safety, all futures join at destruction by default + * @return The continuations source */ - void - detach() { - join_base::get() = false; + template < +#ifndef FUTURES_DOXYGEN + bool U = Options::is_continuable, + std::enable_if_t = 0 +#endif + > + [[nodiscard]] decltype(auto) + get_continuations_source() const noexcept { + return state_.get_continuations_source(); } /// Notify this condition variable when the future is ready notify_when_ready_handle notify_when_ready(std::condition_variable_any &cv) { - if constexpr (!inline_op_state) { - if (!state_) { - detail::throw_exception(); - } - return state_->notify_when_ready(cv); - } else { - return state_.notify_when_ready(cv); + if (!valid()) { + detail::throw_exception(); } + return state_.notify_when_ready(cv); } /// Cancel request to notify this condition variable when the /// future is ready void unnotify_when_ready(notify_when_ready_handle h) { - if constexpr (!inline_op_state) { - if (!state_) { - detail::throw_exception(); - } - return state_->unnotify_when_ready(h); - } else { - return state_.unnotify_when_ready(h); - } - } - - /// Get the current executor for this task - /** - * @note This function only participates in overload resolution - * if future_options has an associated executor - * - * @return The executor associated with this future instance - */ - template < -#ifndef FUTURES_DOXYGEN - bool U = Options::has_executor, - std::enable_if_t = 0 -#endif - > - const typename Options::executor_t & - get_executor() const { - static_assert(Options::has_executor); - if constexpr (!inline_op_state) { - return state_->get_executor(); - } else { - return state_.get_executor(); + if (!valid()) { + detail::throw_exception(); } + return state_.unnotify_when_ready(h); } - private: - /// @name Private Functions - /// @{ - /// Get a reference to the mutex in the underlying shared state std::mutex & waiters_mutex() { - if constexpr (!inline_op_state) { - if (!state_) { - detail::throw_exception(); - } - return state_->waiters_mutex(); - } else { - return state_.waiters_mutex(); + if (!valid()) { + detail::throw_exception(); } + return state_.waiters_mutex(); } + /// Wait if this is the last future referring to the operation state void - wait_if_last() const { - if (join_base::get() && valid() && (!is_ready())) { - if constexpr (!Options::is_shared) { + wait_if_last() { + if constexpr (Options::is_shared) { + if (join_ && valid() && !is_ready() && state_.use_count() == 1) + { + wait(); + } + } else { + if (join_ && valid() && !is_ready()) { wait(); - } else /* constexpr */ { - if (1 == state_.use_count()) { - wait(); - } } } } @@ -793,7 +1129,10 @@ namespace futures { /// @name Members /// @{ /// Pointer to shared state - mutable unique_or_shared_state state_{}; + mutable future_state_type state_{}; + + /// Whether this future should join at destruction + bool join_{ !Options::is_always_detached }; /// @} }; diff --git a/include/futures/futures/detail/future_launcher.hpp b/include/futures/futures/detail/future_launcher.hpp index b90ae5218..5a72876bd 100644 --- a/include/futures/futures/detail/future_launcher.hpp +++ b/include/futures/futures/detail/future_launcher.hpp @@ -48,28 +48,36 @@ namespace futures::detail { // Future traits using value_type = launch_result_t; static constexpr bool is_eager = !FutureOptions::is_always_deferred; - - // Create shared state - auto state = make_initial_state( - ex, - std::forward(f), - std::forward(args)...); - basic_future fut( - move_if_not_shared_ptr(state)); - if constexpr (is_eager) { + // Create shared state + auto shared_state = make_initial_state( + ex, + std::forward(f), + std::forward(args)...); + basic_future fut(shared_state); + // Launch task to fulfill the eager promise now asio::post( ex, std::move( - [state = std::move(state), + [state = std::move(shared_state), f = std::forward(f), args = std::make_tuple( std::forward(args)...)]() mutable { state->apply_tuple(std::move(f), std::move(args)); })); + return fut; + } else { + // Create shared state + auto op_state + = make_initial_state( + ex, + std::forward(f), + std::forward(args)...); + basic_future fut( + move_if_not_shared_ptr(op_state)); + return fut; } - return fut; } template diff --git a/include/futures/futures/detail/future_state.hpp b/include/futures/futures/detail/future_state.hpp new file mode 100644 index 000000000..93ea87655 --- /dev/null +++ b/include/futures/futures/detail/future_state.hpp @@ -0,0 +1,864 @@ +// +// Copyright (c) 2022 alandefreitas (alandefreitas@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. +// https://www.boost.org/LICENSE_1_0.txt +// + +#ifndef FUTURES_FUTURES_DETAIL_FUTURE_STATE_HPP +#define FUTURES_FUTURES_DETAIL_FUTURE_STATE_HPP + +#include +#include +#include +#include +#include + +namespace futures::detail { + /** @addtogroup futures Futures + * @{ + */ + /** @addtogroup future-traits Future Traits + * @{ + */ + + /// Disambiguation tags that can be passed to the constructors future_state + template + struct in_place_type_t + { + explicit in_place_type_t() = default; + }; + template + inline constexpr in_place_type_t in_place_type{}; + + /// A variant operation state used in instances of basic_future + /** + * This class models the operation state in the various formats in might + * be found in a future: + * + * - Empty state (i.e. default constructed and moved-from futures) + * - Direct value storage (futures created with make_ready_future) + * - Shared value storage (shared futures created with make_ready_future) + * - Inline operation state (deferred futures - address can't change) + * - Shared operation state (eager and shared futures) + * + * The non-copyable operation states are converted into shared state if + * they ever need to be copied. This implies we only recur to a shared + * operation state when we really need to. This allows us to avoid dynamic + * memory allocations in all other cases. + * + * @tparam R State main type + * @tparam OpState Underlying operation state type + * + */ + template + class future_state + { + private: + /** + * @name Private types + * @{ + */ + + static_assert(is_operation_state_v); + + using empty_t = empty_value_type; + using operation_storage_t = operation_state_storage; + using shared_storage_t = std::shared_ptr; + using operation_state_t = OpState; + using shared_state_t = std::shared_ptr; + + template + using is_future_state_type = std::disjunction< + std::is_same, + std::is_same, + std::is_same, + std::is_same, + std::is_same>; + + template + static constexpr bool is_future_state_type_v = is_future_state_type< + T>::value; + + using aligned_storage_t = aligned_storage_for< + empty_t, + operation_storage_t, + shared_storage_t, + operation_state_t, + shared_state_t>; + + /** + * @} + */ + public: + /** + * @name Public types + * @{ + */ + + /// Type ids for a future state + enum class type_id : uint8_t + { + /// The future state is empty + empty, + /// The future state holds direct value storage + direct_storage, + /// The future state holds shared direct value storage + shared_storage, + /// The future state holds an inline operation state + inline_state, + /// The future state holds a shared operation state + shared_state, + }; + + /** + * @} + */ + public: + /** + * @name Constructors + * @{ + */ + + /// Destructor + ~future_state() { + destroy_impl(); + }; + + /// Constructor + future_state() = default; + + /// Copy Constructor + future_state(const future_state& other) { + copy_impl(other); + } + + /// Copy Constructor + future_state(future_state& other) { + other.share(); + copy_impl(other); + } + + /// Move Constructor + future_state(future_state&& other) noexcept { + move_impl(std::move(other)); + } + + /// Converting Constructor + template < + class T +#ifndef FUTURES_DOXYGEN + , + std::enable_if_t>, int> = 0 +#endif + > + explicit future_state(T&& other) { + using value_type = std::decay_t; + emplace(std::forward(other)); + } + + /// Emplace Constructor + template < + class T, + class... Args +#ifndef FUTURES_DOXYGEN + , + std::enable_if_t>, int> = 0 +#endif + > + explicit future_state(in_place_type_t, Args&&... args) { + using value_type = std::decay_t; + emplace(std::forward(args)...); + } + + /// Copy Assignment + future_state& + operator=(const future_state& other) { + copy_impl(other); + return *this; + } + + /// Move Assignment + future_state& + operator=(future_state&& other) noexcept { + move_impl(std::move(other)); + return *this; + } + + /** + * @} + */ + + /** + * @name Accessors + * + * Variant-like functions + * + * @{ + */ + + /// Returns enumeration value of the type currently held by the variant + [[nodiscard]] constexpr type_id + type() const { + return type_id_; + } + + /// Returns the index of the alternative held by the variant + [[nodiscard]] constexpr std::size_t + index() const { + return static_cast(type()); + } + + /// Check if current variant value is of specified type + template < + class T +#ifndef FUTURES_DOXYGEN + , + std::enable_if_t>, int> = 0 +#endif + > + [[nodiscard]] bool + holds() const { + using value_type = std::decay_t; + if constexpr (is_future_state_type_v) { + return type_id_ == type_id_for(); + } + return false; + } + + /// Check if variant value is empty value + [[nodiscard]] bool + holds_empty() const { + return holds(); + } + + /// Check if variant value is direct storage + [[nodiscard]] bool + holds_storage() const { + return holds(); + } + + /// Check if variant value is shared direct storage + [[nodiscard]] bool + holds_shared_storage() const { + return holds(); + } + + /// Check if variant value is operation state + [[nodiscard]] bool + holds_operation_state() const { + return holds(); + } + + /// Check if variant value is shared state + [[nodiscard]] bool + holds_shared_state() const { + return holds(); + } + + /// Get variant value as specified type + template < + class T +#ifndef FUTURES_DOXYGEN + , + std::enable_if_t>, int> = 0 +#endif + > + T& + get_as() { + using value_type = std::decay_t; + if constexpr (is_future_state_type_v) { + return *reinterpret_cast(data()); + } + detail::throw_exception(); + } + + /// Get constant variant value as specified type + template < + class T +#ifndef FUTURES_DOXYGEN + , + std::enable_if_t>, int> = 0 +#endif + > + const T& + get_as() const { + using value_type = std::decay_t; + using return_type = std::add_const_t; + if constexpr (is_future_state_type_v) { + return *reinterpret_cast(data()); + } + detail::throw_exception(); + } + + /// Get variant value as empty value + empty_t& + get_as_empty() { + return get_as(); + } + + /// Get constant variant value as empty value + [[nodiscard]] const empty_t& + get_as_empty() const { + return get_as(); + } + + /// Get variant value as storage + operation_storage_t& + get_as_storage() { + return get_as(); + } + + /// Get constant variant value as storage + [[nodiscard]] const operation_storage_t& + get_as_storage() const { + return get_as(); + } + + /// Get variant value as shared storage + shared_storage_t& + get_as_shared_storage() { + return get_as(); + } + + /// Get constant variant value as shared storage + [[nodiscard]] const shared_storage_t& + get_as_shared_storage() const { + return get_as(); + } + + /// Get variant value as operation state + operation_state_t& + get_as_operation_state() { + return get_as(); + } + + /// Get constant variant value as operation state + [[nodiscard]] const operation_state_t& + get_as_operation_state() const { + return get_as(); + } + + /// Get variant value as shared state + shared_state_t& + get_as_shared_state() { + return get_as(); + } + + /// Get constant variant value as shared state + [[nodiscard]] const shared_state_t& + get_as_shared_state() const { + return get_as(); + } + + /// Constructs a value in the variant, in place + template < + class T, + class... Args +#ifndef FUTURES_DOXYGEN + , + std::enable_if_t>, int> = 0 +#endif + > + void + emplace(Args&&... args) { + destroy_impl(); + new (data()) T(std::forward(args)...); + type_id_ = type_id_for(); + } + + /// Constructs empty value in the variant, in place + template + void + emplace_empty(Args&&... args) { + return emplace(std::forward(args)...); + } + + /// Constructs direct storage in the variant, in place + template + void + emplace_storage(Args&&... args) { + return emplace(std::forward(args)...); + } + + /// Constructs shared storage in the variant, in place + template + void + emplace_shared_storage(Args&&... args) { + return emplace(std::forward(args)...); + } + + /// Constructs operation state in the variant, in place + template + void + emplace_operation_state(Args&&... args) { + return emplace(std::forward(args)...); + } + + /// Constructs shared state in the variant, in place + template + void + emplace_shared_state(Args&&... args) { + return emplace(std::forward(args)...); + } + + /** + * @} + */ + + /** + * @name Operation state functions + * + * Operation state-like functions. These function are redirected + * to the operation state depending on the type of state this + * object stores. + * + * @{ + */ + + /// Forward get function to proper state type + decltype(auto) + get() { + switch (type_id_) { + case type_id::shared_state: + return get_as_shared_state()->get(); + case type_id::inline_state: + return get_as_operation_state().get(); + case type_id::direct_storage: + return get_as_storage().get(); + case type_id::shared_storage: + return get_as_shared_storage()->get(); + case type_id::empty: + detail::throw_exception( + "Operation state is invalid"); + } + detail::throw_exception("Invalid type id"); + } + + /// Forward get_exception_ptr function to proper state type + std::exception_ptr + get_exception_ptr() { + switch (type_id_) { + case type_id::shared_state: + return get_as_shared_state()->get_exception_ptr(); + case type_id::inline_state: + return get_as_operation_state().get_exception_ptr(); + case type_id::direct_storage: + case type_id::shared_storage: + case type_id::empty: + return nullptr; + } + detail::throw_exception("Invalid type id"); + } + + [[nodiscard]] bool + valid() const { + switch (type_id_) { + case type_id::shared_state: + return get_as_shared_state().get() != nullptr; + case type_id::inline_state: + return true; + case type_id::direct_storage: + return true; + case type_id::shared_storage: + return get_as_shared_storage().get() != nullptr; + case type_id::empty: + return false; + } + detail::throw_exception("Invalid type id"); + } + + private: + template + static void + wait_impl( + std::conditional_t& s) { + switch (s.type_id_) { + case type_id::shared_state: + s.get_as_shared_state()->wait(); + return; + case type_id::inline_state: + s.get_as_operation_state().wait(); + return; + case type_id::direct_storage: + case type_id::shared_storage: + case type_id::empty: + return; + } + detail::throw_exception("Invalid type id"); + } + public: + /// Forward wait function to proper state type + void + wait() const { + wait_impl(*this); + } + + /// Forward wait function to proper state type + void + wait() { + wait_impl(*this); + } + + private: + template + static std::future_status + wait_for_impl( + std::conditional_t& s, + const std::chrono::duration& timeout_duration) { + // Ensure the state type is shared + if constexpr (!is_const) { + if (s.type_id_ == type_id::inline_state) { + s.share(); + } + } + switch (s.type_id_) { + case type_id::shared_state: + return s.get_as_shared_state()->wait_for(timeout_duration); + case type_id::direct_storage: + case type_id::shared_storage: + case type_id::empty: + return std::future_status::ready; + case type_id::inline_state: + detail::throw_exception( + "Cannot wait for deferred state with timeout"); + } + detail::throw_exception("Invalid type id"); + } + public: + /// Forward wait_for function to proper state type + template + std::future_status + wait_for( + const std::chrono::duration& timeout_duration) const { + return wait_for_impl(*this, timeout_duration); + } + + template + std::future_status + wait_for(const std::chrono::duration& timeout_duration) { + return wait_for_impl(*this, timeout_duration); + } + + private: + template + static std::future_status + wait_until_impl( + std::conditional_t& s, + const std::chrono::time_point& timeout_time) { + // Ensure the state type is shared + if constexpr (is_const) { + if (s.type_id_ == type_id::inline_state) { + s.share(); + } + } + switch (s.type_id_) { + case type_id::shared_state: + return s.get_as_shared_state()->wait_until(timeout_time); + case type_id::direct_storage: + case type_id::shared_storage: + case type_id::empty: + return std::future_status::ready; + case type_id::inline_state: + detail::throw_exception( + "Cannot wait for deferred state with timeout"); + } + detail::throw_exception("Invalid type id"); + } + public: + /// Forward wait_until function to proper state type + template + std::future_status + wait_until(const std::chrono::time_point& timeout_time) + const { + return wait_until_impl(*this, timeout_time); + } + + /// Forward wait_until function to proper state type + template + std::future_status + wait_until( + const std::chrono::time_point& timeout_time) { + return wait_until_impl(*this, timeout_time); + } + + /// Forward is_ready function to proper state type + [[nodiscard]] bool + is_ready() const { + switch (type_id_) { + case type_id::shared_state: + return get_as_shared_state()->is_ready(); + case type_id::inline_state: + return get_as_operation_state().is_ready(); + case type_id::direct_storage: + case type_id::shared_storage: + return true; + case type_id::empty: + return false; + } + detail::throw_exception("Invalid type id"); + } + + /// Forward get_continuations_source function to proper state type + auto + get_continuations_source() { + switch (type_id_) { + case type_id::shared_state: + return get_as_shared_state()->get_continuations_source(); + case type_id::inline_state: + return get_as_operation_state().get_continuations_source(); + case type_id::direct_storage: + case type_id::shared_storage: + case type_id::empty: + detail::throw_exception( + "Future non-continuable"); + } + detail::throw_exception("Invalid type id"); + } + + typename operation_state_t::notify_when_ready_handle + notify_when_ready(std::condition_variable_any& cv) { + switch (type_id_) { + case type_id::shared_state: + return get_as_shared_state()->notify_when_ready(cv); + case type_id::inline_state: + return get_as_operation_state().notify_when_ready(cv); + case type_id::direct_storage: + case type_id::shared_storage: + case type_id::empty: + cv.notify_all(); + return typename operation_state_t::notify_when_ready_handle{}; + } + detail::throw_exception("Invalid type id"); + } + + void + unnotify_when_ready( + typename operation_state_t::notify_when_ready_handle h) { + switch (type_id_) { + case type_id::shared_state: + return get_as_shared_state()->unnotify_when_ready(h); + case type_id::inline_state: + return get_as_operation_state().unnotify_when_ready(h); + case type_id::direct_storage: + case type_id::shared_storage: + case type_id::empty: + (void) h; + } + detail::throw_exception("Invalid type id"); + } + + [[nodiscard]] stop_source + get_stop_source() const noexcept { + switch (type_id_) { + case type_id::shared_state: + return get_as_shared_state()->get_stop_source(); + case type_id::inline_state: + return get_as_operation_state().get_stop_source(); + case type_id::direct_storage: + case type_id::shared_storage: + detail::throw_exception( + "Cannot stop a ready future"); + case type_id::empty: + detail::throw_exception("Invalid state"); + } + detail::throw_exception("Invalid type id"); + } + + [[nodiscard]] const typename operation_state_options_t< + OpState>::executor_t& + get_executor() const noexcept { + switch (type_id_) { + case type_id::shared_state: + return get_as_shared_state()->get_executor(); + case type_id::inline_state: + return get_as_operation_state().get_executor(); + case type_id::direct_storage: + case type_id::shared_storage: + detail::throw_exception( + "No associated executor to direct storage"); + case type_id::empty: + detail::throw_exception( + "No associated executor to empty state"); + } + detail::throw_exception("Invalid type id"); + } + + std::mutex& + waiters_mutex() { + switch (type_id_) { + case type_id::shared_state: + return get_as_shared_state()->waiters_mutex(); + case type_id::inline_state: + return get_as_operation_state().waiters_mutex(); + case type_id::direct_storage: + case type_id::shared_storage: + detail::throw_exception( + "No associated executor to direct storage"); + case type_id::empty: + detail::throw_exception( + "No associated executor to empty state"); + } + detail::throw_exception("Invalid type id"); + } + + long + use_count() const noexcept { + switch (type_id_) { + case type_id::shared_state: + return get_as_shared_state().use_count(); + case type_id::shared_storage: + return get_as_shared_storage().use_count(); + case type_id::inline_state: + case type_id::direct_storage: + return 1; + case type_id::empty: + return 0; + } + detail::throw_exception("Invalid type id"); + } + + /// Make sure the future object is shared + void + share() { + if (holds_storage()) { + emplace_shared_storage(std::make_shared( + std::move(get_as_storage()))); + } + if (holds_operation_state()) { + emplace_shared_state(std::make_shared( + std::move(get_as_operation_state()))); + } + } + + /** + * @} + */ + + private: + /** + * @name Helper Functions + * @{ + */ + constexpr byte* + data() { + return data_.data(); + } + + constexpr const byte* + data() const { + return data_.data(); + } + + void + destroy_impl() { + switch (type_id_) { + case type_id::empty: + break; + case type_id::direct_storage: + get_as_storage().~operation_storage_t(); + break; + case type_id::shared_storage: + get_as_shared_storage().~shared_storage_t(); + break; + case type_id::inline_state: + get_as_operation_state().~operation_state_t(); + break; + case type_id::shared_state: + get_as_shared_state().~shared_state_t(); + break; + } + type_id_ = type_id::empty; + } + + void + copy_impl(const future_state& other) { + destroy_impl(); + switch (other.type_id_) { + case type_id::empty: + emplace_empty(); + break; + case type_id::shared_storage: + emplace_shared_storage(other.get_as_shared_storage()); + break; + case type_id::shared_state: + emplace_shared_state(other.get_as_shared_state()); + break; + case type_id::direct_storage: + case type_id::inline_state: + default: + detail::throw_exception( + "No copy constructor"); + } + } + + void + move_impl(future_state&& other) { + destroy_impl(); + switch (other.type_id_) { + case type_id::empty: + emplace_empty(); + break; + case type_id::direct_storage: + emplace_storage(std::move(other.get_as_storage())); + break; + case type_id::shared_storage: + emplace_shared_storage( + std::move(other.get_as_shared_storage())); + break; + case type_id::inline_state: + emplace_operation_state( + std::move(other.get_as_operation_state())); + break; + case type_id::shared_state: + emplace_shared_state(std::move(other.get_as_shared_state())); + break; + } + // destroy the object moved from and set its type back to empty + other.destroy_impl(); + } + + template + static constexpr type_id + type_id_for() { + if constexpr (std::is_same_v) { + return type_id::empty; + } + if constexpr (std::is_same_v) { + return type_id::direct_storage; + } + if constexpr (std::is_same_v) { + return type_id::shared_storage; + } + if constexpr (std::is_same_v) { + return type_id::inline_state; + } + if constexpr (std::is_same_v) { + return type_id::shared_state; + } + detail::throw_exception("Invalid type T"); + } + + /** + * @} + */ + + /** + * @name Members + * @{ + */ + + aligned_storage_t data_; + type_id type_id_{ type_id::empty }; + + /** + * @} + */ + }; + + /** @} */ + /** @} */ +} // namespace futures::detail + + +#endif // FUTURES_FUTURES_DETAIL_FUTURE_STATE_HPP diff --git a/include/futures/futures/detail/operation_state.hpp b/include/futures/futures/detail/operation_state.hpp index 5bf47ac3a..3207bae1d 100644 --- a/include/futures/futures/detail/operation_state.hpp +++ b/include/futures/futures/detail/operation_state.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -24,34 +25,70 @@ namespace futures::detail { /** @addtogroup futures Futures * @{ */ - /// Members functions and objects common to all shared state - /// object types (bool ready and exception ptr) - /// - /// Shared states for asynchronous operations contain an element of a - /// given type and an exception. - /// - /// All objects such as futures and promises have shared states and - /// inherit from this class to synchronize their access to their common - /// shared state. + + /// Operation state common synchronization primitives + /** + * Member functions and objects common to all operation state object types. + * + * Operation states for asynchronous operations contain an element of a + * given type or an exception. + * + * All objects such as futures and promises have operation states and + * inherit from this class to synchronize their access to their common + * operation state. + * + * When we know the operation state is always deferred, we can use some + * optimizations related to their synchronization. In other words, we can + * avoid atomic operations necessary to determine the state of the task + * and its continuations. + * + * @tparam is_always_deferred Whether the state is always deferred + */ template class operation_state_base { private: - /// A list of waiters: condition variables to notify any - /// object waiting for this shared state to be ready + /** + * @name Private types + * @{ + */ + + /// Type used for a list of external waiters + /** + * A list of waiters is a list of references to external condition + * variables we should notify when this operation state is ready + */ using waiter_list = detail::small_vector; + /** + * @} + */ + public: + /** + * @name Public types + * @{ + */ + /// A handle to notify an external context about this state being ready using notify_when_ready_handle = waiter_list::iterator; - /// Virtual shared state data destructor /** - * Virtual to make it inheritable. + * @} + */ + + /** + * @name Constructors + * @{ */ + + /// Virtual operation state data destructor virtual ~operation_state_base() = default; /// Constructor + /** + * Defaults to the initial status for the operation state. + */ operation_state_base() = default; /// Delete copy constructor @@ -67,6 +104,15 @@ namespace futures::detail { */ operation_state_base(const operation_state_base &) = delete; + /// Cannot copy assign the operation state data + /** + * We cannot copy assign the operation state data because this base + * class holds synchronization primitives + */ + operation_state_base & + operator=(const operation_state_base &) + = delete; + /// Move constructor /** * Moving the operation state is only valid before the task is running, @@ -76,69 +122,94 @@ namespace futures::detail { * let other object steal the contents while recreating the condition * variables. * - * @param other + * This is only supposed to be used by deferred futures being shared, + * which means 1) the task hasn't been launched yet, 2) their + * `operation_state_base` is inline and should become shared. Thus, + * this allows us to construct a shared_ptr for the operation state. */ operation_state_base(operation_state_base &&other) noexcept - : except_{ std::move(other.except_) }, + : status_{ other.status_.load(std::memory_order_acquire) }, + except_{ std::move(other.except_) }, external_waiters_(std::move(other.external_waiters_)) { - assert(!is_waiting()); + assert(!other.is_waiting()); other.waiter_.notify_all(); std::unique_lock lk(other.waiters_mutex_); + other.status_.exchange(status::ready, std::memory_order_release); }; - /// Cannot copy assign the shared state data + /// Move assignment /** - * We cannot copy assign the shared state data because this base - * class holds synchronization primitives + * Moving the operation state is only valid before the task is running, + * as it might happen with deferred futures. + * + * At this point, we can ignore the synchronization primitives and + * let other object steal the contents while recreating the condition + * variables. + * + * This is only supposed to be used by deferred futures being shared, + * which means 1) the task hasn't been launched yet, 2) their + * `operation_state_base` is inline and should become shared. Thus, + * this allows us to construct a shared_ptr for the operation state. */ operation_state_base & - operator=(const operation_state_base &) - = delete; + operator=(operation_state_base &&other) noexcept { + status_ = std::move(other.status_); + except_ = std::move(other.except_); + external_waiters_ = std::move(other.external_waiters_); + assert(!is_waiting()); + other.waiter_.notify_all(); + std::unique_lock lk(other.waiters_mutex_); + other.status_.exchange(status::ready, std::memory_order_release); + }; - /// Indicate to the shared state the state is ready /** - * This operation marks the ready_ flags and warns any future - * waiting on it. This overload is meant to be used by derived - * classes that might need to use another mutex for this operation + * @} + */ + + /** + * @name Accessors + * + * The accessor functions are used to determine the state of the + * operation. They will trigger and use whatever synchronization + * primitives are necessary to avoid data races between promises + * and futures. + * + * @{ + */ + + /// Indicate to the operation state the state is ready + /** + * This operation marks the status flags and warns any future + * waiting for them. This overload is meant to be used by derived + * classes that will also set the state of their storage. */ void set_ready() noexcept { - // notify all waiters - auto notify_all = [this]() { + // Set state to ready and notify all waiters + status prev + = status_.exchange(status::ready, std::memory_order_release); + if (prev == status::waiting) { + maybe_atomic_thread_fence( + std::memory_order_acquire); + // notify all waiters (internal and external) auto lk = create_wait_lock(); waiter_.notify_all(); for (auto &&external_waiter: external_waiters_) { external_waiter->notify_all(); } - }; - - // set state to ready and notify all - if constexpr (!is_always_deferred) { - status prev = status_.exchange( - status::ready, - std::memory_order_release); - if (prev == status::waiting) { - std::atomic_thread_fence(std::memory_order_acquire); - notify_all(); - } - } else { - status prev = std::exchange(status_, status::ready); - if (prev == status::waiting) { - notify_all(); - } } } /// Check if operation state is ready bool is_ready() const { - return is_status(status::ready); + return status_.load(std::memory_order_acquire) == status::ready; } /// Check if operation state is waiting bool is_waiting() const { - return is_status(status::waiting); + return status_.load(std::memory_order_acquire) == status::waiting; } /// Check if state is ready with no exception @@ -147,13 +218,15 @@ namespace futures::detail { return is_ready() && !except_; } - /// Set shared state to an exception + /// Set operation state to an exception /** - * This sets the exception value and marks the shared state as - * ready. If we try to set an exception on a shared state that's - * ready, we throw an exception. This overload is meant to be used - * by derived classes that might need to use another mutex for this - * operation. + * This sets the exception value and marks the operation state as + * ready. + * + * If we try to set an exception on a operation state that's ready, we + * throw an exception. + * + * This overload is meant to be used by derived classes. */ void set_exception(std::exception_ptr except) { @@ -166,10 +239,7 @@ namespace futures::detail { set_ready(); } - /// Get the shared state when it's an exception - /** - * This overload uses the default global mutex for synchronization - */ + /// Get the operation state when it's as an exception std::exception_ptr get_exception_ptr() const { if (!is_ready()) { @@ -184,15 +254,14 @@ namespace futures::detail { std::rethrow_exception(get_exception_ptr()); } - /// Indicate to the shared state its owner has been destroyed + /// Indicate to the operation state its owner has been destroyed /** - * This allows us to set an error if the promise has been destroyed - * too early. + * Promise types call this function to allows us to set an error if + * the promise has been destroyed too early. * - * If owner has been destroyed before the shared state is ready, - * this means a promise has been broken and the shared state should - * store an exception. This overload is meant to be used by derived - * classes that might need to use another mutex for this operation + * If the owner has been destroyed before the operation state is ready, + * this means a promise has been broken and the operation state should + * store an exception. */ void signal_promise_destroyed() { @@ -207,52 +276,86 @@ namespace futures::detail { return is_ready() && except_ != nullptr; } - /// Wait for shared state to become ready + /** + * @} + */ + + /** + * @name Waiting + * @{ + */ + + /// Wait for operation state to become ready /** * This function uses the condition variable waiters to wait for - * this shared state to be marked as ready. + * this operation state to be marked as ready. * * Atomic operations are used to ensure we only involve the - * waiting lock if the shared state is not ready yet. + * waiting lock if the operation state is not ready yet. */ void wait() { status expected = status::initial; - compare_exchange_status(expected, status::waiting); + status_.compare_exchange_strong(expected, status::waiting); if (expected != status::ready) { + // if previous state wasn't "ready" + // wait for parent operation to complete wait_for_parent(); + // If previous status was initial if (expected == status::initial) { + // launch task this->post_deferred(); } + // wait for current task auto lk = create_wait_lock(); waiter_.wait(lk, [this]() { return is_ready(); }); } } - /// Wait for the shared state to become ready + /// Wait for operation state to become ready + void + wait() const { + if constexpr (is_always_deferred) { + detail::throw_exception(); + } + status expected = status::initial; + status_.compare_exchange_strong(expected, status::waiting); + if (expected != status::ready) { + // wait for current task + auto lk = create_wait_lock(); + waiter_.wait(lk, [this]() { return is_ready(); }); + } + } + + /// Wait for the operation state to become ready /** * This function uses the condition variable waiters to wait for - * this shared state to be marked as ready for a specified + * this operation state to be marked as ready for a specified * duration. * - * This overload is meant to be used by derived classes - * that might need to use another mutex for this operation. - * * @tparam Rep An arithmetic type representing the number of ticks * @tparam Period A std::ratio representing the tick period + * * @param timeout_duration maximum duration to block for - * @return The state of the shared value + * + * @return The state of the operation value */ template std::future_status wait_for(std::chrono::duration const &timeout_duration) { + // check status status expected = status::initial; - compare_exchange_status(expected, status::waiting); + status_.compare_exchange_strong(expected, status::waiting); + // if not ready if (expected != status::ready) { + // wait for parent task wait_for_parent(); + // if task not launched if (expected == status::initial) { + // launch task this->post_deferred(); } + // lock and wait auto lk = create_wait_lock(); if (waiter_.wait_for(lk, timeout_duration, [this]() { return is_ready(); @@ -265,30 +368,88 @@ namespace futures::detail { return std::future_status::ready; } - /// Wait for the shared state to become ready + /// Wait for the operation state to become ready + template + std::future_status + wait_for( + std::chrono::duration const &timeout_duration) const { + if constexpr (is_always_deferred) { + detail::throw_exception(); + } + // check status + status expected = status::initial; + status_.compare_exchange_strong(expected, status::waiting); + // if not ready + if (expected != status::ready) { + // lock and wait + auto lk = create_wait_lock(); + if (waiter_.wait_for(lk, timeout_duration, [this]() { + return is_ready(); + })) { + return std::future_status::ready; + } else { + return std::future_status::timeout; + } + } + return std::future_status::ready; + } + + /// Wait for the operation state to become ready /** * This function uses the condition variable waiters - * to wait for this shared state to be marked as ready until a - * specified time point. This overload is meant to be used by - * derived classes that might need to use another mutex for this - * operation + * to wait for this operation state to be marked as ready until a + * specified time point. * * @tparam Clock The clock type * @tparam Duration The duration type + * * @param timeout_time maximum time point to block until - * @return The state of the shared value + * + * @return The state of the operation value */ template std::future_status wait_until( std::chrono::time_point const &timeout_time) { + // check status status expected = status::initial; - compare_exchange_status(expected, status::waiting); + status_.compare_exchange_strong(expected, status::waiting); + // if not ready if (expected != status::ready) { + // wait for parent task wait_for_parent(); + // if not launched yet if (expected == status::initial) { + // launch task this->post_deferred(); } + // lock and wait + auto lk = create_wait_lock(); + if (waiter_.wait_until(lk, timeout_time, [this]() { + return is_ready(); + })) { + return std::future_status::ready; + } else { + return std::future_status::timeout; + } + } + return std::future_status::ready; + } + + /// Wait for the operation state to become ready + template + std::future_status + wait_until( + std::chrono::time_point const &timeout_time) const { + if constexpr (is_always_deferred) { + detail::throw_exception(); + } + // check status + status expected = status::initial; + status_.compare_exchange_strong(expected, status::waiting); + // if not ready + if (expected != status::ready) { + // lock and wait auto lk = create_wait_lock(); if (waiter_.wait_until(lk, timeout_time, [this]() { return is_ready(); @@ -301,7 +462,16 @@ namespace futures::detail { return std::future_status::ready; } - /// Include a condition variable in the list of waiters + /** + * @} + */ + + /** + * @name Synchronization + * @{ + */ + + /// Include an external condition variable in the list of waiters /** * These are external waiters we need to notify when the state is * ready. @@ -312,12 +482,17 @@ namespace futures::detail { */ notify_when_ready_handle notify_when_ready(std::condition_variable_any &cv) { + // check status status expected = status::initial; - compare_exchange_status(expected, status::waiting); + status_.compare_exchange_strong(expected, status::waiting); + // wait for parent task + wait_for_parent(); + // if task is not launched if (expected == status::initial) { + // launch this->post_deferred(); } - wait_for_parent(); + // insert waiter in the list of external waiters return external_waiters_.insert(external_waiters_.end(), &cv); } @@ -328,38 +503,51 @@ namespace futures::detail { void unnotify_when_ready(notify_when_ready_handle it) { auto lk = create_wait_lock(); + // erase from external waiters we should notify external_waiters_.erase(it); } /// Post a deferred function virtual void post_deferred() { + // do nothing by default / assume tasks are eager // override in deferred futures only } /// Wait for parent operation virtual void wait_for_parent() { + // do nothing by default / assume tasks are eager // override in deferred futures only } - /// Get a reference to the mutex in the shared state + /// Get a reference to the mutex in the operation state std::mutex & waiters_mutex() { return waiters_mutex_; } - /// Generate unique lock for the shared state - /// - /// This lock can be used for any operations on the state that might - /// need to be protected/ + /// Generate unique lock for the operation state + /** + * This lock can be used for any operations on the state that might + * need to be protected. + */ std::unique_lock create_wait_lock() const { return std::unique_lock{ waiters_mutex_ }; } + /** + * @} + */ + private: - /// The current status of this shared state + /** + * @name Member values + * @{ + */ + + /// The current status of this operation state enum status : uint8_t { /// Nothing happened yet @@ -370,52 +558,22 @@ namespace futures::detail { ready, }; - /// Indicates if the shared state is already set + /// Indicates if the operation state is already set /** - * There are three states possible: initial, waiting, ready - */ - mutable std:: - conditional_t> - status_{ status::initial }; - - /// Check the status of the operation state - bool - is_status(status s) const { - if constexpr (!is_always_deferred) { - return status_.load(std::memory_order_acquire) == s; - } else { - return status_ == s; - } - } - - /// Compare status and exchange if it matches the previous value - /** - * This function: - * 1) replaces the expected status variable with the old status - * 2) replaces status with new_value if old value matches expected value - * 3) return true if the value was changed + * There are three states possible: initial, waiting, ready. + * + * Unlike std::future and related types, only valid transitions are + * allowed and no mutexes are involved unless the user requires this + * to block the calling thread. * - * @param expected Expected status and reference to store the old status - * @param new_value New status value - * @return True if the status has changed + * We don't need atomic operations for changing the status when the + * future is known to be always deferred. */ - bool - compare_exchange_status(status &expected, status new_value) { - if constexpr (!is_always_deferred) { - return status_.compare_exchange_strong(expected, new_value); - } else { - bool match_expected = status_ == expected; - expected = status_; - if (match_expected) { - status_ = new_value; - return true; - } else { - return false; - } - } - } + mutable detail::maybe_atomic status_{ + status::initial + }; - /// Pointer to an exception, when the shared_state fails + /// Pointer to an exception, when the operation_state fails /** * std::exception_ptr does not need to be atomic because * the status variable is guarding it. @@ -424,8 +582,8 @@ namespace futures::detail { /// Condition variable to notify any object waiting for this state /** - * This is the object we use to be able to block until the shared - * state is ready. Although the shared state is lock-free, users + * This is the object we use to be able to block until the operation + * state is ready. Although the operation state is lock-free, users * might still need to wait for results to be ready. One future * thread calls `waiter_.wait(...)` to block while the promise * thread calls `waiter_.notify_all(...)`. In C++20, atomic @@ -453,26 +611,30 @@ namespace futures::detail { * These functions should be used directly by users very often. */ mutable std::mutex waiters_mutex_{}; + + /** + * @} + */ }; /// Operation state with its concrete storage /** - * This class stores the data for a shared state holding an element of + * This class stores the data for a operation state holding an element of * type `R`, which might be a concrete type, a reference, or `void`. * * For most types, the data is stored as uninitialized storage because * this will only need to be initialized when the state becomes ready. - * This ensures the shared state works for all types and avoids + * This ensures the operation state works for all types and avoids * wasting operations on a constructor we might not use. * * However, initialized storage is used for trivial types because * this involves no performance penalty. This also makes states * easier to debug. * - * If the shared state returns a reference, the data is stored + * If the operation state returns a reference, the data is stored * internally as a pointer. * - * A void shared state needs to synchronize waiting, but it does not + * A void operation state needs to synchronize waiting, but it does not * need to store anything. * * The storage also uses empty base optimization, which means it @@ -483,8 +645,6 @@ namespace futures::detail { class operation_state // base state : public operation_state_base - // enable shared pointers - , public std::enable_shared_from_this> // storage for the results , private operation_state_storage // storage for the executor @@ -505,6 +665,11 @@ namespace futures::detail { !Options::is_shared, "The underlying operation state cannot be shared"); + /** + * @name Private types + * @{ + */ + /// Operation state base type (without storage for the value) using operation_state_base_type = operation_state_base< Options::is_always_deferred>; @@ -536,23 +701,26 @@ namespace futures::detail { stop_token, detail::empty_value_type>; + /** + * @} + */ + public: + /** + * @name Constructors + */ + /// Destructor /** - * We might need to destroy the shared object R if the state is - * ready with a value. + * This might destroy the operation state object R if the state is + * ready with a value. This logic is encapsulated into the + * operation state storage. */ ~operation_state() override { - if constexpr (Options::is_continuable) { - // The state might have been destroyed by the executor - // by now. Use another shared reference to the - // continuations. - this->get_continuations_source().request_run(); - } - if (this->succeeded()) { - operation_state_storage::destroy(); + if constexpr (Options::is_stoppable) { + get_stop_source().request_stop(); } - } + }; /// Constructor /** @@ -561,34 +729,49 @@ namespace futures::detail { * This is often invalid because we cannot let it create an empty * executor type. Nonetheless, this constructor is still useful for * allocating pointers. - * */ operation_state() = default; - /// Constructor for state with reference to executor - /** - * The executor allows us to emplace continuations on the - * same executor by default. - */ - explicit operation_state(const executor_type &ex) : executor_base(ex) {} - - /// Deleted copy constructor - operation_state(operation_state const &) = delete; + /// Copy constructor + operation_state(operation_state const &) = default; /// Move constructor operation_state(operation_state &&) noexcept = default; - /// Deleted copy assignment operator + /// Copy assignment operator operation_state & operator=(operation_state const &) - = delete; + = default; /// Move assignment operator operation_state & - operator=(operation_state &&) - = default; + operator=(operation_state &&) noexcept = default; + + /// Constructor for state with reference to executor + /** + * The executor allows us to emplace continuations on the + * same executor by default. + * + */ + explicit operation_state(const executor_type &ex) : executor_base(ex) {} + + /** + * @} + */ - /// Set the value of the shared state + /** + * @name Accessors + * @{ + */ + + /// Set the value of the operation state + /** + * This function will directly construct the value with the specified + * arguments. + * + * @tparam Args Argument types + * @param args Arguments + */ template void set_value(Args &&...args) { @@ -600,6 +783,19 @@ namespace futures::detail { } /// Set value with a callable and an argument list + /** + * Instead of directly setting the value, we can use this function + * to use a callable that will later set the value. + * + * This simplifies an important pattern where we call a task to + * set the operation state instead of replicating it in all launching + * functions, such as @ref async and @ref schedule. + * + * @tparam Fn Function type + * @tparam Args Function argument types + * @param fn Function object + * @param args Function arguments + */ template void apply(Fn &&fn, Args &&...args) { @@ -642,6 +838,19 @@ namespace futures::detail { } /// Set value with a callable and a tuple or arguments + /** + * This function is a version of apply that accepts a tuple of + * arguments instead of a variadic list of arguments. + * + * This is useful in functions that schedule deferred futures. + * In this case, the function arguments need to be stored with the + * callable object. + * + * @tparam Fn Function type + * @tparam Tuple Tuple type + * @param fn Function object + * @param targs Function arguments as tuple + */ template void apply_tuple(Fn &&fn, Tuple &&targs) { @@ -652,12 +861,13 @@ namespace futures::detail { std::tuple_size_v>>{}); } - /// Get the value of the shared state + /// Get the value of the operation state /** - * This function waits for the shared state to become ready and + * This function waits for the operation state to become ready and * returns its value. * - * This function returns `R&` unless this is a shared state to + * This function returns `R&` unless this is a operation state to + * `void`, in which case `std::add_lvalue_reference_t` is also * `void`. * * @return Reference to the state as a reference to R @@ -671,47 +881,136 @@ namespace futures::detail { return operation_state_storage::get(); } + /** + * @} + */ + + /** + * @name Observers + * + * Observe some trait-dependant properties of the operation state. + * + * @{ + */ + /// Get executor associated with the operation + /** + * The executor associated with the operation state is required to + * enable continuations on the default executor. Because these are + * replicas of the executor, they need to be lightweight handles to + * the execution context. + * + * @note This function only participates in overload resolution if the + * operation state traits contain an executor type. + * + * @return Constant reference to the executor. + * + * @note + */ const executor_type & get_executor() const noexcept { static_assert(Options::has_executor); return executor_base::get(); } - /// Create stop token associated with the operation - stop_token - get_stop_token() const noexcept { - static_assert(Options::is_stoppable); - return stop_source_base::get().get_token(); + /// Get reference to continuations source + /** + * The continuation source stores all functions that should + * be executed when the operation state has executed its main + * task. + * + * This operation state understands the main task to be complete when we + * either set its value or the object gets destroyed. + * + * @note This function only participates in overload resolution if the + * operation state traits is continuable. + * + * @return Reference to continuations source + */ + typename continuations_base::value_type & + get_continuations_source() noexcept { + static_assert(Options::is_continuable); + return continuations_base::get(); } - /// Create stop token associated with the operation - stop_source & + /// Get constant reference to continuations source + /** + * The continuation source stores all functions that should + * be executed when the operation state has executed its main + * task. + * + * This operation state understands the main task to be complete when we + * either set its value or the object gets destroyed. + * + * @note This function only participates in overload resolution if the + * operation state traits is continuable. + * + * @return Reference to continuations source + */ + typename continuations_base::value_type & + get_continuations_source() const noexcept { + static_assert(Options::is_continuable); + return continuations_base::get(); + } + + /// Get reference to operation stop source + /** + * The stop source is a shared object associated with the operation + * state to indicate whether its main task should stop. This stop + * source can be shared with other tasks and be user to create a + * stop token, a read-only view of the stop state a task can use to + * determine if it should stop. + * + * @note This function only participates in overload resolution if the + * operation state traits is stoppable. + * + * @return A constant reference to the stop source + */ + const stop_source & get_stop_source() const noexcept { static_assert(Options::is_stoppable); return stop_source_base::get(); } - /// Get operation stop source + /// Get reference to operation stop source + /** + * The stop source is a shared object associated with the operation + * state to indicate whether its main task should stop. This stop + * source can be shared with other tasks and be user to create a + * stop token, a read-only view of the stop state a task can use to + * determine if it should stop. + * + * @note This function only participates in overload resolution if the + * operation state traits is stoppable. + * + * @return Reference to the stop source + */ stop_source & get_stop_source() noexcept { static_assert(Options::is_stoppable); return stop_source_base::get(); } - /// Get continuations source - typename continuations_base::value_type & - get_continuations_source() const noexcept { - static_assert(Options::is_continuable); - return continuations_base::get(); + /// Create stop token associated with the operation state + /** + * A stop token is a read-only view created from the operation stop + * source. This token can be used by tasks to identify whether they + * should stop. + * + * @note This function only participates in overload resolution if the + * operation state traits is stoppable. + * + * @return + */ + [[nodiscard]] stop_token + get_stop_token() const noexcept { + static_assert(Options::is_stoppable); + return stop_source_base::get().get_token(); } - /// Get continuations source - typename continuations_base::value_type & - get_continuations_source() noexcept { - static_assert(Options::is_continuable); - return continuations_base::get(); - } + /** + * @} + */ private: template @@ -726,7 +1025,9 @@ namespace futures::detail { /// A functor that binds function arguments for deferred futures /** * This function binds the function arguments to a function, generating - * a named functor we can use in a deferred shared state. + * a named functor we can use in a deferred shared state. When the function + * has arguments, this means we can only store this callable in the deferred + * shared state instead of storing the function and its arguments directly. * * @tparam Fn Function type * @tparam Args Arguments @@ -769,32 +1070,60 @@ namespace futures::detail { , private maybe_empty { private: - /// Deferred function storage + /** + * @name Private types + */ + + /// Storage for the deferred function + /** + * Deferred operation states need to store their tasks because + * it's not being launched immediatly. + */ using deferred_function_base = maybe_empty; + using deferred_function_type = typename deferred_function_base:: value_type; public: - /// Destructor /** - * We might need to destroy the shared object R if the state is - * ready with a value. + * @name Constructors + * @{ */ + + /// Destructor ~deferred_operation_state() override = default; /// Constructor - /** - * This function will construct the state with storage for R. - */ deferred_operation_state() = default; + /// Copy Constructor + deferred_operation_state(const deferred_operation_state &) = default; + + /// Move Constructor + deferred_operation_state( + deferred_operation_state &&) noexcept = default; + + /// Copy Assignment + deferred_operation_state & + operator=(const deferred_operation_state &) + = default; + + /// Move Assignment + deferred_operation_state & + operator=(deferred_operation_state &&) noexcept = default; + /// Constructor from the deferred function /** - * The function accepts other function and args types so - * (i) we can forward the variables, and (ii) allow compatible - * types to be used here. Most of the time, these will be the - * same types as Fn and Args. + * Although the function type will be the same as + * deferred_function_type most of the time, any function convertible + * deferred_function_type is also accepted. + * + * This enables us to optionally create special deferred future types + * that accept std::function or other types that erase callables. + * In turn, these types can be used in vectors of deferred futures, + * which are often necessary. + * */ template explicit deferred_operation_state( @@ -811,6 +1140,19 @@ namespace futures::detail { * types to be used here. Most of the time, these will be the * same types as Fn and Args. */ + + /// Constructor from the deferred function and its arguments + /** + * Although the function type will be the same as + * deferred_function_type most of the time, any function convertible + * deferred_function_type is also accepted so that deferred futures + * can also erase their function types. + * + * The arguments here will also be stored with the function in a + * bind_deferred_state_args struct, turning this into a callable + * with no arguments. + * + */ template explicit deferred_operation_state( const typename operation_state::executor_type &ex, @@ -821,36 +1163,33 @@ namespace futures::detail { std::forward(f), std::forward(args)...)) {} - /// Deleted copy constructor /** - * The copy constructor does not make sense for shared states as - * they are meant to be shared in all of our use cases with - * promises and futures. - * + * @} */ - deferred_operation_state(deferred_operation_state const &) = delete; - /// Move constructor - deferred_operation_state( - deferred_operation_state &&) noexcept = default; - - /// Deleted copy assignment operator /** - * These functions do not make sense for shared states as they are - * meant to be shared. + * @name Shared state functions + * @{ */ - deferred_operation_state & - operator=(deferred_operation_state const &) - = delete; - /// Move assignment operator + /// Get the current value from this operation state /** - * These functions do not make sense for shared states as they are - * meant to be shared. + * We explicitly define that to use the overload from + * `operation_state` because empty base functions + * also have their overloads of `get`. */ - deferred_operation_state & - operator=(deferred_operation_state &&) noexcept = default; + using operation_state::get; + /// Post the deferred task to the executor + /** + * When we wait for an operation state for the first time, the base + * class operation_state_base is responsible for calling post_deferred, + * which posts any deferred task to the executor. + * + * This is the only overload of operation_state_base that implements + * this function. If the state has no executor, the function is + * dispatched inline from the thread waiting for it. + */ void post_deferred() override { // if this is a continuation, wait for tasks that come before @@ -859,11 +1198,6 @@ namespace futures::detail { asio::post( operation_state::get_executor(), [this]() { - // use shared_from_this to ensure the state lives while - // the deferred function is executed. Some futures might - // wait() until the shared state value is set but - // destroy the state before the continuations and - // related tasks are executed. this->apply( std::move(this->deferred_function_base::get())); }); @@ -875,6 +1209,17 @@ namespace futures::detail { /// Wait for the parent operation state to be set /** + * When we wait for an operation state for the first time, the base + * class operation_state_base cannot call `post_deferred` before waiting + * for the parent task to finish. + * + * This function can be used to ensure this happens by checking if the + * task it encapsulates is `is_unwrap_and_continue_task`. This + * `is_unwrap_and_continue_task` class is the class we always use + * to represent a callable with the logic for continuations. Since + * we always use this class, it also indicates this is a deferred + * continuation and there's a parent operation we should wait for. + * * Deferred states that refer to continuations need to wait for the * parent task to finish before invoking the continuation task. * @@ -888,9 +1233,7 @@ namespace futures::detail { } } - /// Get the current value from this operation state - using operation_state::get; - + /// Swap two deferred operation states void swap(deferred_operation_state &other) { std::swap( @@ -900,8 +1243,48 @@ namespace futures::detail { static_cast(*this), static_cast(other)); } + + /** + * @} + */ + }; + + /// Check if type is an operation state + template + struct is_operation_state : std::false_type + {}; + + template + struct is_operation_state> : std::true_type + {}; + + template + struct is_operation_state> + : std::true_type + {}; + + template + constexpr bool is_operation_state_v = is_operation_state::value; + + template + struct operation_state_options + {}; + + template + struct operation_state_options> + { + using type = Options; }; + template + struct operation_state_options> + { + using type = Options; + }; + + template + using operation_state_options_t = typename operation_state_options::type; + /** @} */ // @addtogroup futures Futures } // namespace futures::detail diff --git a/include/futures/futures/detail/operation_state_storage.hpp b/include/futures/futures/detail/operation_state_storage.hpp index ae3685f74..c593c2ff2 100644 --- a/include/futures/futures/detail/operation_state_storage.hpp +++ b/include/futures/futures/detail/operation_state_storage.hpp @@ -9,38 +9,54 @@ #define FUTURES_FUTURES_DETAIL_OPERATION_STATE_STORAGE_HPP #include +#include #include namespace futures::detail { - /// Determine the type we should use to store a shared state - /// internally - /// - /// We usually need uninitialized storage for a given type, since the - /// shared state needs to be in control of constructors and destructors. - /// - /// For trivial types, we can directly store the value. - /// - /// When the shared state is a reference, we store pointers internally. - /// + /// Determine the type we should use to store a shared state internally + /** + * We usually need uninitialized storage for a given type, since the + * shared state needs to be in control of constructors and destructors. + * + * For trivial types, we can directly store the value. + * + * When the shared state is a reference, we store pointers internally. + */ template - struct operation_state_storage + class operation_state_storage {}; + /// Operation state storage for void + /** + * When the operation state value type is void, we don't need to store + * anything. The function are simply ignored. + * + * @tparam R Operation state value type + */ template - struct operation_state_storage>> + class operation_state_storage>> { + public: + operation_state_storage() = default; + template void set_value(Args&&...) {} void get() {} - - void destroy() {} }; + /// Operation state storage for references + /** + * When the operation state value type is a reference, we internally + * store a pointer. Whenever we access the value, the pointer is + * dereferenced into a reference again. + * + * @tparam R Operation state value type + */ template - struct operation_state_storage< + class operation_state_storage< R, std::enable_if_t< // clang-format off @@ -49,6 +65,13 @@ namespace futures::detail { // clang-format on >> { + public: + operation_state_storage() = default; + + explicit operation_state_storage(R& value) { + set_value(value); + } + void set_value(R& value) { value_ = std::addressof(value); @@ -60,13 +83,20 @@ namespace futures::detail { return *value_; } - void destroy() {} - + private: std::decay_t* value_{ nullptr }; }; + /// Operation state storage for trivial types + /** + * When the operation state value type is trivial, we store the value + * directly. This means we can directly visualize the current value of + * the operation state at any time, even when not explicitly initialized. + * + * @tparam R Operation state value type + */ template - struct operation_state_storage< + class operation_state_storage< R, std::enable_if_t< // clang-format off @@ -76,6 +106,17 @@ namespace futures::detail { // clang-format on >> { + public: + operation_state_storage() = default; + + explicit operation_state_storage(const R& value) { + set_value(value); + } + + explicit operation_state_storage(R&& value) { + set_value(std::move(value)); + } + void set_value(const R& value) { value_ = value; @@ -86,13 +127,22 @@ namespace futures::detail { return value_; } - void destroy() {} - + private: R value_; }; + /// Operation state storage for non-trivial types + /** + * When the operation state value type is not trivial, we store the value + * with inline aligned store. This acts like a std::optional that allows us + * to keep an uninitialized value while the promise has not been set yet. + * However, whether the optional has a value is controlled by the parent + * operation state. + * + * @tparam R Operation state value type + */ template - struct operation_state_storage< + class operation_state_storage< R, std::enable_if_t< // clang-format off @@ -102,23 +152,68 @@ namespace futures::detail { // clang-format on >> { + public: + ~operation_state_storage() { + if (has_value_) { + get().~R(); + } + } + + operation_state_storage() = default; + + operation_state_storage(const operation_state_storage& other) { + set_value(other.get()); + } + + operation_state_storage(operation_state_storage&& other) noexcept { + set_value(std::move(other.get())); + } + + template + explicit operation_state_storage(Args&&... args) { + set_value(std::forward(args)...); + } + + operation_state_storage& + operator=(const operation_state_storage& other) { + set_value(other.get()); + } + + operation_state_storage& + operator=(operation_state_storage&& other) noexcept { + set_value(other.get()); + } + template void set_value(Args&&... args) { - ::new (static_cast(std::addressof(value_))) + if (has_value_) { + destroy(); + } + ::new (static_cast(value_.data())) R(std::forward(args)...); + has_value_ = true; } R& get() { - return *reinterpret_cast(std::addressof(value_)); + if (has_value_) { + return *reinterpret_cast(value_.data()); + } + detail::throw_exception(); } - void destroy() { - reinterpret_cast(std::addressof(value_))->~R(); + private: + void + destroy() { + if (has_value_) { + get().~R(); + has_value_ = false; + } } - std::aligned_storage_t value_{}; + detail::aligned_storage_for value_{}; + bool has_value_{false}; }; } // namespace futures::detail diff --git a/include/futures/futures/detail/traits/append_future_option.hpp b/include/futures/futures/detail/traits/append_future_option.hpp index c474b7221..283dca92f 100644 --- a/include/futures/futures/detail/traits/append_future_option.hpp +++ b/include/futures/futures/detail/traits/append_future_option.hpp @@ -21,7 +21,7 @@ namespace futures::detail { struct append_future_option< Opt, future_options_list, - std::enable_if_t>> + std::enable_if_t>> { using type = future_options_list; }; diff --git a/include/futures/futures/future_error.hpp b/include/futures/futures/future_error.hpp index da88ac20f..af68faec8 100644 --- a/include/futures/futures/future_error.hpp +++ b/include/futures/futures/future_error.hpp @@ -56,7 +56,9 @@ namespace futures { /// Promise has already been fulfilled promise_already_satisfied = 3, /// There is no shared state we can access - no_state = 4 + no_state = 4, + /// Invalid operation on deferred future + future_deferred = 5 }; // fwd-declare @@ -98,6 +100,11 @@ namespace futures { static_cast(future_errc::no_state), future_category() }; + case future_errc::future_deferred: + return std::error_condition{ + static_cast(future_errc::future_deferred), + future_category() + }; default: return std::error_condition{ ev, *this }; } @@ -136,6 +143,10 @@ namespace futures { "Operation not permitted on an object without " "an associated state." }; + case future_errc::future_deferred: + return std::string{ + "Operation not permitted on a deferred future." + }; } return std::string{ "unspecified future_errc value\n" }; } @@ -215,6 +226,13 @@ namespace futures { : future_error{ make_error_code(future_errc::no_state) } {} }; + class future_deferred : public future_error + { + public: + future_deferred() + : future_error{ make_error_code(future_errc::future_deferred) } {} + }; + /** @} */ /** @} */ } // namespace futures diff --git a/include/futures/futures/is_ready.hpp b/include/futures/futures/is_ready.hpp index 9b8db83c3..d48d53567 100644 --- a/include/futures/futures/is_ready.hpp +++ b/include/futures/futures/is_ready.hpp @@ -43,6 +43,6 @@ namespace futures { == std::future_status::ready; } } -} +} // namespace futures #endif // FUTURES_FUTURES_IS_READY_HPP