From 5697678d79e479cf318ab0f3fd9e246871b6e345 Mon Sep 17 00:00:00 2001 From: Josh Baldwin Date: Fri, 5 Jul 2024 12:29:31 -0600 Subject: [PATCH] coro::thread_pool high cpu usage when tasks < threads (#265) * coro::thread_pool high cpu usage when tasks < threads The check for m_size > 0 was keeping threads awake in a spin state until all tasks completed. This correctl now uses m_queue.size() behind the lock to correctly only wake up threads on the condition variable when tasks are waiting to be processed. * Fix deadlock with task_container and tls::client with the client's destructor scheduling a tls cleanup task, the task_container's lock was being locked twice when the cleanup task was being destroyed. Closes #262 * Adjust when task_container's user_task is deleted It is now deleted inline in make_user_task so any destructors that get invoked that possibly schedule more coroutines do not cause a deadlock * io_scheduler is now std::enable_shared_from_this --- README.md | 17 +- examples/coro_http_200_ok_server.cpp | 2 +- examples/coro_io_scheduler.cpp | 2 +- examples/coro_latch.cpp | 13 +- examples/coro_task_container.cpp | 2 +- examples/coro_tcp_echo_server.cpp | 2 +- include/coro/concepts/executor.hpp | 5 +- include/coro/io_scheduler.hpp | 38 ++- include/coro/task_container.hpp | 68 +++-- include/coro/thread_pool.hpp | 21 +- src/io_scheduler.cpp | 31 ++- src/thread_pool.cpp | 62 +++-- test/bench.cpp | 41 +-- test/net/test_dns_resolver.cpp | 2 +- test/net/test_tcp_server.cpp | 4 +- test/net/test_tls_server.cpp | 2 +- test/net/test_udp_peers.cpp | 4 +- test/test_io_scheduler.cpp | 386 +++++++++++++-------------- test/test_shared_mutex.cpp | 2 +- test/test_thread_pool.cpp | 32 +++ 20 files changed, 419 insertions(+), 317 deletions(-) diff --git a/README.md b/README.md index 96a183b0..d6719952 100644 --- a/README.md +++ b/README.md @@ -362,10 +362,12 @@ int main() // Complete worker tasks faster on a thread pool, using the io_scheduler version so the worker // tasks can yield for a specific amount of time to mimic difficult work. The pool is only // setup with a single thread to showcase yield_for(). - coro::io_scheduler tp{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto tp = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); // This task will wait until the given latch setters have completed. - auto make_latch_task = [](coro::latch& l) -> coro::task { + auto make_latch_task = [](coro::latch& l) -> coro::task + { // It seems like the dependent worker tasks could be created here, but in that case it would // be superior to simply do: `co_await coro::when_all(tasks);` // It is also important to note that the last dependent task will resume the waiting latch @@ -381,14 +383,15 @@ int main() // This task does 'work' and counts down on the latch when completed. The final child task to // complete will end up resuming the latch task when the latch's count reaches zero. - auto make_worker_task = [](coro::io_scheduler& tp, coro::latch& l, int64_t i) -> coro::task { + auto make_worker_task = [](std::shared_ptr& tp, coro::latch& l, int64_t i) -> coro::task + { // Schedule the worker task onto the thread pool. - co_await tp.schedule(); + co_await tp->schedule(); std::cout << "worker task " << i << " is working...\n"; // Do some expensive calculations, yield to mimic work...! Its also important to never use // std::this_thread::sleep_for() within the context of coroutines, it will block the thread // and other tasks that are ready to execute will be blocked. - co_await tp.yield_for(std::chrono::milliseconds{i * 20}); + co_await tp->yield_for(std::chrono::milliseconds{i * 20}); std::cout << "worker task " << i << " is done, counting down on the latch\n"; l.count_down(); co_return; @@ -846,7 +849,7 @@ The example provided here shows an i/o scheduler that spins up a basic `coro::ne int main() { - auto scheduler = std::make_shared(coro::io_scheduler::options{ + auto scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{ // The scheduler will spawn a dedicated event processing thread. This is the default, but // it is possible to use 'manual' and call 'process_events()' to drive the scheduler yourself. .thread_strategy = coro::io_scheduler::thread_strategy_t::spawn, @@ -1017,7 +1020,7 @@ All tasks that are stored within a `coro::task_container` must have a `void` ret int main() { - auto scheduler = std::make_shared( + auto scheduler = coro::io_scheduler::make_shared( coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_server_task = [&]() -> coro::task diff --git a/examples/coro_http_200_ok_server.cpp b/examples/coro_http_200_ok_server.cpp index 7b783f5f..e33b98cf 100644 --- a/examples/coro_http_200_ok_server.cpp +++ b/examples/coro_http_200_ok_server.cpp @@ -67,7 +67,7 @@ Connection: keep-alive std::vector> workers{}; for (size_t i = 0; i < std::thread::hardware_concurrency(); ++i) { - auto scheduler = std::make_shared(coro::io_scheduler::options{ + auto scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{ .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_inline}); workers.push_back(make_http_200_ok_server(scheduler)); diff --git a/examples/coro_io_scheduler.cpp b/examples/coro_io_scheduler.cpp index 8fc2d291..3c0dec5a 100644 --- a/examples/coro_io_scheduler.cpp +++ b/examples/coro_io_scheduler.cpp @@ -3,7 +3,7 @@ int main() { - auto scheduler = std::make_shared(coro::io_scheduler::options{ + auto scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{ // The scheduler will spawn a dedicated event processing thread. This is the default, but // it is possible to use 'manual' and call 'process_events()' to drive the scheduler yourself. .thread_strategy = coro::io_scheduler::thread_strategy_t::spawn, diff --git a/examples/coro_latch.cpp b/examples/coro_latch.cpp index 01fc899a..75777ccc 100644 --- a/examples/coro_latch.cpp +++ b/examples/coro_latch.cpp @@ -6,10 +6,12 @@ int main() // Complete worker tasks faster on a thread pool, using the io_scheduler version so the worker // tasks can yield for a specific amount of time to mimic difficult work. The pool is only // setup with a single thread to showcase yield_for(). - coro::io_scheduler tp{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto tp = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); // This task will wait until the given latch setters have completed. - auto make_latch_task = [](coro::latch& l) -> coro::task { + auto make_latch_task = [](coro::latch& l) -> coro::task + { // It seems like the dependent worker tasks could be created here, but in that case it would // be superior to simply do: `co_await coro::when_all(tasks);` // It is also important to note that the last dependent task will resume the waiting latch @@ -25,14 +27,15 @@ int main() // This task does 'work' and counts down on the latch when completed. The final child task to // complete will end up resuming the latch task when the latch's count reaches zero. - auto make_worker_task = [](coro::io_scheduler& tp, coro::latch& l, int64_t i) -> coro::task { + auto make_worker_task = [](std::shared_ptr& tp, coro::latch& l, int64_t i) -> coro::task + { // Schedule the worker task onto the thread pool. - co_await tp.schedule(); + co_await tp->schedule(); std::cout << "worker task " << i << " is working...\n"; // Do some expensive calculations, yield to mimic work...! Its also important to never use // std::this_thread::sleep_for() within the context of coroutines, it will block the thread // and other tasks that are ready to execute will be blocked. - co_await tp.yield_for(std::chrono::milliseconds{i * 20}); + co_await tp->yield_for(std::chrono::milliseconds{i * 20}); std::cout << "worker task " << i << " is done, counting down on the latch\n"; l.count_down(); co_return; diff --git a/examples/coro_task_container.cpp b/examples/coro_task_container.cpp index 31a64ddc..7ae29c55 100644 --- a/examples/coro_task_container.cpp +++ b/examples/coro_task_container.cpp @@ -3,7 +3,7 @@ int main() { - auto scheduler = std::make_shared( + auto scheduler = coro::io_scheduler::make_shared( coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_server_task = [&]() -> coro::task diff --git a/examples/coro_tcp_echo_server.cpp b/examples/coro_tcp_echo_server.cpp index a318176a..dba150d0 100644 --- a/examples/coro_tcp_echo_server.cpp +++ b/examples/coro_tcp_echo_server.cpp @@ -61,7 +61,7 @@ auto main() -> int std::vector> workers{}; for (size_t i = 0; i < std::thread::hardware_concurrency(); ++i) { - auto scheduler = std::make_shared(coro::io_scheduler::options{ + auto scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{ .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_inline}); workers.push_back(make_tcp_echo_server(scheduler)); diff --git a/include/coro/concepts/executor.hpp b/include/coro/concepts/executor.hpp index dc8394f9..ecb009cf 100644 --- a/include/coro/concepts/executor.hpp +++ b/include/coro/concepts/executor.hpp @@ -21,7 +21,10 @@ concept executor = requires(type t, std::coroutine_handle<> c) { { t.schedule() } -> coro::concepts::awaiter; { t.yield() } -> coro::concepts::awaiter; - { t.resume(c) } -> std::same_as; + { t.resume(c) } -> std::same_as; + { t.size() } -> std::same_as; + { t.empty() } -> std::same_as; + { t.shutdown() } -> std::same_as; }; #ifdef LIBCORO_FEATURE_NETWORKING diff --git a/include/coro/io_scheduler.hpp b/include/coro/io_scheduler.hpp index 1c1f46d7..db48b7c7 100644 --- a/include/coro/io_scheduler.hpp +++ b/include/coro/io_scheduler.hpp @@ -21,10 +21,15 @@ namespace coro { -class io_scheduler +class io_scheduler : public std::enable_shared_from_this { using timed_events = detail::poll_info::timed_events; + struct private_constructor + { + private_constructor() = default; + }; + public: class schedule_operation; friend schedule_operation; @@ -69,7 +74,18 @@ class io_scheduler const execution_strategy_t execution_strategy{execution_strategy_t::process_tasks_on_thread_pool}; }; - explicit io_scheduler( + /** + * @see io_scheduler::make_shared + */ + explicit io_scheduler(options&& opts, private_constructor); + + /** + * @brief Creates an io_scheduler. + * + * @param opts + * @return std::shared_ptr + */ + static auto make_shared( options opts = options{ .thread_strategy = thread_strategy_t::spawn, .on_io_thread_start_functor = nullptr, @@ -79,7 +95,7 @@ class io_scheduler ((std::thread::hardware_concurrency() > 1) ? (std::thread::hardware_concurrency() - 1) : 1), .on_thread_start_functor = nullptr, .on_thread_stop_functor = nullptr}, - .execution_strategy = execution_strategy_t::process_tasks_on_thread_pool}); + .execution_strategy = execution_strategy_t::process_tasks_on_thread_pool}) -> std::shared_ptr; io_scheduler(const io_scheduler&) = delete; io_scheduler(io_scheduler&&) = delete; @@ -229,8 +245,18 @@ class io_scheduler * Resumes execution of a direct coroutine handle on this io scheduler. * @param handle The coroutine handle to resume execution. */ - auto resume(std::coroutine_handle<> handle) -> void + auto resume(std::coroutine_handle<> handle) -> bool { + if (handle == nullptr) + { + return false; + } + + if (m_shutdown_requested.load(std::memory_order::acquire)) + { + return false; + } + if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline) { { @@ -245,10 +271,12 @@ class io_scheduler eventfd_t value{1}; eventfd_write(m_schedule_fd, value); } + + return true; } else { - m_thread_pool->resume(handle); + return m_thread_pool->resume(handle); } } diff --git a/include/coro/task_container.hpp b/include/coro/task_container.hpp index 6d0497b8..373ea900 100644 --- a/include/coro/task_container.hpp +++ b/include/coro/task_container.hpp @@ -36,8 +36,7 @@ class task_container task_container( std::shared_ptr e, const options opts = options{.reserve_size = 8, .growth_factor = 2}) : m_growth_factor(opts.growth_factor), - m_executor(std::move(e)), - m_executor_ptr(m_executor.get()) + m_executor(std::move(e)) { if (m_executor == nullptr) { @@ -78,22 +77,25 @@ class task_container { m_size.fetch_add(1, std::memory_order::relaxed); - std::scoped_lock lk{m_mutex}; - - if (cleanup == garbage_collect_t::yes) + std::size_t index{}; { - gc_internal(); - } + std::unique_lock lk{m_mutex}; - // Only grow if completely full and attempting to add more. - if (m_free_task_indices.empty()) - { - grow(); - } + if (cleanup == garbage_collect_t::yes) + { + gc_internal(); + } + + // Only grow if completely full and attempting to add more. + if (m_free_task_indices.empty()) + { + grow(); + } - // Reserve a free task index - std::size_t index = m_free_task_indices.front(); - m_free_task_indices.pop(); + // Reserve a free task index + index = m_free_task_indices.front(); + m_free_task_indices.pop(); + } // Store the task inside a cleanup task for self deletion. m_tasks[index] = make_cleanup_task(std::move(user_task), index); @@ -103,7 +105,7 @@ class task_container } /** - * Garbage collects any tasks that are marked as deleted. This frees up space to be re-used by + * Garbage collects any tasks that are marked as deleted. This frees up space to be re-used by * the task container for newly stored tasks. * @return The number of tasks that were deleted. */ @@ -144,7 +146,7 @@ class task_container while (!empty()) { garbage_collect(); - co_await m_executor_ptr->yield(); + co_await m_executor->yield(); } } @@ -170,7 +172,7 @@ class task_container auto gc_internal() -> std::size_t { std::size_t deleted{0}; - auto pos = std::begin(m_tasks_to_delete); + auto pos = std::begin(m_tasks_to_delete); while (pos != std::end(m_tasks_to_delete)) { // Skip tasks that are still running or have yet to start. @@ -179,7 +181,7 @@ class task_container pos++; continue; } - // Destroy the cleanup task and the user task. + // Destroy the cleanup task. m_tasks[*pos].destroy(); // Put the deleted position at the end of the free indexes list. m_free_task_indices.emplace(*pos); @@ -207,7 +209,7 @@ class task_container auto make_cleanup_task(task user_task, std::size_t index) -> coro::task { // Immediately move the task onto the executor. - co_await m_executor_ptr->schedule(); + co_await m_executor->schedule(); try { @@ -228,8 +230,16 @@ class task_container std::cerr << "coro::task_container user_task had unhandle exception, not derived from std::exception.\n"; } - std::scoped_lock lk{m_mutex}; - m_tasks_to_delete.emplace_back(index); + // Destroy the user task since it is complete. This is important to do so outside the lock + // since the user could schedule a new task from the destructor (tls::client does this interanlly) + // causing a deadlock. + user_task.destroy(); + + { + std::scoped_lock lk{m_mutex}; + m_tasks_to_delete.emplace_back(index); + } + co_return; } @@ -248,20 +258,6 @@ class task_container double m_growth_factor{}; /// The executor to schedule tasks that have just started. std::shared_ptr m_executor{nullptr}; - /// This is used internally since io_scheduler cannot pass itself in as a shared_ptr. - executor_type* m_executor_ptr{nullptr}; - - /** - * Special constructor for internal types to create their embeded task containers. - */ - - friend io_scheduler; - task_container(executor_type& e, const options opts = options{.reserve_size = 8, .growth_factor = 2}) - : m_growth_factor(opts.growth_factor), - m_executor_ptr(&e) - { - init(opts.reserve_size); - } auto init(std::size_t reserve_size) -> void { diff --git a/include/coro/thread_pool.hpp b/include/coro/thread_pool.hpp index 56d81893..c8e8c0bf 100644 --- a/include/coro/thread_pool.hpp +++ b/include/coro/thread_pool.hpp @@ -134,15 +134,17 @@ class thread_pool /** * Schedules any coroutine handle that is ready to be resumed. * @param handle The coroutine handle to schedule. + * @return True if the coroutine is resumed, false if its a nullptr. */ - auto resume(std::coroutine_handle<> handle) noexcept -> void; + auto resume(std::coroutine_handle<> handle) noexcept -> bool; /** * Schedules the set of coroutine handles that are ready to be resumed. * @param handles The coroutine handles to schedule. + * @param uint64_t The number of tasks resumed, if any where null they are discarded. */ template> range_type> - auto resume(const range_type& handles) noexcept -> void + auto resume(const range_type& handles) noexcept -> uint64_t { m_size.fetch_add(std::size(handles), std::memory_order::release); @@ -168,7 +170,20 @@ class thread_pool m_size.fetch_sub(null_handles, std::memory_order::release); } - m_wait_cv.notify_one(); + uint64_t total = std::size(handles) - null_handles; + if (total >= m_threads.size()) + { + m_wait_cv.notify_all(); + } + else + { + for (uint64_t i = 0; i < total; ++i) + { + m_wait_cv.notify_one(); + } + } + + return total; } /** diff --git a/src/io_scheduler.cpp b/src/io_scheduler.cpp index e639bd82..7b7f3b9a 100644 --- a/src/io_scheduler.cpp +++ b/src/io_scheduler.cpp @@ -14,36 +14,47 @@ using namespace std::chrono_literals; namespace coro { -io_scheduler::io_scheduler(options opts) - : m_opts(std::move(opts)), +io_scheduler::io_scheduler(options&& opts, private_constructor) + : m_opts(opts), m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)), m_shutdown_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)), m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC)), - m_schedule_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)), - m_owned_tasks(new coro::task_container(*this)) + m_schedule_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)) + +{ +} + +auto io_scheduler::make_shared(options opts) -> std::shared_ptr { + auto s = std::make_shared(std::move(opts), private_constructor{}); + + // std::enable_shared_from_this cannot be used until the object is fully created. + s->m_owned_tasks = new coro::task_container(s->shared_from_this()); + if (opts.execution_strategy == execution_strategy_t::process_tasks_on_thread_pool) { - m_thread_pool = std::make_unique(std::move(m_opts.pool)); + s->m_thread_pool = std::make_unique(std::move(s->m_opts.pool)); } epoll_event e{}; e.events = EPOLLIN; e.data.ptr = const_cast(m_shutdown_ptr); - epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_shutdown_fd, &e); + epoll_ctl(s->m_epoll_fd, EPOLL_CTL_ADD, s->m_shutdown_fd, &e); e.data.ptr = const_cast(m_timer_ptr); - epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_timer_fd, &e); + epoll_ctl(s->m_epoll_fd, EPOLL_CTL_ADD, s->m_timer_fd, &e); e.data.ptr = const_cast(m_schedule_ptr); - epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_schedule_fd, &e); + epoll_ctl(s->m_epoll_fd, EPOLL_CTL_ADD, s->m_schedule_fd, &e); - if (m_opts.thread_strategy == thread_strategy_t::spawn) + if (s->m_opts.thread_strategy == thread_strategy_t::spawn) { - m_io_thread = std::thread([this]() { process_events_dedicated_thread(); }); + s->m_io_thread = std::thread([s]() { s->process_events_dedicated_thread(); }); } // else manual mode, the user must call process_events. + + return s; } io_scheduler::~io_scheduler() diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 07d37f19..3c33b0fe 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -44,15 +44,21 @@ auto thread_pool::schedule() -> operation throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks."); } -auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> void +auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool { if (handle == nullptr) { - return; + return false; + } + + if (m_shutdown_requested.load(std::memory_order::acquire)) + { + return false; } m_size.fetch_add(1, std::memory_order::release); schedule_impl(handle); + return true; } auto thread_pool::shutdown() noexcept -> void @@ -84,29 +90,44 @@ auto thread_pool::executor(std::size_t idx) -> void m_opts.on_thread_start_functor(idx); } - // Process until shutdown is requested and the total number of tasks reaches zero. - while (!m_shutdown_requested.load(std::memory_order::acquire) || m_size.load(std::memory_order::acquire) > 0) + // Process until shutdown is requested. + while (!m_shutdown_requested.load(std::memory_order::acquire)) { std::unique_lock lk{m_wait_mutex}; - m_wait_cv.wait( - lk, - [&] { - return m_size.load(std::memory_order::acquire) > 0 || - m_shutdown_requested.load(std::memory_order::acquire); - }); - // Process this batch until the queue is empty. - while (!m_queue.empty()) + m_wait_cv.wait(lk, [&]() { return !m_queue.empty() || m_shutdown_requested.load(std::memory_order::acquire); }); + + if (m_queue.empty()) { - auto handle = m_queue.front(); - m_queue.pop_front(); + continue; + } + + auto handle = m_queue.front(); + m_queue.pop_front(); + lk.unlock(); - // Release the lock while executing the coroutine. - lk.unlock(); - handle.resume(); + // Release the lock while executing the coroutine. + handle.resume(); + m_size.fetch_sub(1, std::memory_order::release); + } - m_size.fetch_sub(1, std::memory_order::release); - lk.lock(); + // Process until there are no ready tasks left. + while (m_size.load(std::memory_order::acquire) > 0) + { + std::unique_lock lk{m_wait_mutex}; + // m_size will only drop to zero once all executing coroutines are finished + // but the queue could be empty for threads that finished early. + if (m_queue.empty()) + { + break; } + + auto handle = m_queue.front(); + m_queue.pop_front(); + lk.unlock(); + + // Release the lock while executing the coroutine. + handle.resume(); + m_size.fetch_sub(1, std::memory_order::release); } if (m_opts.on_thread_stop_functor != nullptr) @@ -125,9 +146,8 @@ auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void { std::scoped_lock lk{m_wait_mutex}; m_queue.emplace_back(handle); + m_wait_cv.notify_one(); } - - m_wait_cv.notify_one(); } } // namespace coro diff --git a/test/bench.cpp b/test/bench.cpp index 9cd7196f..f102fe26 100644 --- a/test/bench.cpp +++ b/test/bench.cpp @@ -237,7 +237,8 @@ TEST_CASE("benchmark counter task scheduler{1} yield", "[benchmark]") constexpr std::size_t iterations = default_iterations; constexpr std::size_t ops = iterations * 2; // the external resume is still a resume op - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); std::atomic counter{0}; std::vector> tasks{}; @@ -245,8 +246,8 @@ TEST_CASE("benchmark counter task scheduler{1} yield", "[benchmark]") auto make_task = [&]() -> coro::task { - co_await s.schedule(); - co_await s.yield(); + co_await s->schedule(); + co_await s->yield(); counter.fetch_add(1, std::memory_order::relaxed); co_return; }; @@ -262,7 +263,7 @@ TEST_CASE("benchmark counter task scheduler{1} yield", "[benchmark]") auto stop = sc::now(); print_stats("benchmark counter task scheduler{1} yield", ops, start, stop); - REQUIRE(s.empty()); + REQUIRE(s->empty()); REQUIRE(counter == iterations); } @@ -271,7 +272,8 @@ TEST_CASE("benchmark counter task scheduler{1} yield_for", "[benchmark]") constexpr std::size_t iterations = default_iterations; constexpr std::size_t ops = iterations * 2; // the external resume is still a resume op - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); std::atomic counter{0}; std::vector> tasks{}; @@ -279,8 +281,8 @@ TEST_CASE("benchmark counter task scheduler{1} yield_for", "[benchmark]") auto make_task = [&]() -> coro::task { - co_await s.schedule(); - co_await s.yield_for(std::chrono::milliseconds{1}); + co_await s->schedule(); + co_await s->yield_for(std::chrono::milliseconds{1}); counter.fetch_add(1, std::memory_order::relaxed); co_return; }; @@ -296,7 +298,7 @@ TEST_CASE("benchmark counter task scheduler{1} yield_for", "[benchmark]") auto stop = sc::now(); print_stats("benchmark counter task scheduler{1} yield", ops, start, stop); - REQUIRE(s.empty()); + REQUIRE(s->empty()); REQUIRE(counter == iterations); } @@ -305,7 +307,8 @@ TEST_CASE("benchmark counter task scheduler await event from another coroutine", constexpr std::size_t iterations = default_iterations; constexpr std::size_t ops = iterations * 3; // two tasks + event resume - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); std::vector> events{}; events.reserve(iterations); @@ -321,7 +324,7 @@ TEST_CASE("benchmark counter task scheduler await event from another coroutine", auto wait_func = [&](std::size_t index) -> coro::task { - co_await s.schedule(); + co_await s->schedule(); co_await *events[index]; counter.fetch_add(1, std::memory_order::relaxed); co_return; @@ -329,7 +332,7 @@ TEST_CASE("benchmark counter task scheduler await event from another coroutine", auto resume_func = [&](std::size_t index) -> coro::task { - co_await s.schedule(); + co_await s->schedule(); events[index]->set(); co_return; }; @@ -349,11 +352,11 @@ TEST_CASE("benchmark counter task scheduler await event from another coroutine", REQUIRE(counter == iterations); // valgrind workaround - while (!s.empty()) + while (!s->empty()) { std::this_thread::sleep_for(std::chrono::milliseconds{1}); } - REQUIRE(s.empty()); + REQUIRE(s->empty()); } #ifdef LIBCORO_FEATURE_NETWORKING @@ -399,7 +402,7 @@ TEST_CASE("benchmark tcp::server echo server thread pool", "[benchmark]") co_return; }; - auto server_scheduler = std::make_shared(coro::io_scheduler::options{ + auto server_scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{ .pool = coro::thread_pool::options{}, .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_on_thread_pool}); auto make_server_task = [&]() -> coro::task @@ -433,7 +436,7 @@ TEST_CASE("benchmark tcp::server echo server thread pool", "[benchmark]") std::mutex g_histogram_mutex; std::map g_histogram; - auto client_scheduler = std::make_shared(coro::io_scheduler::options{ + auto client_scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{ .pool = coro::thread_pool::options{}, .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_on_thread_pool}); auto make_client_task = [&]() -> coro::task @@ -538,7 +541,7 @@ TEST_CASE("benchmark tcp::server echo server inline", "[benchmark]") struct server { uint64_t id; - std::shared_ptr scheduler{std::make_shared( + std::shared_ptr scheduler{coro::io_scheduler::make_shared( coro::io_scheduler::options{.execution_strategy = estrat::process_tasks_inline})}; uint64_t live_clients{0}; coro::event wait_for_clients{}; @@ -546,7 +549,7 @@ TEST_CASE("benchmark tcp::server echo server inline", "[benchmark]") struct client { - std::shared_ptr scheduler{std::make_shared( + std::shared_ptr scheduler{coro::io_scheduler::make_shared( coro::io_scheduler::options{.execution_strategy = estrat::process_tasks_inline})}; std::vector> tasks{}; }; @@ -785,7 +788,7 @@ TEST_CASE("benchmark tls::server echo server thread pool", "[benchmark]") co_return; }; - auto server_scheduler = std::make_shared(coro::io_scheduler::options{ + auto server_scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{ .pool = coro::thread_pool::options{}, .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_on_thread_pool}); auto make_server_task = [&]() -> coro::task @@ -823,7 +826,7 @@ TEST_CASE("benchmark tls::server echo server thread pool", "[benchmark]") coro::mutex histogram_mutex; std::map g_histogram; - auto client_scheduler = std::make_shared(coro::io_scheduler::options{ + auto client_scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{ .pool = coro::thread_pool::options{}, .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_on_thread_pool}); auto make_client_task = [&](coro::mutex& histogram_mutex) -> coro::task diff --git a/test/net/test_dns_resolver.cpp b/test/net/test_dns_resolver.cpp index 071d5dc6..4c69fc70 100644 --- a/test/net/test_dns_resolver.cpp +++ b/test/net/test_dns_resolver.cpp @@ -8,7 +8,7 @@ TEST_CASE("dns_resolver basic", "[dns]") { - auto scheduler = std::make_shared( + auto scheduler = coro::io_scheduler::make_shared( coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); coro::net::dns::resolver dns_resolver{scheduler, std::chrono::milliseconds{5000}}; diff --git a/test/net/test_tcp_server.cpp b/test/net/test_tcp_server.cpp index 33ebebf7..09966482 100644 --- a/test/net/test_tcp_server.cpp +++ b/test/net/test_tcp_server.cpp @@ -11,7 +11,7 @@ TEST_CASE("tcp_server ping server", "[tcp_server]") const std::string client_msg{"Hello from client"}; const std::string server_msg{"Reply from server!"}; - auto scheduler = std::make_shared( + auto scheduler = coro::io_scheduler::make_shared( coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_client_task = [&]() -> coro::task @@ -91,7 +91,7 @@ TEST_CASE("tcp_server concurrent polling on the same socket", "[tcp_server]") // Issue 224: This test duplicates a client and issues two different poll operations per coroutine. using namespace std::chrono_literals; - auto scheduler = std::make_shared(coro::io_scheduler::options{ + auto scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{ .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_inline}); auto make_read_task = [](coro::net::tcp::client client) -> coro::task diff --git a/test/net/test_tls_server.cpp b/test/net/test_tls_server.cpp index 1ddebab4..230d4d43 100644 --- a/test/net/test_tls_server.cpp +++ b/test/net/test_tls_server.cpp @@ -9,7 +9,7 @@ TEST_CASE("tls_server hello world server", "[tls_server]") { - auto scheduler = std::make_shared( + auto scheduler = coro::io_scheduler::make_shared( coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); std::string client_msg = "Hello world from TLS client!"; diff --git a/test/net/test_udp_peers.cpp b/test/net/test_udp_peers.cpp index 4d96fe72..bfa6b7dc 100644 --- a/test/net/test_udp_peers.cpp +++ b/test/net/test_udp_peers.cpp @@ -8,7 +8,7 @@ TEST_CASE("udp one way") { const std::string msg{"aaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbcccccccccccccccccc"}; - auto scheduler = std::make_shared( + auto scheduler = coro::io_scheduler::make_shared( coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_send_task = [&]() -> coro::task @@ -54,7 +54,7 @@ TEST_CASE("udp echo peers") const std::string peer1_msg{"Hello from peer1!"}; const std::string peer2_msg{"Hello from peer2!!"}; - auto scheduler = std::make_shared( + auto scheduler = coro::io_scheduler::make_shared( coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_peer_task = [&scheduler]( diff --git a/test/test_io_scheduler.cpp b/test/test_io_scheduler.cpp index 012dc4b6..9e5d73cd 100644 --- a/test/test_io_scheduler.cpp +++ b/test/test_io_scheduler.cpp @@ -18,20 +18,21 @@ using namespace std::chrono_literals; TEST_CASE("io_scheduler schedule single task", "[io_scheduler]") { - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_task = [&]() -> coro::task { - co_await s.schedule(); + co_await s->schedule(); co_return 42; }; auto value = coro::sync_wait(make_task()); REQUIRE(value == 42); - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); } TEST_CASE("io_scheduler submit mutiple tasks", "[io_scheduler]") @@ -40,11 +41,11 @@ TEST_CASE("io_scheduler submit mutiple tasks", "[io_scheduler]") std::atomic counter{0}; std::vector> tasks{}; tasks.reserve(n); - coro::io_scheduler s{}; + auto s = coro::io_scheduler::make_shared(); auto make_task = [&]() -> coro::task { - co_await s.schedule(); + co_await s->schedule(); counter++; co_return; }; @@ -57,16 +58,17 @@ TEST_CASE("io_scheduler submit mutiple tasks", "[io_scheduler]") REQUIRE(counter == n); - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); } TEST_CASE("io_scheduler task with multiple events", "[io_scheduler]") { std::atomic counter{0}; - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); coro::event e1; coro::event e2; @@ -74,7 +76,7 @@ TEST_CASE("io_scheduler task with multiple events", "[io_scheduler]") auto make_wait_task = [&]() -> coro::task { - co_await s.schedule(); + co_await s->schedule(); co_await e1; counter++; co_await e2; @@ -86,7 +88,7 @@ TEST_CASE("io_scheduler task with multiple events", "[io_scheduler]") auto make_set_task = [&](coro::event& e) -> coro::task { - co_await s.schedule(); + co_await s->schedule(); e.set(); }; @@ -94,28 +96,29 @@ TEST_CASE("io_scheduler task with multiple events", "[io_scheduler]") REQUIRE(counter == 3); - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); } TEST_CASE("io_scheduler task with read poll", "[io_scheduler]") { - auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_poll_read_task = [&]() -> coro::task { - co_await s.schedule(); - auto status = co_await s.poll(trigger_fd, coro::poll_op::read); + co_await s->schedule(); + auto status = co_await s->poll(trigger_fd, coro::poll_op::read); REQUIRE(status == coro::poll_status::event); co_return; }; auto make_poll_write_task = [&]() -> coro::task { - co_await s.schedule(); + co_await s->schedule(); uint64_t value{42}; auto unused = write(trigger_fd, &value, sizeof(value)); (void)unused; @@ -124,30 +127,31 @@ TEST_CASE("io_scheduler task with read poll", "[io_scheduler]") coro::sync_wait(coro::when_all(make_poll_read_task(), make_poll_write_task())); - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); close(trigger_fd); } TEST_CASE("io_scheduler task with read poll with timeout", "[io_scheduler]") { - auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_poll_read_task = [&]() -> coro::task { - co_await s.schedule(); + co_await s->schedule(); // Poll with a timeout but don't timeout. - auto status = co_await s.poll(trigger_fd, coro::poll_op::read, 50ms); + auto status = co_await s->poll(trigger_fd, coro::poll_op::read, 50ms); REQUIRE(status == coro::poll_status::event); co_return; }; auto make_poll_write_task = [&]() -> coro::task { - co_await s.schedule(); + co_await s->schedule(); uint64_t value{42}; auto unused = write(trigger_fd, &value, sizeof(value)); (void)unused; @@ -156,73 +160,49 @@ TEST_CASE("io_scheduler task with read poll with timeout", "[io_scheduler]") coro::sync_wait(coro::when_all(make_poll_read_task(), make_poll_write_task())); - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); close(trigger_fd); } TEST_CASE("io_scheduler task with read poll timeout", "[io_scheduler]") { - auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_task = [&]() -> coro::task { - co_await s.schedule(); + co_await s->schedule(); // Poll with a timeout and timeout. - auto status = co_await s.poll(trigger_fd, coro::poll_op::read, 10ms); + auto status = co_await s->poll(trigger_fd, coro::poll_op::read, 10ms); REQUIRE(status == coro::poll_status::timeout); co_return; }; coro::sync_wait(make_task()); - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); close(trigger_fd); } -// TODO: This probably requires a TCP socket? -// TEST_CASE("io_scheduler task with read poll closed socket", "[io_scheduler]") -// { -// auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); -// coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options { .thread_count = 1 }}}; - -// auto make_poll_task = [&]() -> coro::task { -// co_await s.schedule(); -// auto status = co_await s.poll(trigger_fd, coro::poll_op::read, 1000ms); -// REQUIRE(status == coro::poll_status::closed); -// co_return; -// }; - -// auto make_close_task = [&]() -> coro::task { -// co_await s.schedule(); -// std::this_thread::sleep_for(100ms); -// // shutdown(trigger_fd, SHUT_RDWR); -// close(trigger_fd); -// co_return; -// }; - -// coro::sync_wait(coro::when_all(make_poll_task(), make_close_task())); - -// s.shutdown(); -// REQUIRE(s.empty()); -// } - TEST_CASE("io_scheduler separate thread resume", "[io_scheduler]") { - coro::io_scheduler s1{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; - coro::io_scheduler s2{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto s1 = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); + auto s2 = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); coro::event e{}; auto make_s1_task = [&]() -> coro::task { - co_await s1.schedule(); + co_await s1->schedule(); auto tid = std::this_thread::get_id(); co_await e; @@ -234,7 +214,7 @@ TEST_CASE("io_scheduler separate thread resume", "[io_scheduler]") auto make_s2_task = [&]() -> coro::task { - co_await s2.schedule(); + co_await s2->schedule(); // Wait a bit to be sure the wait on 'e' in the other scheduler is done first. std::this_thread::sleep_for(10ms); e.set(); @@ -243,19 +223,20 @@ TEST_CASE("io_scheduler separate thread resume", "[io_scheduler]") coro::sync_wait(coro::when_all(make_s1_task(), make_s2_task())); - s1.shutdown(); - REQUIRE(s1.empty()); - s2.shutdown(); - REQUIRE(s2.empty()); + s1->shutdown(); + REQUIRE(s1->empty()); + s2->shutdown(); + REQUIRE(s2->empty()); } TEST_CASE("io_scheduler separate thread resume spawned thread", "[io_scheduler]") { - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_task = [&]() -> coro::task { - co_await s.schedule(); + co_await s->schedule(); coro::event e{}; auto tid = std::this_thread::get_id(); @@ -269,7 +250,7 @@ TEST_CASE("io_scheduler separate thread resume spawned thread", "[io_scheduler]" { // mimic some expensive computation // Resume the coroutine back onto the scheduler, not this background thread. - e.set(s); + e.set(*s); }); third_party_thread.detach(); @@ -280,14 +261,15 @@ TEST_CASE("io_scheduler separate thread resume spawned thread", "[io_scheduler]" coro::sync_wait(make_task()); - s.shutdown(); - REQUIRE(s.empty()); + s->shutdown(); + REQUIRE(s->empty()); } TEST_CASE("io_scheduler separate thread resume with return", "[io_scheduler]") { constexpr uint64_t expected_value{1337}; - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); coro::event start_service{}; coro::event service_done{}; @@ -302,7 +284,7 @@ TEST_CASE("io_scheduler separate thread resume with return", "[io_scheduler]") } output = expected_value; - service_done.set(s); + service_done.set(*s); }}; auto third_party_service = [&](int multiplier) -> coro::task @@ -314,7 +296,7 @@ TEST_CASE("io_scheduler separate thread resume with return", "[io_scheduler]") auto make_task = [&]() -> coro::task { - co_await s.schedule(); + co_await s->schedule(); int multiplier{5}; uint64_t value = co_await third_party_service(multiplier); @@ -324,26 +306,27 @@ TEST_CASE("io_scheduler separate thread resume with return", "[io_scheduler]") coro::sync_wait(make_task()); service.join(); - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); } TEST_CASE("io_scheduler with basic task", "[io_scheduler]") { constexpr std::size_t expected_value{5}; - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto add_data = [&](uint64_t val) -> coro::task { - co_await s.schedule(); + co_await s->schedule(); co_return val; }; auto func = [&]() -> coro::task { - co_await s.schedule(); + co_await s->schedule(); auto output_tasks = co_await coro::when_all(add_data(1), add_data(1), add_data(1), add_data(1), add_data(1)); @@ -357,10 +340,10 @@ TEST_CASE("io_scheduler with basic task", "[io_scheduler]") REQUIRE(counter == expected_value); - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); } TEST_CASE("io_scheduler scheduler_after", "[io_scheduler]") @@ -379,38 +362,38 @@ TEST_CASE("io_scheduler scheduler_after", "[io_scheduler]") }; { - coro::io_scheduler s{coro::io_scheduler::options{ - .pool = coro::thread_pool::options{ - .thread_count = 1, .on_thread_start_functor = [&](std::size_t) { tid = std::this_thread::get_id(); }}}}; - auto start = std::chrono::steady_clock::now(); - coro::sync_wait(func(s, 0ms)); + auto s = coro::io_scheduler::make_shared(coro::io_scheduler::options{ + .pool = coro::thread_pool::options{ + .thread_count = 1, .on_thread_start_functor = [&](std::size_t) { tid = std::this_thread::get_id(); }}}); + auto start = std::chrono::steady_clock::now(); + coro::sync_wait(func(*s, 0ms)); auto stop = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(stop - start); REQUIRE(counter == 1); REQUIRE(duration < wait_for); - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); } { - coro::io_scheduler s{coro::io_scheduler::options{ + auto s = coro::io_scheduler::make_shared(coro::io_scheduler::options{ .pool = coro::thread_pool::options{ - .thread_count = 1, .on_thread_start_functor = [&](std::size_t) { tid = std::this_thread::get_id(); }}}}; + .thread_count = 1, .on_thread_start_functor = [&](std::size_t) { tid = std::this_thread::get_id(); }}}); auto start = std::chrono::steady_clock::now(); - coro::sync_wait(func(s, wait_for)); + coro::sync_wait(func(*s, wait_for)); auto stop = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(stop - start); REQUIRE(counter == 2); REQUIRE(duration >= wait_for); - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); } } @@ -422,13 +405,13 @@ TEST_CASE("io_scheduler schedule_at", "[io_scheduler]") std::atomic counter{0}; std::thread::id tid; - coro::io_scheduler s{coro::io_scheduler::options{ + auto s = coro::io_scheduler::make_shared(coro::io_scheduler::options{ .pool = coro::thread_pool::options{ - .thread_count = 1, .on_thread_start_functor = [&](std::size_t) { tid = std::this_thread::get_id(); }}}}; + .thread_count = 1, .on_thread_start_functor = [&](std::size_t) { tid = std::this_thread::get_id(); }}}); auto func = [&](std::chrono::steady_clock::time_point time) -> coro::task { - co_await s.schedule_at(time); + co_await s->schedule_at(time); ++counter; REQUIRE(tid == std::this_thread::get_id()); co_return; @@ -467,55 +450,57 @@ TEST_CASE("io_scheduler schedule_at", "[io_scheduler]") TEST_CASE("io_scheduler yield", "[io_scheduler]") { - std::thread::id tid; - coro::io_scheduler s{coro::io_scheduler::options{ - .pool = coro::thread_pool::options{ - .thread_count = 1, .on_thread_start_functor = [&](std::size_t) { tid = std::this_thread::get_id(); }}}}; + std::thread::id tid; + auto s = coro::io_scheduler::make_shared(coro::io_scheduler::options{ + .pool = coro::thread_pool::options{ + .thread_count = 1, .on_thread_start_functor = [&](std::size_t) { tid = std::this_thread::get_id(); }}}); auto func = [&]() -> coro::task { REQUIRE(tid != std::this_thread::get_id()); - co_await s.schedule(); + co_await s->schedule(); REQUIRE(tid == std::this_thread::get_id()); - co_await s.yield(); // this is really a thread pool function but /shrug + co_await s->yield(); // this is really a thread pool function but /shrug REQUIRE(tid == std::this_thread::get_id()); co_return; }; coro::sync_wait(func()); - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); } TEST_CASE("io_scheduler yield_for", "[io_scheduler]") { - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); const std::chrono::milliseconds wait_for{50}; auto make_task = [&]() -> coro::task { - co_await s.schedule(); + co_await s->schedule(); auto start = std::chrono::steady_clock::now(); - co_await s.yield_for(wait_for); + co_await s->yield_for(wait_for); co_return std::chrono::duration_cast(std::chrono::steady_clock::now() - start); }; auto duration = coro::sync_wait(make_task()); REQUIRE(duration >= wait_for); - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); } TEST_CASE("io_scheduler yield_until", "[io_scheduler]") { - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); // Because yield_until() takes its own time internally the wait_for might be off by a bit. const std::chrono::milliseconds epsilon{3}; @@ -523,26 +508,26 @@ TEST_CASE("io_scheduler yield_until", "[io_scheduler]") auto make_task = [&]() -> coro::task { - co_await s.schedule(); + co_await s->schedule(); auto start = std::chrono::steady_clock::now(); - co_await s.yield_until(start + wait_for); + co_await s->yield_until(start + wait_for); co_return std::chrono::duration_cast(std::chrono::steady_clock::now() - start); }; auto duration = coro::sync_wait(make_task()); REQUIRE(duration >= (wait_for - epsilon)); - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); } TEST_CASE("io_scheduler multipler event waiters", "[io_scheduler]") { const constexpr std::size_t total{10}; coro::event e{}; - coro::io_scheduler s{}; + auto s = coro::io_scheduler::make_shared(); auto func = [&]() -> coro::task { @@ -552,7 +537,7 @@ TEST_CASE("io_scheduler multipler event waiters", "[io_scheduler]") auto spawn = [&]() -> coro::task { - co_await s.schedule(); + co_await s->schedule(); std::vector> tasks; for (size_t i = 0; i < total; ++i) { @@ -571,30 +556,31 @@ TEST_CASE("io_scheduler multipler event waiters", "[io_scheduler]") auto release = [&]() -> coro::task { - co_await s.schedule_after(10ms); - e.set(s); + co_await s->schedule_after(10ms); + e.set(*s); }; coro::sync_wait(coro::when_all(spawn(), release())); - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); } TEST_CASE("io_scheduler self generating coroutine (stack overflow check)", "[io_scheduler]") { const constexpr std::size_t total{1'000'000}; uint64_t counter{0}; - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); std::vector> tasks; tasks.reserve(total); auto func = [&](auto f) -> coro::task { - co_await s.schedule(); + co_await s->schedule(); ++counter; if (counter % total == 0) @@ -619,44 +605,44 @@ TEST_CASE("io_scheduler self generating coroutine (stack overflow check)", "[io_ REQUIRE(tasks.size() == total - 1); - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); } TEST_CASE("io_scheduler manual process events thread pool", "[io_scheduler]") { - auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - coro::io_scheduler s{coro::io_scheduler::options{ - .thread_strategy = coro::io_scheduler::thread_strategy_t::manual, - .pool = coro::thread_pool::options{ - .thread_count = 1, - }}}; + auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + auto s = coro::io_scheduler::make_shared(coro::io_scheduler::options{ + .thread_strategy = coro::io_scheduler::thread_strategy_t::manual, + .pool = coro::thread_pool::options{ + .thread_count = 1, + }}); std::atomic polling{false}; auto make_poll_read_task = [&]() -> coro::task { - std::cerr << "poll task start s.size() == " << s.size() << "\n"; - co_await s.schedule(); + std::cerr << "poll task start s.size() == " << s->size() << "\n"; + co_await s->schedule(); polling = true; - std::cerr << "poll task polling s.size() == " << s.size() << "\n"; - auto status = co_await s.poll(trigger_fd, coro::poll_op::read); + std::cerr << "poll task polling s.size() == " << s->size() << "\n"; + auto status = co_await s->poll(trigger_fd, coro::poll_op::read); REQUIRE(status == coro::poll_status::event); - std::cerr << "poll task exiting s.size() == " << s.size() << "\n"; + std::cerr << "poll task exiting s.size() == " << s->size() << "\n"; co_return; }; auto make_poll_write_task = [&]() -> coro::task { - std::cerr << "write task start s.size() == " << s.size() << "\n"; - co_await s.schedule(); + std::cerr << "write task start s.size() == " << s->size() << "\n"; + co_await s->schedule(); uint64_t value{42}; - std::cerr << "write task writing s.size() == " << s.size() << "\n"; + std::cerr << "write task writing s.size() == " << s->size() << "\n"; auto unused = write(trigger_fd, &value, sizeof(value)); (void)unused; - std::cerr << "write task exiting s.size() == " << s.size() << "\n"; + std::cerr << "write task exiting s.size() == " << s->size() << "\n"; co_return; }; @@ -671,43 +657,43 @@ TEST_CASE("io_scheduler manual process events thread pool", "[io_scheduler]") write_task.resume(); - while (s.process_events(100ms) > 0) + while (s->process_events(100ms) > 0) ; - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); close(trigger_fd); } TEST_CASE("io_scheduler manual process events inline", "[io_scheduler]") { - auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - coro::io_scheduler s{coro::io_scheduler::options{ - .thread_strategy = coro::io_scheduler::thread_strategy_t::manual, - .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_inline}}; + auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + auto s = coro::io_scheduler::make_shared(coro::io_scheduler::options{ + .thread_strategy = coro::io_scheduler::thread_strategy_t::manual, + .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_inline}); auto make_poll_read_task = [&]() -> coro::task { - std::cerr << "poll task start s.size() == " << s.size() << "\n"; - co_await s.schedule(); - std::cerr << "poll task polling s.size() == " << s.size() << "\n"; - auto status = co_await s.poll(trigger_fd, coro::poll_op::read); + std::cerr << "poll task start s.size() == " << s->size() << "\n"; + co_await s->schedule(); + std::cerr << "poll task polling s.size() == " << s->size() << "\n"; + auto status = co_await s->poll(trigger_fd, coro::poll_op::read); REQUIRE(status == coro::poll_status::event); - std::cerr << "poll task exiting s.size() == " << s.size() << "\n"; + std::cerr << "poll task exiting s.size() == " << s->size() << "\n"; co_return; }; auto make_poll_write_task = [&]() -> coro::task { - std::cerr << "write task start s.size() == " << s.size() << "\n"; - co_await s.schedule(); + std::cerr << "write task start s.size() == " << s->size() << "\n"; + co_await s->schedule(); uint64_t value{42}; - std::cerr << "write task writing s.size() == " << s.size() << "\n"; + std::cerr << "write task writing s.size() == " << s->size() << "\n"; auto unused = write(trigger_fd, &value, sizeof(value)); (void)unused; - std::cerr << "write task exiting s.size() == " << s.size() << "\n"; + std::cerr << "write task exiting s.size() == " << s->size() << "\n"; co_return; }; @@ -721,7 +707,7 @@ TEST_CASE("io_scheduler manual process events inline", "[io_scheduler]") // Now process them to completion. while (true) { - auto remaining = s.process_events(100ms); + auto remaining = s->process_events(100ms); std::cerr << "remaining " << remaining << "\n"; if (remaining == 0) { @@ -729,20 +715,21 @@ TEST_CASE("io_scheduler manual process events inline", "[io_scheduler]") } }; - std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; - s.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; - REQUIRE(s.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << s->size() << "\n"; + s->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s->size() << "\n"; + REQUIRE(s->empty()); close(trigger_fd); } TEST_CASE("io_scheduler task throws", "[io_scheduler]") { - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto func = [&]() -> coro::task { - co_await s.schedule(); + co_await s->schedule(); throw std::runtime_error{"I always throw."}; co_return 42; }; @@ -752,13 +739,14 @@ TEST_CASE("io_scheduler task throws", "[io_scheduler]") TEST_CASE("io_scheduler task throws after resume", "[io_scheduler]") { - coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto s = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_thrower = [&]() -> coro::task { - co_await s.schedule(); + co_await s->schedule(); std::cerr << "Throwing task is doing some work...\n"; - co_await s.yield(); + co_await s->yield(); throw std::runtime_error{"I always throw."}; co_return true; }; diff --git a/test/test_shared_mutex.cpp b/test/test_shared_mutex.cpp index af2167a0..8f9b64be 100644 --- a/test/test_shared_mutex.cpp +++ b/test/test_shared_mutex.cpp @@ -83,7 +83,7 @@ TEST_CASE("mutex single waiter not locked shared", "[shared_mutex]") #ifdef LIBCORO_FEATURE_NETWORKING TEST_CASE("mutex many shared and exclusive waiters interleaved", "[shared_mutex]") { - auto tp = std::make_shared( + auto tp = coro::io_scheduler::make_shared( coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 8}}); coro::shared_mutex m{tp}; diff --git a/test/test_thread_pool.cpp b/test/test_thread_pool.cpp index 434a0902..d0ebf2b1 100644 --- a/test/test_thread_pool.cpp +++ b/test/test_thread_pool.cpp @@ -199,4 +199,36 @@ TEST_CASE("thread_pool event jump threads", "[thread_pool]") }; coro::sync_wait(coro::when_all(make_tp1_task(), make_tp2_task())); +} + +TEST_CASE("thread_pool high cpu usage when threadcount is greater than the number of tasks", "[thread_pool]") +{ + // https://github.com/jbaldwin/libcoro/issues/262 + // This test doesn't really trigger any error conditions but was reported via + // an issue that the thread_pool threads not doing work would spin on the CPU + // if there were less tasks running than threads in the pool. + // This was due to using m_size instead of m_queue.size() causing the threads + // that had no work to go into a spin trying to acquire work. + + auto sleep_for_task = [](std::chrono::seconds duration) -> coro::task + { + std::this_thread::sleep_for(duration); + co_return duration.count(); + }; + + auto wait_for_task = [&](coro::thread_pool& pool, std::chrono::seconds delay) -> coro::task<> + { + co_await pool.schedule(); + for (int i = 0; i < 5; ++i) + { + co_await sleep_for_task(delay); + std::cout << std::chrono::system_clock::now().time_since_epoch().count() << " wait for " << delay.count() + << "seconds\n"; + } + co_return; + }; + + coro::thread_pool pool{coro::thread_pool::options{.thread_count = 3}}; + coro::sync_wait( + coro::when_all(wait_for_task(pool, std::chrono::seconds{1}), wait_for_task(pool, std::chrono::seconds{3}))); } \ No newline at end of file