Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Equalize arena slots #1436

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 43 additions & 36 deletions src/tbb/arena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,34 +149,31 @@ void arena::on_thread_leaving(unsigned ref_param) {
}
}

std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
if ( lower >= upper ) return out_of_arena;
std::size_t arena::occupy_free_slot(thread_data& tls) {
// Start search for an empty slot from the one we occupied the last time
std::size_t index = tls.my_arena_index;
if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
__TBB_ASSERT( index >= lower && index < upper, nullptr);
std::size_t locked_slot = out_of_arena;
if ( index >= my_num_slots ) index = tls.my_random.get() % my_num_slots;
// Find a free slot
for ( std::size_t i = index; i < upper; ++i )
if (my_slots[i].try_occupy()) return i;
for ( std::size_t i = lower; i < index; ++i )
if (my_slots[i].try_occupy()) return i;
return out_of_arena;
}

template <bool as_worker>
std::size_t arena::occupy_free_slot(thread_data& tls) {
// Firstly, external threads try to occupy reserved slots
std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots );
if ( index == out_of_arena ) {
// Secondly, all threads try to occupy all non-reserved slots
index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
// Likely this arena is already saturated
if ( index == out_of_arena )
return out_of_arena;
for (std::size_t i = index; i < my_num_slots; ++i) {
if (my_slots[i].try_occupy()) {
locked_slot = i;
break;
}
}
if (locked_slot == out_of_arena) {
for (std::size_t i = 0; i < index; ++i) {
if (my_slots[i].try_occupy()) {
locked_slot = i;
break;
}
}
}

atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
return index;
if (locked_slot != out_of_arena) {
atomic_update( my_limit, (unsigned)(locked_slot + 1), std::less<unsigned>() );
}
return locked_slot;
}

