Skip to content

Commit

Permalink
coro::thread_pool high cpu usage when tasks < threads (#265)
Browse files Browse the repository at this point in the history
* coro::thread_pool high cpu usage when tasks < threads

The check for m_size > 0 was keeping threads awake in a spin state until
all tasks completed. This correctl now uses m_queue.size() behind the
lock to correctly only wake up threads on the condition variable when
tasks are waiting to be processed.

* Fix deadlock with task_container and tls::client with the client's
  destructor scheduling a tls cleanup task, the task_container's lock
was being locked twice when the cleanup task was being destroyed.

Closes #262

* Adjust when task_container's user_task is deleted

It is now deleted inline in make_user_task so any destructors that get
invoked that possibly schedule more coroutines do not cause a deadlock

* io_scheduler is now std::enable_shared_from_this
  • Loading branch information
jbaldwin authored Jul 5, 2024
1 parent d4a5515 commit 5697678
Show file tree
Hide file tree
Showing 20 changed files with 419 additions and 317 deletions.
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,12 @@ int main()
// Complete worker tasks faster on a thread pool, using the io_scheduler version so the worker
// tasks can yield for a specific amount of time to mimic difficult work. The pool is only
// setup with a single thread to showcase yield_for().
coro::io_scheduler tp{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}};
auto tp = coro::io_scheduler::make_shared(
coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}});

