Skip to content

Commit

Permalink
Always return outermost thread id
Browse files Browse the repository at this point in the history
-flyby: deprecate get_outer_self_id
  • Loading branch information
hkaiser committed Jan 12, 2024
1 parent 7b35448 commit 3eff942
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 115 deletions.
11 changes: 5 additions & 6 deletions components/iostreams/src/server/output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace hpx::iostreams::detail {
ar << valid;
if (valid)
{
ar& data_;
ar & data_;
}
}

Expand All @@ -44,7 +44,7 @@ namespace hpx::iostreams::detail {
ar >> valid;
if (valid)
{
ar& data_;
ar & data_;
}
}
} // namespace hpx::iostreams::detail
Expand Down Expand Up @@ -89,10 +89,9 @@ namespace hpx::iostreams::server {
{ // {{{
// Perform the IO in another OS thread.
detail::buffer in(buf_in);
hpx::get_thread_pool("io_pool")->get_io_service().post(
hpx::bind_front(&output_stream::call_write_sync, this, locality_id,
count, std::ref(in),
threads::thread_id_ref_type(threads::get_outer_self_id())));
hpx::get_thread_pool("io_pool")->get_io_service().post(hpx::bind_front(
&output_stream::call_write_sync, this, locality_id, count,
std::ref(in), threads::thread_id_ref_type(threads::get_self_id())));

// Sleep until the worker thread wakes us up.
this_thread::suspend(threads::thread_schedule_state::suspended,
Expand Down
63 changes: 31 additions & 32 deletions libs/core/threading/include/hpx/threading/thread.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2022 Hartmut Kaiser
// Copyright (c) 2007-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -41,7 +41,7 @@ namespace hpx {
thread_termination_handler_type f);

/// The class thread represents a single thread of execution. Threads allow
/// multiple functions to execute concurrently. hreads begin execution
/// multiple functions to execute concurrently. Threads begin execution
/// immediately upon construction of the associated thread object (pending
/// any OS scheduling delays), starting at the top-level function provided
/// as a constructor argument. The return value of the top-level function is
Expand All @@ -60,8 +60,6 @@ namespace hpx {
{
using mutex_type = hpx::spinlock;

void terminate(char const* function, char const* reason) const;

public:
class id;
using native_handle_type = threads::thread_id_type;
Expand All @@ -73,7 +71,7 @@ namespace hpx {
std::enable_if_t<!std::is_same_v<std::decay_t<F>, thread>>>
explicit thread(F&& f)
{
auto thrd_data = threads::get_self_id_data();
auto const thrd_data = threads::get_self_id_data();
HPX_ASSERT(thrd_data);
start_thread(thrd_data->get_scheduler_base()->get_parent_pool(),
util::deferred_call(HPX_FORWARD(F, f)));
Expand All @@ -82,7 +80,7 @@ namespace hpx {
template <typename F, typename... Ts>
explicit thread(F&& f, Ts&&... vs)
{
auto thrd_data = threads::get_self_id_data();
auto const thrd_data = threads::get_self_id_data();
HPX_ASSERT(thrd_data);
start_thread(thrd_data->get_scheduler_base()->get_parent_pool(),
util::deferred_call(HPX_FORWARD(F, f), HPX_FORWARD(Ts, vs)...));
Expand Down Expand Up @@ -143,15 +141,15 @@ namespace hpx {
[[nodiscard]] static unsigned int hardware_concurrency() noexcept;

// extensions
void interrupt(bool flag = true);
void interrupt(bool flag = true) const;
bool interruption_requested() const;

static void interrupt(id, bool flag = true);
static void interrupt(id const&, bool flag = true);

hpx::future<void> get_future(error_code& ec = throws);
hpx::future<void> get_future(error_code& ec = throws) const;

std::size_t get_thread_data() const;
std::size_t set_thread_data(std::size_t);
std::size_t set_thread_data(std::size_t) const;

#if defined(HPX_HAVE_LIBCDS)
std::size_t get_libcds_data() const;
Expand Down Expand Up @@ -295,7 +293,7 @@ namespace hpx {
/// (and if there are no other threads at the same priority,
/// yield has no effect).
HPX_CORE_EXPORT void yield() noexcept;
HPX_CORE_EXPORT void yield_to(thread::id) noexcept;
HPX_CORE_EXPORT void yield_to(thread::id const&) noexcept;

// extensions
HPX_CORE_EXPORT threads::thread_priority get_priority() noexcept;
Expand Down Expand Up @@ -350,45 +348,46 @@ namespace hpx {

class HPX_CORE_EXPORT disable_interruption
{
private:
disable_interruption(disable_interruption const&);
disable_interruption& operator=(disable_interruption const&);
public:
disable_interruption(disable_interruption&&) = delete;
disable_interruption& operator=(disable_interruption&&) = delete;
disable_interruption(disable_interruption const&) = delete;
disable_interruption& operator=(
disable_interruption const&) = delete;

bool interruption_was_enabled_;
friend class restore_interruption;

public:
disable_interruption();
~disable_interruption();
};

class HPX_CORE_EXPORT restore_interruption
{
private:
restore_interruption(restore_interruption const&);
restore_interruption& operator=(restore_interruption const&);
public:
restore_interruption(restore_interruption&&) = delete;
restore_interruption& operator=(restore_interruption&&) = delete;
restore_interruption(restore_interruption const&) = delete;
restore_interruption& operator=(
restore_interruption const&) = delete;

bool interruption_was_enabled_;

public:
explicit restore_interruption(disable_interruption& d);
explicit restore_interruption(disable_interruption const& d);
~restore_interruption();
};
} // namespace this_thread
} // namespace hpx

namespace std {

// specialize std::hash for hpx::thread::id
template <>
struct hash<::hpx::thread::id>
// specialize std::hash for hpx::thread::id
template <>
struct std::hash<::hpx::thread::id>
{
std::size_t operator()(::hpx::thread::id const& id) const noexcept
{
std::size_t operator()(::hpx::thread::id const& id) const
{
std::hash<::hpx::threads::thread_id_ref_type> hasher_;
return hasher_(id.native_handle());
}
};
} // namespace std
std::hash<::hpx::threads::thread_id_ref_type> const hasher_;
return hasher_(id.native_handle());
}
}; // namespace std

#include <hpx/config/warnings_suffix.hpp>
80 changes: 40 additions & 40 deletions libs/core/threading/src/thread.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2022 Hartmut Kaiser
// Copyright (c) 2007-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -22,7 +22,6 @@

#include <cstddef>
#include <exception>
#include <functional>
#include <mutex>
#include <utility>

Expand All @@ -32,14 +31,14 @@

namespace hpx {

namespace detail {
namespace {

static thread_termination_handler_type thread_termination_handler;
thread_termination_handler_type thread_termination_handler;
}

void set_thread_termination_handler(thread_termination_handler_type f)
{
detail::thread_termination_handler = HPX_MOVE(f);
thread_termination_handler = HPX_MOVE(f);
}

thread::thread() noexcept
Expand Down Expand Up @@ -74,7 +73,7 @@ namespace hpx {
{
if (joinable())
{
if (detail::thread_termination_handler)
if (thread_termination_handler)
{
try
{
Expand All @@ -83,8 +82,7 @@ namespace hpx {
}
catch (...)
{
detail::thread_termination_handler(
std::current_exception());
thread_termination_handler(std::current_exception());
}
}
else
Expand All @@ -103,17 +101,20 @@ namespace hpx {
std::swap(id_, rhs.id_);
}

static void run_thread_exit_callbacks()
{
threads::thread_id_type id = threads::get_self_id();
if (id == threads::invalid_thread_id)
namespace {

void run_thread_exit_callbacks()
{
HPX_THROW_EXCEPTION(hpx::error::null_thread_id,
"run_thread_exit_callbacks", "null thread id encountered");
threads::thread_id_type const id = threads::get_self_id();
if (id == threads::invalid_thread_id)
{
HPX_THROW_EXCEPTION(hpx::error::null_thread_id,
"run_thread_exit_callbacks", "null thread id encountered");
}
threads::run_thread_exit_callbacks(id);
threads::free_thread_exit_callbacks(id);
}
threads::run_thread_exit_callbacks(id);
threads::free_thread_exit_callbacks(id);
}
} // namespace

threads::thread_result_type thread::thread_function_nullary(
hpx::move_only_function<void()> const& func)
Expand Down Expand Up @@ -146,9 +147,8 @@ namespace hpx {
// run all callbacks attached to the exit event for this thread
run_thread_exit_callbacks();

return threads::thread_result_type(
threads::thread_schedule_state::terminated,
threads::invalid_thread_id);
return {threads::thread_schedule_state::terminated,
threads::invalid_thread_id};
}

thread::id thread::get_id() const noexcept
Expand Down Expand Up @@ -182,15 +182,17 @@ namespace hpx {
{
HPX_THROW_EXCEPTION(hpx::error::thread_resource_error,
"thread::start_thread", "Could not create thread");
return;
}
}

static void resume_thread(threads::thread_id_ref_type const& id)
{
threads::set_thread_state(
id.noref(), threads::thread_schedule_state::pending);
}
namespace {

void resume_thread(threads::thread_id_ref_type const& id)
{
threads::set_thread_state(
id.noref(), threads::thread_schedule_state::pending);
}
} // namespace

void thread::join()
{
Expand All @@ -210,7 +212,6 @@ namespace hpx {
l.unlock();
HPX_THROW_EXCEPTION(hpx::error::thread_resource_error,
"thread::join", "hpx::thread: trying joining itself");
return;
}
this_thread::interruption_point();

Expand All @@ -228,7 +229,7 @@ namespace hpx {
}

// extensions
void thread::interrupt(bool flag)
void thread::interrupt(bool flag) const
{
threads::interrupt_thread(native_handle(), flag);
}
Expand All @@ -238,7 +239,7 @@ namespace hpx {
return threads::get_thread_interruption_requested(native_handle());
}

void thread::interrupt(thread::id id, bool flag)
void thread::interrupt(thread::id const& id, bool flag)
{
threads::interrupt_thread(id.id_, flag);
}
Expand All @@ -247,7 +248,7 @@ namespace hpx {
{
return threads::get_thread_data(native_handle());
}
std::size_t thread::set_thread_data(std::size_t data)
std::size_t thread::set_thread_data(std::size_t data) const
{
return threads::set_thread_data(native_handle(), data);
}
Expand Down Expand Up @@ -297,7 +298,7 @@ namespace hpx {
using base_type::mtx_;

public:
thread_task_base(threads::thread_id_ref_type const& id)
explicit thread_task_base(threads::thread_id_ref_type const& id)
{
if (threads::add_thread_exit_callback(id.noref(),
hpx::bind_front(&thread_task_base::thread_exit_function,
Expand Down Expand Up @@ -345,13 +346,13 @@ namespace hpx {
};
} // namespace detail

hpx::future<void> thread::get_future(error_code& ec)
hpx::future<void> thread::get_future(error_code& ec) const
{
if (id_ == threads::invalid_thread_id)
{
HPX_THROWS_IF(ec, hpx::error::null_thread_id, "thread::get_future",
"null thread id encountered");
return hpx::future<void>();
return {};
}

detail::thread_task_base* p = new detail::thread_task_base(id_);
Expand All @@ -361,7 +362,7 @@ namespace hpx {
HPX_THROWS_IF(ec, hpx::error::thread_resource_error,
"thread::get_future",
"Could not create future as thread has been terminated.");
return hpx::future<void>();
return {};
}

using traits::future_access;
Expand All @@ -371,7 +372,7 @@ namespace hpx {
///////////////////////////////////////////////////////////////////////////
namespace this_thread {

void yield_to(thread::id id) noexcept
void yield_to(thread::id const& id) noexcept
{
this_thread::suspend(threads::thread_schedule_state::pending,
id.native_handle(), "this_thread::yield_to");
Expand Down Expand Up @@ -486,16 +487,16 @@ namespace hpx {

disable_interruption::~disable_interruption()
{
threads::thread_self* p = threads::get_self_ptr();
if (p)
if (threads::get_self_ptr() != nullptr)
{
threads::set_thread_interruption_enabled(
threads::get_self_id(), interruption_was_enabled_);
}
}

///////////////////////////////////////////////////////////////////////
restore_interruption::restore_interruption(disable_interruption& d)
restore_interruption::restore_interruption(
disable_interruption const& d)
: interruption_was_enabled_(d.interruption_was_enabled_)
{
if (!interruption_was_enabled_)
Expand All @@ -508,8 +509,7 @@ namespace hpx {

restore_interruption::~restore_interruption()
{
threads::thread_self* p = threads::get_self_ptr();
if (p)
if (threads::get_self_ptr() != nullptr)
{
threads::set_thread_interruption_enabled(
threads::get_self_id(), interruption_was_enabled_);
Expand Down
Loading

0 comments on commit 3eff942

Please sign in to comment.