Skip to content
This repository has been archived by the owner on Feb 1, 2021. It is now read-only.

Commit

Permalink
Merge pull request #1700 from foglamp/FOGL-3204.170RC
Browse files Browse the repository at this point in the history
FOGL-3204 Fix delays in sending data from south service and storage layer  (1.7.0 RC)
  • Loading branch information
praveen-garg authored Sep 11, 2019
2 parents 85d2ae6 + 2123e0b commit 3bfffff
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 7 deletions.
39 changes: 36 additions & 3 deletions C/services/south/ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,25 @@ void Ingest::waitForQueue()
{
mutex mtx;
unique_lock<mutex> lck(mtx);
if (m_running)
m_cv.wait_for(lck,chrono::milliseconds(m_timeout));
if (m_running && m_queue->size() < m_queueSizeThreshold)
{
// Work out how long to wait based on age of oldest queued reading
long timeout = m_timeout;
if (!m_queue->empty())
{
Reading *reading = (*m_queue)[0];
struct timeval tm, now;
reading->getUserTimestamp(&tm);
gettimeofday(&now, NULL);
long ageMS = (now.tv_sec - tm.tv_sec) * 1000 +
(now.tv_usec - tm.tv_usec) / 1000;
timeout = m_timeout - ageMS;
}
if (timeout > 0)
{
m_cv.wait_for(lck,chrono::milliseconds(timeout));
}
}
}

/**
Expand Down Expand Up @@ -377,6 +394,22 @@ vector<Reading *>* newQ = new vector<Reading *>();
}
++statsEntriesCurrQueue[reading->getAssetName()];
}

/*
* Check the first reading in the list to see if we are meeting the
* latency configuration we have been set
*/
const vector<Reading *>::const_iterator itr = m_data->cbegin();
if (itr != m_data->cend())
{
const Reading *firstReading = *itr;
time_t now = time(0);
unsigned long latency = now - firstReading->getUserTimestamp();
if (latency > m_timeout / 1000) // m_timeout is in milliseconds
{
m_logger->warn("Current send latency of %d seconds exceeds requested maximum latency of %d seconds", latency, m_timeout);
}
}

/**
* 'm_data' vector is ready to be sent to storage service.
Expand All @@ -396,7 +429,7 @@ vector<Reading *>* newQ = new vector<Reading *>();
m_logger->error("Failed to write readings to storage layer, buffering");
lock_guard<mutex> guard(m_qMutex);

// BUffer current data in m_data
// Buffer current data in m_data
m_queue->insert(m_queue->begin(),
m_data->begin(),
m_data->end());
Expand Down
4 changes: 3 additions & 1 deletion C/services/storage/include/storage_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ class StorageRegistry {
void processPayload(char *payload);
void sendPayload(const std::string& url, char *payload);
void filterPayload(const std::string& url, char *payload, const std::string& asset);
typedef std::pair<time_t, char *> Item;
REGISTRY m_registrations;
std::queue<char *> m_queue;
std::queue<StorageRegistry::Item>
m_queue;
std::mutex m_qMutex;
std::thread *m_thread;
std::condition_variable m_cv;
Expand Down
31 changes: 28 additions & 3 deletions C/services/storage/storage_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
#include "logger.h"
#include "strings.h"
#include "client_http.hpp"
#include <chrono>

#define CHECK_QTIMES 0 // Turn on to check length of time data is queued
#define QTIME_THRESHOLD 3 // Threshold to report long queue times

#define REGISTRY_SLEEP_TIME 5 // Time to sleep in the register process thread
// between checks for chutdown

using namespace std;
using namespace rapidjson;
Expand Down Expand Up @@ -78,8 +85,10 @@ StorageRegistry::process(const string& payload)
char *data = NULL;
if ((data = strdup(payload.c_str())) != NULL)
{
time_t now = time(0);
Item item = make_pair(now, data);
lock_guard<mutex> guard(m_qMutex);
m_queue.push(data);
m_queue.push(item);
m_cv.notify_all();
}
}
Expand Down Expand Up @@ -132,14 +141,30 @@ StorageRegistry::run()
while (m_running)
{
char *data = NULL;
time_t qTime;
{
unique_lock<mutex> mlock(m_cvMutex);
m_cv.wait(mlock);
data = m_queue.front();
while (m_queue.size() == 0)
{
m_cv.wait_for(mlock, std::chrono::seconds(REGISTRY_SLEEP_TIME));
if (!m_running)
{
return;
}
}
Item item = m_queue.front();
m_queue.pop();
data = item.second;
qTime = item.first;
}
if (data)
{
#if CHECK_QTIMES
if (time(0) - qTime > QTIME_THRESHOLD)
{
Logger::getLogger()->error("Data has been queued for %d seconds to be sent to registered party", (time(0) - qTime));
}
#endif
processPayload(data);
free(data);
}
Expand Down

0 comments on commit 3bfffff

Please sign in to comment.