Skip to content

Commit

Permalink
A blocking queue implementation added to ensure no funky stuff happen…
Browse files Browse the repository at this point in the history
…s with higher loads.
  • Loading branch information
Avi Saranga committed Nov 29, 2017
1 parent 8e89171 commit 4ba54e7
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 1,588 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,20 @@ TelmateFrameGrabberOpenCVImpl::TelmateFrameGrabberOpenCVImpl() {
this->outputFormat = FGFMT_JPEG;
this->lastQueueTimeStamp = 0;
this->queueLength = 0;
this->frameQueue = new avis_blocking_queue<VideoFrame*>;

this->frameQueue = new BlockingReaderWriterQueue<VideoFrame *>(QUEUE_BASE_ALLOC);
this->thr = new boost::thread(boost::bind(
&TelmateFrameGrabberOpenCVImpl::queueHandler, this));
this->thr->detach();




GST_INFO("FrameGrabber Constructor was called for %s", this->epName.c_str());
GST_INFO("Constructor was called for %s", this->epName.c_str());
}


TelmateFrameGrabberOpenCVImpl::~TelmateFrameGrabberOpenCVImpl() {

this->thrLoop = false;

GST_INFO("FrameGrabber Destructor was called for %s", this->epName.c_str());
GST_INFO("Destructor was called for %s", this->epName.c_str());

}

