diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread_pool/pool/SlotThreadPool.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread_pool/pool/SlotThreadPool.hpp index fa15909e4..fc271ff6e 100644 --- a/ddsrouter_utils/include/ddsrouter_utils/thread_pool/pool/SlotThreadPool.hpp +++ b/ddsrouter_utils/include/ddsrouter_utils/thread_pool/pool/SlotThreadPool.hpp @@ -29,12 +29,16 @@ #include #include #include +#include #include namespace eprosima { namespace ddsrouter { namespace utils { +// Declarations to make code less verbose +using SlotsMapType = SharedAtomicable>; + /** * This class represents a thread pool that can register tasks inside. * @@ -151,12 +155,9 @@ class SlotThreadPool /** * @brief Map of tasks indexed by their task Id. * - * This object is protected by the \c slots_mutex_ mutex. + * This object is protected by itself (atomicable). */ - std::map slots_; - - //! Protects access to \c slots_ . - std::mutex slots_mutex_; + SlotsMapType slots_; //! Whether the object is currently enabled std::atomic enabled_; diff --git a/ddsrouter_utils/src/cpp/thread_pool/pool/SlotThreadPool.cpp b/ddsrouter_utils/src/cpp/thread_pool/pool/SlotThreadPool.cpp index ff5ea97d0..17ea1bc83 100644 --- a/ddsrouter_utils/src/cpp/thread_pool/pool/SlotThreadPool.cpp +++ b/ddsrouter_utils/src/cpp/thread_pool/pool/SlotThreadPool.cpp @@ -81,27 +81,27 @@ void SlotThreadPool::disable() noexcept void SlotThreadPool::emit( const TaskId& task_id) { - // Lock to access the slot map - std::lock_guard lock(slots_mutex_); - - auto it = slots_.find(task_id); - - if (it == slots_.end()) - { - throw utils::ValueNotAllowedException(STR_ENTRY << "Slot " << task_id << " not registered."); - } - else - { - task_queue_.produce(it->first); - } + // TODO check if we want this check or we trust the user of this class + // { + // // Lock to access the slot map while searching the task exists + // std::shared_lock lock(slots_); + + // auto it = slots_.find(task_id); + // if (it == slots_.end()) + // { + // throw utils::ValueNotAllowedException(STR_ENTRY << "Slot " << task_id << " not registered."); + // } + // } + + task_queue_.produce(task_id); } void SlotThreadPool::slot( const TaskId& task_id, Task&& task) { - // Lock to access the slot map - std::lock_guard lock(slots_mutex_); + // Lock to access the slot map to modify it + std::unique_lock lock(slots_); auto it = slots_.find(task_id); @@ -126,22 +126,24 @@ void SlotThreadPool::thread_routine_() logDebug(DDSROUTER_THREAD_POOL, "Thread: " << std::this_thread::get_id() << " free, getting new callback."); TaskId task_id = task_queue_.consume(); - // Lock to access the slot map - slots_mutex_.lock(); - - auto it = slots_.find(task_id); - // Check the slot is correct - if (it == slots_.end()) { - utils::tsnh(STR_ENTRY << "Slot in Queue must be stored in slots register"); + // Lock to access the slot map while executing task + // NOTE: this can end before calling task if the task object is copied, but if it is used with + // a reference, it cannot unlock before calling method + std::shared_lock lock(slots_); + + auto it = slots_.find(task_id); + // Check the slot is correct + if (it == slots_.end()) + { + throw utils::ValueNotAllowedException(STR_ENTRY << "Slot " << task_id << " in queue not registered."); + } + + Task& task = it->second; + + logDebug(DDSROUTER_THREAD_POOL, "Thread: " << std::this_thread::get_id() << " executing callback."); + task(); } - - Task& task = it->second; - - slots_mutex_.unlock(); - - logDebug(DDSROUTER_THREAD_POOL, "Thread: " << std::this_thread::get_id() << " executing callback."); - task(); } } catch (const utils::DisabledException& e)