std::uintptr_t arena::calculate_stealing_threshold() {
Expand All @@ -189,16 +186,16 @@ void arena::process(thread_data& tls) {
__TBB_ASSERT( is_alive(my_guard), nullptr);
__TBB_ASSERT( my_num_slots >= 1, nullptr);

std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
std::size_t index = occupy_free_slot(tls);
if (index == out_of_arena) {
on_thread_leaving(ref_worker);
return;
}

my_tc_client.get_pm_client()->register_thread();

__TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
tls.attach_arena(*this, index);
// remember that we occupied a slot from workers' quota
tls.attach_arena(*this, index, /*is_worker_slot*/ true);
// worker thread enters the dispatch loop to look for a work
tls.my_inbox.set_is_idle(true);
if (tls.my_arena_slot->is_task_pool_published()) {
Expand Down Expand Up @@ -396,7 +393,13 @@ bool arena::is_top_priority() const {

bool arena::try_join() {
if (is_joinable()) {
my_references += arena::ref_worker;
// check quota for number of workers in arena
unsigned curr = my_references.fetch_add(arena::ref_worker) + arena::ref_worker;
unsigned workers = curr >> arena::ref_external_bits;
if (workers > my_num_slots - my_num_reserved_slots) {
my_references -= arena::ref_worker;
return false;
}
return true;
}
return false;
Expand Down Expand Up @@ -626,24 +629,25 @@ void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_a

class nested_arena_context : no_copy {
public:
nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index, bool is_worker_slot)
: m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
{
if (td.my_arena != &nested_arena) {
m_orig_arena = td.my_arena;
m_orig_slot_index = td.my_arena_index;
m_orig_is_worker_slot = td.my_is_workers_slot_occupied;
m_orig_last_observer = td.my_last_observer;

td.detach_task_dispatcher();
td.attach_arena(nested_arena, slot_index);
td.attach_arena(nested_arena, slot_index, is_worker_slot);
if (td.my_inbox.is_idle_state(true))
td.my_inbox.set_is_idle(false);
task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold);

// If the calling thread occupies the slots out of external thread reserve we need to notify the
// market that this arena requires one worker less.
if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
if (td.my_is_workers_slot_occupied) {
td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ -1);
}

Expand Down Expand Up @@ -679,15 +683,15 @@ class nested_arena_context : no_copy {

// Notify the market that this thread releasing a one slot
// that can be used by a worker thread.
if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
if (td.my_is_workers_slot_occupied) {
td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ 1);
}

td.leave_task_dispatcher();
td.my_arena_slot->release();
td.my_arena->my_exit_monitors.notify_one(); // do not relax!

td.attach_arena(*m_orig_arena, m_orig_slot_index);
td.attach_arena(*m_orig_arena, m_orig_slot_index, m_orig_is_worker_slot);
td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
__TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
}
Expand All @@ -700,6 +704,7 @@ class nested_arena_context : no_copy {
observer_proxy* m_orig_last_observer{ nullptr };
task_dispatcher* m_task_dispatcher{ nullptr };
unsigned m_orig_slot_index{};
bool m_orig_is_worker_slot{};
bool m_orig_fifo_tasks_allowed{};
bool m_orig_critical_task_allowed{};
};
Expand Down Expand Up @@ -757,8 +762,10 @@ void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {

bool same_arena = td->my_arena == a;
std::size_t index1 = td->my_arena_index;
bool is_worker_slot = td->my_is_workers_slot_occupied;
if (!same_arena) {
index1 = a->occupy_free_slot</*as_worker */false>(*td);
index1 = a->occupy_free_slot(*td);
is_worker_slot = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we unset the flag?
I did not get the idea: how we distinguish if external thread occupied the workers slot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, here can be a problem place. Logic behind is_worker_slot = false; is that the calling thread got index1 slot as a master, so not consuming from quota of workers slots. Is it the invalid reasoning?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there are more external threads than reserved slots then these external threads will take slots from workers quota.
Am I missing something?

if (index1 == arena::out_of_arena) {
concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
d1::wait_context wo(1);
Expand All @@ -774,10 +781,10 @@ void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
a->my_exit_monitors.cancel_wait(waiter);
break;
}
index2 = a->occupy_free_slot</*as_worker*/false>(*td);
index2 = a->occupy_free_slot(*td);
if (index2 != arena::out_of_arena) {
a->my_exit_monitors.cancel_wait(waiter);
nested_arena_context scope(*td, *a, index2 );
nested_arena_context scope(*td, *a, index2, /*is_worker_slot=*/false);
r1::wait(wo, exec_context);
__TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred
break;
Expand All @@ -802,7 +809,7 @@ void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {

context_guard_helper</*report_tasks=*/false> context_guard;
context_guard.set_ctx(a->my_default_ctx);
nested_arena_context scope(*td, *a, index1);
nested_arena_context scope(*td, *a, index1, is_worker_slot);
#if _WIN64
try {
#endif
Expand Down
3 changes: 0 additions & 3 deletions src/tbb/arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,7 @@ class arena: public padded<arena_base>

static const std::size_t out_of_arena = ~size_t(0);
//! Tries to occupy a slot in the arena. On success, returns the slot index; if no slot is available, returns out_of_arena.
template <bool as_worker>
std::size_t occupy_free_slot(thread_data&);
//! Tries to occupy a slot in the specified range.
std::size_t occupy_free_slot_in_range(thread_data& tls, std::size_t lower, std::size_t upper);

std::uintptr_t calculate_stealing_threshold();

Expand Down
2 changes: 1 addition & 1 deletion src/tbb/governor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void governor::init_external_thread() {
arena& a = arena::create(thr_control, num_slots, num_reserved_slots, arena_priority_level);
// External thread always occupies the first slot
thread_data& td = *new(cache_aligned_allocate(sizeof(thread_data))) thread_data(0, false);
td.attach_arena(a, /*slot index*/ 0);
td.attach_arena(a, /*slot index*/ 0, /*is_worker_slot*/ false);
__TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);

stack_size = a.my_threading_control->worker_stack_size();
Expand Down
11 changes: 8 additions & 3 deletions src/tbb/thread_data.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2020-2023 Intel Corporation
Copyright (c) 2020-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -101,6 +101,7 @@ class thread_data : public ::rml::job
thread_data(unsigned short index, bool is_worker)
: my_arena_index{ index }
, my_is_worker{ is_worker }
, my_is_workers_slot_occupied{ false }
, my_task_dispatcher{ nullptr }
, my_arena{ nullptr }
, my_last_client{ nullptr }
Expand Down Expand Up @@ -131,7 +132,7 @@ class thread_data : public ::rml::job
#endif /* __TBB_RESUMABLE_TASKS */
}

void attach_arena(arena& a, std::size_t index);
void attach_arena(arena& a, std::size_t index, bool is_worker_slot);
bool is_attached_to(arena*);
void attach_task_dispatcher(task_dispatcher&);
void detach_task_dispatcher();
Expand All @@ -145,6 +146,9 @@ class thread_data : public ::rml::job
//! Indicates if the thread is created by RML
const bool my_is_worker;

//! Is the slot occupied in arena belongs to workers' quota?
bool my_is_workers_slot_occupied;

//! The current task dipsatcher
task_dispatcher* my_task_dispatcher;

Expand Down Expand Up @@ -202,9 +206,10 @@ class thread_data : public ::rml::job
d1::task_group_context my_default_context;
};

inline void thread_data::attach_arena(arena& a, std::size_t index) {
inline void thread_data::attach_arena(arena& a, std::size_t index, bool is_worker_slot) {
my_arena = &a;
my_arena_index = static_cast<unsigned short>(index);
my_is_workers_slot_occupied = is_worker_slot;
my_arena_slot = a.my_slots + index;
// Read the current slot mail_outbox and attach it to the mail_inbox (remove inbox later maybe)
my_inbox.attach(my_arena->mailbox(index));
Expand Down
45 changes: 26 additions & 19 deletions test/tbb/test_task_arena.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -111,31 +111,37 @@ class ArenaObserver : public tbb::task_scheduler_observer {
int myId; // unique observer/arena id within a test
int myMaxConcurrency; // concurrency of the associated arena
int myNumReservedSlots; // reserved slots in the associated arena
std::atomic<int> myNumActiveWorkers;
void on_scheduler_entry( bool is_worker ) override {
int current_index = tbb::this_task_arena::current_thread_index();
CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
if (is_worker) {
CHECK(current_index >= myNumReservedSlots);
int currNumActiveWorkers = ++myNumActiveWorkers;
CHECK(currNumActiveWorkers <= myMaxConcurrency - myNumReservedSlots);
}
CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry");
old_id.local() = local_id.local();
CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena");
local_id.local() = myId;
slot_id.local() = current_index;
}
void on_scheduler_exit( bool /*is_worker*/ ) override {
void on_scheduler_exit( bool is_worker ) override {
CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken");
CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
slot_id.local() = -2;
local_id.local() = old_id.local();
old_id.local() = 0;
if (is_worker) {
--myNumActiveWorkers;
}
}
public:
ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id)
: tbb::task_scheduler_observer(a)
, myId(id)
, myMaxConcurrency(maxConcurrency)
, myNumReservedSlots(numReservedSlots) {
, myNumReservedSlots(numReservedSlots)
, myNumActiveWorkers(0) {
CHECK(myId);
observe(true);
}
Expand Down Expand Up @@ -433,12 +439,11 @@ void TestArenaEntryConsistency() {
class TestArenaConcurrencyBody : utils::NoAssign {
tbb::task_arena &my_a;
int my_max_concurrency;
int my_reserved_slots;
utils::SpinBarrier *my_barrier;
utils::SpinBarrier *my_worker_barrier;
public:
TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = nullptr, utils::SpinBarrier *wb = nullptr )
: my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {}
TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, utils::SpinBarrier *b = nullptr, utils::SpinBarrier *wb = nullptr )
: my_a(a), my_max_concurrency(max_concurrency), my_barrier(b), my_worker_barrier(wb) {}
// NativeParallelFor's functor
void operator()( int ) const {
CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" );
Expand All @@ -453,12 +458,8 @@ class TestArenaConcurrencyBody : utils::NoAssign {
int max_arena_concurrency = tbb::this_task_arena::max_concurrency();
REQUIRE( max_arena_concurrency == my_max_concurrency );
if ( my_worker_barrier ) {
if ( local_id.local() == 1 ) {
// External thread in a reserved slot
CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" );
} else {
if ( local_id.local() != 1 ) {
// Worker thread
CHECK( idx >= my_reserved_slots );
my_worker_barrier->wait();
}
} else if ( my_barrier )
Expand All @@ -476,7 +477,7 @@ void TestArenaConcurrency( int p, int reserved = 0, int step = 1) {
ResetTLS();
utils::SpinBarrier b( p );
utils::SpinBarrier wb( p-reserved );
TestArenaConcurrencyBody test( a, p, reserved, &b, &wb );
TestArenaConcurrencyBody test( a, p, &b, &wb );
for ( int i = reserved; i < p; ++i ) // requests p-reserved worker threads
a.enqueue( test );
if ( reserved==1 )
Expand All @@ -487,7 +488,7 @@ void TestArenaConcurrency( int p, int reserved = 0, int step = 1) {
} { // Check if multiple external threads alone can achieve maximum concurrency.
ResetTLS();
utils::SpinBarrier b( p );
utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) );
utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, &b ) );
a.debug_wait_until_empty();
} { // Check oversubscription by external threads.
#if !_WIN32 || !_WIN64
Expand All @@ -500,7 +501,7 @@ void TestArenaConcurrency( int p, int reserved = 0, int step = 1) {
#endif
{
ResetTLS();
utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved));
utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p));
a.debug_wait_until_empty();
}
}
Expand Down Expand Up @@ -1545,20 +1546,26 @@ class simple_observer : public tbb::task_scheduler_observer {
int my_idx;
int myMaxConcurrency; // concurrency of the associated arena
int myNumReservedSlots; // reserved slots in the associated arena
std::atomic<int> myNumActiveWorkers;
void on_scheduler_entry( bool is_worker ) override {
int current_index = tbb::this_task_arena::current_thread_index();
CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
if (is_worker) {
CHECK(current_index >= myNumReservedSlots);
int currNumActiveWorkers = ++myNumActiveWorkers;
CHECK(currNumActiveWorkers <= myMaxConcurrency - myNumReservedSlots);
}
}
void on_scheduler_exit( bool is_worker ) override {
if (is_worker) {
--myNumActiveWorkers;
}
}
void on_scheduler_exit( bool /*is_worker*/ ) override
{}
public:
simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots)
: tbb::task_scheduler_observer(a), my_idx(idx_counter++)
, myMaxConcurrency(maxConcurrency)
, myNumReservedSlots(numReservedSlots) {
, myNumReservedSlots(numReservedSlots)
, myNumActiveWorkers(0) {
observe(true);
}

Expand Down
Loading