Skip to content

Commit

Permalink
Always return outermost thread id
Browse files Browse the repository at this point in the history
-flyby: deprecate get_outer_self_id
  • Loading branch information
hkaiser committed Jan 18, 2024
1 parent c0d79c8 commit b086b2e
Show file tree
Hide file tree
Showing 36 changed files with 585 additions and 436 deletions.
11 changes: 5 additions & 6 deletions components/iostreams/src/server/output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace hpx::iostreams::detail {
ar << valid;
if (valid)
{
ar& data_;
ar & data_;
}
}

Expand All @@ -44,7 +44,7 @@ namespace hpx::iostreams::detail {
ar >> valid;
if (valid)
{
ar& data_;
ar & data_;
}
}
} // namespace hpx::iostreams::detail
Expand Down Expand Up @@ -89,10 +89,9 @@ namespace hpx::iostreams::server {
{ // {{{
// Perform the IO in another OS thread.
detail::buffer in(buf_in);
hpx::get_thread_pool("io_pool")->get_io_service().post(
hpx::bind_front(&output_stream::call_write_sync, this, locality_id,
count, std::ref(in),
threads::thread_id_ref_type(threads::get_outer_self_id())));
hpx::get_thread_pool("io_pool")->get_io_service().post(hpx::bind_front(
&output_stream::call_write_sync, this, locality_id, count,
std::ref(in), threads::thread_id_ref_type(threads::get_self_id())));

// Sleep until the worker thread wakes us up.
this_thread::suspend(threads::thread_schedule_state::suspended,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Hartmut Kaiser
// Copyright (c) 2007-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -63,6 +63,9 @@ namespace examples::server {
}
~reset_id()
{
auto const mtx = outer_.mtx_;
std::lock_guard<hpx::mutex> l(*mtx);

[[maybe_unused]] hpx::thread::id const old_value = outer_.id_;
outer_.id_ = hpx::thread::id();
HPX_ASSERT(old_value != hpx::thread::id());
Expand Down Expand Up @@ -104,9 +107,15 @@ namespace examples::server {
});

auto const mtx = mtx_;
std::lock_guard<hpx::mutex> l(*mtx);
HPX_ASSERT(id_ != hpx::thread::id());
hpx::thread::interrupt(id_);

std::unique_lock<hpx::mutex> l(*mtx);
auto const id = id_;

if (id != hpx::thread::id())
{
l.unlock();
hpx::thread::interrupt(id);
}
}

HPX_DEFINE_COMPONENT_ACTION(cancelable_action, do_it, do_it_action)
Expand Down
8 changes: 7 additions & 1 deletion libs/core/coroutines/include/hpx/coroutines/thread_enums.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Hartmut Kaiser
// Copyright (c) 2007-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -425,6 +425,12 @@ namespace hpx::threads {
runs_as_child_mode_bits = static_cast<std::uint8_t>(bits);
}

void schedule_hint(std::int16_t core) noexcept
{
mode = thread_schedule_hint_mode::thread;
hint = core;
}

/// The hint associated with the mode. The interpretation of this hint
/// depends on the given mode.
std::int16_t hint = -1;
Expand Down
35 changes: 24 additions & 11 deletions libs/core/execution/tests/unit/bulk_async.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2015 Daniel Bourgeois
// Copyright (c) 2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -16,16 +17,16 @@
#include <vector>

////////////////////////////////////////////////////////////////////////////////
int bulk_test(
hpx::thread::id tid, int value, bool is_par, int passed_through) //-V813
int bulk_test(hpx::thread::id const& tid, int value, bool is_par,
int passed_through) //-V813
{
HPX_TEST_EQ(is_par, (tid != hpx::this_thread::get_id()));
HPX_TEST_EQ(passed_through, 42);
return value;
}

