diff --git a/C/services/dispatcher/controlrequest.cpp b/C/services/dispatcher/controlrequest.cpp index f0ca69e..55d5c52 100644 --- a/C/services/dispatcher/controlrequest.cpp +++ b/C/services/dispatcher/controlrequest.cpp @@ -127,10 +127,10 @@ void ControlOperationServiceRequest::execute(DispatcherService *service) filter(service->getPipelineManager()); string payload = "{ \"operation\" : \""; payload += m_operation; - payload += "\", "; + payload += "\""; if (m_parameters.size() > 0) { - payload += "\"parameters\" : "; + payload += ", \"parameters\" : "; payload += m_parameters.toJSON(); } payload += " }"; @@ -157,10 +157,10 @@ void ControlOperationAssetRequest::execute(DispatcherService *service) string ingestService = tracker->getIngestService(m_asset); string payload = "{ \"operation\" : \""; payload += m_operation; - payload += "\", "; + payload += "\""; if (m_parameters.size() > 0) { - payload += "\"parameters\" : "; + payload += ", \"parameters\" : "; payload += m_parameters.toJSON(); } payload += " }"; @@ -190,10 +190,10 @@ void ControlOperationBroadcastRequest::execute(DispatcherService *service) string payload = "{ \"operation\" : \""; payload += m_operation; - payload += "\", "; + payload += "\""; if (m_parameters.size() > 0) { - payload += "\"parameters\" : "; + payload += ", \"parameters\" : "; payload += m_parameters.toJSON(); } payload += " }"; @@ -251,9 +251,12 @@ void WriteControlRequest::filter(ControlPipelineManager *manager) } Reading *reading = m_values.toReading("reading"); // Filter the reading - context->filter(reading); + reading = context->filter(reading); m_values.fromReading(reading); - delete reading; + if (reading) + { + delete reading; + } } /** @@ -326,7 +329,12 @@ void ControlOperationRequest::filter(ControlPipelineManager *manager) } Reading *reading = m_parameters.toReading(m_operation); // Filter the reading - context->filter(reading); + reading = context->filter(reading); m_parameters.fromReading(reading); - delete reading; + // All the filter pipeline to change the operation name + if (reading) + { + m_operation = reading->getAssetName(); + delete reading; + } } diff --git a/C/services/dispatcher/dispatcher_api.cpp b/C/services/dispatcher/dispatcher_api.cpp index 25f8ec9..31bea10 100644 --- a/C/services/dispatcher/dispatcher_api.cpp +++ b/C/services/dispatcher/dispatcher_api.cpp @@ -49,9 +49,16 @@ DispatcherApi::DispatcherApi(const unsigned short port, */ DispatcherApi::~DispatcherApi() { - delete m_server; if (m_thread) + { + stop(); + wait(); delete m_thread; + } + if (m_server) + { + delete m_server; + } } /** @@ -87,7 +94,8 @@ void startService() /** * Start the HTTP server */ -void DispatcherApi::start(DispatcherService *service) { +void DispatcherApi::start(DispatcherService *service) +{ m_service = service; m_thread = new thread(startService); } @@ -95,15 +103,20 @@ void DispatcherApi::start(DispatcherService *service) { /** * Start method for HTTP server */ -void DispatcherApi::startServer() { +void DispatcherApi::startServer() +{ m_server->start(); } /** * Stop method for HTTP server */ -void DispatcherApi::stopServer() { - m_server->stop(); +void DispatcherApi::stopServer() +{ + if (m_server) + { + m_server->stop(); + } } /** @@ -117,8 +130,16 @@ void DispatcherApi::stop() /** * Wait for the HTTP server to shutdown */ -void DispatcherApi::wait() { - m_thread->join(); +void DispatcherApi::wait() +{ + if (m_thread) + { + try { + m_thread->join(); + } catch (exception& e) { + // ignore + } + } } /** diff --git a/C/services/dispatcher/dispatcher_service.cpp b/C/services/dispatcher/dispatcher_service.cpp index 2a8f732..3286913 100644 --- a/C/services/dispatcher/dispatcher_service.cpp +++ b/C/services/dispatcher/dispatcher_service.cpp @@ -161,13 +161,16 @@ bool DispatcherService::start(string& coreAddress, managementListener, // Management port m_token); // Token - if (!m_mgtClient->registerService(record)) + if (!m_dryRun) { - m_logger->fatal("Unable to register service " - "\"Dispatcher\" for service '" + m_name + "'"); + if (!m_mgtClient->registerService(record)) + { + m_logger->fatal("Unable to register service " + "\"Dispatcher\" for service '" + m_name + "'"); - this->cleanupResources(); - return false; + this->cleanupResources(); + return false; + } } // Make sure we have an instance of the asset tracker @@ -230,10 +233,13 @@ bool DispatcherService::start(string& coreAddress, m_mgtClient->addChildCategories(m_name, children1); } - // Register m_name category to Fledge Core - registerCategory(m_name); - registerCategory(advancedCatName); - registerCategory(serverCatName); + if (!m_dryRun) + { + // Register m_name category to Fledge Core + registerCategory(m_name); + registerCategory(advancedCatName); + registerCategory(serverCatName); + } ConfigCategory serverCategory = m_mgtClient->getCategory(serverCatName); if (serverCategory.itemExists("enable")) @@ -299,10 +305,13 @@ bool DispatcherService::start(string& coreAddress, m_mgtClient->addAuditEntry("DSPST", "INFORMATION", "{\"name\": \"" + m_name + "\"}"); + } - // Create default security category - this->createSecurityCategories(m_mgtClient, m_dryRun); + // Create default security category + this->createSecurityCategories(m_mgtClient, m_dryRun); + if (!m_dryRun) + { // Start the control filter pipeline manager m_pipelineManager = new ControlPipelineManager(m_mgtClient, m_storage); m_pipelineManager->setService(this); @@ -335,7 +344,7 @@ bool DispatcherService::start(string& coreAddress, // Request the core to restart the service m_mgtClient->restartService(); } - else if (m_removeFromCore) + else if (m_removeFromCore && (!m_dryRun)) { // Unregister from storage service m_mgtClient->unregisterService(); diff --git a/C/services/dispatcher/include/controlrequest.h b/C/services/dispatcher/include/controlrequest.h index ee01047..4ef717e 100644 --- a/C/services/dispatcher/include/controlrequest.h +++ b/C/services/dispatcher/include/controlrequest.h @@ -217,7 +217,7 @@ class ControlOperationRequest : public ControlRequest { protected: void filter(ControlPipelineManager *manager); protected: - const std::string m_operation; + std::string m_operation; KVList m_parameters; }; diff --git a/C/services/dispatcher/kvlist.cpp b/C/services/dispatcher/kvlist.cpp index 192fa4c..d0a7d19 100644 --- a/C/services/dispatcher/kvlist.cpp +++ b/C/services/dispatcher/kvlist.cpp @@ -202,6 +202,13 @@ vector values; } } } + // We can not have a reading with no data points, so if we have no parameters + // we must add a dummy datapoint to pass the operation through the filter + if (values.size() == 0) + { + DatapointValue dpv("None"); + values.push_back(new Datapoint("__None__", dpv)); + } return new Reading(asset, values); } @@ -215,13 +222,23 @@ vector values; void KVList::fromReading(Reading *reading) { m_list.clear(); + if (!reading) + return; vectordatapoints = reading->getReadingData(); for (Datapoint *dp : datapoints) { - if (dp->getData().getType() == DatapointValue::T_STRING) - add(dp->getName(), dp->getData().toStringValue()); - else - add(dp->getName(), dp->getData().toString()); + // Remove the dummy datapoint that was added + if (dp->getName().compare("__None__") != 0) + { + try { + if (dp->getData().getType() == DatapointValue::T_STRING) + add(dp->getName(), dp->getData().toStringValue()); + else + add(dp->getName(), dp->getData().toString()); + } catch (exception& e) { + Logger::getLogger()->warn("Unable to add datapoint %s of type %s returned from pipeline, %s.", dp->getName(), dp->getData().getTypeStr(), e.what()); + } + } } } diff --git a/C/services/dispatcher/pipeline_manager.cpp b/C/services/dispatcher/pipeline_manager.cpp index d944e94..ea07623 100644 --- a/C/services/dispatcher/pipeline_manager.cpp +++ b/C/services/dispatcher/pipeline_manager.cpp @@ -107,7 +107,7 @@ ControlPipelineManager::loadPipelines() m_logger->error("Exception loading control pipelines"); } - m_logger->debug("%d pipelines have benn loaded", m_pipelines.size()); + m_logger->debug("%d pipelines have been loaded", m_pipelines.size()); // Register for updates to the table m_dispatcher->registerTable(PIPELINES_TABLE); @@ -359,7 +359,7 @@ ControlPipelineManager::EndpointLookup::EndpointLookup(const ControlPipelineMana /** * Register a category name for a filter plugin. This allows the pipeline manager - * to reconfigure the filters inthe various pipelines when a category is changed. + * to reconfigure the filters in the various pipelines when a category is changed. * * @param category The name of the category to register * @param plugin The plugin that requires the category @@ -405,6 +405,7 @@ void ControlPipelineManager::categoryChanged(const string& name, const string& c while (it != m_categories.end()) { it->second->reconfigure(content); + it++; } } @@ -561,7 +562,7 @@ void ControlPipelineManager::insertPipeline(const Document& doc) * Called when a new filter is inserted into a pipeline. The document * passed contains the database row that was inserted. * - * {"cpid": "3", "forder": 1, "fname": "ctrl_test3_exp1"} + * {"cpid": 3, "forder": 1, "fname": "ctrl_test3_exp1"} * @param doc The new filter table contents */ void ControlPipelineManager::insertPipelineFilter(const Document& doc) @@ -569,10 +570,9 @@ void ControlPipelineManager::insertPipelineFilter(const Document& doc) int id, order; string filter; - if (doc.HasMember("cpid") && doc["cpid"].IsString()) + if (doc.HasMember("cpid") && doc["cpid"].IsInt()) { - string s = doc["cpid"].GetString(); - id = strtol(s.c_str(), NULL, 10); + id = doc["cpid"].GetInt(); } else { @@ -622,7 +622,7 @@ void ControlPipelineManager::insertPipelineFilter(const Document& doc) * * The document passed will look as follows * - * {"values": {"enabled": "t", "execution": "Shared", "stype": 1, "sname": "", "dtype": 4, "dname": ""}, "where": {"column": "cpid", "condition": "=", "value": "1"}} + * {"values": {"enabled": "t", "execution": "Shared", "stype": 1, "sname": "", "dtype": 4, "dname": ""}, "where": {"column": "cpid", "condition": "=", "value": 1}} * @param doc The new pipeline table contents */ @@ -637,14 +637,19 @@ void ControlPipelineManager::updatePipeline(const Document& doc) lock_guard guard(m_pipelinesMtx); long cpid = strtol(value.c_str(), NULL, 10); - string pipelineName = m_pipelineIds[cpid]; - if (pipelineName.empty()) - { + auto pipelineIDIterator = m_pipelineIds.find(cpid); + if (pipelineIDIterator == m_pipelineIds.end()) { m_logger->error("Unable to determine name of updated pipeline %d, ignoring update", cpid); return; } + string pipelineName = pipelineIDIterator->second; + auto pipelineIterator = m_pipelines.find(pipelineName); + if (pipelineIterator == m_pipelines.end()) { + m_logger->error("Pipeline %s has not been loaded, update ignored", pipelineName.c_str()); + return; + } - ControlPipeline *pipeline = m_pipelines[pipelineName]; + ControlPipeline *pipeline = pipelineIterator->second; if (doc.HasMember("values") && doc["values"].IsObject()) { const Value& values = doc["values"]; @@ -670,7 +675,7 @@ void ControlPipelineManager::updatePipeline(const Document& doc) * Called when a new filter is inserted into a pipeline. The document * passed contains the database row that was inserted. * - * {"values": {"forder": 2}, "where": {"column": "fname", "condition": "=", "value": "ctrl_test1_rename", "and": {"column": "cpid", "condition": "=", "value": "1"}}} + * {"values": {"forder": 2}, "where": {"column": "fname", "condition": "=", "value": "ctrl_test1_rename", "and": {"column": "cpid", "condition": "=", "value": 1}}} * @param doc The new filter table contents */ void ControlPipelineManager::updatePipelineFilter(const Document& doc) @@ -688,14 +693,20 @@ void ControlPipelineManager::updatePipelineFilter(const Document& doc) m_logger->error("Unable to determine the name of the filter to reorder"); return; } - string name = m_pipelineIds[cpid]; - ControlPipeline *pipeline = m_pipelines[name]; - if (!pipeline) - { + + auto pipelineIDIterator = m_pipelineIds.find(cpid); + if (pipelineIDIterator == m_pipelineIds.end()) { + m_logger->error("Unable to find pipeline with id %d, filter pipeline update ignored", cpid); + return; + } + string name = pipelineIDIterator->second; + auto pipelineIterator = m_pipelines.find(name); + if (pipelineIterator == m_pipelines.end()) { m_logger->error("Pipeline %s has not been loaded, update ignored", name.c_str()); return; } + ControlPipeline *pipeline = pipelineIterator->second; // We have the pipeline ID, not work out what has changed if (doc.HasMember("values") && doc["values"].IsObject()) { @@ -752,7 +763,7 @@ void ControlPipelineManager::deletePipeline(const Document& doc) * Called when a new filter is inserted into a pipeline. The document * passed contains the database row that was inserted. * - * {"where": {"column": "cpid", "condition": "=", "value": "1", "and": {"column": "fname", "condition": "=", "value": "ctrl_test1_del"}}} + * {"where": {"column": "cpid", "condition": "=", "value": 1, "and": {"column": "fname", "condition": "=", "value": "ctrl_test1_del"}}} * @param doc The new filter table contents */ void ControlPipelineManager::deletePipelineFilter(const Document& doc) @@ -792,6 +803,7 @@ void ControlPipelineManager::deletePipelineFilter(const Document& doc) * @param doc The JSON document containing the where clause * @param key The key to extract * @return string The key value + * */ string ControlPipelineManager::getFromJSONWhere(const Document& doc, const string& key) { @@ -805,7 +817,7 @@ string ControlPipelineManager::getFromJSONWhere(const Document& doc, const strin { if (where.HasMember("value") && where["value"].IsString()) { - result = std::to_string(strtol(where["value"].GetString(), NULL, 10)); + result = where["value"].GetString(); } else if (where.HasMember("value") && where["value"].IsInt64()) { @@ -827,17 +839,17 @@ string ControlPipelineManager::getFromJSONWhere(const Document& doc, const strin { if (second.HasMember("value") && second["value"].IsString()) { - result = strtol(second["value"].GetString(), NULL, 10); + result = second["value"].GetString(); } else if (second.HasMember("value") && second["value"].IsInt64()) { - result = second["value"].GetInt64(); + result = std::to_string(second["value"].GetInt64()); } else { if (second.HasMember("value") && second["value"].IsInt()) { - result = second["value"].GetInt(); + result = std::to_string(second["value"].GetInt()); } } } diff --git a/VERSION b/VERSION index 88967b0..2d53296 100755 --- a/VERSION +++ b/VERSION @@ -1,2 +1,2 @@ -fledge_version>=2.3 -dispatcher_version=2.3.0 +fledge_version>=2.4 +dispatcher_version=2.4.0