From 4ba54e728540be55fabd796aad579f4edf6992ab Mon Sep 17 00:00:00 2001 From: Avi Saranga Date: Wed, 29 Nov 2017 08:25:00 -0800 Subject: [PATCH] A blocking queue implementation added to ensure no funky stuff happens with higher loads. --- .../objects/TelmateFrameGrabberOpenCVImpl.cpp | 130 +-- .../objects/TelmateFrameGrabberOpenCVImpl.hpp | 11 +- .../server/implementation/objects/atomicops.h | 665 -------------- .../implementation/objects/avisqueue.hpp | 44 + .../objects/readerwriterqueue.h | 854 ------------------ 5 files changed, 116 insertions(+), 1588 deletions(-) delete mode 100644 module/src/server/implementation/objects/atomicops.h create mode 100644 module/src/server/implementation/objects/avisqueue.hpp delete mode 100644 module/src/server/implementation/objects/readerwriterqueue.h diff --git a/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.cpp b/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.cpp index fa9e979..5227336 100644 --- a/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.cpp +++ b/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.cpp @@ -23,24 +23,20 @@ TelmateFrameGrabberOpenCVImpl::TelmateFrameGrabberOpenCVImpl() { this->outputFormat = FGFMT_JPEG; this->lastQueueTimeStamp = 0; this->queueLength = 0; + this->frameQueue = new avis_blocking_queue; - this->frameQueue = new BlockingReaderWriterQueue(QUEUE_BASE_ALLOC); this->thr = new boost::thread(boost::bind( &TelmateFrameGrabberOpenCVImpl::queueHandler, this)); this->thr->detach(); - - - - GST_INFO("FrameGrabber Constructor was called for %s", this->epName.c_str()); + GST_INFO("Constructor was called for %s", this->epName.c_str()); } TelmateFrameGrabberOpenCVImpl::~TelmateFrameGrabberOpenCVImpl() { this->thrLoop = false; - - GST_INFO("FrameGrabber Destructor was called for %s", this->epName.c_str()); + GST_INFO("Destructor was called for %s", this->epName.c_str()); } @@ -51,15 +47,18 @@ TelmateFrameGrabberOpenCVImpl::~TelmateFrameGrabberOpenCVImpl() { * here. Any changes in mat, will be sent through the Media Pipeline. */ void TelmateFrameGrabberOpenCVImpl::process(cv::Mat &mat) { - if ((this->getCurrentTimestampLong() - - this->lastQueueTimeStamp) >= this->snapInterval) { - this->lastQueueTimeStamp = this->getCurrentTimestampLong(); - VideoFrame *ptrVf = new VideoFrame(); - ptrVf->mat = mat.clone(); - ptrVf->ts = std::to_string(this->lastQueueTimeStamp); - this->frameQueue->enqueue(ptrVf); - ++this->queueLength; - ++this->framesCounter; + if ((this->getCurrentTimestampLong() - this->lastQueueTimeStamp) >= this->snapInterval) { + + if(this->thrLoop) { // do not push into the queue if the destructor was called. + this->lastQueueTimeStamp = this->getCurrentTimestampLong(); + VideoFrame *ptrVf = new VideoFrame(); + ptrVf->mat = mat.clone(); + ptrVf->ts = std::to_string(this->lastQueueTimeStamp); + + this->frameQueue->push(ptrVf); + ++this->queueLength; + ++this->framesCounter; + } } } /* @@ -75,64 +74,67 @@ void TelmateFrameGrabberOpenCVImpl::queueHandler() { std::vector params; std::string image_extension; - while (this->thrLoop && this->frameQueue->wait_dequeue_timed(ptrVf,std::chrono::milliseconds(5))) { + while (this->thrLoop) { - params.clear(); // clear the vector since the last iteration. - this->lastQueueTimeStamp = this->getCurrentTimestampLong(); - --this->queueLength; - - switch (this->outputFormat) { - case FGFMT_JPEG: - /* Set jpeg params */ - params.push_back(CV_IMWRITE_JPEG_QUALITY); - params.push_back(FG_JPEG_QUALITY); - image_extension = ".jpeg"; - break; - case FGFMT_PNG: - /* Set PNG parameters, compression etc. */ - params.push_back(CV_IMWRITE_PNG_COMPRESSION); - params.push_back(FG_PNG_QUALITY); - image_extension = ".png"; - break; - default: - /* Defaults to jpeg */ - params.push_back(CV_IMWRITE_JPEG_QUALITY); - params.push_back(FG_JPEG_QUALITY); - image_extension = ".jpeg"; - break; - } - - std::string filename = - std::to_string((long) this->framesCounter) + "_" + ptrVf->ts + image_extension; + this->frameQueue->pop(ptrVf); // blocks + params.clear(); // clear the vector since the last iteration. + this->lastQueueTimeStamp = this->getCurrentTimestampLong(); + --this->queueLength; - if (this->storagePathSubdir.empty()) { - this->storagePathSubdir = this->storagePath + "/frames_" + this->getCurrentTimestampString(); - boost::filesystem::path dir(this->storagePathSubdir.c_str()); - if (!boost::filesystem::create_directories(dir)) { - GST_ERROR("%s create_directories() failed for: %s", this->epName.c_str(), - this->storagePathSubdir.c_str()); - } + switch (this->outputFormat) { + case FGFMT_JPEG: + /* Set jpeg params */ + params.push_back(CV_IMWRITE_JPEG_QUALITY); + params.push_back(FG_JPEG_QUALITY); + image_extension = ".jpeg"; + break; + case FGFMT_PNG: + /* Set PNG parameters, compression etc. */ + params.push_back(CV_IMWRITE_PNG_COMPRESSION); + params.push_back(FG_PNG_QUALITY); + image_extension = ".png"; + break; + default: + /* Defaults to jpeg */ + params.push_back(CV_IMWRITE_JPEG_QUALITY); + params.push_back(FG_JPEG_QUALITY); + image_extension = ".jpeg"; + break; + } + + std::string filename = + std::to_string((long) this->framesCounter) + "_" + ptrVf->ts + image_extension; + + if (this->storagePathSubdir.empty()) { + this->storagePathSubdir = this->storagePath + "/frames_" + this->getCurrentTimestampString(); + boost::filesystem::path dir(this->storagePathSubdir.c_str()); + if (!boost::filesystem::create_directories(dir)) { + GST_ERROR("%s create_directories() failed for: %s", this->epName.c_str(), + this->storagePathSubdir.c_str()); } + } - std::string fullpath = this->storagePathSubdir + "/" + filename; + std::string fullpath = this->storagePathSubdir + "/" + filename; - try { - cv::imwrite(fullpath.c_str(), ptrVf->mat, params); - } - catch (...) { - GST_ERROR("::queueHandler() imgwrite() failed."); - throw KurentoException(NOT_IMPLEMENTED, - "TelmateFrameGrabberOpenCVImpl::queueHandler() imgwrite() failed. \n"); - } + try { + cv::imwrite(fullpath.c_str(), ptrVf->mat, params); + } + catch (...) { + GST_ERROR("::queueHandler() imgwrite() failed."); + throw KurentoException(NOT_IMPLEMENTED, + "TelmateFrameGrabberOpenCVImpl::queueHandler() imgwrite() failed. \n"); + } - ptrVf->mat.release(); // release internal memory allocations + ptrVf->mat.release(); // release internal memory allocations - delete ptrVf; - ptrVf = NULL; + delete ptrVf; + ptrVf = NULL; } - while(this->frameQueue->try_dequeue(ptrVf)) { /* Empty the queue post processing if the dtor was called */ + while(!this->frameQueue->empty()) { + + this->frameQueue->pop(ptrVf); // blocks --this->queueLength; delete ptrVf; ptrVf = NULL; diff --git a/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.hpp b/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.hpp index 0e29ba7..2873283 100644 --- a/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.hpp +++ b/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.hpp @@ -3,6 +3,8 @@ #ifndef __TELMATE_FRAME_GRABBER_OPENCV_IMPL_HPP__ #define __TELMATE_FRAME_GRABBER_OPENCV_IMPL_HPP__ +#define NDEBUG 1 + #include #include #include @@ -17,8 +19,7 @@ #include -#include "atomicops.h" -#include "readerwriterqueue.h" +#include "avisqueue.hpp" #include @@ -36,9 +37,9 @@ #define FG_PNG_QUALITY 9 #define MAX_IDLE_QUEUE_TIME_NS 30000 -#define QUEUE_BASE_ALLOC 1000 +#define QUEUE_BASE_ELEMENT_ALLOC 1000 -using namespace moodycamel; +//using namespace moodycamel; @@ -84,7 +85,7 @@ class TelmateFrameGrabberOpenCVImpl : public virtual OpenCVProcess { boost::asio::io_service ioService; boost::thread_group tp; - BlockingReaderWriterQueue *frameQueue; + avis_blocking_queue *frameQueue; boost::thread* thr; boost::atomic thrLoop; diff --git a/module/src/server/implementation/objects/atomicops.h b/module/src/server/implementation/objects/atomicops.h deleted file mode 100644 index 6432af4..0000000 --- a/module/src/server/implementation/objects/atomicops.h +++ /dev/null @@ -1,665 +0,0 @@ -// ©2013-2016 Cameron Desrochers. -// Distributed under the simplified BSD license (see the license file that -// should have come with this header). -// Uses Jeff Preshing's semaphore implementation (under the terms of its -// separate zlib license, embedded below). - -#pragma once - -// Provides portable (VC++2010+, Intel ICC 13, GCC 4.7+, and anything C++11 compliant) implementation -// of low-level memory barriers, plus a few semi-portable utility macros (for inlining and alignment). -// Also has a basic atomic type (limited to hardware-supported atomics with no memory ordering guarantees). -// Uses the AE_* prefix for macros (historical reasons), and the "moodycamel" namespace for symbols. - -#include -#include -#include -#include -#include - -// Platform detection -#if defined(__INTEL_COMPILER) -#define AE_ICC -#elif defined(_MSC_VER) -#define AE_VCPP -#elif defined(__GNUC__) -#define AE_GCC -#endif - -#if defined(_M_IA64) || defined(__ia64__) -#define AE_ARCH_IA64 -#elif defined(_WIN64) || defined(__amd64__) || defined(_M_X64) || defined(__x86_64__) -#define AE_ARCH_X64 -#elif defined(_M_IX86) || defined(__i386__) -#define AE_ARCH_X86 -#elif defined(_M_PPC) || defined(__powerpc__) -#define AE_ARCH_PPC -#else -#define AE_ARCH_UNKNOWN -#endif - - -// AE_UNUSED -#define AE_UNUSED(x) ((void)x) - - -// AE_FORCEINLINE -#if defined(AE_VCPP) || defined(AE_ICC) -#define AE_FORCEINLINE __forceinline -#elif defined(AE_GCC) -//#define AE_FORCEINLINE __attribute__((always_inline)) -#define AE_FORCEINLINE inline -#else -#define AE_FORCEINLINE inline -#endif - - -// AE_ALIGN -#if defined(AE_VCPP) || defined(AE_ICC) -#define AE_ALIGN(x) __declspec(align(x)) -#elif defined(AE_GCC) -#define AE_ALIGN(x) __attribute__((aligned(x))) -#else -// Assume GCC compliant syntax... -#define AE_ALIGN(x) __attribute__((aligned(x))) -#endif - - -// Portable atomic fences implemented below: - -namespace moodycamel { - - enum memory_order { - memory_order_relaxed, - memory_order_acquire, - memory_order_release, - memory_order_acq_rel, - memory_order_seq_cst, - - // memory_order_sync: Forces a full sync: - // #LoadLoad, #LoadStore, #StoreStore, and most significantly, #StoreLoad - memory_order_sync = memory_order_seq_cst - }; - -} // end namespace moodycamel - -#if (defined(AE_VCPP) && (_MSC_VER < 1700 || defined(__cplusplus_cli))) || defined(AE_ICC) -// VS2010 and ICC13 don't support std::atomic_*_fence, implement our own fences - -#include - -#if defined(AE_ARCH_X64) || defined(AE_ARCH_X86) -#define AeFullSync _mm_mfence -#define AeLiteSync _mm_mfence -#elif defined(AE_ARCH_IA64) -#define AeFullSync __mf -#define AeLiteSync __mf -#elif defined(AE_ARCH_PPC) -#include -#define AeFullSync __sync -#define AeLiteSync __lwsync -#endif - - -#ifdef AE_VCPP -#pragma warning(push) -#pragma warning(disable: 4365) // Disable erroneous 'conversion from long to unsigned int, signed/unsigned mismatch' error when using `assert` -#ifdef __cplusplus_cli -#pragma managed(push, off) -#endif -#endif - -namespace moodycamel { - -AE_FORCEINLINE void compiler_fence(memory_order order) -{ - switch (order) { - case memory_order_relaxed: break; - case memory_order_acquire: _ReadBarrier(); break; - case memory_order_release: _WriteBarrier(); break; - case memory_order_acq_rel: _ReadWriteBarrier(); break; - case memory_order_seq_cst: _ReadWriteBarrier(); break; - default: assert(false); - } -} - -// x86/x64 have a strong memory model -- all loads and stores have -// acquire and release semantics automatically (so only need compiler -// barriers for those). -#if defined(AE_ARCH_X86) || defined(AE_ARCH_X64) -AE_FORCEINLINE void fence(memory_order order) -{ - switch (order) { - case memory_order_relaxed: break; - case memory_order_acquire: _ReadBarrier(); break; - case memory_order_release: _WriteBarrier(); break; - case memory_order_acq_rel: _ReadWriteBarrier(); break; - case memory_order_seq_cst: - _ReadWriteBarrier(); - AeFullSync(); - _ReadWriteBarrier(); - break; - default: assert(false); - } -} -#else -AE_FORCEINLINE void fence(memory_order order) -{ - // Non-specialized arch, use heavier memory barriers everywhere just in case :-( - switch (order) { - case memory_order_relaxed: - break; - case memory_order_acquire: - _ReadBarrier(); - AeLiteSync(); - _ReadBarrier(); - break; - case memory_order_release: - _WriteBarrier(); - AeLiteSync(); - _WriteBarrier(); - break; - case memory_order_acq_rel: - _ReadWriteBarrier(); - AeLiteSync(); - _ReadWriteBarrier(); - break; - case memory_order_seq_cst: - _ReadWriteBarrier(); - AeFullSync(); - _ReadWriteBarrier(); - break; - default: assert(false); - } -} -#endif -} // end namespace moodycamel -#else -// Use standard library of atomics -#include - -namespace moodycamel { - - AE_FORCEINLINE void compiler_fence(memory_order order) - { - switch (order) { - case memory_order_relaxed: break; - case memory_order_acquire: std::atomic_signal_fence(std::memory_order_acquire); break; - case memory_order_release: std::atomic_signal_fence(std::memory_order_release); break; - case memory_order_acq_rel: std::atomic_signal_fence(std::memory_order_acq_rel); break; - case memory_order_seq_cst: std::atomic_signal_fence(std::memory_order_seq_cst); break; - default: assert(false); - } - } - - AE_FORCEINLINE void fence(memory_order order) - { - switch (order) { - case memory_order_relaxed: break; - case memory_order_acquire: std::atomic_thread_fence(std::memory_order_acquire); break; - case memory_order_release: std::atomic_thread_fence(std::memory_order_release); break; - case memory_order_acq_rel: std::atomic_thread_fence(std::memory_order_acq_rel); break; - case memory_order_seq_cst: std::atomic_thread_fence(std::memory_order_seq_cst); break; - default: assert(false); - } - } - -} // end namespace moodycamel - -#endif - - -#if !defined(AE_VCPP) || (_MSC_VER >= 1700 && !defined(__cplusplus_cli)) -#define AE_USE_STD_ATOMIC_FOR_WEAK_ATOMIC -#endif - -#ifdef AE_USE_STD_ATOMIC_FOR_WEAK_ATOMIC -#include -#endif -#include - -// WARNING: *NOT* A REPLACEMENT FOR std::atomic. READ CAREFULLY: -// Provides basic support for atomic variables -- no memory ordering guarantees are provided. -// The guarantee of atomicity is only made for types that already have atomic load and store guarantees -// at the hardware level -- on most platforms this generally means aligned pointers and integers (only). -namespace moodycamel { - template - class weak_atomic - { - public: - weak_atomic() { } -#ifdef AE_VCPP - #pragma warning(push) -#pragma warning(disable: 4100) // Get rid of (erroneous) 'unreferenced formal parameter' warning -#endif - template weak_atomic(U&& x) : value(std::forward(x)) { } -#ifdef __cplusplus_cli - // Work around bug with universal reference/nullptr combination that only appears when /clr is on - weak_atomic(nullptr_t) : value(nullptr) { } -#endif - weak_atomic(weak_atomic const& other) : value(other.value) { } - weak_atomic(weak_atomic&& other) : value(std::move(other.value)) { } -#ifdef AE_VCPP -#pragma warning(pop) -#endif - - AE_FORCEINLINE operator T() const { return load(); } - - -#ifndef AE_USE_STD_ATOMIC_FOR_WEAK_ATOMIC - template AE_FORCEINLINE weak_atomic const& operator=(U&& x) { value = std::forward(x); return *this; } - AE_FORCEINLINE weak_atomic const& operator=(weak_atomic const& other) { value = other.value; return *this; } - - AE_FORCEINLINE T load() const { return value; } - - AE_FORCEINLINE T fetch_add_acquire(T increment) - { -#if defined(AE_ARCH_X64) || defined(AE_ARCH_X86) - if (sizeof(T) == 4) return _InterlockedExchangeAdd((long volatile*)&value, (long)increment); -#if defined(_M_AMD64) - else if (sizeof(T) == 8) return _InterlockedExchangeAdd64((long long volatile*)&value, (long long)increment); -#endif -#else -#error Unsupported platform -#endif - assert(false && "T must be either a 32 or 64 bit type"); - return value; - } - - AE_FORCEINLINE T fetch_add_release(T increment) - { -#if defined(AE_ARCH_X64) || defined(AE_ARCH_X86) - if (sizeof(T) == 4) return _InterlockedExchangeAdd((long volatile*)&value, (long)increment); -#if defined(_M_AMD64) - else if (sizeof(T) == 8) return _InterlockedExchangeAdd64((long long volatile*)&value, (long long)increment); -#endif -#else -#error Unsupported platform -#endif - assert(false && "T must be either a 32 or 64 bit type"); - return value; - } -#else - template - AE_FORCEINLINE weak_atomic const& operator=(U&& x) - { - value.store(std::forward(x), std::memory_order_relaxed); - return *this; - } - - AE_FORCEINLINE weak_atomic const& operator=(weak_atomic const& other) - { - value.store(other.value.load(std::memory_order_relaxed), std::memory_order_relaxed); - return *this; - } - - AE_FORCEINLINE T load() const { return value.load(std::memory_order_relaxed); } - - AE_FORCEINLINE T fetch_add_acquire(T increment) - { - return value.fetch_add(increment, std::memory_order_acquire); - } - - AE_FORCEINLINE T fetch_add_release(T increment) - { - return value.fetch_add(increment, std::memory_order_release); - } -#endif - - - private: -#ifndef AE_USE_STD_ATOMIC_FOR_WEAK_ATOMIC - // No std::atomic support, but still need to circumvent compiler optimizations. - // `volatile` will make memory access slow, but is guaranteed to be reliable. - volatile T value; -#else - std::atomic value; -#endif - }; - -} // end namespace moodycamel - - - -// Portable single-producer, single-consumer semaphore below: - -#if defined(_WIN32) -// Avoid including windows.h in a header; we only need a handful of -// items, so we'll redeclare them here (this is relatively safe since -// the API generally has to remain stable between Windows versions). -// I know this is an ugly hack but it still beats polluting the global -// namespace with thousands of generic names or adding a .cpp for nothing. -extern "C" { - struct _SECURITY_ATTRIBUTES; - __declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName); - __declspec(dllimport) int __stdcall CloseHandle(void* hObject); - __declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds); - __declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount); -} -#elif defined(__MACH__) -#include -#elif defined(__unix__) -#include -#endif - -namespace moodycamel -{ - // Code in the spsc_sema namespace below is an adaptation of Jeff Preshing's - // portable + lightweight semaphore implementations, originally from - // https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h - // LICENSE: - // Copyright (c) 2015 Jeff Preshing - // - // This software is provided 'as-is', without any express or implied - // warranty. In no event will the authors be held liable for any damages - // arising from the use of this software. - // - // Permission is granted to anyone to use this software for any purpose, - // including commercial applications, and to alter it and redistribute it - // freely, subject to the following restrictions: - // - // 1. The origin of this software must not be misrepresented; you must not - // claim that you wrote the original software. If you use this software - // in a product, an acknowledgement in the product documentation would be - // appreciated but is not required. - // 2. Altered source versions must be plainly marked as such, and must not be - // misrepresented as being the original software. - // 3. This notice may not be removed or altered from any source distribution. - namespace spsc_sema - { -#if defined(_WIN32) - class Semaphore - { - private: - void* m_hSema; - - Semaphore(const Semaphore& other); - Semaphore& operator=(const Semaphore& other); - - public: - Semaphore(int initialCount = 0) - { - assert(initialCount >= 0); - const long maxLong = 0x7fffffff; - m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr); - } - - ~Semaphore() - { - CloseHandle(m_hSema); - } - - void wait() - { - const unsigned long infinite = 0xffffffff; - WaitForSingleObject(m_hSema, infinite); - } - - bool try_wait() - { - const unsigned long RC_WAIT_TIMEOUT = 0x00000102; - return WaitForSingleObject(m_hSema, 0) != RC_WAIT_TIMEOUT; - } - - bool timed_wait(std::uint64_t usecs) - { - const unsigned long RC_WAIT_TIMEOUT = 0x00000102; - return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) != RC_WAIT_TIMEOUT; - } - - void signal(int count = 1) - { - ReleaseSemaphore(m_hSema, count, nullptr); - } - }; -#elif defined(__MACH__) - //--------------------------------------------------------- - // Semaphore (Apple iOS and OSX) - // Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html - //--------------------------------------------------------- - class Semaphore - { - private: - semaphore_t m_sema; - - Semaphore(const Semaphore& other); - Semaphore& operator=(const Semaphore& other); - - public: - Semaphore(int initialCount = 0) - { - assert(initialCount >= 0); - semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount); - } - - ~Semaphore() - { - semaphore_destroy(mach_task_self(), m_sema); - } - - void wait() - { - semaphore_wait(m_sema); - } - - bool try_wait() - { - return timed_wait(0); - } - - bool timed_wait(std::int64_t timeout_usecs) - { - mach_timespec_t ts; - ts.tv_sec = static_cast(timeout_usecs / 1000000); - ts.tv_nsec = (timeout_usecs % 1000000) * 1000; - - // added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html - kern_return_t rc = semaphore_timedwait(m_sema, ts); - - return rc != KERN_OPERATION_TIMED_OUT; - } - - void signal() - { - semaphore_signal(m_sema); - } - - void signal(int count) - { - while (count-- > 0) - { - semaphore_signal(m_sema); - } - } - }; -#elif defined(__unix__) - //--------------------------------------------------------- - // Semaphore (POSIX, Linux) - //--------------------------------------------------------- - class Semaphore - { - private: - sem_t m_sema; - - Semaphore(const Semaphore& other); - Semaphore& operator=(const Semaphore& other); - - public: - Semaphore(int initialCount = 0) - { - assert(initialCount >= 0); - sem_init(&m_sema, 0, initialCount); - } - - ~Semaphore() - { - sem_destroy(&m_sema); - } - - void wait() - { - // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error - int rc; - do - { - rc = sem_wait(&m_sema); - } - while (rc == -1 && errno == EINTR); - } - - bool try_wait() - { - int rc; - do { - rc = sem_trywait(&m_sema); - } while (rc == -1 && errno == EINTR); - return !(rc == -1 && errno == EAGAIN); - } - - bool timed_wait(std::uint64_t usecs) - { - struct timespec ts; - const int usecs_in_1_sec = 1000000; - const int nsecs_in_1_sec = 1000000000; - clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += usecs / usecs_in_1_sec; - ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000; - // sem_timedwait bombs if you have more than 1e9 in tv_nsec - // so we have to clean things up before passing it in - if (ts.tv_nsec >= nsecs_in_1_sec) { - ts.tv_nsec -= nsecs_in_1_sec; - ++ts.tv_sec; - } - - int rc; - do { - rc = sem_timedwait(&m_sema, &ts); - } while (rc == -1 && errno == EINTR); - return !(rc == -1 && errno == ETIMEDOUT); - } - - void signal() - { - sem_post(&m_sema); - } - - void signal(int count) - { - while (count-- > 0) - { - sem_post(&m_sema); - } - } - }; -#else -#error Unsupported platform! (No semaphore wrapper available) -#endif - - //--------------------------------------------------------- - // LightweightSemaphore - //--------------------------------------------------------- - class LightweightSemaphore - { - public: - typedef std::make_signed::type ssize_t; - - private: - weak_atomic m_count; - Semaphore m_sema; - - bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1) - { - ssize_t oldCount; - // Is there a better way to set the initial spin count? - // If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC, - // as threads start hitting the kernel semaphore. - int spin = 10000; - while (--spin >= 0) - { - if (m_count.load() > 0) - { - m_count.fetch_add_acquire(-1); - return true; - } - compiler_fence(memory_order_acquire); // Prevent the compiler from collapsing the loop. - } - oldCount = m_count.fetch_add_acquire(-1); - if (oldCount > 0) - return true; - if (timeout_usecs < 0) - { - m_sema.wait(); - return true; - } - if (m_sema.timed_wait(timeout_usecs)) - return true; - // At this point, we've timed out waiting for the semaphore, but the - // count is still decremented indicating we may still be waiting on - // it. So we have to re-adjust the count, but only if the semaphore - // wasn't signaled enough times for us too since then. If it was, we - // need to release the semaphore too. - while (true) - { - oldCount = m_count.fetch_add_release(1); - if (oldCount < 0) - return false; // successfully restored things to the way they were - // Oh, the producer thread just signaled the semaphore after all. Try again: - oldCount = m_count.fetch_add_acquire(-1); - if (oldCount > 0 && m_sema.try_wait()) - return true; - } - } - - public: - LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount) - { - assert(initialCount >= 0); - } - - bool tryWait() - { - if (m_count.load() > 0) - { - m_count.fetch_add_acquire(-1); - return true; - } - return false; - } - - void wait() - { - if (!tryWait()) - waitWithPartialSpinning(); - } - - bool wait(std::int64_t timeout_usecs) - { - return tryWait() || waitWithPartialSpinning(timeout_usecs); - } - - void signal(ssize_t count = 1) - { - assert(count >= 0); - ssize_t oldCount = m_count.fetch_add_release(count); - assert(oldCount >= -1); - if (oldCount < 0) - { - m_sema.signal(1); - } - } - - ssize_t availableApprox() const - { - ssize_t count = m_count.load(); - return count > 0 ? count : 0; - } - }; - } // end namespace spsc_sema -} // end namespace moodycamel - -#if defined(AE_VCPP) && (_MSC_VER < 1700 || defined(__cplusplus_cli)) -#pragma warning(pop) -#ifdef __cplusplus_cli -#pragma managed(pop) -#endif -#endif \ No newline at end of file diff --git a/module/src/server/implementation/objects/avisqueue.hpp b/module/src/server/implementation/objects/avisqueue.hpp new file mode 100644 index 0000000..061b86c --- /dev/null +++ b/module/src/server/implementation/objects/avisqueue.hpp @@ -0,0 +1,44 @@ +/* + * Created by Avi Saranga + */ + +#include +#include + +template +class avis_blocking_queue +{ +private: + std::queue _queue; + mutable boost::mutex _mutex; + boost::condition_variable _condvar; +public: + + void push(VideoFrameElement const& vf) + { + boost::mutex::scoped_lock lock(_mutex); + _queue.push(vf); + lock.unlock(); + _condvar.notify_one(); + } + + bool empty() const + { + boost::mutex::scoped_lock lock(_mutex); + return _queue.empty(); + } + + void pop(VideoFrameElement& val) + { + boost::mutex::scoped_lock lock(_mutex); + while(_queue.empty()) + { + _condvar.wait(lock); + } + val=_queue.front(); + _queue.pop(); + } + + + +}; \ No newline at end of file diff --git a/module/src/server/implementation/objects/readerwriterqueue.h b/module/src/server/implementation/objects/readerwriterqueue.h deleted file mode 100644 index 3199dbd..0000000 --- a/module/src/server/implementation/objects/readerwriterqueue.h +++ /dev/null @@ -1,854 +0,0 @@ -// ©2013-2016 Cameron Desrochers. -// Distributed under the simplified BSD license (see the license file that -// should have come with this header). - -#pragma once - -#include "atomicops.h" -#include -#include -#include -#include -#include -#include -#include // For malloc/free/abort & size_t -#if __cplusplus > 199711L || _MSC_VER >= 1700 // C++11 or VS2012 -#include -#endif - - -// A lock-free queue for a single-consumer, single-producer architecture. -// The queue is also wait-free in the common path (except if more memory -// needs to be allocated, in which case malloc is called). -// Allocates memory sparingly (O(lg(n) times, amortized), and only once if -// the original maximum size estimate is never exceeded. -// Tested on x86/x64 processors, but semantics should be correct for all -// architectures (given the right implementations in atomicops.h), provided -// that aligned integer and pointer accesses are naturally atomic. -// Note that there should only be one consumer thread and producer thread; -// Switching roles of the threads, or using multiple consecutive threads for -// one role, is not safe unless properly synchronized. -// Using the queue exclusively from one thread is fine, though a bit silly. - -#ifndef MOODYCAMEL_CACHE_LINE_SIZE -#define MOODYCAMEL_CACHE_LINE_SIZE 64 -#endif - -#ifndef MOODYCAMEL_EXCEPTIONS_ENABLED -#if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__)) -#define MOODYCAMEL_EXCEPTIONS_ENABLED -#endif -#endif - -#ifdef AE_VCPP -#pragma warning(push) -#pragma warning(disable: 4324) // structure was padded due to __declspec(align()) -#pragma warning(disable: 4820) // padding was added -#pragma warning(disable: 4127) // conditional expression is constant -#endif - -namespace moodycamel { - - template - class ReaderWriterQueue - { - // Design: Based on a queue-of-queues. The low-level queues are just - // circular buffers with front and tail indices indicating where the - // next element to dequeue is and where the next element can be enqueued, - // respectively. Each low-level queue is called a "block". Each block - // wastes exactly one element's worth of space to keep the design simple - // (if front == tail then the queue is empty, and can't be full). - // The high-level queue is a circular linked list of blocks; again there - // is a front and tail, but this time they are pointers to the blocks. - // The front block is where the next element to be dequeued is, provided - // the block is not empty. The back block is where elements are to be - // enqueued, provided the block is not full. - // The producer thread owns all the tail indices/pointers. The consumer - // thread owns all the front indices/pointers. Both threads read each - // other's variables, but only the owning thread updates them. E.g. After - // the consumer reads the producer's tail, the tail may change before the - // consumer is done dequeuing an object, but the consumer knows the tail - // will never go backwards, only forwards. - // If there is no room to enqueue an object, an additional block (of - // equal size to the last block) is added. Blocks are never removed. - - public: - // Constructs a queue that can hold maxSize elements without further - // allocations. If more than MAX_BLOCK_SIZE elements are requested, - // then several blocks of MAX_BLOCK_SIZE each are reserved (including - // at least one extra buffer block). - explicit ReaderWriterQueue(size_t maxSize = 15) -#ifndef NDEBUG - : enqueuing(false) - ,dequeuing(false) -#endif - { - assert(maxSize > 0); - assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) && "MAX_BLOCK_SIZE must be a power of 2"); - assert(MAX_BLOCK_SIZE >= 2 && "MAX_BLOCK_SIZE must be at least 2"); - - Block* firstBlock = nullptr; - - largestBlockSize = ceilToPow2(maxSize + 1); // We need a spare slot to fit maxSize elements in the block - if (largestBlockSize > MAX_BLOCK_SIZE * 2) { - // We need a spare block in case the producer is writing to a different block the consumer is reading from, and - // wants to enqueue the maximum number of elements. We also need a spare element in each block to avoid the ambiguity - // between front == tail meaning "empty" and "full". - // So the effective number of slots that are guaranteed to be usable at any time is the block size - 1 times the - // number of blocks - 1. Solving for maxSize and applying a ceiling to the division gives us (after simplifying): - size_t initialBlockCount = (maxSize + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1); - largestBlockSize = MAX_BLOCK_SIZE; - Block* lastBlock = nullptr; - for (size_t i = 0; i != initialBlockCount; ++i) { - auto block = make_block(largestBlockSize); - if (block == nullptr) { -#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED - throw std::bad_alloc(); -#else - abort(); -#endif - } - if (firstBlock == nullptr) { - firstBlock = block; - } - else { - lastBlock->next = block; - } - lastBlock = block; - block->next = firstBlock; - } - } - else { - firstBlock = make_block(largestBlockSize); - if (firstBlock == nullptr) { -#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED - throw std::bad_alloc(); -#else - abort(); -#endif - } - firstBlock->next = firstBlock; - } - frontBlock = firstBlock; - tailBlock = firstBlock; - - // Make sure the reader/writer threads will have the initialized memory setup above: - fence(memory_order_sync); - } - - // Note: The queue should not be accessed concurrently while it's - // being moved. It's up to the user to synchronize this. - ReaderWriterQueue(ReaderWriterQueue&& other) - : frontBlock(other.frontBlock.load()), - tailBlock(other.tailBlock.load()), - largestBlockSize(other.largestBlockSize) -#ifndef NDEBUG - ,enqueuing(false) - ,dequeuing(false) -#endif - { - other.largestBlockSize = 32; - Block* b = other.make_block(other.largestBlockSize); - if (b == nullptr) { -#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED - throw std::bad_alloc(); -#else - abort(); -#endif - } - b->next = b; - other.frontBlock = b; - other.tailBlock = b; - } - - // Note: The queue should not be accessed concurrently while it's - // being moved. It's up to the user to synchronize this. - ReaderWriterQueue& operator=(ReaderWriterQueue&& other) - { - Block* b = frontBlock.load(); - frontBlock = other.frontBlock.load(); - other.frontBlock = b; - b = tailBlock.load(); - tailBlock = other.tailBlock.load(); - other.tailBlock = b; - std::swap(largestBlockSize, other.largestBlockSize); - return *this; - } - - // Note: The queue should not be accessed concurrently while it's - // being deleted. It's up to the user to synchronize this. - ~ReaderWriterQueue() - { - // Make sure we get the latest version of all variables from other CPUs: - fence(memory_order_sync); - - // Destroy any remaining objects in queue and free memory - Block* frontBlock_ = frontBlock; - Block* block = frontBlock_; - do { - Block* nextBlock = block->next; - size_t blockFront = block->front; - size_t blockTail = block->tail; - - for (size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask) { - auto element = reinterpret_cast(block->data + i * sizeof(T)); - element->~T(); - (void)element; - } - - auto rawBlock = block->rawThis; - block->~Block(); - std::free(rawBlock); - block = nextBlock; - } while (block != frontBlock_); - } - - - // Enqueues a copy of element if there is room in the queue. - // Returns true if the element was enqueued, false otherwise. - // Does not allocate memory. - AE_FORCEINLINE bool try_enqueue(T const& element) - { - return inner_enqueue(element); - } - - // Enqueues a moved copy of element if there is room in the queue. - // Returns true if the element was enqueued, false otherwise. - // Does not allocate memory. - AE_FORCEINLINE bool try_enqueue(T&& element) - { - return inner_enqueue(std::forward(element)); - } - - - // Enqueues a copy of element on the queue. - // Allocates an additional block of memory if needed. - // Only fails (returns false) if memory allocation fails. - AE_FORCEINLINE bool enqueue(T const& element) - { - return inner_enqueue(element); - } - - // Enqueues a moved copy of element on the queue. - // Allocates an additional block of memory if needed. - // Only fails (returns false) if memory allocation fails. - AE_FORCEINLINE bool enqueue(T&& element) - { - return inner_enqueue(std::forward(element)); - } - - - // Attempts to dequeue an element; if the queue is empty, - // returns false instead. If the queue has at least one element, - // moves front to result using operator=, then returns true. - template - bool try_dequeue(U& result) - { -#ifndef NDEBUG - ReentrantGuard guard(this->dequeuing); -#endif - - // High-level pseudocode: - // Remember where the tail block is - // If the front block has an element in it, dequeue it - // Else - // If front block was the tail block when we entered the function, return false - // Else advance to next block and dequeue the item there - - // Note that we have to use the value of the tail block from before we check if the front - // block is full or not, in case the front block is empty and then, before we check if the - // tail block is at the front block or not, the producer fills up the front block *and - // moves on*, which would make us skip a filled block. Seems unlikely, but was consistently - // reproducible in practice. - // In order to avoid overhead in the common case, though, we do a double-checked pattern - // where we have the fast path if the front block is not empty, then read the tail block, - // then re-read the front block and check if it's not empty again, then check if the tail - // block has advanced. - - Block* frontBlock_ = frontBlock.load(); - size_t blockTail = frontBlock_->localTail; - size_t blockFront = frontBlock_->front.load(); - - if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) { - fence(memory_order_acquire); - - non_empty_front_block: - // Front block not empty, dequeue from here - auto element = reinterpret_cast(frontBlock_->data + blockFront * sizeof(T)); - result = std::move(*element); - element->~T(); - - blockFront = (blockFront + 1) & frontBlock_->sizeMask; - - fence(memory_order_release); - frontBlock_->front = blockFront; - } - else if (frontBlock_ != tailBlock.load()) { - fence(memory_order_acquire); - - frontBlock_ = frontBlock.load(); - blockTail = frontBlock_->localTail = frontBlock_->tail.load(); - blockFront = frontBlock_->front.load(); - fence(memory_order_acquire); - - if (blockFront != blockTail) { - // Oh look, the front block isn't empty after all - goto non_empty_front_block; - } - - // Front block is empty but there's another block ahead, advance to it - Block* nextBlock = frontBlock_->next; - // Don't need an acquire fence here since next can only ever be set on the tailBlock, - // and we're not the tailBlock, and we did an acquire earlier after reading tailBlock which - // ensures next is up-to-date on this CPU in case we recently were at tailBlock. - - size_t nextBlockFront = nextBlock->front.load(); - size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load(); - fence(memory_order_acquire); - - // Since the tailBlock is only ever advanced after being written to, - // we know there's for sure an element to dequeue on it - assert(nextBlockFront != nextBlockTail); - AE_UNUSED(nextBlockTail); - - // We're done with this block, let the producer use it if it needs - fence(memory_order_release); // Expose possibly pending changes to frontBlock->front from last dequeue - frontBlock = frontBlock_ = nextBlock; - - compiler_fence(memory_order_release); // Not strictly needed - - auto element = reinterpret_cast(frontBlock_->data + nextBlockFront * sizeof(T)); - - result = std::move(*element); - element->~T(); - - nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask; - - fence(memory_order_release); - frontBlock_->front = nextBlockFront; - } - else { - // No elements in current block and no other block to advance to - return false; - } - - return true; - } - - - // Returns a pointer to the front element in the queue (the one that - // would be removed next by a call to `try_dequeue` or `pop`). If the - // queue appears empty at the time the method is called, nullptr is - // returned instead. - // Must be called only from the consumer thread. - T* peek() - { -#ifndef NDEBUG - ReentrantGuard guard(this->dequeuing); -#endif - // See try_dequeue() for reasoning - - Block* frontBlock_ = frontBlock.load(); - size_t blockTail = frontBlock_->localTail; - size_t blockFront = frontBlock_->front.load(); - - if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) { - fence(memory_order_acquire); - non_empty_front_block: - return reinterpret_cast(frontBlock_->data + blockFront * sizeof(T)); - } - else if (frontBlock_ != tailBlock.load()) { - fence(memory_order_acquire); - frontBlock_ = frontBlock.load(); - blockTail = frontBlock_->localTail = frontBlock_->tail.load(); - blockFront = frontBlock_->front.load(); - fence(memory_order_acquire); - - if (blockFront != blockTail) { - goto non_empty_front_block; - } - - Block* nextBlock = frontBlock_->next; - - size_t nextBlockFront = nextBlock->front.load(); - fence(memory_order_acquire); - - assert(nextBlockFront != nextBlock->tail.load()); - return reinterpret_cast(nextBlock->data + nextBlockFront * sizeof(T)); - } - - return nullptr; - } - - // Removes the front element from the queue, if any, without returning it. - // Returns true on success, or false if the queue appeared empty at the time - // `pop` was called. - bool pop() - { -#ifndef NDEBUG - ReentrantGuard guard(this->dequeuing); -#endif - // See try_dequeue() for reasoning - - Block* frontBlock_ = frontBlock.load(); - size_t blockTail = frontBlock_->localTail; - size_t blockFront = frontBlock_->front.load(); - - if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) { - fence(memory_order_acquire); - - non_empty_front_block: - auto element = reinterpret_cast(frontBlock_->data + blockFront * sizeof(T)); - element->~T(); - - blockFront = (blockFront + 1) & frontBlock_->sizeMask; - - fence(memory_order_release); - frontBlock_->front = blockFront; - } - else if (frontBlock_ != tailBlock.load()) { - fence(memory_order_acquire); - frontBlock_ = frontBlock.load(); - blockTail = frontBlock_->localTail = frontBlock_->tail.load(); - blockFront = frontBlock_->front.load(); - fence(memory_order_acquire); - - if (blockFront != blockTail) { - goto non_empty_front_block; - } - - // Front block is empty but there's another block ahead, advance to it - Block* nextBlock = frontBlock_->next; - - size_t nextBlockFront = nextBlock->front.load(); - size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load(); - fence(memory_order_acquire); - - assert(nextBlockFront != nextBlockTail); - AE_UNUSED(nextBlockTail); - - fence(memory_order_release); - frontBlock = frontBlock_ = nextBlock; - - compiler_fence(memory_order_release); - - auto element = reinterpret_cast(frontBlock_->data + nextBlockFront * sizeof(T)); - element->~T(); - - nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask; - - fence(memory_order_release); - frontBlock_->front = nextBlockFront; - } - else { - // No elements in current block and no other block to advance to - return false; - } - - return true; - } - - // Returns the approximate number of items currently in the queue. - // Safe to call from both the producer and consumer threads. - inline size_t size_approx() const - { - size_t result = 0; - Block* frontBlock_ = frontBlock.load(); - Block* block = frontBlock_; - do { - fence(memory_order_acquire); - size_t blockFront = block->front.load(); - size_t blockTail = block->tail.load(); - result += (blockTail - blockFront) & block->sizeMask; - block = block->next.load(); - } while (block != frontBlock_); - return result; - } - - - private: - enum AllocationMode { CanAlloc, CannotAlloc }; - - template - bool inner_enqueue(U&& element) - { -#ifndef NDEBUG - ReentrantGuard guard(this->enqueuing); -#endif - - // High-level pseudocode (assuming we're allowed to alloc a new block): - // If room in tail block, add to tail - // Else check next block - // If next block is not the head block, enqueue on next block - // Else create a new block and enqueue there - // Advance tail to the block we just enqueued to - - Block* tailBlock_ = tailBlock.load(); - size_t blockFront = tailBlock_->localFront; - size_t blockTail = tailBlock_->tail.load(); - - size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask; - if (nextBlockTail != blockFront || nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load())) { - fence(memory_order_acquire); - // This block has room for at least one more element - char* location = tailBlock_->data + blockTail * sizeof(T); - new (location) T(std::forward(element)); - - fence(memory_order_release); - tailBlock_->tail = nextBlockTail; - } - else { - fence(memory_order_acquire); - if (tailBlock_->next.load() != frontBlock) { - // Note that the reason we can't advance to the frontBlock and start adding new entries there - // is because if we did, then dequeue would stay in that block, eventually reading the new values, - // instead of advancing to the next full block (whose values were enqueued first and so should be - // consumed first). - - fence(memory_order_acquire); // Ensure we get latest writes if we got the latest frontBlock - - // tailBlock is full, but there's a free block ahead, use it - Block* tailBlockNext = tailBlock_->next.load(); - size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load(); - nextBlockTail = tailBlockNext->tail.load(); - fence(memory_order_acquire); - - // This block must be empty since it's not the head block and we - // go through the blocks in a circle - assert(nextBlockFront == nextBlockTail); - tailBlockNext->localFront = nextBlockFront; - - char* location = tailBlockNext->data + nextBlockTail * sizeof(T); - new (location) T(std::forward(element)); - - tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask; - - fence(memory_order_release); - tailBlock = tailBlockNext; - } - else if (canAlloc == CanAlloc) { - // tailBlock is full and there's no free block ahead; create a new block - auto newBlockSize = largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2; - auto newBlock = make_block(newBlockSize); - if (newBlock == nullptr) { - // Could not allocate a block! - return false; - } - largestBlockSize = newBlockSize; - - new (newBlock->data) T(std::forward(element)); - - assert(newBlock->front == 0); - newBlock->tail = newBlock->localTail = 1; - - newBlock->next = tailBlock_->next.load(); - tailBlock_->next = newBlock; - - // Might be possible for the dequeue thread to see the new tailBlock->next - // *without* seeing the new tailBlock value, but this is OK since it can't - // advance to the next block until tailBlock is set anyway (because the only - // case where it could try to read the next is if it's already at the tailBlock, - // and it won't advance past tailBlock in any circumstance). - - fence(memory_order_release); - tailBlock = newBlock; - } - else if (canAlloc == CannotAlloc) { - // Would have had to allocate a new block to enqueue, but not allowed - return false; - } - else { - assert(false && "Should be unreachable code"); - return false; - } - } - - return true; - } - - - // Disable copying - ReaderWriterQueue(ReaderWriterQueue const&) { } - - // Disable assignment - ReaderWriterQueue& operator=(ReaderWriterQueue const&) { } - - - - AE_FORCEINLINE static size_t ceilToPow2(size_t x) - { - // From http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 - --x; - x |= x >> 1; - x |= x >> 2; - x |= x >> 4; - for (size_t i = 1; i < sizeof(size_t); i <<= 1) { - x |= x >> (i << 3); - } - ++x; - return x; - } - - template - static AE_FORCEINLINE char* align_for(char* ptr) - { - const std::size_t alignment = std::alignment_of::value; - return ptr + (alignment - (reinterpret_cast(ptr) % alignment)) % alignment; - } - private: -#ifndef NDEBUG - struct ReentrantGuard - { - ReentrantGuard(bool& _inSection) - : inSection(_inSection) - { - assert(!inSection && "ReaderWriterQueue does not support enqueuing or dequeuing elements from other elements' ctors and dtors"); - inSection = true; - } - - ~ReentrantGuard() { inSection = false; } - - private: - ReentrantGuard& operator=(ReentrantGuard const&); - - private: - bool& inSection; - }; -#endif - - struct Block - { - // Avoid false-sharing by putting highly contended variables on their own cache lines - weak_atomic front; // (Atomic) Elements are read from here - size_t localTail; // An uncontended shadow copy of tail, owned by the consumer - - char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic) - sizeof(size_t)]; - weak_atomic tail; // (Atomic) Elements are enqueued here - size_t localFront; - - char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic) - sizeof(size_t)]; // next isn't very contended, but we don't want it on the same cache line as tail (which is) - weak_atomic next; // (Atomic) - - char* data; // Contents (on heap) are aligned to T's alignment - - const size_t sizeMask; - - - // size must be a power of two (and greater than 0) - Block(size_t const& _size, char* _rawThis, char* _data) - : front(0), localTail(0), tail(0), localFront(0), next(nullptr), data(_data), sizeMask(_size - 1), rawThis(_rawThis) - { - } - - private: - // C4512 - Assignment operator could not be generated - Block& operator=(Block const&); - - public: - char* rawThis; - }; - - - static Block* make_block(size_t capacity) - { - // Allocate enough memory for the block itself, as well as all the elements it will contain - auto size = sizeof(Block) + std::alignment_of::value - 1; - size += sizeof(T) * capacity + std::alignment_of::value - 1; - auto newBlockRaw = static_cast(std::malloc(size)); - if (newBlockRaw == nullptr) { - return nullptr; - } - - auto newBlockAligned = align_for(newBlockRaw); - auto newBlockData = align_for(newBlockAligned + sizeof(Block)); - return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData); - } - - private: - weak_atomic frontBlock; // (Atomic) Elements are enqueued to this block - - char cachelineFiller[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic)]; - weak_atomic tailBlock; // (Atomic) Elements are dequeued from this block - - size_t largestBlockSize; - -#ifndef NDEBUG - bool enqueuing; - bool dequeuing; -#endif - }; - -// Like ReaderWriterQueue, but also providees blocking operations - template - class BlockingReaderWriterQueue - { - private: - typedef ::moodycamel::ReaderWriterQueue ReaderWriterQueue; - - public: - explicit BlockingReaderWriterQueue(size_t maxSize = 15) - : inner(maxSize) - { } - - - // Enqueues a copy of element if there is room in the queue. - // Returns true if the element was enqueued, false otherwise. - // Does not allocate memory. - AE_FORCEINLINE bool try_enqueue(T const& element) - { - if (inner.try_enqueue(element)) { - sema.signal(); - return true; - } - return false; - } - - // Enqueues a moved copy of element if there is room in the queue. - // Returns true if the element was enqueued, false otherwise. - // Does not allocate memory. - AE_FORCEINLINE bool try_enqueue(T&& element) - { - if (inner.try_enqueue(std::forward(element))) { - sema.signal(); - return true; - } - return false; - } - - - // Enqueues a copy of element on the queue. - // Allocates an additional block of memory if needed. - // Only fails (returns false) if memory allocation fails. - AE_FORCEINLINE bool enqueue(T const& element) - { - if (inner.enqueue(element)) { - sema.signal(); - return true; - } - return false; - } - - // Enqueues a moved copy of element on the queue. - // Allocates an additional block of memory if needed. - // Only fails (returns false) if memory allocation fails. - AE_FORCEINLINE bool enqueue(T&& element) - { - if (inner.enqueue(std::forward(element))) { - sema.signal(); - return true; - } - return false; - } - - - // Attempts to dequeue an element; if the queue is empty, - // returns false instead. If the queue has at least one element, - // moves front to result using operator=, then returns true. - template - bool try_dequeue(U& result) - { - if (sema.tryWait()) { - bool success = inner.try_dequeue(result); - assert(success); - AE_UNUSED(success); - return true; - } - return false; - } - - - // Attempts to dequeue an element; if the queue is empty, - // waits until an element is available, then dequeues it. - template - void wait_dequeue(U& result) - { - sema.wait(); - bool success = inner.try_dequeue(result); - AE_UNUSED(result); - assert(success); - AE_UNUSED(success); - } - - - // Attempts to dequeue an element; if the queue is empty, - // waits until an element is available up to the specified timeout, - // then dequeues it and returns true, or returns false if the timeout - // expires before an element can be dequeued. - // Using a negative timeout indicates an indefinite timeout, - // and is thus functionally equivalent to calling wait_dequeue. - template - bool wait_dequeue_timed(U& result, std::int64_t timeout_usecs) - { - if (!sema.wait(timeout_usecs)) { - return false; - } - bool success = inner.try_dequeue(result); - AE_UNUSED(result); - assert(success); - AE_UNUSED(success); - return true; - } - - -#if __cplusplus > 199711L || _MSC_VER >= 1700 - // Attempts to dequeue an element; if the queue is empty, - // waits until an element is available up to the specified timeout, - // then dequeues it and returns true, or returns false if the timeout - // expires before an element can be dequeued. - // Using a negative timeout indicates an indefinite timeout, - // and is thus functionally equivalent to calling wait_dequeue. - template - inline bool wait_dequeue_timed(U& result, std::chrono::duration const& timeout) - { - return wait_dequeue_timed(result, std::chrono::duration_cast(timeout).count()); - } -#endif - - - // Returns a pointer to the front element in the queue (the one that - // would be removed next by a call to `try_dequeue` or `pop`). If the - // queue appears empty at the time the method is called, nullptr is - // returned instead. - // Must be called only from the consumer thread. - AE_FORCEINLINE T* peek() - { - return inner.peek(); - } - - // Removes the front element from the queue, if any, without returning it. - // Returns true on success, or false if the queue appeared empty at the time - // `pop` was called. - AE_FORCEINLINE bool pop() - { - if (sema.tryWait()) { - bool result = inner.pop(); - assert(result); - AE_UNUSED(result); - return true; - } - return false; - } - - // Returns the approximate number of items currently in the queue. - // Safe to call from both the producer and consumer threads. - AE_FORCEINLINE size_t size_approx() const - { - return sema.availableApprox(); - } - - - private: - // Disable copying & assignment - BlockingReaderWriterQueue(ReaderWriterQueue const&) { } - BlockingReaderWriterQueue& operator=(ReaderWriterQueue const&) { } - - private: - ReaderWriterQueue inner; - spsc_sema::LightweightSemaphore sema; - }; - -} // end namespace moodycamel - -#ifdef AE_VCPP -#pragma warning(pop) -#endif \ No newline at end of file