Skip to content

Commit

Permalink
Merge pull request #67 from fledge-iot/2.4.0RC
Browse files Browse the repository at this point in the history
2.4.0RC
  • Loading branch information
dianomicbot authored Apr 17, 2024
2 parents f92f893 + 141f5ea commit 4018664
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 57 deletions.
28 changes: 18 additions & 10 deletions C/services/dispatcher/controlrequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ void ControlOperationServiceRequest::execute(DispatcherService *service)
filter(service->getPipelineManager());
string payload = "{ \"operation\" : \"";
payload += m_operation;
payload += "\", ";
payload += "\"";
if (m_parameters.size() > 0)
{
payload += "\"parameters\" : ";
payload += ", \"parameters\" : ";
payload += m_parameters.toJSON();
}
payload += " }";
Expand All @@ -157,10 +157,10 @@ void ControlOperationAssetRequest::execute(DispatcherService *service)
string ingestService = tracker->getIngestService(m_asset);
string payload = "{ \"operation\" : \"";
payload += m_operation;
payload += "\", ";
payload += "\"";
if (m_parameters.size() > 0)
{
payload += "\"parameters\" : ";
payload += ", \"parameters\" : ";
payload += m_parameters.toJSON();
}
payload += " }";
Expand Down Expand Up @@ -190,10 +190,10 @@ void ControlOperationBroadcastRequest::execute(DispatcherService *service)

string payload = "{ \"operation\" : \"";
payload += m_operation;
payload += "\", ";
payload += "\"";
if (m_parameters.size() > 0)
{
payload += "\"parameters\" : ";
payload += ", \"parameters\" : ";
payload += m_parameters.toJSON();
}
payload += " }";
Expand Down Expand Up @@ -251,9 +251,12 @@ void WriteControlRequest::filter(ControlPipelineManager *manager)
}
Reading *reading = m_values.toReading("reading");
// Filter the reading
context->filter(reading);
reading = context->filter(reading);
m_values.fromReading(reading);
delete reading;
if (reading)
{
delete reading;
}
}

/**
Expand Down Expand Up @@ -326,7 +329,12 @@ void ControlOperationRequest::filter(ControlPipelineManager *manager)
}
Reading *reading = m_parameters.toReading(m_operation);
// Filter the reading
context->filter(reading);
reading = context->filter(reading);
m_parameters.fromReading(reading);
delete reading;
// All the filter pipeline to change the operation name
if (reading)
{
m_operation = reading->getAssetName();
delete reading;
}
}
35 changes: 28 additions & 7 deletions C/services/dispatcher/dispatcher_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,16 @@ DispatcherApi::DispatcherApi(const unsigned short port,
*/
DispatcherApi::~DispatcherApi()
{
delete m_server;
if (m_thread)
{
stop();
wait();
delete m_thread;
}
if (m_server)
{
delete m_server;
}
}

