Skip to content

Commit

Permalink
Add workers_leave::automatic, let hybric systems take advantage of pa…
Browse files Browse the repository at this point in the history
…rallel_block

Signed-off-by: Isaev, Ilya <[email protected]>
  • Loading branch information
isaevil committed Nov 26, 2024
1 parent 4486959 commit a7483ee
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 24 deletions.
11 changes: 6 additions & 5 deletions include/oneapi/tbb/task_arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class task_arena_base {

#if __TBB_PREVIEW_PARALLEL_BLOCK
enum class workers_leave : int {
automatic = 0,
fast = 1,
delayed = 2
};
Expand Down Expand Up @@ -293,7 +294,7 @@ class task_arena : public task_arena_base {
task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_BLOCK
, workers_leave wl = workers_leave::delayed
, workers_leave wl = workers_leave::automatic
#endif
)
: task_arena_base(max_concurrency_, reserved_for_masters, a_priority
Expand All @@ -308,7 +309,7 @@ class task_arena : public task_arena_base {
task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_BLOCK
, workers_leave wl = workers_leave::delayed
, workers_leave wl = workers_leave::automatic
#endif
)
: task_arena_base(constraints_, reserved_for_masters, a_priority
Expand Down Expand Up @@ -350,7 +351,7 @@ class task_arena : public task_arena_base {
explicit task_arena( attach )
: task_arena_base(automatic, 1, priority::normal
#if __TBB_PREVIEW_PARALLEL_BLOCK
, workers_leave::delayed
, workers_leave::automatic
#endif
) // use default settings if attach fails
{
Expand All @@ -373,7 +374,7 @@ class task_arena : public task_arena_base {
void initialize(int max_concurrency_, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_BLOCK
, workers_leave wl = workers_leave::delayed
, workers_leave wl = workers_leave::automatic
#endif
)
{
Expand All @@ -394,7 +395,7 @@ class task_arena : public task_arena_base {
void initialize(constraints constraints_, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_BLOCK
, workers_leave wl = workers_leave::delayed
, workers_leave wl = workers_leave::automatic
#endif
)
{
Expand Down
28 changes: 16 additions & 12 deletions src/tbb/arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,19 @@ class atomic_flag {

#if __TBB_PREVIEW_PARALLEL_BLOCK
class thread_leave_manager {
static const std::uintptr_t FAST_LEAVE = 1;
static const std::uintptr_t ONE_TIME_FAST_LEAVE = 1 << 1;
static const std::uintptr_t DELAYED_LEAVE = 1 << 2;
static const std::uintptr_t PARALLEL_BLOCK = 1 << 3;
static const std::uintptr_t PARALLEL_BLOCK_MASK = ~((1LLU << 32) - 1) & ~(0x7);
static const std::uint64_t FAST_LEAVE = 1;
static const std::uint64_t ONE_TIME_FAST_LEAVE = 1 << 1;
static const std::uint64_t DELAYED_LEAVE = 1 << 2;
static const std::uint64_t PARALLEL_BLOCK = 1 << 3;
static const std::uint64_t PARALLEL_BLOCK_MASK = ~((1LLU << 32) - 1) & ~(0x7);

std::atomic<std::uintptr_t> my_state{0};
std::atomic<std::uint64_t> my_state{0};
public:
void set_initial_state(tbb::task_arena::workers_leave wl) {
if (wl == tbb::task_arena::workers_leave::automatic) {
wl = governor::hybrid_cpu() ? tbb::task_arena::workers_leave::fast
: tbb::task_arena::workers_leave::delayed;
}
if (wl == tbb::task_arena::workers_leave::delayed) {
my_state.store(DELAYED_LEAVE, std::memory_order_relaxed);
} else {
Expand All @@ -198,7 +202,7 @@ class thread_leave_manager {
}

void restore_state_if_needed() {
std::uintptr_t curr = ONE_TIME_FAST_LEAVE;
std::uint64_t curr = ONE_TIME_FAST_LEAVE;
if (my_state.load(std::memory_order_relaxed) == curr) {
// Potentially can override desicion of the parallel block from future epoch
// but it is not a problem because it does not violate the correctness
Expand All @@ -209,8 +213,8 @@ class thread_leave_manager {
void register_parallel_block() {
__TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set");

std::uintptr_t prev = my_state.load(std::memory_order_relaxed);
std::uintptr_t desired{};
std::uint64_t prev = my_state.load(std::memory_order_relaxed);
std::uint64_t desired{};
do {
if (prev & PARALLEL_BLOCK_MASK) {
desired = PARALLEL_BLOCK + prev;
Expand All @@ -225,8 +229,8 @@ class thread_leave_manager {
void unregister_parallel_block(bool enable_fast_leave) {
__TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set");

std::uintptr_t prev = my_state.load(std::memory_order_relaxed);
std::uintptr_t desired{};
std::uint64_t prev = my_state.load(std::memory_order_relaxed);
std::uint64_t desired{};
do {
if (((prev - PARALLEL_BLOCK) & PARALLEL_BLOCK_MASK) != 0) {
desired = prev - PARALLEL_BLOCK;
Expand All @@ -239,7 +243,7 @@ class thread_leave_manager {
bool should_leave() {
__TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set");

std::uintptr_t curr = my_state.load(std::memory_order_relaxed);
std::uint64_t curr = my_state.load(std::memory_order_relaxed);
return curr == FAST_LEAVE || curr == ONE_TIME_FAST_LEAVE;
}
};
Expand Down
2 changes: 2 additions & 0 deletions src/tbb/def/lin32-tbb.def
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ _ZN3tbb6detail2r17enqueueERNS0_2d14taskEPNS2_15task_arena_baseE;
_ZN3tbb6detail2r17enqueueERNS0_2d14taskERNS2_18task_group_contextEPNS2_15task_arena_baseE;
_ZN3tbb6detail2r14waitERNS0_2d115task_arena_baseE;
_ZN3tbb6detail2r114execution_slotERKNS0_2d115task_arena_baseE;
_ZN3tbb6detail2r123register_parallel_blockERNS0_2d115task_arena_baseE;
_ZN3tbb6detail2r125unregister_parallel_blockERNS0_2d115task_arena_baseEb;

/* System topology parsing and threads pinning (governor.cpp) */
_ZN3tbb6detail2r115numa_node_countEv;
Expand Down
11 changes: 7 additions & 4 deletions src/tbb/waiters.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ class outermost_worker_waiter : public waiter_base {
__TBB_ASSERT(t == nullptr, nullptr);

if (is_worker_should_leave(slot)) {
if (!governor::hybrid_cpu()
if (
#if __TBB_PREVIEW_PARALLEL_BLOCK
&& !my_arena.my_thread_leave.should_leave()
!my_arena.my_thread_leave.should_leave()
#else
!governor::hybrid_cpu()
#endif
)
{
Expand All @@ -75,10 +77,11 @@ class outermost_worker_waiter : public waiter_base {
return true;
}

if (my_arena.my_threading_control->is_any_other_client_active()
if (
#if __TBB_PREVIEW_PARALLEL_BLOCK
|| my_arena.my_thread_leave.should_leave()
my_arena.my_thread_leave.should_leave() ||
#endif
my_arena.my_threading_control->is_any_other_client_active()
)
{
break;
Expand Down
6 changes: 3 additions & 3 deletions test/tbb/test_task_arena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2137,7 +2137,7 @@ TEST_CASE("Check that workers leave faster with workers_leave::fast") {
std::sort(avg_start_time_fast.begin(), avg_start_time_fast.end());
auto delayed_avg = std::accumulate(avg_start_time_delayed.begin(), avg_start_time_delayed.begin() + num_trials * 0.9, 0) / num_trials;
auto fast_avg = std::accumulate(avg_start_time_fast.begin(), avg_start_time_fast.begin() + num_trials * 0.9, 0) / num_trials;
REQUIRE(delayed_avg < fast_avg);
WARN_MESSAGE(delayed_avg < fast_avg, "Expected workers start new work faster with delayed leave");
}

TEST_CASE("parallel_block retains workers in task_arena") {
Expand Down Expand Up @@ -2166,7 +2166,7 @@ TEST_CASE("parallel_block retains workers in task_arena") {
std::sort(avg_start_time_fast.begin(), avg_start_time_fast.end());
auto delayed_avg = std::accumulate(avg_start_time_delayed.begin(), avg_start_time_delayed.begin() + num_trials * 0.9, 0) / num_trials;
auto fast_avg = std::accumulate(avg_start_time_fast.begin(), avg_start_time_fast.begin() + num_trials * 0.9, 0) / num_trials;
REQUIRE(delayed_avg < fast_avg);
WARN_MESSAGE(delayed_avg < fast_avg, "Expected workers start new work faster when using parallel_block");
}

TEST_CASE("Test one-time fast leave") {
Expand Down Expand Up @@ -2195,7 +2195,7 @@ TEST_CASE("Test one-time fast leave") {
std::sort(avg_start_time_fast.begin(), avg_start_time_fast.end());
auto delayed_avg = std::accumulate(avg_start_time_delayed.begin(), avg_start_time_delayed.begin() + num_trials * 0.9, 0) / num_trials;
auto fast_avg = std::accumulate(avg_start_time_fast.begin(), avg_start_time_fast.begin() + num_trials * 0.9, 0) / num_trials;
REQUIRE(delayed_avg < fast_avg);
WARN_MESSAGE(delayed_avg < fast_avg, "Expected one-time fast leave setting to slow workers to start new work");
}

#endif

0 comments on commit a7483ee

Please sign in to comment.