diff --git a/C/services/south/ingest.cpp b/C/services/south/ingest.cpp index d549357d05..288f6e5708 100644 --- a/C/services/south/ingest.cpp +++ b/C/services/south/ingest.cpp @@ -289,8 +289,25 @@ void Ingest::waitForQueue() { mutex mtx; unique_lock lck(mtx); - if (m_running) - m_cv.wait_for(lck,chrono::milliseconds(m_timeout)); + if (m_running && m_queue->size() < m_queueSizeThreshold) + { + // Work out how long to wait based on age of oldest queued reading + long timeout = m_timeout; + if (!m_queue->empty()) + { + Reading *reading = (*m_queue)[0]; + struct timeval tm, now; + reading->getUserTimestamp(&tm); + gettimeofday(&now, NULL); + long ageMS = (now.tv_sec - tm.tv_sec) * 1000 + + (now.tv_usec - tm.tv_usec) / 1000; + timeout = m_timeout - ageMS; + } + if (timeout > 0) + { + m_cv.wait_for(lck,chrono::milliseconds(timeout)); + } + } } /** @@ -377,6 +394,22 @@ vector* newQ = new vector(); } ++statsEntriesCurrQueue[reading->getAssetName()]; } + + /* + * Check the first reading in the list to see if we are meeting the + * latency configuration we have been set + */ + const vector::const_iterator itr = m_data->cbegin(); + if (itr != m_data->cend()) + { + const Reading *firstReading = *itr; + time_t now = time(0); + unsigned long latency = now - firstReading->getUserTimestamp(); + if (latency > m_timeout / 1000) // m_timeout is in milliseconds + { + m_logger->warn("Current send latency of %d seconds exceeds requested maximum latency of %d seconds", latency, m_timeout); + } + } /** * 'm_data' vector is ready to be sent to storage service. @@ -396,7 +429,7 @@ vector* newQ = new vector(); m_logger->error("Failed to write readings to storage layer, buffering"); lock_guard guard(m_qMutex); - // BUffer current data in m_data + // Buffer current data in m_data m_queue->insert(m_queue->begin(), m_data->begin(), m_data->end()); diff --git a/C/services/storage/include/storage_registry.h b/C/services/storage/include/storage_registry.h index e9707566f9..fa3b9ddb6a 100644 --- a/C/services/storage/include/storage_registry.h +++ b/C/services/storage/include/storage_registry.h @@ -27,8 +27,10 @@ class StorageRegistry { void processPayload(char *payload); void sendPayload(const std::string& url, char *payload); void filterPayload(const std::string& url, char *payload, const std::string& asset); + typedef std::pair Item; REGISTRY m_registrations; - std::queue m_queue; + std::queue + m_queue; std::mutex m_qMutex; std::thread *m_thread; std::condition_variable m_cv; diff --git a/C/services/storage/storage_registry.cpp b/C/services/storage/storage_registry.cpp index d1650674e7..71fdadfe6b 100644 --- a/C/services/storage/storage_registry.cpp +++ b/C/services/storage/storage_registry.cpp @@ -17,6 +17,13 @@ #include "logger.h" #include "strings.h" #include "client_http.hpp" +#include + +#define CHECK_QTIMES 0 // Turn on to check length of time data is queued +#define QTIME_THRESHOLD 3 // Threshold to report long queue times + +#define REGISTRY_SLEEP_TIME 5 // Time to sleep in the register process thread + // between checks for chutdown using namespace std; using namespace rapidjson; @@ -78,8 +85,10 @@ StorageRegistry::process(const string& payload) char *data = NULL; if ((data = strdup(payload.c_str())) != NULL) { + time_t now = time(0); + Item item = make_pair(now, data); lock_guard guard(m_qMutex); - m_queue.push(data); + m_queue.push(item); m_cv.notify_all(); } } @@ -132,14 +141,30 @@ StorageRegistry::run() while (m_running) { char *data = NULL; + time_t qTime; { unique_lock mlock(m_cvMutex); - m_cv.wait(mlock); - data = m_queue.front(); + while (m_queue.size() == 0) + { + m_cv.wait_for(mlock, std::chrono::seconds(REGISTRY_SLEEP_TIME)); + if (!m_running) + { + return; + } + } + Item item = m_queue.front(); m_queue.pop(); + data = item.second; + qTime = item.first; } if (data) { +#if CHECK_QTIMES + if (time(0) - qTime > QTIME_THRESHOLD) + { + Logger::getLogger()->error("Data has been queued for %d seconds to be sent to registered party", (time(0) - qTime)); + } +#endif processPayload(data); free(data); }