diff --git a/quantum/impl/quantum_coroutine_pool_allocator_impl.h b/quantum/impl/quantum_coroutine_pool_allocator_impl.h index a572454..a69159d 100644 --- a/quantum/impl/quantum_coroutine_pool_allocator_impl.h +++ b/quantum/impl/quantum_coroutine_pool_allocator_impl.h @@ -219,7 +219,7 @@ void CoroutinePoolAllocator::deallocate(const boost::context::stac { SpinLock::Guard lock(_spinlock); --_numHeapAllocatedBlocks; - assert(_numHeapAllocatedBlocks >= 0); + assert(_numHeapAllocatedBlocks != (size_t)-1); } if (deallocateCoroutine(stackEnd(ctx)) != 0) { diff --git a/quantum/impl/quantum_io_queue_impl.h b/quantum/impl/quantum_io_queue_impl.h index 0147b6f..327c3de 100644 --- a/quantum/impl/quantum_io_queue_impl.h +++ b/quantum/impl/quantum_io_queue_impl.h @@ -70,6 +70,23 @@ IoQueue::IoQueue(const IoQueue& other) : } } +inline +IoQueue::IoQueue(IoQueue&& other) noexcept: + _sharedIoQueues(other._sharedIoQueues), + _loadBalanceSharedIoQueues(other._loadBalanceSharedIoQueues), + _loadBalancePollIntervalMs(other._loadBalancePollIntervalMs), + _loadBalancePollIntervalBackoffPolicy(other._loadBalancePollIntervalBackoffPolicy), + _loadBalancePollIntervalNumBackoffs(other._loadBalancePollIntervalNumBackoffs), + _loadBalanceBackoffNum(other._loadBalanceBackoffNum), + _thread(std::move(other._thread)), + _queue(std::move(other._queue)), + _isEmpty(other._isEmpty.load()), + _isInterrupted(other._isInterrupted.load()), + _isIdle(other._isIdle.load()), + _terminated(other._terminated.load()) +{ +} + inline IoQueue::~IoQueue() { diff --git a/quantum/impl/quantum_io_task_impl.h b/quantum/impl/quantum_io_task_impl.h index dfe1e0c..b11b037 100644 --- a/quantum/impl/quantum_io_task_impl.h +++ b/quantum/impl/quantum_io_task_impl.h @@ -63,6 +63,30 @@ IoTask::IoTask(std::false_type, { } +inline +IoTask::IoTask(IoTask&& other) noexcept : + _func(std::move(other._func)), + _terminated(other._terminated.load()), + _queueId(other._queueId), + _isHighPriority(other._isHighPriority), + _taskId(ThreadContextTag{}) +{ +} + +inline +IoTask& IoTask::operator=(IoTask&& other) noexcept +{ + if (this != &other) { + _func = std::move(other._func); + _terminated.store(other._terminated.load()); + _queueId = other._queueId; + _isHighPriority = other._isHighPriority; + _taskId = other._taskId; + _localStorage = std::move(other._localStorage); + } + return *this; +} + inline IoTask::~IoTask() { diff --git a/quantum/impl/quantum_spinlock_impl.h b/quantum/impl/quantum_spinlock_impl.h index e226fd8..34fd717 100644 --- a/quantum/impl/quantum_spinlock_impl.h +++ b/quantum/impl/quantum_spinlock_impl.h @@ -28,19 +28,6 @@ namespace quantum { //============================================================================== // SpinLock //============================================================================== -inline -SpinLock::SpinLock(SpinLock&& o) : _flag(o._flag.load()) { } - -inline -SpinLock& SpinLock::operator=(SpinLock&& o) -{ - if(this != &o) - { - _flag.store(o._flag.load()); - } - return *this; -} - inline void SpinLock::lock() { diff --git a/quantum/impl/quantum_task_impl.h b/quantum/impl/quantum_task_impl.h index 2c79275..7d0224d 100644 --- a/quantum/impl/quantum_task_impl.h +++ b/quantum/impl/quantum_task_impl.h @@ -72,6 +72,38 @@ Task::Task(std::true_type, _suspendedState((int)State::Suspended) {} +inline +Task::Task(Task&& other) noexcept : + _coroContext(std::move(other._coroContext)), + _coro(std::move(other._coro)), + _isHighPriority(other._isHighPriority), + _next(std::move(other._next)), + _prev(std::move(other._prev)), + _type(other._type), + _taskId(other._taskId), + _terminated(other._terminated.load()), + _suspendedState(other._suspendedState.load()), + _localStorage(std::move(other._localStorage)) +{} + +inline +Task& Task::operator=(Task&& other) noexcept +{ + if (this != &other) { + _coroContext = std::move(other._coroContext); + _coro = std::move(other._coro); + _isHighPriority = other._isHighPriority; + _next = std::move(other._next); + _prev = std::move(other._prev); + _type = other._type; + _taskId = other._taskId; + _terminated.store(other._terminated.load()); + _suspendedState.store(other._suspendedState.load()); + _localStorage = std::move(other._localStorage); + } + return *this; +} + inline Task::~Task() { diff --git a/quantum/impl/quantum_task_queue_impl.h b/quantum/impl/quantum_task_queue_impl.h index 3d1ae2b..38ade73 100644 --- a/quantum/impl/quantum_task_queue_impl.h +++ b/quantum/impl/quantum_task_queue_impl.h @@ -25,11 +25,11 @@ namespace Bloomberg { namespace quantum { inline -TaskQueue::WorkItem::WorkItem(TaskPtr task, +TaskQueue::WorkItem::WorkItem(ITaskPtr task, TaskListIter iter, bool isBlocked, unsigned int blockedQueueRound) : - _task(task), + _task(std::move(task)), _iter(iter), _isBlocked(isBlocked), _blockedQueueRound(blockedQueueRound) @@ -52,9 +52,8 @@ TaskQueue::TaskQueue() : inline TaskQueue::TaskQueue(const Configuration&, std::shared_ptr sharedQueue) : - _alloc(Allocator::instance(AllocatorTraits::queueListAllocSize())), - _runQueue(_alloc), - _waitQueue(_alloc), + _runQueue(Allocator::instance(AllocatorTraits::queueListAllocSize())), + _waitQueue(Allocator::instance(AllocatorTraits::queueListAllocSize())), _queueIt(_runQueue.end()), _blockedIt(_runQueue.end()), _isBlocked(false), @@ -64,7 +63,7 @@ TaskQueue::TaskQueue(const Configuration&, std::shared_ptr sharedQueu _isIdle(true), _terminated(false), _isAdvanced(false), - _sharedQueue(sharedQueue), + _sharedQueue(std::move(sharedQueue)), _queueRound(0), _lastSleptQueueRound(std::numeric_limits::max()), _lastSleptSharedQueueRound(std::numeric_limits::max()) @@ -80,7 +79,29 @@ inline TaskQueue::TaskQueue(const TaskQueue&) : TaskQueue() { +} +inline +TaskQueue::TaskQueue(TaskQueue&& other) noexcept: + _thread(std::move(other._thread)), + _runQueue(std::move(other._runQueue)), + _waitQueue(std::move(other._waitQueue)), + _queueIt(other._queueIt), + _blockedIt(_runQueue.end()), + _isBlocked(other._isBlocked), + _isEmpty(other._isEmpty.load()), + _isSharedQueueEmpty(other._isSharedQueueEmpty.load()), + _isInterrupted(other._isInterrupted.load()), + _isIdle(other._isIdle.load()), + _terminated(other._terminated.load()), + _isAdvanced(other._isAdvanced), + _stats(other._stats), + _sharedQueue(std::move(other._sharedQueue)), + _helpers(other._helpers), + _queueRound(other._queueRound), + _lastSleptQueueRound(other._lastSleptQueueRound), + _lastSleptSharedQueueRound(other._lastSleptSharedQueueRound) +{ } inline @@ -183,10 +204,10 @@ TaskQueue::ProcessTaskResult TaskQueue::processTask() //Process a task workItem = grabWorkItem(); - TaskPtr task = workItem._task; + ITaskPtr task = workItem._task; if (!task) { - return ProcessTaskResult(workItem._isBlocked, workItem._blockedQueueRound); + return { workItem._isBlocked, workItem._blockedQueueRound }; } int rc; @@ -231,7 +252,7 @@ TaskQueue::ProcessTaskResult TaskQueue::processTask() { handleException(workItem); } - return ProcessTaskResult(workItem._isBlocked, workItem._blockedQueueRound); + return {workItem._isBlocked, workItem._blockedQueueRound}; } inline @@ -453,7 +474,7 @@ bool TaskQueue::handleSuccess(const WorkItem& workItem) { ITaskContinuation::Ptr nextTask; //check if there's another task scheduled to run after this one - nextTask = workItem._task->getNextTask(); + nextTask = std::static_pointer_cast(workItem._task)->getNextTask(); if (nextTask && (nextTask->getType() == ITask::Type::ErrorHandler)) { //skip error handler since we don't have any errors @@ -473,7 +494,7 @@ bool TaskQueue::handleError(const WorkItem& workItem) { ITaskContinuation::Ptr nextTask; //Check if we have a final task to run - nextTask = workItem._task->getErrorHandlerOrFinalTask(); + nextTask = std::static_pointer_cast(workItem._task)->getErrorHandlerOrFinalTask(); //queue next task and de-queue current one enqueue(nextTask); doDequeue(_isIdle, workItem._iter); @@ -537,9 +558,9 @@ TaskQueue::grabWorkItem() _isIdle = _runQueue.empty(); if (_runQueue.empty()) { - return WorkItem(nullptr, _runQueue.end(), _isBlocked, _queueRound); + return {nullptr, _runQueue.end(), _isBlocked, _queueRound}; } - return WorkItem((*_queueIt), _queueIt, false, 0); + return {(*_queueIt), _queueIt, false, 0}; } inline diff --git a/quantum/interface/quantum_itask.h b/quantum/interface/quantum_itask.h index 0700902..047039d 100644 --- a/quantum/interface/quantum_itask.h +++ b/quantum/interface/quantum_itask.h @@ -53,8 +53,8 @@ struct ITask : public ITerminate Sleeping = (int)Running-5, ///< Coroutine is sleeping Max = (int)Running-10, ///< Value of the max reserved return code }; - - ~ITask() = default; + + ~ITask() override = default; virtual int run() = 0; diff --git a/quantum/quantum_io_queue.h b/quantum/quantum_io_queue.h index 1865513..80b5c97 100644 --- a/quantum/quantum_io_queue.h +++ b/quantum/quantum_io_queue.h @@ -40,7 +40,7 @@ namespace quantum { class IoQueue : public IQueue { public: - using TaskList = std::list; + using TaskList = std::list; using TaskListIter = TaskList::iterator; IoQueue(); @@ -48,11 +48,11 @@ class IoQueue : public IQueue IoQueue(const Configuration& config, std::vector* sharedIoQueues); - IoQueue(const IoQueue& other); + IoQueue(const IoQueue&); - IoQueue(IoQueue&& other) = default; + IoQueue(IoQueue&& other) noexcept; - ~IoQueue(); + ~IoQueue() override; void terminate() final; diff --git a/quantum/quantum_io_task.h b/quantum/quantum_io_task.h index dcb5eba..c80770f 100644 --- a/quantum/quantum_io_task.h +++ b/quantum/quantum_io_task.h @@ -52,13 +52,13 @@ class IoTask : public ITask bool isHighPriority, FUNC&& func, ARGS&&... args); + + IoTask(const IoTask&) = delete; + IoTask(IoTask&& other) noexcept; + IoTask& operator=(const IoTask&) = delete; + IoTask& operator=(IoTask&& other) noexcept; - IoTask(const IoTask& task) = delete; - IoTask(IoTask&& task) = default; - IoTask& operator=(const IoTask& task) = delete; - IoTask& operator=(IoTask&& task) = default; - - ~IoTask(); + ~IoTask() override; //ITerminate void terminate() final; @@ -70,7 +70,7 @@ class IoTask : public ITask Type getType() const final; TaskId getTaskId() const final; bool isBlocked() const final; - bool isSleeping(bool updateTimer = false) final; + bool isSleeping(bool updateTimer) final; bool isHighPriority() const final; bool isSuspended() const final; ITask::LocalStorage& getLocalStorage() final; diff --git a/quantum/quantum_read_write_mutex.h b/quantum/quantum_read_write_mutex.h index dda7882..f9935e0 100644 --- a/quantum/quantum_read_write_mutex.h +++ b/quantum/quantum_read_write_mutex.h @@ -31,16 +31,8 @@ class ReadWriteMutex /// @brief Copy constructor ReadWriteMutex(const ReadWriteMutex&) = delete; - - /// @brief Move constructor - ReadWriteMutex(ReadWriteMutex&&) = default; - - /// @brief Copy assignment operator ReadWriteMutex& operator=(const ReadWriteMutex&) = delete; - /// @brief Move assignment operator - ReadWriteMutex& operator=(ReadWriteMutex&&) = default; - /// @brief Lock this object as a reader (shared with other readers) /// @details The current context will be yielded until the lock is acquired. /// @note From a non-coroutine context, call the first. From a coroutine context, diff --git a/quantum/quantum_read_write_spinlock.h b/quantum/quantum_read_write_spinlock.h index 06b2a0a..3aa7ff6 100644 --- a/quantum/quantum_read_write_spinlock.h +++ b/quantum/quantum_read_write_spinlock.h @@ -31,16 +31,8 @@ class ReadWriteSpinLock /// @brief Copy constructor. ReadWriteSpinLock(const ReadWriteSpinLock&) = delete; - - /// @brief Move constructor. - ReadWriteSpinLock(ReadWriteSpinLock&&) = default; - - /// @brief Copy assignment operator. ReadWriteSpinLock& operator=(const ReadWriteSpinLock&) = delete; - /// @brief Move assignment operator. - ReadWriteSpinLock& operator=(ReadWriteSpinLock&&) = default; - /// @brief Lock this object as a reader (shared with other readers) void lockRead(); diff --git a/quantum/quantum_spinlock.h b/quantum/quantum_spinlock.h index 6955ff5..93a5a7c 100644 --- a/quantum/quantum_spinlock.h +++ b/quantum/quantum_spinlock.h @@ -37,16 +37,8 @@ class SpinLock /// @brief Copy constructor. SpinLock(const SpinLock&) = delete; - - /// @brief Move constructor. - SpinLock(SpinLock&&); - - /// @brief Copy assignment operator. SpinLock& operator=(const SpinLock&) = delete; - /// @brief Move assignment operator. - SpinLock& operator=(SpinLock&&); - /// @brief Locks this object. /// @note Blocks the current thread until the lock is acquired. Blocking is achieved /// via a busy loop and the thread is not re-scheduled by the operating system. diff --git a/quantum/quantum_stack_allocator.h b/quantum/quantum_stack_allocator.h index 7ab3b7b..ffdba3d 100644 --- a/quantum/quantum_stack_allocator.h +++ b/quantum/quantum_stack_allocator.h @@ -63,7 +63,7 @@ struct StackAllocator : public ContiguousPoolManager StackAllocator(const this_type&) : StackAllocator() {} - StackAllocator(this_type&&) : StackAllocator() + StackAllocator(this_type&&) noexcept : StackAllocator() {} StackAllocator& operator=(const this_type&) = delete; StackAllocator& operator=(this_type&&) = delete; diff --git a/quantum/quantum_task.h b/quantum/quantum_task.h index ef23f3f..c9c32b2 100644 --- a/quantum/quantum_task.h +++ b/quantum/quantum_task.h @@ -64,12 +64,12 @@ class Task : public ITaskContinuation, FUNC&& func, ARGS&&... args); - Task(const Task& task) = delete; - Task(Task&& task) = default; - Task& operator=(const Task& task) = delete; - Task& operator=(Task&& task) = default; + Task(const Task&) = delete; + Task(Task&& other) noexcept; + Task& operator=(const Task&) = delete; + Task& operator=(Task&& other) noexcept; - ~Task(); + ~Task() override; //ITerminate void terminate() final; @@ -81,7 +81,7 @@ class Task : public ITaskContinuation, Type getType() const final; TaskId getTaskId() const final; bool isBlocked() const final; - bool isSleeping(bool updateTimer = false) final; + bool isSleeping(bool updateTimer) final; bool isHighPriority() const final; bool isSuspended() const final; @@ -110,29 +110,29 @@ class Task : public ITaskContinuation, private: struct SuspensionGuard { - SuspensionGuard(std::atomic_int& suspendedState) : + explicit SuspensionGuard(std::atomic_int& suspendedState) : _isLocked(false), _suspendedState(suspendedState) { int suspended = (int)State::Suspended; _isLocked = _suspendedState.compare_exchange_strong(suspended, (int)State::Running, - std::memory_order::memory_order_acq_rel); + std::memory_order_acq_rel); } ~SuspensionGuard() { if (_isLocked) { - _suspendedState.store((int)State::Suspended, std::memory_order::memory_order_acq_rel); + _suspendedState.store((int)State::Suspended, std::memory_order_acq_rel); } } void set(int newState) { - _suspendedState.store(newState, std::memory_order::memory_order_acq_rel); + _suspendedState.store(newState, std::memory_order_acq_rel); _isLocked = false; } - operator bool() const + explicit operator bool() const { return _isLocked; } diff --git a/quantum/quantum_task_queue.h b/quantum/quantum_task_queue.h index 720cb58..5e11c97 100644 --- a/quantum/quantum_task_queue.h +++ b/quantum/quantum_task_queue.h @@ -46,7 +46,7 @@ namespace quantum { class TaskQueue : public IQueue { public: - using TaskList = std::list>; + using TaskList = std::list>; using TaskListIter = TaskList::iterator; TaskQueue(); @@ -56,9 +56,9 @@ class TaskQueue : public IQueue TaskQueue(const TaskQueue& other); - TaskQueue(TaskQueue&& other) = default; + TaskQueue(TaskQueue&& other) noexcept; - ~TaskQueue(); + ~TaskQueue() override; void pinToCore(int coreId) final; @@ -91,12 +91,12 @@ class TaskQueue : public IQueue private: struct WorkItem { - WorkItem(TaskPtr task, + WorkItem(ITaskPtr task, TaskListIter iter, bool isBlocked, unsigned int blockedQueueRound); - TaskPtr _task; // task pointer + ITaskPtr _task; // task pointer TaskListIter _iter; // task iterator bool _isBlocked; // true if the entire queue is blocked unsigned int _blockedQueueRound; // blocked queue round id @@ -135,8 +135,7 @@ class TaskQueue : public IQueue void sleepOnBlockedQueue(const ProcessTaskResult& mainQueueResult, const ProcessTaskResult& sharedQueueResult); - - QueueListAllocator _alloc; + std::shared_ptr _thread; TaskList _runQueue; TaskList _waitQueue;