// This task will wait until the given latch setters have completed.
auto make_latch_task = [](coro::latch& l) -> coro::task<void> {
auto make_latch_task = [](coro::latch& l) -> coro::task<void>
{
// It seems like the dependent worker tasks could be created here, but in that case it would
// be superior to simply do: `co_await coro::when_all(tasks);`
// It is also important to note that the last dependent task will resume the waiting latch
Expand All @@ -381,14 +383,15 @@ int main()

// This task does 'work' and counts down on the latch when completed. The final child task to
// complete will end up resuming the latch task when the latch's count reaches zero.
auto make_worker_task = [](coro::io_scheduler& tp, coro::latch& l, int64_t i) -> coro::task<void> {
auto make_worker_task = [](std::shared_ptr<coro::io_scheduler>& tp, coro::latch& l, int64_t i) -> coro::task<void>
{
// Schedule the worker task onto the thread pool.
co_await tp.schedule();
co_await tp->schedule();
std::cout << "worker task " << i << " is working...\n";
// Do some expensive calculations, yield to mimic work...! Its also important to never use
// std::this_thread::sleep_for() within the context of coroutines, it will block the thread
// and other tasks that are ready to execute will be blocked.
co_await tp.yield_for(std::chrono::milliseconds{i * 20});
co_await tp->yield_for(std::chrono::milliseconds{i * 20});
std::cout << "worker task " << i << " is done, counting down on the latch\n";
l.count_down();
co_return;
Expand Down Expand Up @@ -846,7 +849,7 @@ The example provided here shows an i/o scheduler that spins up a basic `coro::ne

int main()
{
auto scheduler = std::make_shared<coro::io_scheduler>(coro::io_scheduler::options{
auto scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{
// The scheduler will spawn a dedicated event processing thread. This is the default, but
// it is possible to use 'manual' and call 'process_events()' to drive the scheduler yourself.
.thread_strategy = coro::io_scheduler::thread_strategy_t::spawn,
Expand Down Expand Up @@ -1017,7 +1020,7 @@ All tasks that are stored within a `coro::task_container` must have a `void` ret

int main()
{
auto scheduler = std::make_shared<coro::io_scheduler>(
auto scheduler = coro::io_scheduler::make_shared(
coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}});

auto make_server_task = [&]() -> coro::task<void>
Expand Down
2 changes: 1 addition & 1 deletion examples/coro_http_200_ok_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Connection: keep-alive
std::vector<coro::task<void>> workers{};
for (size_t i = 0; i < std::thread::hardware_concurrency(); ++i)
{
auto scheduler = std::make_shared<coro::io_scheduler>(coro::io_scheduler::options{
auto scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{
.execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_inline});

workers.push_back(make_http_200_ok_server(scheduler));
Expand Down
2 changes: 1 addition & 1 deletion examples/coro_io_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

int main()
{
auto scheduler = std::make_shared<coro::io_scheduler>(coro::io_scheduler::options{
auto scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{
// The scheduler will spawn a dedicated event processing thread. This is the default, but
// it is possible to use 'manual' and call 'process_events()' to drive the scheduler yourself.
.thread_strategy = coro::io_scheduler::thread_strategy_t::spawn,
Expand Down
13 changes: 8 additions & 5 deletions examples/coro_latch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ int main()
// Complete worker tasks faster on a thread pool, using the io_scheduler version so the worker
// tasks can yield for a specific amount of time to mimic difficult work. The pool is only
// setup with a single thread to showcase yield_for().
coro::io_scheduler tp{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}};
auto tp = coro::io_scheduler::make_shared(
coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}});

// This task will wait until the given latch setters have completed.
auto make_latch_task = [](coro::latch& l) -> coro::task<void> {
auto make_latch_task = [](coro::latch& l) -> coro::task<void>
{
// It seems like the dependent worker tasks could be created here, but in that case it would
// be superior to simply do: `co_await coro::when_all(tasks);`
// It is also important to note that the last dependent task will resume the waiting latch
Expand All @@ -25,14 +27,15 @@ int main()

// This task does 'work' and counts down on the latch when completed. The final child task to
// complete will end up resuming the latch task when the latch's count reaches zero.
auto make_worker_task = [](coro::io_scheduler& tp, coro::latch& l, int64_t i) -> coro::task<void> {
auto make_worker_task = [](std::shared_ptr<coro::io_scheduler>& tp, coro::latch& l, int64_t i) -> coro::task<void>
{
// Schedule the worker task onto the thread pool.
co_await tp.schedule();
co_await tp->schedule();
std::cout << "worker task " << i << " is working...\n";
// Do some expensive calculations, yield to mimic work...! Its also important to never use
// std::this_thread::sleep_for() within the context of coroutines, it will block the thread
// and other tasks that are ready to execute will be blocked.
co_await tp.yield_for(std::chrono::milliseconds{i * 20});
co_await tp->yield_for(std::chrono::milliseconds{i * 20});
std::cout << "worker task " << i << " is done, counting down on the latch\n";
l.count_down();
co_return;
Expand Down
2 changes: 1 addition & 1 deletion examples/coro_task_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

int main()
{
auto scheduler = std::make_shared<coro::io_scheduler>(
auto scheduler = coro::io_scheduler::make_shared(
coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}});

auto make_server_task = [&]() -> coro::task<void>
Expand Down
2 changes: 1 addition & 1 deletion examples/coro_tcp_echo_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ auto main() -> int
std::vector<coro::task<void>> workers{};
for (size_t i = 0; i < std::thread::hardware_concurrency(); ++i)
{
auto scheduler = std::make_shared<coro::io_scheduler>(coro::io_scheduler::options{
auto scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{
.execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_inline});

workers.push_back(make_tcp_echo_server(scheduler));
Expand Down
5 changes: 4 additions & 1 deletion include/coro/concepts/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ concept executor = requires(type t, std::coroutine_handle<> c)
{
{ t.schedule() } -> coro::concepts::awaiter;
{ t.yield() } -> coro::concepts::awaiter;
{ t.resume(c) } -> std::same_as<void>;
{ t.resume(c) } -> std::same_as<bool>;
{ t.size() } -> std::same_as<std::size_t>;
{ t.empty() } -> std::same_as<bool>;
{ t.shutdown() } -> std::same_as<void>;
};

#ifdef LIBCORO_FEATURE_NETWORKING
Expand Down
38 changes: 33 additions & 5 deletions include/coro/io_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@

namespace coro
{
class io_scheduler
class io_scheduler : public std::enable_shared_from_this<io_scheduler>
{
using timed_events = detail::poll_info::timed_events;

struct private_constructor
{
private_constructor() = default;
};

public:
class schedule_operation;
friend schedule_operation;
Expand Down Expand Up @@ -69,7 +74,18 @@ class io_scheduler
const execution_strategy_t execution_strategy{execution_strategy_t::process_tasks_on_thread_pool};
};

explicit io_scheduler(
/**
* @see io_scheduler::make_shared
*/
explicit io_scheduler(options&& opts, private_constructor);

/**
* @brief Creates an io_scheduler.
*
* @param opts
* @return std::shared_ptr<io_scheduler>
*/
static auto make_shared(
options opts = options{
.thread_strategy = thread_strategy_t::spawn,
.on_io_thread_start_functor = nullptr,
Expand All @@ -79,7 +95,7 @@ class io_scheduler
((std::thread::hardware_concurrency() > 1) ? (std::thread::hardware_concurrency() - 1) : 1),
.on_thread_start_functor = nullptr,
.on_thread_stop_functor = nullptr},
.execution_strategy = execution_strategy_t::process_tasks_on_thread_pool});
.execution_strategy = execution_strategy_t::process_tasks_on_thread_pool}) -> std::shared_ptr<io_scheduler>;

io_scheduler(const io_scheduler&) = delete;
io_scheduler(io_scheduler&&) = delete;
Expand Down Expand Up @@ -229,8 +245,18 @@ class io_scheduler
* Resumes execution of a direct coroutine handle on this io scheduler.
* @param handle The coroutine handle to resume execution.
*/
auto resume(std::coroutine_handle<> handle) -> void
auto resume(std::coroutine_handle<> handle) -> bool
{
if (handle == nullptr)
{
return false;
}

if (m_shutdown_requested.load(std::memory_order::acquire))
{
return false;
}

if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
{
{
Expand All @@ -245,10 +271,12 @@ class io_scheduler
eventfd_t value{1};
eventfd_write(m_schedule_fd, value);
}

return true;
}
else
{
m_thread_pool->resume(handle);
return m_thread_pool->resume(handle);
}
}

Expand Down
68 changes: 32 additions & 36 deletions include/coro/task_container.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class task_container
task_container(
std::shared_ptr<executor_type> e, const options opts = options{.reserve_size = 8, .growth_factor = 2})
: m_growth_factor(opts.growth_factor),
m_executor(std::move(e)),
m_executor_ptr(m_executor.get())
m_executor(std::move(e))
{
if (m_executor == nullptr)
{
Expand Down Expand Up @@ -78,22 +77,25 @@ class task_container
{
m_size.fetch_add(1, std::memory_order::relaxed);

std::scoped_lock lk{m_mutex};

if (cleanup == garbage_collect_t::yes)
std::size_t index{};
{
gc_internal();
}
std::unique_lock lk{m_mutex};

// Only grow if completely full and attempting to add more.
if (m_free_task_indices.empty())
{
grow();
}
if (cleanup == garbage_collect_t::yes)
{
gc_internal();
}

// Only grow if completely full and attempting to add more.
if (m_free_task_indices.empty())
{
grow();
}

// Reserve a free task index
std::size_t index = m_free_task_indices.front();
m_free_task_indices.pop();
// Reserve a free task index
index = m_free_task_indices.front();
m_free_task_indices.pop();
}

// Store the task inside a cleanup task for self deletion.
m_tasks[index] = make_cleanup_task(std::move(user_task), index);
Expand All @@ -103,7 +105,7 @@ class task_container
}

/**
* Garbage collects any tasks that are marked as deleted. This frees up space to be re-used by
* Garbage collects any tasks that are marked as deleted. This frees up space to be re-used by
* the task container for newly stored tasks.
* @return The number of tasks that were deleted.
*/
Expand Down Expand Up @@ -144,7 +146,7 @@ class task_container
while (!empty())
{
garbage_collect();
co_await m_executor_ptr->yield();
co_await m_executor->yield();
}
}

Expand All @@ -170,7 +172,7 @@ class task_container
auto gc_internal() -> std::size_t
{
std::size_t deleted{0};
auto pos = std::begin(m_tasks_to_delete);
auto pos = std::begin(m_tasks_to_delete);
while (pos != std::end(m_tasks_to_delete))
{
// Skip tasks that are still running or have yet to start.
Expand All @@ -179,7 +181,7 @@ class task_container
pos++;
continue;
}
// Destroy the cleanup task and the user task.
// Destroy the cleanup task.
m_tasks[*pos].destroy();
// Put the deleted position at the end of the free indexes list.
m_free_task_indices.emplace(*pos);
Expand Down Expand Up @@ -207,7 +209,7 @@ class task_container
auto make_cleanup_task(task<void> user_task, std::size_t index) -> coro::task<void>
{
// Immediately move the task onto the executor.
co_await m_executor_ptr->schedule();
co_await m_executor->schedule();

try
{
Expand All @@ -228,8 +230,16 @@ class task_container
std::cerr << "coro::task_container user_task had unhandle exception, not derived from std::exception.\n";
}

std::scoped_lock lk{m_mutex};
m_tasks_to_delete.emplace_back(index);
// Destroy the user task since it is complete. This is important to do so outside the lock
// since the user could schedule a new task from the destructor (tls::client does this interanlly)
// causing a deadlock.
user_task.destroy();

{
std::scoped_lock lk{m_mutex};
m_tasks_to_delete.emplace_back(index);
}

co_return;
}

Expand All @@ -248,20 +258,6 @@ class task_container
double m_growth_factor{};
/// The executor to schedule tasks that have just started.
std::shared_ptr<executor_type> m_executor{nullptr};
/// This is used internally since io_scheduler cannot pass itself in as a shared_ptr.
executor_type* m_executor_ptr{nullptr};

/**
* Special constructor for internal types to create their embeded task containers.
*/

friend io_scheduler;
task_container(executor_type& e, const options opts = options{.reserve_size = 8, .growth_factor = 2})
: m_growth_factor(opts.growth_factor),
m_executor_ptr(&e)
{
init(opts.reserve_size);
}

auto init(std::size_t reserve_size) -> void
{
Expand Down
21 changes: 18 additions & 3 deletions include/coro/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,17 @@ class thread_pool
/**
* Schedules any coroutine handle that is ready to be resumed.
* @param handle The coroutine handle to schedule.
* @return True if the coroutine is resumed, false if its a nullptr.
*/
auto resume(std::coroutine_handle<> handle) noexcept -> void;
auto resume(std::coroutine_handle<> handle) noexcept -> bool;

/**
* Schedules the set of coroutine handles that are ready to be resumed.
* @param handles The coroutine handles to schedule.
* @param uint64_t The number of tasks resumed, if any where null they are discarded.
*/
template<coro::concepts::range_of<std::coroutine_handle<>> range_type>
auto resume(const range_type& handles) noexcept -> void
auto resume(const range_type& handles) noexcept -> uint64_t
{
m_size.fetch_add(std::size(handles), std::memory_order::release);

Expand All @@ -168,7 +170,20 @@ class thread_pool
m_size.fetch_sub(null_handles, std::memory_order::release);
}

m_wait_cv.notify_one();
uint64_t total = std::size(handles) - null_handles;
if (total >= m_threads.size())
{
m_wait_cv.notify_all();
}
else
{
for (uint64_t i = 0; i < total; ++i)
{
m_wait_cv.notify_one();
}
}

return total;
}

/**
Expand Down
Loading

0 comments on commit 5697678

Please sign in to comment.