Expand All @@ -51,15 +47,18 @@ TelmateFrameGrabberOpenCVImpl::~TelmateFrameGrabberOpenCVImpl() {
* here. Any changes in mat, will be sent through the Media Pipeline.
*/
void TelmateFrameGrabberOpenCVImpl::process(cv::Mat &mat) {
if ((this->getCurrentTimestampLong() -
this->lastQueueTimeStamp) >= this->snapInterval) {
this->lastQueueTimeStamp = this->getCurrentTimestampLong();
VideoFrame *ptrVf = new VideoFrame();
ptrVf->mat = mat.clone();
ptrVf->ts = std::to_string(this->lastQueueTimeStamp);
this->frameQueue->enqueue(ptrVf);
++this->queueLength;
++this->framesCounter;
if ((this->getCurrentTimestampLong() - this->lastQueueTimeStamp) >= this->snapInterval) {

if(this->thrLoop) { // do not push into the queue if the destructor was called.
this->lastQueueTimeStamp = this->getCurrentTimestampLong();
VideoFrame *ptrVf = new VideoFrame();
ptrVf->mat = mat.clone();
ptrVf->ts = std::to_string(this->lastQueueTimeStamp);

this->frameQueue->push(ptrVf);
++this->queueLength;
++this->framesCounter;
}
}
}
/*
Expand All @@ -75,64 +74,67 @@ void TelmateFrameGrabberOpenCVImpl::queueHandler() {
std::vector<int> params;
std::string image_extension;

while (this->thrLoop && this->frameQueue->wait_dequeue_timed(ptrVf,std::chrono::milliseconds(5))) {
while (this->thrLoop) {

params.clear(); // clear the vector since the last iteration.
this->lastQueueTimeStamp = this->getCurrentTimestampLong();
--this->queueLength;

switch (this->outputFormat) {
case FGFMT_JPEG:
/* Set jpeg params */
params.push_back(CV_IMWRITE_JPEG_QUALITY);
params.push_back(FG_JPEG_QUALITY);
image_extension = ".jpeg";
break;
case FGFMT_PNG:
/* Set PNG parameters, compression etc. */
params.push_back(CV_IMWRITE_PNG_COMPRESSION);
params.push_back(FG_PNG_QUALITY);
image_extension = ".png";
break;
default:
/* Defaults to jpeg */
params.push_back(CV_IMWRITE_JPEG_QUALITY);
params.push_back(FG_JPEG_QUALITY);
image_extension = ".jpeg";
break;
}

std::string filename =
std::to_string((long) this->framesCounter) + "_" + ptrVf->ts + image_extension;
this->frameQueue->pop(ptrVf); // blocks
params.clear(); // clear the vector since the last iteration.
this->lastQueueTimeStamp = this->getCurrentTimestampLong();
--this->queueLength;

if (this->storagePathSubdir.empty()) {
this->storagePathSubdir = this->storagePath + "/frames_" + this->getCurrentTimestampString();
boost::filesystem::path dir(this->storagePathSubdir.c_str());
if (!boost::filesystem::create_directories(dir)) {
GST_ERROR("%s create_directories() failed for: %s", this->epName.c_str(),
this->storagePathSubdir.c_str());
}
switch (this->outputFormat) {
case FGFMT_JPEG:
/* Set jpeg params */
params.push_back(CV_IMWRITE_JPEG_QUALITY);
params.push_back(FG_JPEG_QUALITY);
image_extension = ".jpeg";
break;
case FGFMT_PNG:
/* Set PNG parameters, compression etc. */
params.push_back(CV_IMWRITE_PNG_COMPRESSION);
params.push_back(FG_PNG_QUALITY);
image_extension = ".png";
break;
default:
/* Defaults to jpeg */
params.push_back(CV_IMWRITE_JPEG_QUALITY);
params.push_back(FG_JPEG_QUALITY);
image_extension = ".jpeg";
break;
}

std::string filename =
std::to_string((long) this->framesCounter) + "_" + ptrVf->ts + image_extension;

if (this->storagePathSubdir.empty()) {
this->storagePathSubdir = this->storagePath + "/frames_" + this->getCurrentTimestampString();
boost::filesystem::path dir(this->storagePathSubdir.c_str());
if (!boost::filesystem::create_directories(dir)) {
GST_ERROR("%s create_directories() failed for: %s", this->epName.c_str(),
this->storagePathSubdir.c_str());
}
}

std::string fullpath = this->storagePathSubdir + "/" + filename;
std::string fullpath = this->storagePathSubdir + "/" + filename;

try {
cv::imwrite(fullpath.c_str(), ptrVf->mat, params);
}
catch (...) {
GST_ERROR("::queueHandler() imgwrite() failed.");
throw KurentoException(NOT_IMPLEMENTED,
"TelmateFrameGrabberOpenCVImpl::queueHandler() imgwrite() failed. \n");
}
try {
cv::imwrite(fullpath.c_str(), ptrVf->mat, params);
}
catch (...) {
GST_ERROR("::queueHandler() imgwrite() failed.");
throw KurentoException(NOT_IMPLEMENTED,
"TelmateFrameGrabberOpenCVImpl::queueHandler() imgwrite() failed. \n");
}

ptrVf->mat.release(); // release internal memory allocations
ptrVf->mat.release(); // release internal memory allocations

delete ptrVf;
ptrVf = NULL;
delete ptrVf;
ptrVf = NULL;

}

while(this->frameQueue->try_dequeue(ptrVf)) { /* Empty the queue post processing if the dtor was called */
while(!this->frameQueue->empty()) {

this->frameQueue->pop(ptrVf); // blocks
--this->queueLength;
delete ptrVf;
ptrVf = NULL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#ifndef __TELMATE_FRAME_GRABBER_OPENCV_IMPL_HPP__
#define __TELMATE_FRAME_GRABBER_OPENCV_IMPL_HPP__

#define NDEBUG 1

#include <ctime>
#include <iostream>
#include <OpenCVProcess.hpp>
Expand All @@ -17,8 +19,7 @@

#include <boost/thread/thread.hpp>

#include "atomicops.h"
#include "readerwriterqueue.h"
#include "avisqueue.hpp"

#include <boost/atomic.hpp>

Expand All @@ -36,9 +37,9 @@
#define FG_PNG_QUALITY 9

#define MAX_IDLE_QUEUE_TIME_NS 30000
#define QUEUE_BASE_ALLOC 1000
#define QUEUE_BASE_ELEMENT_ALLOC 1000

using namespace moodycamel;
//using namespace moodycamel;



Expand Down Expand Up @@ -84,7 +85,7 @@ class TelmateFrameGrabberOpenCVImpl : public virtual OpenCVProcess {
boost::asio::io_service ioService;
boost::thread_group tp;

BlockingReaderWriterQueue<VideoFrame*> *frameQueue;
avis_blocking_queue<VideoFrame*> *frameQueue;
boost::thread* thr;
boost::atomic<bool> thrLoop;

Expand Down
Loading

0 comments on commit 4ba54e7

Please sign in to comment.