Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor cleanup of future_data #6512

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 28 additions & 22 deletions libs/core/futures/include/hpx/futures/detail/future_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ namespace hpx::lcos::detail {
template <typename Result>
struct future_data_base;

// make sure continuation invocation does not recurse deeper than allowed
HPX_CORE_EXPORT void handle_on_completed(
future_data_refcnt_base::completed_callback_type&& on_completed);
HPX_CORE_EXPORT void handle_on_completed(
future_data_refcnt_base::completed_callback_vector_type&& on_completed);

template <>
struct HPX_CORE_EXPORT future_data_base<traits::detail::future_data_void>
: future_data_refcnt_base
Expand Down Expand Up @@ -305,11 +311,6 @@ namespace hpx::lcos::detail {
static void run_on_completed(
completed_callback_vector_type&& on_completed) noexcept;

// make sure continuation invocation does not recurse deeper than
// allowed
template <typename Callback>
static void handle_on_completed(Callback&& on_completed);

// Set the callback which needs to be invoked when the future becomes
// ready. If the future is ready the function will be invoked
// immediately.
Expand Down Expand Up @@ -361,14 +362,13 @@ namespace hpx::lcos::detail {
public:
using result_type = future_data_result_t<Result>;
using base_type = future_data_base<traits::detail::future_data_void>;
using init_no_addref = typename base_type::init_no_addref;
using completed_callback_type =
typename base_type::completed_callback_type;
using init_no_addref = base_type::init_no_addref;
using completed_callback_type = base_type::completed_callback_type;
using completed_callback_vector_type =
typename base_type::completed_callback_vector_type;
base_type::completed_callback_vector_type;

protected:
using mutex_type = typename base_type::mutex_type;
using mutex_type = base_type::mutex_type;

public:
// Variable 'hpx::lcos::detail::future_data_base<void>::storage_' is
Expand Down Expand Up @@ -501,12 +501,15 @@ namespace hpx::lcos::detail {
#endif

// Note: we use notify_one repeatedly instead of notify_all as we
// know: a) that most of the time we have at most one thread
// waiting on the future (most futures are not shared), and
// b) our implementation of condition_variable::notify_one
// relinquishes the lock before resuming the waiting thread
// that avoids suspension of this thread when it tries to
// re-lock the mutex while exiting from condition_variable::wait
// know:
//
// a. that most of the time we have at most one thread
// waiting on the future (most futures are not shared), and
// b. our implementation of condition_variable::notify_one
// relinquishes the lock before resuming the waiting thread
// that avoids suspension of this thread when it tries to
// re-lock the mutex while exiting from
// condition_variable::wait
while (
cond_.notify_one(HPX_MOVE(l), threads::thread_priority::boost))
{
Expand Down Expand Up @@ -580,12 +583,15 @@ namespace hpx::lcos::detail {
#endif

// Note: we use notify_one repeatedly instead of notify_all as we
// know: a) that most of the time we have at most one thread
// waiting on the future (most futures are not shared), and
// b) our implementation of condition_variable::notify_one
// relinquishes the lock before resuming the waiting thread
// that avoids suspension of this thread when it tries to
// re-lock the mutex while exiting from condition_variable::wait
// know:
//
// a. that most of the time we have at most one thread
// waiting on the future (most futures are not shared), and
// b. our implementation of condition_variable::notify_one
// relinquishes the lock before resuming the waiting thread
// that avoids suspension of this thread when it tries to
// re-lock the mutex while exiting from
// condition_variable::wait
while (
cond_.notify_one(HPX_MOVE(l), threads::thread_priority::boost))
{
Expand Down
33 changes: 19 additions & 14 deletions libs/core/futures/src/future_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ namespace hpx::lcos::detail {
LTM_(debug).format(
"task_object::~task_object({}), description({}): "
"destroy runs_as_child thread",
thrd, thrd->get_description(), thrd->get_thread_phase());
thrd, thrd->get_description());

runs_child_ = threads::invalid_thread_id;
}
Expand Down Expand Up @@ -252,9 +252,7 @@ namespace hpx::lcos::detail {

// make sure continuation invocation does not recurse deeper than allowed
template <typename Callback>
void
future_data_base<traits::detail::future_data_void>::handle_on_completed(
Callback&& on_completed)
void handle_on_completed_impl(Callback&& on_completed)
{
// We need to run the completion on a new thread if we are on a non HPX
// thread.
Expand All @@ -272,10 +270,14 @@ namespace hpx::lcos::detail {
#endif
}

using future_data_base =
future_data_base<traits::detail::future_data_void>;

if (!is_hpx_thread || !recurse_asynchronously)
{
// directly execute continuation on this thread
run_on_completed(HPX_FORWARD(Callback, on_completed));
future_data_base::run_on_completed(
HPX_FORWARD(Callback, on_completed));
}
else
{
Expand Down Expand Up @@ -305,14 +307,17 @@ namespace hpx::lcos::detail {
}
}

// We need only one explicit instantiation here as the second version
// (single callback) is implicitly instantiated below.
using completed_callback_vector_type =
future_data_refcnt_base::completed_callback_vector_type;
void handle_on_completed(
future_data_refcnt_base::completed_callback_type&& on_completed)
{
handle_on_completed_impl(HPX_MOVE(on_completed));
}

template HPX_CORE_EXPORT void
future_data_base<traits::detail::future_data_void>::handle_on_completed<
completed_callback_vector_type>(completed_callback_vector_type&&);
void handle_on_completed(
future_data_refcnt_base::completed_callback_vector_type&& on_completed)
{
handle_on_completed_impl(HPX_MOVE(on_completed));
}

// Set the callback which needs to be invoked when the future becomes ready.
// If the future is ready the function will be invoked immediately.
Expand All @@ -326,7 +331,7 @@ namespace hpx::lcos::detail {
if (is_ready(std::memory_order_relaxed))
{
// invoke the callback (continuation) function right away
handle_on_completed(HPX_MOVE(data_sink));
handle_on_completed_impl(HPX_MOVE(data_sink));
}
else
{
Expand All @@ -336,7 +341,7 @@ namespace hpx::lcos::detail {
l.unlock();

// invoke the callback (continuation) function
handle_on_completed(HPX_MOVE(data_sink));
handle_on_completed_impl(HPX_MOVE(data_sink));
}
else
{
Expand Down
7 changes: 5 additions & 2 deletions libs/full/components/include/hpx/components/client_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,15 @@ template <>
struct HPX_EXPORT hpx::lcos::detail::future_data<hpx::id_type>
: future_data_base<id_type>
{
HPX_NON_COPYABLE(future_data);

using init_no_addref = future_data_base<hpx::id_type>::init_no_addref;

future_data() = default;

future_data(future_data const&) = delete;
future_data(future_data&&) = delete;
future_data& operator=(future_data const&) = delete;
future_data& operator=(future_data&&) = delete;

explicit future_data(init_no_addref no_addref)
: future_data_base(no_addref)
{
Expand Down
Loading