From 3ff8d50dfd1be6beb143d329a126caa2cb5ed2e2 Mon Sep 17 00:00:00 2001 From: David-Haim Date: Tue, 28 Nov 2017 09:02:41 +0200 Subject: [PATCH] * various fixes and improvements * added when_any and when_all * threadpool improved --- concurrencpp.h | 458 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 338 insertions(+), 120 deletions(-) diff --git a/concurrencpp.h b/concurrencpp.h index f1a1499..f28563e 100644 --- a/concurrencpp.h +++ b/concurrencpp.h @@ -1,23 +1,23 @@ /* - Copyright (c) 2017 David Haim - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in all - copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - SOFTWARE. +Copyright (c) 2017 David Haim + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. */ #ifndef CONCURRENCPP_H @@ -38,7 +38,7 @@ #include #include -namespace concurrencpp { +namespace concurrencpp { namespace details { class spinlock { @@ -191,8 +191,8 @@ namespace concurrencpp { m_head = m_head->next; --m_block_count; - assert(m_block_count == 0 ? - (m_head == nullptr) : + assert(m_block_count == 0 ? + (m_head == nullptr) : (m_head != nullptr)); return block; @@ -324,7 +324,7 @@ namespace concurrencpp { if (block == nullptr) { throw std::bad_alloc(); } - + return block; } @@ -428,6 +428,8 @@ namespace concurrencpp { struct callback_base { virtual ~callback_base() = default; virtual void execute() = 0; + + std::unique_ptr next; }; template @@ -441,7 +443,7 @@ namespace concurrencpp { callback(function_type&& function) : m_function(std::forward(function)) {} - virtual void execute() override final { + virtual void execute() override final { m_function(); } @@ -449,7 +451,7 @@ namespace concurrencpp { return memory_pool::allocate(size); } - static void operator delete(void* block){ + static void operator delete(void* block) { memory_pool::deallocate(block, sizeof(callback)); } @@ -461,11 +463,52 @@ namespace concurrencpp { new callback(std::forward(function))); } + class work_queue { + + private: + std::unique_ptr m_head; + callback_base* m_tail; + + public: + + void push(decltype(m_head) task) noexcept { + if (m_head == nullptr) { + m_tail = task.get(); + m_head = std::move(task); + } + else { + m_tail->next = std::move(task); + m_tail = m_tail->next.get(); + } + } + + auto try_pop() noexcept { + if (m_head.get() == nullptr) { + return decltype(m_head){}; + } + + auto& next = m_head->next; + auto task = std::move(m_head); + m_head = std::move(next); + + if (m_head == nullptr) { + m_tail = nullptr; + } + + return task; + } + + bool empty() const noexcept { + return !static_cast(m_head); + } + + }; + class worker_thread { private: spinlock m_lock; - std::queue> m_tasks; + work_queue m_tasks; std::vector& m_thread_group; std::atomic_bool& m_stop; std::condition_variable_any m_condition; @@ -488,10 +531,11 @@ namespace concurrencpp { { std::lock_guard lock(m_lock); - if (!m_tasks.empty()) { + /*if (!m_tasks.empty()) { task = std::move(m_tasks.front()); m_tasks.pop(); - } + }*/ + task = m_tasks.try_pop(); } if (task) { @@ -523,9 +567,7 @@ namespace concurrencpp { std::unique_lock lock(m_lock, std::try_to_lock); if (lock.owns_lock() && !m_tasks.empty()) { - auto task = std::move(m_tasks.front()); - m_tasks.pop(); - return task; + return m_tasks.try_pop(); } return{}; @@ -548,7 +590,7 @@ namespace concurrencpp { template void enqueue_task(bool self, function_type&& function) { - enqueue_task(make_callback(std::forward(function))); + enqueue_task(make_callback(std::forward(function))); } void enqueue_task(bool self, std::unique_ptr task) { @@ -586,7 +628,6 @@ namespace concurrencpp { thread_pool(const size_t number_of_workers) : m_stop(false) { - std::lock_guard construction_lock(m_pool_lock); m_workers.reserve(number_of_workers); for (auto i = 0ul; i < number_of_workers; i++) { @@ -634,7 +675,7 @@ namespace concurrencpp { void enqueue_task(function_type&& function, arguments&& ... args) { enqueue_task(std::bind( std::forward(function), - std::forward(args)...)); + std::forward(args)...)); } template @@ -660,6 +701,12 @@ namespace concurrencpp { return default_thread_pool; } + static thread_pool& blocking_tasks_instance() { + static thread_pool blocking_tasks_thread_pool( + static_cast(std::thread::hardware_concurrency() * 2)); + return blocking_tasks_thread_pool; + } + }; enum class future_result_state { @@ -692,7 +739,6 @@ namespace concurrencpp { protected: //members are ordered in the order of their importance. mutable recursive_spinlock m_lock; - //mutable std::recursive_mutex m_lock; std::unique_ptr m_then; future_result_state m_state; std::unique_ptr m_deffered; @@ -742,7 +788,7 @@ namespace concurrencpp { return ::std::future_status::deferred; } - if (m_state != future_result_state::NOT_READY) { + if (m_state != future_result_state::NOT_READY) { return ::std::future_status::ready; } @@ -757,9 +803,9 @@ namespace concurrencpp { void wait() { /* - According to the standard, only non-timed wait on the future - will cause the deffered-function to be launched. this is why - this segment is not in wait_for implementation. + According to the standard, only non-timed wait on the future + will cause the deffered-function to be launched. this is why + this segment is not in wait_for implementation. */ if (m_deffered) { m_deffered->execute(); @@ -767,7 +813,7 @@ namespace concurrencpp { assert(m_state != future_result_state::NOT_READY); return; } - + while (wait_for(std::chrono::hours(365 * 24)) == std::future_status::timeout); } @@ -791,6 +837,39 @@ namespace concurrencpp { return static_cast(m_deffered); } + template + void wrap_continuation(function_type&& callback) noexcept { + std::unique_lock lock(m_lock); + + if (m_state != future_result_state::NOT_READY) { + //nothing to wrap, just call the callback + callback(); + return; + } + + std::unique_ptr new_then; + auto then = std::move(m_then); + + if (static_cast(then)) { + new_then = make_callback([ + then = std::move(then), + callback = std::forward(callback)]() mutable{ + callback(); + then->execute(); + }); + } + else { + new_then = make_callback([ + callback = std::forward(callback)]() mutable{ + callback(); + }); + } + + assert(static_cast(new_then)); + assert(!static_cast(m_then)); + m_then = std::move(new_then); + } + bool is_ready() const noexcept { std::unique_lock lock(m_lock); return m_state != future_result_state::NOT_READY; @@ -805,13 +884,13 @@ namespace concurrencpp { if (static_cast(m_then)) { m_then->execute(); m_then.reset(); - } + } } - + void set_deffered_task(std::unique_ptr task) { /* - this function should only be called once, - by using async + launch::deffered + this function should only be called once, + by using async + launch::deffered */ std::unique_lock lock(m_lock); @@ -820,6 +899,10 @@ namespace concurrencpp { m_deffered = std::move(task); } + + bool has_continuation() const noexcept { + return static_cast(m_then); + } }; template @@ -836,15 +919,15 @@ namespace concurrencpp { ~future_associated_state() noexcept { switch (m_state) { - case future_result_state::RESULT: { - m_result.result.~T(); - return; - } + case future_result_state::RESULT: { + m_result.result.~T(); + return; + } - case future_result_state::EXCEPTION: { - m_result.exception.~exception_ptr(); - return; - } + case future_result_state::EXCEPTION: { + m_result.exception.~exception_ptr(); + return; + } } } @@ -875,7 +958,7 @@ namespace concurrencpp { T result_or_exception() { std::unique_lock lock(m_lock); - return result_or_exception_unlocked(); + return result_or_exception_unlocked(); } T result_or_exception_unlocked() { @@ -892,9 +975,9 @@ namespace concurrencpp { wait(); return result_or_exception_unlocked(); } - + template - static std::unique_ptr + static std::unique_ptr make_future_callable(std::shared_ptr> _this, function_type&& function) { return make_callback([_this = std::move(_this), _function = std::forward(function)]() mutable{ try { @@ -955,7 +1038,7 @@ namespace concurrencpp { } template - static std::unique_ptr + static std::unique_ptr make_future_callable(std::shared_ptr> _this, function_type&& function) { return make_callback([_this = std::move(_this), _function = std::forward(function)]() mutable{ try { @@ -1034,7 +1117,7 @@ namespace concurrencpp { } template - static std::unique_ptr + static std::unique_ptr make_future_callable(std::shared_ptr> _this, function_type&& function) { return make_callback([_this = std::move(_this), _function = std::forward(function)]() mutable{ try { @@ -1061,7 +1144,7 @@ namespace concurrencpp { promise_base() noexcept : m_fulfilled(false), m_future_retreived(false) {} promise_base(promise_base&& rhs) noexcept = default; promise_base& operator = (promise_base&& rhs) noexcept = default; - + }; template @@ -1069,17 +1152,17 @@ namespace concurrencpp { return state_holder.m_state; } - template + template struct async_impl { template inline static future do_async(F&& task) { pool_allocator> allocator; - auto future_state = + auto future_state = std::allocate_shared>(allocator); - auto future_task = + auto future_task = future_associated_state::make_future_callable(future_state, std::forward(task)); - + scheduler_type::schedule(std::move(future_task), future_state.get()); return future_state; } @@ -1091,24 +1174,24 @@ namespace concurrencpp { template static inline future do_async(F&& task) { pool_allocator> allocator; - auto future_state = + auto future_state = std::allocate_shared>(allocator); auto bridge_task = make_callback([ future_state = future_state, - task = std::forward(task)]() mutable { - auto future = task(); - future.then([future_state = std::move(future_state)](auto done_future){ - try { - future_state->set_result(done_future.get()); - } - catch (...) { - future_state->set_exception(std::current_exception()); - } - }); - }); + task = std::forward(task)]() mutable { + auto future = task(); + future.then([future_state = std::move(future_state)](auto done_future){ + try { + future_state->set_result(done_future.get()); + } + catch (...) { + future_state->set_exception(std::current_exception()); + } + }); + }); - scheduler_type::schedule(std::move(bridge_task), future_state.get()); + scheduler_type::schedule(std::move(bridge_task), future_state.get()); return future_state; } }; @@ -1117,7 +1200,7 @@ namespace concurrencpp { struct async_impl, scheduler_type> { template - static inline future do_async(F&& task) { + static inline future do_async(F&& task) { pool_allocator> allocator; auto future_state = std::allocate_shared>(allocator); @@ -1125,17 +1208,17 @@ namespace concurrencpp { auto bridge_task = make_callback([ future_state = future_state, task = std::forward(task)]() mutable -> void{ - auto future = task(); - future.then([future_state = std::move(future_state)](auto done_future){ - try { - done_future.get(); - future_state->set_result(); - } - catch (...) { - future_state->set_exception(std::current_exception()); - } + auto future = task(); + future.then([future_state = std::move(future_state)](auto done_future){ + try { + done_future.get(); + future_state->set_result(); + } + catch (...) { + future_state->set_exception(std::current_exception()); + } + }); }); - }); scheduler_type::schedule(std::move(bridge_task), future_state.get()); return future_state; @@ -1271,15 +1354,15 @@ namespace concurrencpp { throw_if_empty(); using return_type = typename std::result_of_t)>; - + details::pool_allocator> allocator; - auto new_associated_state = + auto new_associated_state = std::allocate_shared>(allocator); auto task = details::future_associated_state:: - make_future_callable(new_associated_state, - [state = m_state, - continuation = std::forward(continuation)]{ + make_future_callable(new_associated_state, + [state = m_state, + continuation = std::forward(continuation)]{ return continuation(future(state)); }); @@ -1294,12 +1377,23 @@ namespace concurrencpp { future ready_future; details::pool_allocator> allocator; auto& future_inner_state = details::get_inner_state(ready_future); - future_inner_state = + future_inner_state = std::allocate_shared>(allocator); future_inner_state->set_result(std::forward(result)); return ready_future; } + template + future make_ready_future() { + future ready_future; + details::pool_allocator> allocator; + auto& future_inner_state = details::get_inner_state(ready_future); + future_inner_state = + std::allocate_shared>(allocator); + future_inner_state->set_result(); + return ready_future; + } + template future make_exceptional_future(std::exception_ptr exception_pointer) { future ready_future; @@ -1339,8 +1433,8 @@ namespace concurrencpp { promise& operator = (promise&& rhs) noexcept = default; ~promise() noexcept { - if (static_cast(m_state) && - !m_fulfilled && + if (static_cast(m_state) && + !m_fulfilled && !m_state->has_deffered_task()) { m_state->set_exception(std::future_error(std::future_errc::broken_promise)); } @@ -1353,7 +1447,7 @@ namespace concurrencpp { if (!static_cast(m_state)) { details::pool_allocator> allocator; - m_state = + m_state = std::allocate_shared>(allocator); } @@ -1459,23 +1553,23 @@ namespace concurrencpp { }; template - auto async(launch launch_policy, F&& f, Args&&... args){ + auto async(launch launch_policy, F&& f, Args&&... args) { using function_type = typename std::decay_t; using result = typename std::result_of_t; switch (launch_policy) { - case launch::task: { - return details::async_impl::do_async(std::bind(std::forward(f), std::forward(args)...)); - } + case launch::task: { + return details::async_impl::do_async(std::bind(std::forward(f), std::forward(args)...)); + } - case launch::deferred: { - return details::async_impl::do_async(std::bind(std::forward(f), std::forward(args)...)); - } + case launch::deferred: { + return details::async_impl::do_async(std::bind(std::forward(f), std::forward(args)...)); + } - case launch::async: { - return details::async_impl::do_async(std::bind(std::forward(f), std::forward(args)...)); - } + case launch::async: { + return details::async_impl::do_async(std::bind(std::forward(f), std::forward(args)...)); + } } @@ -1485,24 +1579,24 @@ namespace concurrencpp { std::bind(std::forward(f), std::forward(args)...))){}; } - template + template auto async(launch launch_policy, F&& f) { using function_type = typename std::decay_t; using result = typename std::result_of_t; switch (launch_policy) { - case launch::task: { - return details::async_impl::do_async(std::forward(f)); - } + case launch::task: { + return details::async_impl::do_async(std::forward(f)); + } - case launch::deferred: { - return details::async_impl::do_async(std::forward(f)); - } + case launch::deferred: { + return details::async_impl::do_async(std::forward(f)); + } - case launch::async: { - return details::async_impl::do_async(std::forward(f)); - } + case launch::async: { + return details::async_impl::do_async(std::forward(f)); + } } @@ -1512,6 +1606,16 @@ namespace concurrencpp { details::thread_pool_scheduler>::do_async(std::forward(f))){}; } + template + auto async(F&& f, Args&&... args) { + return async(launch::task, std::forward(f), std::forward(args)...); + } + + template + auto async(F&& f) { + return async(launch::task, std::forward(f)); + } + template bool await_ready(const future& future) noexcept { auto& inner_future_state = details::get_inner_state(future); @@ -1531,13 +1635,127 @@ namespace concurrencpp { } } +namespace concurrencpp { + + namespace details { + + class when_all_state { + + private: + concurrencpp::promise m_promise; + std::atomic_size_t m_counter; + + public: + when_all_state(size_t counter) noexcept : m_counter(counter){} + + void on_task_finished() { + auto new_count = m_counter.fetch_sub(1, std::memory_order_acq_rel); + if (new_count == 0) { + m_promise.set_value(); + } + } + + future future() { + return m_promise.get_future(); + } + }; + + inline void when_all_once(const std::shared_ptr& state) noexcept {} + + template + void when_all_once(std::shared_ptr state, ::concurrencpp::future& future, types&& ... future_types) { + if (!future.valid()) { + throw std::future_error(std::future_errc::no_state); + } + + auto& inner_state = get_inner_state(future); + inner_state->wrap_continuation([state = state] { + state->on_task_finished(); + }); + + when_all_once(std::move(state), std::forward(future_types)...); + } + + template + future when_all_wrapper(future_types&& ... futures) { + pool_allocator allocator; + auto state = std::allocate_shared(allocator); + when_all_once(state, std::forward(futures)...); + return state->future(); + } + + struct when_any_state { + ::std::atomic_bool fulfilled; + ::concurrencpp::promise promise; + + when_any_state() noexcept : fulfilled(false) {} + + void on_task_finished(size_t index) { + const auto _fulfilled = std::atomic_exchange_explicit( + &fulfilled, + true, + std::memory_order_acquire); + + if (_fulfilled == false) { //this is the first finished task + promise.set_value(index); + } + } + }; + + inline void when_any_once(const std::shared_ptr& state, size_t task_index) noexcept {} + + template + void when_any_once( + std::shared_ptr state, + size_t task_index, + ::concurrencpp::future& future, + types&& ... future_types) { + + if (!future.valid()) { + throw std::future_error(std::future_errc::no_state); + } + + if (future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { + state->on_task_finished(task_index); + return; + } + + auto& inner_state = get_inner_state(future); + inner_state->wrap_continuation([state = state, task_index] { + state->on_task_finished(task_index); + }); + + when_any_once(std::move(state), task_index + 1, std::forward(future_types)...); + } + + template + future when_any_wrapper(types&& ... future_types) { + pool_allocator allocator; + auto state = std::allocate_shared(allocator); + details::when_any_once(state, 0, std::forward(future_types)...); + return state->promise.get_future(); + } + } + + template + future when_all(future_types&& ... futures) { + return details::when_all_wrapper(std::forward(futures)...); + } + + template + future when_any(future_types&& ... futures) { + return details::when_any_wrapper(std::forward(futures)...); + } + +} + namespace std { namespace experimental { template struct coroutine_traits<::concurrencpp::future, args...> { - struct promise_type : + struct promise_type : public ::concurrencpp::details::promise_type_base { template @@ -1550,9 +1768,9 @@ namespace std { template struct coroutine_traits<::concurrencpp::future, args...> { - - struct promise_type : - public ::concurrencpp::details::promise_type_base{ + + struct promise_type : + public ::concurrencpp::details::promise_type_base { void return_void() { m_future_state->set_result(); @@ -1581,7 +1799,7 @@ namespace std { template void set_exception(exception_type&& exception) noexcept {} - + void* operator new (const size_t size) { return ::concurrencpp::details::memory_pool::allocate(size); }