Skip to content

Commit

Permalink
Switch to a blocking SPSC queue (included template also supports a no…
Browse files Browse the repository at this point in the history
…n-blocking version of this queue).

A few changes to the consumer logic which will result in a full cleanup once a destructor is called ;)
Change some logging
  • Loading branch information
Avi Saranga committed Nov 21, 2017
1 parent 3bc0065 commit 8e89171
Show file tree
Hide file tree
Showing 4 changed files with 1,553 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ namespace kurento {

TelmateFrameGrabberOpenCVImpl::TelmateFrameGrabberOpenCVImpl() {


this->thrLoop = true;
this->snapInterval = 1000;
this->epName = "EP_NAME_UNINITIALIZED";
Expand All @@ -25,29 +24,24 @@ TelmateFrameGrabberOpenCVImpl::TelmateFrameGrabberOpenCVImpl() {
this->lastQueueTimeStamp = 0;
this->queueLength = 0;

this->frameQueue = new boost::lockfree::queue<VideoFrame *>(0);
this->frameQueue = new BlockingReaderWriterQueue<VideoFrame *>(QUEUE_BASE_ALLOC);
this->thr = new boost::thread(boost::bind(
&TelmateFrameGrabberOpenCVImpl::queueHandler, this));
this->thr->detach();
GST_INFO("TelmateFrameGrabberOpenCVImpl::TelmateFrameGrabberOpenCVImpl()");




GST_INFO("FrameGrabber Constructor was called for %s", this->epName.c_str());
}


TelmateFrameGrabberOpenCVImpl::~TelmateFrameGrabberOpenCVImpl() {

this->thrLoop = false;

while(queueLength > 0) {
boost::this_thread::sleep_for(boost::chrono::milliseconds(10));

}

delete this->frameQueue;
this->frameQueue = NULL;
GST_INFO("FrameGrabber Destructor was called for %s", this->epName.c_str());

GST_INFO("TelmateFrameGrabberOpenCVImpl::"
"~TelmateFrameGrabberOpenCVImpl() "
"called, %s ", this->epName.c_str());
}


Expand All @@ -63,7 +57,7 @@ void TelmateFrameGrabberOpenCVImpl::process(cv::Mat &mat) {
VideoFrame *ptrVf = new VideoFrame();
ptrVf->mat = mat.clone();
ptrVf->ts = std::to_string(this->lastQueueTimeStamp);
this->frameQueue->push(ptrVf);
this->frameQueue->enqueue(ptrVf);
++this->queueLength;
++this->framesCounter;
}
Expand All @@ -81,12 +75,10 @@ void TelmateFrameGrabberOpenCVImpl::queueHandler() {
std::vector<int> params;
std::string image_extension;

while (this->thrLoop) {

if (this->queueLength > 0) {
while (this->thrLoop && this->frameQueue->wait_dequeue_timed(ptrVf,std::chrono::milliseconds(5))) {

params.clear(); // clear the vector since the last iteration.
this->frameQueue->pop(ptrVf);
this->lastQueueTimeStamp = this->getCurrentTimestampLong();
--this->queueLength;

switch (this->outputFormat) {
Expand Down Expand Up @@ -128,7 +120,7 @@ void TelmateFrameGrabberOpenCVImpl::queueHandler() {
cv::imwrite(fullpath.c_str(), ptrVf->mat, params);
}
catch (...) {
GST_ERROR("TelmateFrameGrabberOpenCVImpl::queueHandler() imgwrite() failed.");
GST_ERROR("::queueHandler() imgwrite() failed.");
throw KurentoException(NOT_IMPLEMENTED,
"TelmateFrameGrabberOpenCVImpl::queueHandler() imgwrite() failed. \n");
}
Expand All @@ -137,16 +129,23 @@ void TelmateFrameGrabberOpenCVImpl::queueHandler() {

delete ptrVf;
ptrVf = NULL;
} else {
boost::this_thread::sleep(boost::posix_time::seconds(1));
}

}

while(this->frameQueue->try_dequeue(ptrVf)) { /* Empty the queue post processing if the dtor was called */
--this->queueLength;
delete ptrVf;
ptrVf = NULL;

}


delete this->frameQueue;
this->frameQueue = NULL;

}


std::string TelmateFrameGrabberOpenCVImpl::getCurrentTimestampString() {
struct timeval tp;
long int ms;
Expand All @@ -160,7 +159,6 @@ std::string TelmateFrameGrabberOpenCVImpl::getCurrentTimestampString() {

long TelmateFrameGrabberOpenCVImpl::getCurrentTimestampLong() {
struct timeval tp;
std::stringstream sstr_ts;

gettimeofday(&tp, NULL);
return (tp.tv_sec * 1000 + tp.tv_usec / 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
#include <boost/bind.hpp>

#include <boost/thread/thread.hpp>
#include <boost/lockfree/queue.hpp>

#include "atomicops.h"
#include "readerwriterqueue.h"

#include <boost/atomic.hpp>

Expand All @@ -33,6 +35,14 @@
#define FG_JPEG_QUALITY 20
#define FG_PNG_QUALITY 9

#define MAX_IDLE_QUEUE_TIME_NS 30000
#define QUEUE_BASE_ALLOC 1000

using namespace moodycamel;




namespace kurento {

class TelmateFrameGrabberOpenCVImpl : public virtual OpenCVProcess {
Expand Down Expand Up @@ -73,7 +83,8 @@ class TelmateFrameGrabberOpenCVImpl : public virtual OpenCVProcess {

boost::asio::io_service ioService;
boost::thread_group tp;
boost::lockfree::queue<VideoFrame*> *frameQueue;

BlockingReaderWriterQueue<VideoFrame*> *frameQueue;
boost::thread* thr;
boost::atomic<bool> thrLoop;

Expand Down
Loading

0 comments on commit 8e89171

Please sign in to comment.