From 58172e7b462e6ea08071f4d09ebb5259175bde4b Mon Sep 17 00:00:00 2001 From: pintomax Date: Fri, 5 Jan 2024 12:03:13 +0100 Subject: [PATCH 01/11] FOGL.8343: skip service authentication for table operations (#55) FOGL.8343: skip service authentication for table operations --- C/services/dispatcher/dispatcher_api.cpp | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/C/services/dispatcher/dispatcher_api.cpp b/C/services/dispatcher/dispatcher_api.cpp index 1fdd3d3..25f8ec9 100644 --- a/C/services/dispatcher/dispatcher_api.cpp +++ b/C/services/dispatcher/dispatcher_api.cpp @@ -385,6 +385,11 @@ void DispatcherApi::operation(shared_ptr response, void DispatcherApi::tableInsert(shared_ptr response, shared_ptr request) { + // Currently skipping authentication for requests like + // /dispatch/table/{operation}/{object} + // TODO: Implement Core service bearer token to be passed in the request + bool checkAuth = false; + string destination, name, key, value; string payload = request->content.string(); @@ -398,7 +403,7 @@ void DispatcherApi::tableInsert(shared_ptr response, string callerName, callerType; // If authentication is set verify input token and service/URL ACLs - if (auth_set) + if (checkAuth && auth_set) { // Verify access token from caller and check caller can access dispatcher // Routine sends HTTP reply in case of errors @@ -443,6 +448,11 @@ void DispatcherApi::tableInsert(shared_ptr response, void DispatcherApi::tableUpdate(shared_ptr response, shared_ptr request) { + // Currently skipping authentication for requests like + // /dispatch/table/{operation}/{object} + // TODO: Implement Core service bearer token to be passed in the request + bool checkAuth = false; + string destination, name, key, value; string payload = request->content.string(); @@ -456,7 +466,7 @@ void DispatcherApi::tableUpdate(shared_ptr response, string callerName, callerType; // If authentication is set verify input token and service/URL ACLs - if (auth_set) + if (checkAuth && auth_set) { // Verify access token from caller and check caller can access dispatcher // Routine sends HTTP reply in case of errors @@ -500,6 +510,11 @@ void DispatcherApi::tableUpdate(shared_ptr response, void DispatcherApi::tableDelete(shared_ptr response, shared_ptr request) { + // Currently skipping authentication for requests like + // /dispatch/table/{operation}/{object} + // TODO: Implement Core service bearer token to be passed in the request + bool checkAuth = false; + string destination, name, key, value; string payload = request->content.string(); @@ -513,7 +528,7 @@ void DispatcherApi::tableDelete(shared_ptr response, string callerName, callerType; // If authentication is set verify input token and service/URL ACLs - if (auth_set) + if (checkAuth && auth_set) { // Verify access token from caller and check caller can access dispatcher // Routine sends HTTP reply in case of errors From a6525c5ba4e0426cc10eb735434cdeb3b2e682c5 Mon Sep 17 00:00:00 2001 From: pintomax Date: Mon, 8 Jan 2024 09:14:28 +0100 Subject: [PATCH 02/11] FOGL-8343: retrieve cpid from storage after pipeline creation (#56) FOGL-8343: retrieve cpid from storage after pipeline creation getFromJSONWhere now deals with long values --- .../dispatcher/include/pipeline_manager.h | 8 +- C/services/dispatcher/pipeline_manager.cpp | 111 +++++++++++++++++- 2 files changed, 112 insertions(+), 7 deletions(-) diff --git a/C/services/dispatcher/include/pipeline_manager.h b/C/services/dispatcher/include/pipeline_manager.h index e7583cc..ca320cb 100644 --- a/C/services/dispatcher/include/pipeline_manager.h +++ b/C/services/dispatcher/include/pipeline_manager.h @@ -7,7 +7,7 @@ * * Released under the Apache 2.0 Licence * - * Author: Mark Riddoch + * Author: Mark Riddoch, Massimiliano Pinto * */ #include @@ -156,8 +156,10 @@ class ControlPipelineManager { public: ControlPipelineManager(ManagementClient *mgtClient, StorageClient *storage); ~ControlPipelineManager(); - void loadPipelines(); - ControlPipeline *findPipeline(const PipelineEndpoint& source, const PipelineEndpoint& dest); + void loadPipelines(); // From storage + long loadPipeline(std::string& pName); // From storage + ControlPipeline *findPipeline(const PipelineEndpoint& source, + const PipelineEndpoint& dest); // From memory /** * Return a point to the management client diff --git a/C/services/dispatcher/pipeline_manager.cpp b/C/services/dispatcher/pipeline_manager.cpp index 938c98c..d944e94 100644 --- a/C/services/dispatcher/pipeline_manager.cpp +++ b/C/services/dispatcher/pipeline_manager.cpp @@ -5,7 +5,7 @@ * * Released under the Apache 2.0 Licence * - * Author: Mark Riddoch + * Author: Mark Riddoch, Massimiliano Pinto * */ @@ -538,8 +538,22 @@ void ControlPipelineManager::insertPipeline(const Document& doc) { pipe->exclusive(false); } + lock_guard guard(m_pipelinesMtx); - m_pipelines[pname] = pipe; + + // Load pipeline from storage and get cpid value + long pipelineId = loadPipeline(pname); + if (pipelineId > 0) + { + // store pipeline object + m_pipelines[pname] = pipe; + + // store pipeline id + m_pipelineIds[pipelineId] = pname; + } else { + m_logger->error("Failed to setup control pipeline '%s'", + pname.c_str()); + } } } @@ -720,6 +734,7 @@ void ControlPipelineManager::updatePipelineFilter(const Document& doc) void ControlPipelineManager::deletePipeline(const Document& doc) { string value = getFromJSONWhere(doc, "cpid"); + if (value.empty()) { m_logger->error("Unable to determine ID of pipeline to delete, ignoring delete"); @@ -727,6 +742,7 @@ void ControlPipelineManager::deletePipeline(const Document& doc) } long cpid = strtol(value.c_str(), NULL, 10); string pipelineName = m_pipelineIds[cpid]; + m_pipelineIds.erase(cpid); ControlPipeline *pipeline = m_pipelines[pipelineName]; m_pipelines.erase(pipelineName); @@ -789,8 +805,19 @@ string ControlPipelineManager::getFromJSONWhere(const Document& doc, const strin { if (where.HasMember("value") && where["value"].IsString()) { - result = where["value"].GetString(); + result = std::to_string(strtol(where["value"].GetString(), NULL, 10)); } + else if (where.HasMember("value") && where["value"].IsInt64()) + { + result = std::to_string(where["value"].GetInt64()); + } + else + { + if (where.HasMember("value") && where["value"].IsInt()) + { + result = std::to_string(where["value"].GetInt()); + } + } } else if (where.HasMember("and") && where["and"].IsObject()) { @@ -800,10 +827,86 @@ string ControlPipelineManager::getFromJSONWhere(const Document& doc, const strin { if (second.HasMember("value") && second["value"].IsString()) { - result = second["value"].GetString(); + result = strtol(second["value"].GetString(), NULL, 10); + } + else if (second.HasMember("value") && second["value"].IsInt64()) + { + result = second["value"].GetInt64(); + } + else + { + if (second.HasMember("value") && second["value"].IsInt()) + { + result = second["value"].GetInt(); + } } } } } + return result; } + +/** + * Do the load of a given control pipeline name (just added) + * from storage and get the cpid value and store it in memory + * + * @param name The pipeline name to load from storage + * @return piplineId or -1 in case of errors + */ +long +ControlPipelineManager::loadPipeline(string& pName) +{ + vectorcolumns; + // Look for pname pipeline in storage + Where *where = new Where("name", Equals, pName); + // Get back cpid and name columns + columns.push_back(new Returns ("cpid")); + columns.push_back(new Returns ("name")); + Query aPipeline(columns, where); + + long pipelineId = -1; + + try { + ResultSet *pipeline = m_storage->queryTable(PIPELINES_TABLE, aPipeline); + if (pipeline->rowCount() > 0) + { + ResultSet::RowIterator it = pipeline->firstRow(); + do + { + ResultSet::Row *row = *it; + if (row) + { + ResultSet::ColumnValue *name = row->getColumn("name"); + string pipelineName = name->getString(); + if (pipelineName == pName) { + // Match found + ResultSet::ColumnValue *cpid = row->getColumn("cpid"); + + // Set return value + pipelineId = cpid->getInteger(); + break; + } + } + if (! pipeline->isLastRow(it)) + { + it++; + } + } while (! pipeline->isLastRow(it)); + } + delete pipeline; + } catch (exception* exp) { + m_logger->error("Exception loading control pipeline '%s': %s", + pName.c_str(), + exp->what()); + } catch (exception& ex) { + m_logger->error("Exception loading control pipeline '%s': %s", + pName.c_str(), + ex.what()); + } catch (...) { + m_logger->error("Exception loading control pipeline '%s'", + pName.c_str()); + } + + return pipelineId; +} From 987a22a08a211a59ced931ff399e167bccd1403c Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Tue, 9 Jan 2024 10:10:58 +0000 Subject: [PATCH 03/11] FOGL-8348 Allow filter pipelines to change operation name Signed-off-by: Mark Riddoch --- C/services/dispatcher/controlrequest.cpp | 6 ++++-- C/services/dispatcher/include/controlrequest.h | 2 +- C/services/dispatcher/kvlist.cpp | 12 ++++++++---- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/C/services/dispatcher/controlrequest.cpp b/C/services/dispatcher/controlrequest.cpp index f0ca69e..49ee489 100644 --- a/C/services/dispatcher/controlrequest.cpp +++ b/C/services/dispatcher/controlrequest.cpp @@ -251,7 +251,7 @@ void WriteControlRequest::filter(ControlPipelineManager *manager) } Reading *reading = m_values.toReading("reading"); // Filter the reading - context->filter(reading); + reading = context->filter(reading); m_values.fromReading(reading); delete reading; } @@ -326,7 +326,9 @@ void ControlOperationRequest::filter(ControlPipelineManager *manager) } Reading *reading = m_parameters.toReading(m_operation); // Filter the reading - context->filter(reading); + reading = context->filter(reading); m_parameters.fromReading(reading); + // All the filter pipeline to change the operation name + m_operation = reading->getAssetName(); delete reading; } diff --git a/C/services/dispatcher/include/controlrequest.h b/C/services/dispatcher/include/controlrequest.h index ee01047..4ef717e 100644 --- a/C/services/dispatcher/include/controlrequest.h +++ b/C/services/dispatcher/include/controlrequest.h @@ -217,7 +217,7 @@ class ControlOperationRequest : public ControlRequest { protected: void filter(ControlPipelineManager *manager); protected: - const std::string m_operation; + std::string m_operation; KVList m_parameters; }; diff --git a/C/services/dispatcher/kvlist.cpp b/C/services/dispatcher/kvlist.cpp index 192fa4c..db4678d 100644 --- a/C/services/dispatcher/kvlist.cpp +++ b/C/services/dispatcher/kvlist.cpp @@ -218,10 +218,14 @@ void KVList::fromReading(Reading *reading) vectordatapoints = reading->getReadingData(); for (Datapoint *dp : datapoints) { - if (dp->getData().getType() == DatapointValue::T_STRING) - add(dp->getName(), dp->getData().toStringValue()); - else - add(dp->getName(), dp->getData().toString()); + try { + if (dp->getData().getType() == DatapointValue::T_STRING) + add(dp->getName(), dp->getData().toStringValue()); + else + add(dp->getName(), dp->getData().toString()); + } catch (exception& e) { + Logger::getLogger()->warn("Unable to add datapoint %s of type %s returned from pipeline, %s.", dp->getName(), dp->getData().getTypeStr(), e.what()); + } } } From a6418f2ce708f78c1d16d5e2f0e25e95e9f202cd Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Tue, 9 Jan 2024 17:56:13 +0000 Subject: [PATCH 04/11] FOGL-8393 Empty datapoint support Signed-off-by: Mark Riddoch --- C/services/dispatcher/kvlist.cpp | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/C/services/dispatcher/kvlist.cpp b/C/services/dispatcher/kvlist.cpp index 192fa4c..2db4174 100644 --- a/C/services/dispatcher/kvlist.cpp +++ b/C/services/dispatcher/kvlist.cpp @@ -202,6 +202,13 @@ vector values; } } } + // We can not have a reading with no data points, so if we have no parameters + // we must add a dummy datapoint to pass the operation through the filter + if (values.size() == 0) + { + DatapointValue dpv("None"); + values.push_back(new Datapoint("__None__", dpv)); + } return new Reading(asset, values); } @@ -218,10 +225,14 @@ void KVList::fromReading(Reading *reading) vectordatapoints = reading->getReadingData(); for (Datapoint *dp : datapoints) { - if (dp->getData().getType() == DatapointValue::T_STRING) - add(dp->getName(), dp->getData().toStringValue()); - else - add(dp->getName(), dp->getData().toString()); + // Remove the dummy datapoint that was added + if (dp->getName().compare("__None__") != 0) + { + if (dp->getData().getType() == DatapointValue::T_STRING) + add(dp->getName(), dp->getData().toStringValue()); + else + add(dp->getName(), dp->getData().toString()); + } } } From 3d1ab09e5fd56d2fa2ab16f6c5c14c3be9e48406 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Tue, 9 Jan 2024 18:17:04 +0000 Subject: [PATCH 05/11] FOGL-8393 Supprot empty parameter sets Signed-off-by: Mark Riddoch --- C/services/dispatcher/controlrequest.cpp | 18 ++++++++++-------- C/services/dispatcher/include/controlrequest.h | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/C/services/dispatcher/controlrequest.cpp b/C/services/dispatcher/controlrequest.cpp index f0ca69e..de4b931 100644 --- a/C/services/dispatcher/controlrequest.cpp +++ b/C/services/dispatcher/controlrequest.cpp @@ -127,10 +127,10 @@ void ControlOperationServiceRequest::execute(DispatcherService *service) filter(service->getPipelineManager()); string payload = "{ \"operation\" : \""; payload += m_operation; - payload += "\", "; + payload += "\""; if (m_parameters.size() > 0) { - payload += "\"parameters\" : "; + payload += ", \"parameters\" : "; payload += m_parameters.toJSON(); } payload += " }"; @@ -157,10 +157,10 @@ void ControlOperationAssetRequest::execute(DispatcherService *service) string ingestService = tracker->getIngestService(m_asset); string payload = "{ \"operation\" : \""; payload += m_operation; - payload += "\", "; + payload += "\""; if (m_parameters.size() > 0) { - payload += "\"parameters\" : "; + payload += ", \"parameters\" : "; payload += m_parameters.toJSON(); } payload += " }"; @@ -190,10 +190,10 @@ void ControlOperationBroadcastRequest::execute(DispatcherService *service) string payload = "{ \"operation\" : \""; payload += m_operation; - payload += "\", "; + payload += "\""; if (m_parameters.size() > 0) { - payload += "\"parameters\" : "; + payload += ", \"parameters\" : "; payload += m_parameters.toJSON(); } payload += " }"; @@ -251,7 +251,7 @@ void WriteControlRequest::filter(ControlPipelineManager *manager) } Reading *reading = m_values.toReading("reading"); // Filter the reading - context->filter(reading); + reading = context->filter(reading); m_values.fromReading(reading); delete reading; } @@ -326,7 +326,9 @@ void ControlOperationRequest::filter(ControlPipelineManager *manager) } Reading *reading = m_parameters.toReading(m_operation); // Filter the reading - context->filter(reading); + reading = context->filter(reading); m_parameters.fromReading(reading); + // All the filter pipeline to change the operation name + m_operation = reading->getAssetName(); delete reading; } diff --git a/C/services/dispatcher/include/controlrequest.h b/C/services/dispatcher/include/controlrequest.h index ee01047..4ef717e 100644 --- a/C/services/dispatcher/include/controlrequest.h +++ b/C/services/dispatcher/include/controlrequest.h @@ -217,7 +217,7 @@ class ControlOperationRequest : public ControlRequest { protected: void filter(ControlPipelineManager *manager); protected: - const std::string m_operation; + std::string m_operation; KVList m_parameters; }; From 28bc8c7645e538379234fe3fe641882c59f95932 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Tue, 23 Jan 2024 10:57:26 +0000 Subject: [PATCH 06/11] FOGL-8450 Add missing iterator increment Signed-off-by: Mark Riddoch --- C/services/dispatcher/pipeline_manager.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/C/services/dispatcher/pipeline_manager.cpp b/C/services/dispatcher/pipeline_manager.cpp index d944e94..f17c5ad 100644 --- a/C/services/dispatcher/pipeline_manager.cpp +++ b/C/services/dispatcher/pipeline_manager.cpp @@ -359,7 +359,7 @@ ControlPipelineManager::EndpointLookup::EndpointLookup(const ControlPipelineMana /** * Register a category name for a filter plugin. This allows the pipeline manager - * to reconfigure the filters inthe various pipelines when a category is changed. + * to reconfigure the filters in the various pipelines when a category is changed. * * @param category The name of the category to register * @param plugin The plugin that requires the category @@ -405,6 +405,7 @@ void ControlPipelineManager::categoryChanged(const string& name, const string& c while (it != m_categories.end()) { it->second->reconfigure(content); + it++; } } From 0be75c98dfe8d8e529d7b5a0747c8034c1f7001b Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Fri, 9 Feb 2024 12:11:59 +0000 Subject: [PATCH 07/11] FOGL-8498 Deal with pipelines that remove the readings Signed-off-by: Mark Riddoch --- C/services/dispatcher/controlrequest.cpp | 12 +++++++++--- C/services/dispatcher/kvlist.cpp | 2 ++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/C/services/dispatcher/controlrequest.cpp b/C/services/dispatcher/controlrequest.cpp index de4b931..55d5c52 100644 --- a/C/services/dispatcher/controlrequest.cpp +++ b/C/services/dispatcher/controlrequest.cpp @@ -253,7 +253,10 @@ void WriteControlRequest::filter(ControlPipelineManager *manager) // Filter the reading reading = context->filter(reading); m_values.fromReading(reading); - delete reading; + if (reading) + { + delete reading; + } } /** @@ -329,6 +332,9 @@ void ControlOperationRequest::filter(ControlPipelineManager *manager) reading = context->filter(reading); m_parameters.fromReading(reading); // All the filter pipeline to change the operation name - m_operation = reading->getAssetName(); - delete reading; + if (reading) + { + m_operation = reading->getAssetName(); + delete reading; + } } diff --git a/C/services/dispatcher/kvlist.cpp b/C/services/dispatcher/kvlist.cpp index 7705e7c..d0a7d19 100644 --- a/C/services/dispatcher/kvlist.cpp +++ b/C/services/dispatcher/kvlist.cpp @@ -222,6 +222,8 @@ vector values; void KVList::fromReading(Reading *reading) { m_list.clear(); + if (!reading) + return; vectordatapoints = reading->getReadingData(); for (Datapoint *dp : datapoints) { From 4133b716eab5b74ca22c50a56d8a88af1d897a4b Mon Sep 17 00:00:00 2001 From: Himanshu Vimal <67678828+cyberwalk3r@users.noreply.github.com> Date: Tue, 26 Mar 2024 14:19:50 +0530 Subject: [PATCH 08/11] FOGL-8523: Control Pipeline filters not getting added on pipeline creation. (#65) - Fixed JSON parsing for cpid. Signed-off-by: Himanshu Vimal --- C/services/dispatcher/pipeline_manager.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/C/services/dispatcher/pipeline_manager.cpp b/C/services/dispatcher/pipeline_manager.cpp index f17c5ad..7c8c063 100644 --- a/C/services/dispatcher/pipeline_manager.cpp +++ b/C/services/dispatcher/pipeline_manager.cpp @@ -107,7 +107,7 @@ ControlPipelineManager::loadPipelines() m_logger->error("Exception loading control pipelines"); } - m_logger->debug("%d pipelines have benn loaded", m_pipelines.size()); + m_logger->debug("%d pipelines have been loaded", m_pipelines.size()); // Register for updates to the table m_dispatcher->registerTable(PIPELINES_TABLE); @@ -570,10 +570,9 @@ void ControlPipelineManager::insertPipelineFilter(const Document& doc) int id, order; string filter; - if (doc.HasMember("cpid") && doc["cpid"].IsString()) + if (doc.HasMember("cpid") && doc["cpid"].IsInt()) { - string s = doc["cpid"].GetString(); - id = strtol(s.c_str(), NULL, 10); + id = doc["cpid"].GetInt(); } else { From c82ccd9fa646d4ca7e5e6569623121af61c1668c Mon Sep 17 00:00:00 2001 From: Himanshu Vimal <67678828+cyberwalk3r@users.noreply.github.com> Date: Mon, 8 Apr 2024 18:02:08 +0530 Subject: [PATCH 09/11] FOGL-8513: Service crash when destination is changed in control pipeline (#66) * FOGL-8513: Service crash when destination is changed in control pipeline. - Fixed empty entries inserted in m_pipeline with [] operator. - Fixed JSON parsing for cpid for CRUD operations. Signed-off-by: Himanshu Vimal --- C/services/dispatcher/pipeline_manager.cpp | 44 ++++++++++++++-------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/C/services/dispatcher/pipeline_manager.cpp b/C/services/dispatcher/pipeline_manager.cpp index 7c8c063..ea07623 100644 --- a/C/services/dispatcher/pipeline_manager.cpp +++ b/C/services/dispatcher/pipeline_manager.cpp @@ -562,7 +562,7 @@ void ControlPipelineManager::insertPipeline(const Document& doc) * Called when a new filter is inserted into a pipeline. The document * passed contains the database row that was inserted. * - * {"cpid": "3", "forder": 1, "fname": "ctrl_test3_exp1"} + * {"cpid": 3, "forder": 1, "fname": "ctrl_test3_exp1"} * @param doc The new filter table contents */ void ControlPipelineManager::insertPipelineFilter(const Document& doc) @@ -622,7 +622,7 @@ void ControlPipelineManager::insertPipelineFilter(const Document& doc) * * The document passed will look as follows * - * {"values": {"enabled": "t", "execution": "Shared", "stype": 1, "sname": "", "dtype": 4, "dname": ""}, "where": {"column": "cpid", "condition": "=", "value": "1"}} + * {"values": {"enabled": "t", "execution": "Shared", "stype": 1, "sname": "", "dtype": 4, "dname": ""}, "where": {"column": "cpid", "condition": "=", "value": 1}} * @param doc The new pipeline table contents */ @@ -637,14 +637,19 @@ void ControlPipelineManager::updatePipeline(const Document& doc) lock_guard guard(m_pipelinesMtx); long cpid = strtol(value.c_str(), NULL, 10); - string pipelineName = m_pipelineIds[cpid]; - if (pipelineName.empty()) - { + auto pipelineIDIterator = m_pipelineIds.find(cpid); + if (pipelineIDIterator == m_pipelineIds.end()) { m_logger->error("Unable to determine name of updated pipeline %d, ignoring update", cpid); return; } + string pipelineName = pipelineIDIterator->second; + auto pipelineIterator = m_pipelines.find(pipelineName); + if (pipelineIterator == m_pipelines.end()) { + m_logger->error("Pipeline %s has not been loaded, update ignored", pipelineName.c_str()); + return; + } - ControlPipeline *pipeline = m_pipelines[pipelineName]; + ControlPipeline *pipeline = pipelineIterator->second; if (doc.HasMember("values") && doc["values"].IsObject()) { const Value& values = doc["values"]; @@ -670,7 +675,7 @@ void ControlPipelineManager::updatePipeline(const Document& doc) * Called when a new filter is inserted into a pipeline. The document * passed contains the database row that was inserted. * - * {"values": {"forder": 2}, "where": {"column": "fname", "condition": "=", "value": "ctrl_test1_rename", "and": {"column": "cpid", "condition": "=", "value": "1"}}} + * {"values": {"forder": 2}, "where": {"column": "fname", "condition": "=", "value": "ctrl_test1_rename", "and": {"column": "cpid", "condition": "=", "value": 1}}} * @param doc The new filter table contents */ void ControlPipelineManager::updatePipelineFilter(const Document& doc) @@ -688,14 +693,20 @@ void ControlPipelineManager::updatePipelineFilter(const Document& doc) m_logger->error("Unable to determine the name of the filter to reorder"); return; } - string name = m_pipelineIds[cpid]; - ControlPipeline *pipeline = m_pipelines[name]; - if (!pipeline) - { + + auto pipelineIDIterator = m_pipelineIds.find(cpid); + if (pipelineIDIterator == m_pipelineIds.end()) { + m_logger->error("Unable to find pipeline with id %d, filter pipeline update ignored", cpid); + return; + } + string name = pipelineIDIterator->second; + auto pipelineIterator = m_pipelines.find(name); + if (pipelineIterator == m_pipelines.end()) { m_logger->error("Pipeline %s has not been loaded, update ignored", name.c_str()); return; } + ControlPipeline *pipeline = pipelineIterator->second; // We have the pipeline ID, not work out what has changed if (doc.HasMember("values") && doc["values"].IsObject()) { @@ -752,7 +763,7 @@ void ControlPipelineManager::deletePipeline(const Document& doc) * Called when a new filter is inserted into a pipeline. The document * passed contains the database row that was inserted. * - * {"where": {"column": "cpid", "condition": "=", "value": "1", "and": {"column": "fname", "condition": "=", "value": "ctrl_test1_del"}}} + * {"where": {"column": "cpid", "condition": "=", "value": 1, "and": {"column": "fname", "condition": "=", "value": "ctrl_test1_del"}}} * @param doc The new filter table contents */ void ControlPipelineManager::deletePipelineFilter(const Document& doc) @@ -792,6 +803,7 @@ void ControlPipelineManager::deletePipelineFilter(const Document& doc) * @param doc The JSON document containing the where clause * @param key The key to extract * @return string The key value + * */ string ControlPipelineManager::getFromJSONWhere(const Document& doc, const string& key) { @@ -805,7 +817,7 @@ string ControlPipelineManager::getFromJSONWhere(const Document& doc, const strin { if (where.HasMember("value") && where["value"].IsString()) { - result = std::to_string(strtol(where["value"].GetString(), NULL, 10)); + result = where["value"].GetString(); } else if (where.HasMember("value") && where["value"].IsInt64()) { @@ -827,17 +839,17 @@ string ControlPipelineManager::getFromJSONWhere(const Document& doc, const strin { if (second.HasMember("value") && second["value"].IsString()) { - result = strtol(second["value"].GetString(), NULL, 10); + result = second["value"].GetString(); } else if (second.HasMember("value") && second["value"].IsInt64()) { - result = second["value"].GetInt64(); + result = std::to_string(second["value"].GetInt64()); } else { if (second.HasMember("value") && second["value"].IsInt()) { - result = second["value"].GetInt(); + result = std::to_string(second["value"].GetInt()); } } } From 124b7efffc62d6ae372d145f6aa413ff80a9610b Mon Sep 17 00:00:00 2001 From: dianomicbot Date: Wed, 10 Apr 2024 12:08:39 +0000 Subject: [PATCH 10/11] VERSION changed Signed-off-by: dianomicbot --- VERSION | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/VERSION b/VERSION index 88967b0..2d53296 100755 --- a/VERSION +++ b/VERSION @@ -1,2 +1,2 @@ -fledge_version>=2.3 -dispatcher_version=2.3.0 +fledge_version>=2.4 +dispatcher_version=2.4.0 From 141f5ea9e1617b2ed67b0bf3089b53388151971a Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Mon, 15 Apr 2024 12:01:21 +0100 Subject: [PATCH 11/11] Merge pull request #58 from fledge-iot/FOGL-8380 FOGL-8380 Fix shutdown issue in dryrun mode --- C/services/dispatcher/dispatcher_api.cpp | 35 ++++++++++++++++---- C/services/dispatcher/dispatcher_service.cpp | 33 +++++++++++------- 2 files changed, 49 insertions(+), 19 deletions(-) diff --git a/C/services/dispatcher/dispatcher_api.cpp b/C/services/dispatcher/dispatcher_api.cpp index 25f8ec9..31bea10 100644 --- a/C/services/dispatcher/dispatcher_api.cpp +++ b/C/services/dispatcher/dispatcher_api.cpp @@ -49,9 +49,16 @@ DispatcherApi::DispatcherApi(const unsigned short port, */ DispatcherApi::~DispatcherApi() { - delete m_server; if (m_thread) + { + stop(); + wait(); delete m_thread; + } + if (m_server) + { + delete m_server; + } } /** @@ -87,7 +94,8 @@ void startService() /** * Start the HTTP server */ -void DispatcherApi::start(DispatcherService *service) { +void DispatcherApi::start(DispatcherService *service) +{ m_service = service; m_thread = new thread(startService); } @@ -95,15 +103,20 @@ void DispatcherApi::start(DispatcherService *service) { /** * Start method for HTTP server */ -void DispatcherApi::startServer() { +void DispatcherApi::startServer() +{ m_server->start(); } /** * Stop method for HTTP server */ -void DispatcherApi::stopServer() { - m_server->stop(); +void DispatcherApi::stopServer() +{ + if (m_server) + { + m_server->stop(); + } } /** @@ -117,8 +130,16 @@ void DispatcherApi::stop() /** * Wait for the HTTP server to shutdown */ -void DispatcherApi::wait() { - m_thread->join(); +void DispatcherApi::wait() +{ + if (m_thread) + { + try { + m_thread->join(); + } catch (exception& e) { + // ignore + } + } } /** diff --git a/C/services/dispatcher/dispatcher_service.cpp b/C/services/dispatcher/dispatcher_service.cpp index 2a8f732..3286913 100644 --- a/C/services/dispatcher/dispatcher_service.cpp +++ b/C/services/dispatcher/dispatcher_service.cpp @@ -161,13 +161,16 @@ bool DispatcherService::start(string& coreAddress, managementListener, // Management port m_token); // Token - if (!m_mgtClient->registerService(record)) + if (!m_dryRun) { - m_logger->fatal("Unable to register service " - "\"Dispatcher\" for service '" + m_name + "'"); + if (!m_mgtClient->registerService(record)) + { + m_logger->fatal("Unable to register service " + "\"Dispatcher\" for service '" + m_name + "'"); - this->cleanupResources(); - return false; + this->cleanupResources(); + return false; + } } // Make sure we have an instance of the asset tracker @@ -230,10 +233,13 @@ bool DispatcherService::start(string& coreAddress, m_mgtClient->addChildCategories(m_name, children1); } - // Register m_name category to Fledge Core - registerCategory(m_name); - registerCategory(advancedCatName); - registerCategory(serverCatName); + if (!m_dryRun) + { + // Register m_name category to Fledge Core + registerCategory(m_name); + registerCategory(advancedCatName); + registerCategory(serverCatName); + } ConfigCategory serverCategory = m_mgtClient->getCategory(serverCatName); if (serverCategory.itemExists("enable")) @@ -299,10 +305,13 @@ bool DispatcherService::start(string& coreAddress, m_mgtClient->addAuditEntry("DSPST", "INFORMATION", "{\"name\": \"" + m_name + "\"}"); + } - // Create default security category - this->createSecurityCategories(m_mgtClient, m_dryRun); + // Create default security category + this->createSecurityCategories(m_mgtClient, m_dryRun); + if (!m_dryRun) + { // Start the control filter pipeline manager m_pipelineManager = new ControlPipelineManager(m_mgtClient, m_storage); m_pipelineManager->setService(this); @@ -335,7 +344,7 @@ bool DispatcherService::start(string& coreAddress, // Request the core to restart the service m_mgtClient->restartService(); } - else if (m_removeFromCore) + else if (m_removeFromCore && (!m_dryRun)) { // Unregister from storage service m_mgtClient->unregisterService();