/**
Expand Down Expand Up @@ -87,23 +94,29 @@ void startService()
/**
* Start the HTTP server
*/
void DispatcherApi::start(DispatcherService *service) {
void DispatcherApi::start(DispatcherService *service)
{
m_service = service;
m_thread = new thread(startService);
}

/**
* Start method for HTTP server
*/
void DispatcherApi::startServer() {
void DispatcherApi::startServer()
{
m_server->start();
}

/**
* Stop method for HTTP server
*/
void DispatcherApi::stopServer() {
m_server->stop();
void DispatcherApi::stopServer()
{
if (m_server)
{
m_server->stop();
}
}

/**
Expand All @@ -117,8 +130,16 @@ void DispatcherApi::stop()
/**
* Wait for the HTTP server to shutdown
*/
void DispatcherApi::wait() {
m_thread->join();
void DispatcherApi::wait()
{
if (m_thread)
{
try {
m_thread->join();
} catch (exception& e) {
// ignore
}
}
}

/**
Expand Down
33 changes: 21 additions & 12 deletions C/services/dispatcher/dispatcher_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,16 @@ bool DispatcherService::start(string& coreAddress,
managementListener, // Management port
m_token); // Token

if (!m_mgtClient->registerService(record))
if (!m_dryRun)
{
m_logger->fatal("Unable to register service "
"\"Dispatcher\" for service '" + m_name + "'");
if (!m_mgtClient->registerService(record))
{
m_logger->fatal("Unable to register service "
"\"Dispatcher\" for service '" + m_name + "'");

this->cleanupResources();
return false;
this->cleanupResources();
return false;
}
}

// Make sure we have an instance of the asset tracker
Expand Down Expand Up @@ -230,10 +233,13 @@ bool DispatcherService::start(string& coreAddress,
m_mgtClient->addChildCategories(m_name, children1);
}

// Register m_name category to Fledge Core
registerCategory(m_name);
registerCategory(advancedCatName);
registerCategory(serverCatName);
if (!m_dryRun)
{
// Register m_name category to Fledge Core
registerCategory(m_name);
registerCategory(advancedCatName);
registerCategory(serverCatName);
}

ConfigCategory serverCategory = m_mgtClient->getCategory(serverCatName);
if (serverCategory.itemExists("enable"))
Expand Down Expand Up @@ -299,10 +305,13 @@ bool DispatcherService::start(string& coreAddress,
m_mgtClient->addAuditEntry("DSPST",
"INFORMATION",
"{\"name\": \"" + m_name + "\"}");
}

// Create default security category
this->createSecurityCategories(m_mgtClient, m_dryRun);
// Create default security category
this->createSecurityCategories(m_mgtClient, m_dryRun);

if (!m_dryRun)
{
// Start the control filter pipeline manager
m_pipelineManager = new ControlPipelineManager(m_mgtClient, m_storage);
m_pipelineManager->setService(this);
Expand Down Expand Up @@ -335,7 +344,7 @@ bool DispatcherService::start(string& coreAddress,
// Request the core to restart the service
m_mgtClient->restartService();
}
else if (m_removeFromCore)
else if (m_removeFromCore && (!m_dryRun))
{
// Unregister from storage service
m_mgtClient->unregisterService();
Expand Down
2 changes: 1 addition & 1 deletion C/services/dispatcher/include/controlrequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class ControlOperationRequest : public ControlRequest {
protected:
void filter(ControlPipelineManager *manager);
protected:
const std::string m_operation;
std::string m_operation;
KVList m_parameters;
};

Expand Down
25 changes: 21 additions & 4 deletions C/services/dispatcher/kvlist.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ vector<Datapoint *> values;
}
}
}
// We can not have a reading with no data points, so if we have no parameters
// we must add a dummy datapoint to pass the operation through the filter
if (values.size() == 0)
{
DatapointValue dpv("None");
values.push_back(new Datapoint("__None__", dpv));
}
return new Reading(asset, values);
}

Expand All @@ -215,13 +222,23 @@ vector<Datapoint *> values;
void KVList::fromReading(Reading *reading)
{
m_list.clear();
if (!reading)
return;
vector<Datapoint *>datapoints = reading->getReadingData();
for (Datapoint *dp : datapoints)
{
if (dp->getData().getType() == DatapointValue::T_STRING)
add(dp->getName(), dp->getData().toStringValue());
else
add(dp->getName(), dp->getData().toString());
// Remove the dummy datapoint that was added
if (dp->getName().compare("__None__") != 0)
{
try {
if (dp->getData().getType() == DatapointValue::T_STRING)
add(dp->getName(), dp->getData().toStringValue());
else
add(dp->getName(), dp->getData().toString());
} catch (exception& e) {
Logger::getLogger()->warn("Unable to add datapoint %s of type %s returned from pipeline, %s.", dp->getName(), dp->getData().getTypeStr(), e.what());
}
}
}
}

Expand Down
Loading

0 comments on commit 4018664

Please sign in to comment.