template <typename Executor>
void test_bulk_sync(Executor& exec)
void test_bulk_sync(Executor&& exec)
{
hpx::thread::id tid = hpx::this_thread::get_id();

Expand All @@ -35,14 +36,15 @@ void test_bulk_sync(Executor& exec)
using hpx::placeholders::_1;
using hpx::placeholders::_2;

std::vector<int> results = hpx::parallel::execution::bulk_sync_execute(
exec, hpx::bind(&bulk_test, tid, _1, false, _2), v, 42);
std::vector<int> results =
hpx::parallel::execution::bulk_sync_execute(HPX_FORWARD(Executor, exec),
hpx::bind(&bulk_test, tid, _1, false, _2), v, 42);

HPX_TEST(std::equal(std::begin(results), std::end(results), std::begin(v)));
}

template <typename Executor>
void test_bulk_async(Executor& exec)
void test_bulk_async(Executor&& exec)
{
hpx::thread::id tid = hpx::this_thread::get_id();

Expand All @@ -54,31 +56,42 @@ void test_bulk_async(Executor& exec)

std::vector<hpx::future<int>> results =
hpx::parallel::execution::bulk_async_execute(
exec, hpx::bind(&bulk_test, tid, _1, true, _2), v, 42);
HPX_FORWARD(Executor, exec),
hpx::bind(&bulk_test, tid, _1, true, _2), v, 42);

HPX_TEST(std::equal(std::begin(results), std::end(results), std::begin(v),
[](hpx::future<int>& lhs, const int& rhs) {
return lhs.get() == rhs;
}));
}

template <typename Executor>
decltype(auto) disable_run_as_child(Executor&& exec)
{
auto hint = hpx::execution::experimental::get_hint(exec);
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);

return hpx::experimental::prefer(hpx::execution::experimental::with_hint,
HPX_FORWARD(Executor, exec), hint);
}

////////////////////////////////////////////////////////////////////////////////
int hpx_main()
{
hpx::execution::sequenced_executor seq_exec;
test_bulk_sync(seq_exec);
test_bulk_sync(disable_run_as_child(seq_exec));

hpx::execution::parallel_executor par_exec;
hpx::execution::parallel_executor par_fork_exec(hpx::launch::fork);
test_bulk_async(par_exec);
test_bulk_async(par_fork_exec);
test_bulk_async(disable_run_as_child(par_exec));
test_bulk_async(disable_run_as_child(par_fork_exec));

return hpx::local::finalize();
}

int main(int argc, char* argv[])
{
// By default this test should run on all available cores
// By default, this test should run on all available cores
std::vector<std::string> const cfg = {"hpx.os_threads=all"};

// Initialize and run HPX
Expand Down
93 changes: 56 additions & 37 deletions libs/core/execution/tests/unit/minimal_async_executor.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2022 Hartmut Kaiser
// Copyright (c) 2007-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -36,7 +36,8 @@ void apply_test(hpx::latch& l, hpx::thread::id& id, int passed_through)
l.count_down(1);
}

void async_bulk_test(int, hpx::thread::id tid, int passed_through) //-V813
void async_bulk_test(
int, hpx::thread::id const& tid, int passed_through) //-V813
{
HPX_TEST_NEQ(tid, hpx::this_thread::get_id());
HPX_TEST_EQ(passed_through, 42);
Expand All @@ -57,22 +58,22 @@ void test_apply(Executor& exec)
}

template <typename Executor>
void test_sync(Executor& exec)
void test_sync(Executor&& exec)
{
HPX_TEST(hpx::parallel::execution::sync_execute(exec, &async_test, 42) !=
hpx::this_thread::get_id());
}

template <typename Executor>
void test_async(Executor& exec)
void test_async(Executor&& exec)
{
HPX_TEST(
hpx::parallel::execution::async_execute(exec, &async_test, 42).get() !=
hpx::this_thread::get_id());
}

template <typename Executor>
void test_bulk_sync(Executor& exec)
void test_bulk_sync(Executor&& exec)
{
hpx::thread::id tid = hpx::this_thread::get_id();

Expand All @@ -89,7 +90,7 @@ void test_bulk_sync(Executor& exec)
}

