Skip to content

Commit

Permalink
Merge pull request #28 from Telmate/PS-2221-FIX
Browse files Browse the repository at this point in the history
Ps 2221 fix
  • Loading branch information
avis authored Jul 26, 2018
2 parents 27ecc94 + 27f25a1 commit 60e1168
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,31 @@ public interface TelmateFrameGrabber extends OpenCVFilter {



/**
*
* clear plugin data
* @return 1 for true 0 for false *
**/
int cleanup();

/**
*
* Asynchronous version of cleanup:
* {@link Continuation#onSuccess} is called when the action is
* done. If an error occurs, {@link Continuation#onError} is called.
* @see TelmateFrameGrabber#cleanup
*
**/
void cleanup(Continuation<Integer> cont);

/**
*
* clear plugin data
* @return 1 for true 0 for false *
**/
TFuture<Integer> cleanup(Transaction tx);


/**
*
* get Snapshot interval
Expand Down Expand Up @@ -202,6 +227,124 @@ public interface TelmateFrameGrabber extends OpenCVFilter {
void setOutputFormat(Transaction tx, @org.kurento.client.internal.server.Param("outputFormat") int outputFormat);


/**
*
* get monitoring Thread timeout in seconds
* @return path of snapshots location *
**/
int getMonitorTimeoutSec();

/**
*
* Asynchronous version of getMonitorTimeoutSec:
* {@link Continuation#onSuccess} is called when the action is
* done. If an error occurs, {@link Continuation#onError} is called.
* @see TelmateFrameGrabber#getMonitorTimeoutSec
*
**/
void getMonitorTimeoutSec(Continuation<Integer> cont);

/**
*
* get monitoring Thread timeout in seconds
* @return path of snapshots location *
**/
TFuture<Integer> getMonitorTimeoutSec(Transaction tx);


/**
*
* set monitoring Thread timeout in seconds
*
* @param timeout
* session timeout in sec
*
**/
void setMonitorTimeoutSec(@org.kurento.client.internal.server.Param("timeout") int timeout);

/**
*
* Asynchronous version of setMonitorTimeoutSec:
* {@link Continuation#onSuccess} is called when the action is
* done. If an error occurs, {@link Continuation#onError} is called.
* @see TelmateFrameGrabber#setMonitorTimeoutSec
*
* @param timeout
* session timeout in sec
*
**/
void setMonitorTimeoutSec(@org.kurento.client.internal.server.Param("timeout") int timeout, Continuation<Void> cont);

/**
*
* set monitoring Thread timeout in seconds
*
* @param timeout
* session timeout in sec
*
**/
void setMonitorTimeoutSec(Transaction tx, @org.kurento.client.internal.server.Param("timeout") int timeout);


/**
*
* get the session id for this session
* @return UUID set for session *
**/
String getSessionUUID();

/**
*
* Asynchronous version of getSessionUUID:
* {@link Continuation#onSuccess} is called when the action is
* done. If an error occurs, {@link Continuation#onError} is called.
* @see TelmateFrameGrabber#getSessionUUID
*
**/
void getSessionUUID(Continuation<String> cont);

/**
*
* get the session id for this session
* @return UUID set for session *
**/
TFuture<String> getSessionUUID(Transaction tx);


/**
*
* set the session UUID
*
* @param puuid
* path of snapshots location
*
**/
void setSessionUUID(@org.kurento.client.internal.server.Param("puuid") String puuid);

/**
*
* Asynchronous version of setSessionUUID:
* {@link Continuation#onSuccess} is called when the action is
* done. If an error occurs, {@link Continuation#onError} is called.
* @see TelmateFrameGrabber#setSessionUUID
*
* @param puuid
* path of snapshots location
*
**/
void setSessionUUID(@org.kurento.client.internal.server.Param("puuid") String puuid, Continuation<Void> cont);

/**
*
* set the session UUID
*
* @param puuid
* path of snapshots location
*
**/
void setSessionUUID(Transaction tx, @org.kurento.client.internal.server.Param("puuid") String puuid);





Expand All @@ -219,13 +362,13 @@ public Builder(org.kurento.client.MediaPipeline mediaPipeline){
props.add("mediaPipeline",mediaPipeline);
}

public Builder withProperties(Properties properties) {
return (Builder)super.withProperties(properties);
}
public Builder withProperties(Properties properties) {
return (Builder)super.withProperties(properties);
}

public Builder with(String name, Object value) {
return (Builder)super.with(name, value);
}
public Builder with(String name, Object value) {
return (Builder)super.with(name, value);
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,43 +35,50 @@ TelmateFrameGrabberOpenCVImpl::TelmateFrameGrabberOpenCVImpl ()
this->outputFormat = FGFMT_JPEG;
this->lastQueueTimeStamp = 0;
this->queueLength = 0;
this->frameQueue = new avis_blocking_queue<VideoFrame*>;
this->monitorTimeoutMs = 60000; // 60 sec.

this->isThreaded = false;

this->thr = new boost::thread(boost::bind(
&TelmateFrameGrabberOpenCVImpl::queueHandler, this));
this->thr->detach();
if(this->isThreaded) {
this->frameQueue = new avis_blocking_queue<VideoFrame *>;

this->wdThr = new boost::thread(boost::bind(
&TelmateFrameGrabberOpenCVImpl::watchDogThread, this));
this->wdThr->detach();
this->thr = new boost::thread(boost::bind(
&TelmateFrameGrabberOpenCVImpl::queueHandler, this));
this->thr->detach();

GST_INFO("Constructor was called for %s", this->epName.c_str());
this->wdThr = new boost::thread(boost::bind(
&TelmateFrameGrabberOpenCVImpl::watchDogThread, this));
this->wdThr->detach();

GST_INFO("*** THREADED *** Constructor was called for %s", this->epName.c_str());
}
else {
GST_INFO("*** NON-THREADED *** Constructor was called for %s", this->epName.c_str());
}
}

TelmateFrameGrabberOpenCVImpl::~TelmateFrameGrabberOpenCVImpl() {

VideoFrame *ptrVf;
this->thrLoop = false;
this->monThreadLoop = false;
if(this->isThreaded) {
VideoFrame *ptrVf;
this->thrLoop = false;
this->monThreadLoop = false;

boost::this_thread::sleep_for(boost::chrono::milliseconds(50));
boost::this_thread::sleep_for(boost::chrono::milliseconds(50));


while(this->queueLength > 0) {
this->frameQueue->pop(ptrVf); // blocks
--this->queueLength;
if(ptrVf != NULL) {
while (this->queueLength > 0) {
this->frameQueue->pop(ptrVf); // blocks
--this->queueLength;
if (ptrVf != NULL) {
delete ptrVf;
ptrVf = NULL;
}
}
}

delete this->frameQueue;
this->frameQueue = NULL;

delete this->frameQueue;
this->frameQueue = NULL;
}

GST_INFO("Destructor was called for %s", this->epName.c_str());

Expand All @@ -88,19 +95,94 @@ int TelmateFrameGrabberOpenCVImpl::cleanup() {
* be performed here and will be sent back to Media Pipline.
*/
void TelmateFrameGrabberOpenCVImpl::process(cv::Mat &mat) {



if ((this->getCurrentTimestampLong() - this->lastQueueTimeStamp) >= this->snapInterval) {

if(this->thrLoop) { // do not push into the queue if the destructor was called.
if(this->isThreaded) {

if (this->thrLoop) { // do not push into the queue if the destructor was called.
this->lastQueueTimeStamp = this->getCurrentTimestampLong();
VideoFrame *ptrVf = new VideoFrame();
ptrVf->mat = mat.clone();
ptrVf->ts = std::to_string(this->lastQueueTimeStamp);

this->frameQueue->push(ptrVf);
++this->queueLength;
++this->framesCounter;
}
}
else {

std::vector<int> params;
std::string image_extension;

this->lastQueueTimeStamp = this->getCurrentTimestampLong();
VideoFrame *ptrVf = new VideoFrame();
ptrVf->mat = mat.clone();
ptrVf->ts = std::to_string(this->lastQueueTimeStamp);

this->frameQueue->push(ptrVf);
++this->queueLength;

switch (this->outputFormat) {
case FGFMT_JPEG:
/* Set jpeg params */
params.push_back(CV_IMWRITE_JPEG_QUALITY);
params.push_back(FG_JPEG_QUALITY);
image_extension = ".jpeg";
break;
case FGFMT_PNG:
/* Set PNG parameters, compression etc. */
params.push_back(CV_IMWRITE_PNG_COMPRESSION);
params.push_back(FG_PNG_QUALITY);
image_extension = ".png";
break;
default:
/* Defaults to jpeg */
params.push_back(CV_IMWRITE_JPEG_QUALITY);
params.push_back(FG_JPEG_QUALITY);
image_extension = ".jpeg";
break;
}

std::string filename =
std::to_string((long) this->framesCounter) + "_" +
std::to_string(this->getCurrentTimestampLong()) +
image_extension;


if (this->storagePathSubdir.empty()) {

this->storagePathSubdir = this->storagePath + "/frames_" + this->getCurrentTimestampString();
boost::filesystem::path dir(this->storagePathSubdir.c_str());
GST_INFO("going to create a directory in %s", this->storagePathSubdir.c_str());
if (!boost::filesystem::create_directories(dir)) {
GST_ERROR("%s create_directories() failed for: %s", this->epName.c_str(),
this->storagePathSubdir.c_str());
}

}

std::string fullpath = this->storagePathSubdir + "/" + filename;

try {
cv::imwrite(fullpath.c_str(), mat, params);
}
catch (...) {
GST_ERROR("::queueHandler() imgwrite() failed.");
/*throw KurentoException(NOT_IMPLEMENTED,
"TelmateFrameGrabberOpenCVImpl::queueHandler() imgwrite() failed. \n");*/
}

++this->framesCounter;


}


}





}
/*
* This function is executed inside the queueHandler thread as a main() function.
Expand All @@ -110,7 +192,7 @@ void TelmateFrameGrabberOpenCVImpl::process(cv::Mat &mat) {
void TelmateFrameGrabberOpenCVImpl::queueHandler() {

VideoFrame *ptrVf;
cv::Mat image;
//cv::Mat image;
std::vector<int> params;
std::string image_extension;

Expand Down Expand Up @@ -298,8 +380,6 @@ void TelmateFrameGrabberOpenCVImpl::setSessionUUID(const std::string &puuid) {
return;
}



} /* telmateframegrabber */
} /* module */
} /* kurento */
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class TelmateFrameGrabberOpenCVImpl : public virtual OpenCVProcess
boost::atomic<int> queueLength;
int64 monitorTimeoutMs;

boost::atomic<bool> isThreaded;

void queueHandler();
void watchDogThread();

Expand Down
2 changes: 1 addition & 1 deletion module/src/server/interface/telmateframegrabber.kmd.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "telmateframegrabber",
"version": "2.0.2",
"version": "2.0.4",
"kurentoVersion": "^6.7.1"
}
2 changes: 1 addition & 1 deletion module/src/server/kmd/telmate.kmd.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "telmate",
"version": "2.0.2",
"version": "2.0.4",
"kurentoVersion": "^6.7.1",
"imports": [
{
Expand Down

0 comments on commit 60e1168

Please sign in to comment.