Skip to content

Commit

Permalink
Minor cleanup of future_data
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Jun 17, 2024
1 parent 5898e7e commit 782a901
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 38 deletions.
50 changes: 28 additions & 22 deletions libs/core/futures/include/hpx/futures/detail/future_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ namespace hpx::lcos::detail {
template <typename Result>
struct future_data_base;

// make sure continuation invocation does not recurse deeper than allowed
HPX_CORE_EXPORT void handle_on_completed(
future_data_refcnt_base::completed_callback_type&& on_completed);
HPX_CORE_EXPORT void handle_on_completed(
future_data_refcnt_base::completed_callback_vector_type&& on_completed);

template <>
struct HPX_CORE_EXPORT future_data_base<traits::detail::future_data_void>
: future_data_refcnt_base
Expand Down Expand Up @@ -305,11 +311,6 @@ namespace hpx::lcos::detail {
static void run_on_completed(
completed_callback_vector_type&& on_completed) noexcept;

// make sure continuation invocation does not recurse deeper than
// allowed
template <typename Callback>
static void handle_on_completed(Callback&& on_completed);

// Set the callback which needs to be invoked when the future becomes
// ready. If the future is ready the function will be invoked
// immediately.
Expand Down Expand Up @@ -361,14 +362,13 @@ namespace hpx::lcos::detail {
public:
using result_type = future_data_result_t<Result>;
using base_type = future_data_base<traits::detail::future_data_void>;
using init_no_addref = typename base_type::init_no_addref;
using completed_callback_type =
typename base_type::completed_callback_type;
using init_no_addref = base_type::init_no_addref;
using completed_callback_type = base_type::completed_callback_type;
using completed_callback_vector_type =
typename base_type::completed_callback_vector_type;
base_type::completed_callback_vector_type;

protected:
using mutex_type = typename base_type::mutex_type;
using mutex_type = base_type::mutex_type;

public:
// Variable 'hpx::lcos::detail::future_data_base<void>::storage_' is
Expand Down Expand Up @@ -501,12 +501,15 @@ namespace hpx::lcos::detail {
#endif

// Note: we use notify_one repeatedly instead of notify_all as we
// know: a) that most of the time we have at most one thread
// waiting on the future (most futures are not shared), and
// b) our implementation of condition_variable::notify_one
// relinquishes the lock before resuming the waiting thread
// that avoids suspension of this thread when it tries to
// re-lock the mutex while exiting from condition_variable::wait
// know:
//
// a. that most of the time we have at most one thread
// waiting on the future (most futures are not shared), and
// b. our implementation of condition_variable::notify_one
// relinquishes the lock before resuming the waiting thread
// that avoids suspension of this thread when it tries to
// re-lock the mutex while exiting from
// condition_variable::wait
while (
cond_.notify_one(HPX_MOVE(l), threads::thread_priority::boost))
{
Expand Down Expand Up @@ -580,12 +583,15 @@ namespace hpx::lcos::detail {
#endif

// Note: we use notify_one repeatedly instead of notify_all as we
// know: a) that most of the time we have at most one thread
// waiting on the future (most futures are not shared), and
// b) our implementation of condition_variable::notify_one
// relinquishes the lock before resuming the waiting thread
// that avoids suspension of this thread when it tries to
// re-lock the mutex while exiting from condition_variable::wait
// know:
//
// a. that most of the time we have at most one thread
// waiting on the future (most futures are not shared), and
// b. our implementation of condition_variable::notify_one
// relinquishes the lock before resuming the waiting thread
// that avoids suspension of this thread when it tries to
// re-lock the mutex while exiting from
// condition_variable::wait
while (
cond_.notify_one(HPX_MOVE(l), threads::thread_priority::boost))
{
Expand Down
33 changes: 19 additions & 14 deletions libs/core/futures/src/future_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ namespace hpx::lcos::detail {
LTM_(debug).format(
"task_object::~task_object({}), description({}): "
"destroy runs_as_child thread",
thrd, thrd->get_description(), thrd->get_thread_phase());
thrd, thrd->get_description());

runs_child_ = threads::invalid_thread_id;
}
Expand Down Expand Up @@ -253,9 +253,7 @@ namespace hpx::lcos::detail {

// make sure continuation invocation does not recurse deeper than allowed
template <typename Callback>
void
future_data_base<traits::detail::future_data_void>::handle_on_completed(
Callback&& on_completed)
void handle_on_completed_impl(Callback&& on_completed)
{
// We need to run the completion on a new thread if we are on a non HPX
// thread.
Expand All @@ -273,10 +271,14 @@ namespace hpx::lcos::detail {
#endif
}

using future_data_base =
future_data_base<traits::detail::future_data_void>;

if (!is_hpx_thread || !recurse_asynchronously)
{
// directly execute continuation on this thread
run_on_completed(HPX_FORWARD(Callback, on_completed));
future_data_base::run_on_completed(
HPX_FORWARD(Callback, on_completed));
}
else
{
Expand Down Expand Up @@ -306,14 +308,17 @@ namespace hpx::lcos::detail {
}
}

// We need only one explicit instantiation here as the second version
// (single callback) is implicitly instantiated below.
using completed_callback_vector_type =
future_data_refcnt_base::completed_callback_vector_type;
void handle_on_completed(
future_data_refcnt_base::completed_callback_type&& on_completed)
{
handle_on_completed_impl(HPX_MOVE(on_completed));
}

template HPX_CORE_EXPORT void
future_data_base<traits::detail::future_data_void>::handle_on_completed<
completed_callback_vector_type>(completed_callback_vector_type&&);
void handle_on_completed(
future_data_refcnt_base::completed_callback_vector_type&& on_completed)
{
handle_on_completed_impl(HPX_MOVE(on_completed));
}

// Set the callback which needs to be invoked when the future becomes ready.
// If the future is ready the function will be invoked immediately.
Expand All @@ -326,7 +331,7 @@ namespace hpx::lcos::detail {
if (is_ready(std::memory_order_relaxed))
{
// invoke the callback (continuation) function right away
handle_on_completed(HPX_MOVE(data_sink));
handle_on_completed_impl(HPX_MOVE(data_sink));
}
else
{
Expand All @@ -338,7 +343,7 @@ namespace hpx::lcos::detail {
l.unlock();

// invoke the callback (continuation) function
handle_on_completed(HPX_MOVE(data_sink));
handle_on_completed_impl(HPX_MOVE(data_sink));
}
else
{
Expand Down
7 changes: 5 additions & 2 deletions libs/full/components/include/hpx/components/client_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,15 @@ template <>
struct HPX_EXPORT hpx::lcos::detail::future_data<hpx::id_type>
: future_data_base<id_type>
{
HPX_NON_COPYABLE(future_data);

using init_no_addref = future_data_base<hpx::id_type>::init_no_addref;

future_data() = default;

future_data(future_data const&) = delete;
future_data(future_data&&) = delete;
future_data& operator=(future_data const&) = delete;
future_data& operator=(future_data&&) = delete;

explicit future_data(init_no_addref no_addref)
: future_data_base(no_addref)
{
Expand Down

0 comments on commit 782a901

Please sign in to comment.