template <typename Executor>
void test_bulk_async(Executor& exec)
void test_bulk_async(Executor&& exec)
{
hpx::thread::id tid = hpx::this_thread::get_id();

Expand Down Expand Up @@ -153,17 +154,21 @@ struct test_async_executor1
test_async_executor1 const&, F&& f, Ts&&... ts)
{
++count_async;
return hpx::async(
hpx::launch::async, std::forward<F>(f), std::forward<Ts>(ts)...);

auto policy = hpx::launch::async;
auto hint = policy.hint();
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);
policy.set_hint(hint);

return hpx::async(policy, std::forward<F>(f), std::forward<Ts>(ts)...);
}
};

namespace hpx::parallel::execution {
template <>
struct is_two_way_executor<test_async_executor1> : std::true_type
{
};
} // namespace hpx::parallel::execution
template <>
struct hpx::parallel::execution::is_two_way_executor<test_async_executor1>
: std::true_type
{
};

struct test_async_executor2 : test_async_executor1
{
Expand All @@ -174,18 +179,22 @@ struct test_async_executor2 : test_async_executor1
test_async_executor2 const&, F&& f, Ts&&... ts)
{
++count_sync;
return hpx::async(
hpx::launch::async, std::forward<F>(f), std::forward<Ts>(ts)...)

auto policy = hpx::launch::async;
auto hint = policy.hint();
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);
policy.set_hint(hint);

return hpx::async(policy, std::forward<F>(f), std::forward<Ts>(ts)...)
.get();
}
};

namespace hpx::parallel::execution {
template <>
struct is_two_way_executor<test_async_executor2> : std::true_type
{
};
} // namespace hpx::parallel::execution
template <>
struct hpx::parallel::execution::is_two_way_executor<test_async_executor2>
: std::true_type
{
};

struct test_async_executor3 : test_async_executor1
{
Expand All @@ -197,21 +206,26 @@ struct test_async_executor3 : test_async_executor1
test_async_executor3 const&, F f, Shape const& shape, Ts&&... ts)
{
++count_bulk_sync;

auto policy = hpx::launch::async;
auto hint = policy.hint();
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);
policy.set_hint(hint);

std::vector<hpx::future<void>> results;
for (auto const& elem : shape)
{
results.push_back(hpx::async(hpx::launch::async, f, elem, ts...));
results.push_back(hpx::async(policy, f, elem, ts...));
}
hpx::when_all(results).get();
}
};

namespace hpx::parallel::execution {
template <>
struct is_two_way_executor<test_async_executor3> : std::true_type
{
};
} // namespace hpx::parallel::execution
template <>
struct hpx::parallel::execution::is_two_way_executor<test_async_executor3>
: std::true_type
{
};

struct test_async_executor4 : test_async_executor1
{
Expand All @@ -223,10 +237,16 @@ struct test_async_executor4 : test_async_executor1
test_async_executor4 const&, F f, Shape const& shape, Ts&&... ts)
{
++count_bulk_async;

auto policy = hpx::launch::async;
auto hint = policy.hint();
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);
policy.set_hint(hint);

std::vector<hpx::future<void>> results;
for (auto const& elem : shape)
{
results.push_back(hpx::async(hpx::launch::async, f, elem, ts...));
results.push_back(hpx::async(policy, f, elem, ts...));
}
return results;
}
Expand Down Expand Up @@ -257,12 +277,11 @@ struct test_async_executor5 : test_async_executor1
}
};

namespace hpx::parallel::execution {
template <>
struct is_two_way_executor<test_async_executor5> : std::true_type
{
};
} // namespace hpx::parallel::execution
template <>
struct hpx::parallel::execution::is_two_way_executor<test_async_executor5>
: std::true_type
{
};

///////////////////////////////////////////////////////////////////////////////
int hpx_main()
Expand All @@ -278,7 +297,7 @@ int hpx_main()

int main(int argc, char* argv[])
{
// By default this test should run on all available cores
// By default, this test should run on all available cores
std::vector<std::string> const cfg = {"hpx.os_threads=all"};

// Initialize and run HPX
Expand Down
Loading

0 comments on commit b086b2e

Please sign in to comment.