Skip to content

Commit

Permalink
Merge pull request #473 from mtconnect/467_468_check_uuid_device_upda…
Browse files Browse the repository at this point in the history
…te_in_mqtt_and_add_retain_flag

467 468 check UUID device update in mqtt and add retain flag
  • Loading branch information
wsobel authored Jul 2, 2024
2 parents 43331b2 + c1960d4 commit a5ba74c
Show file tree
Hide file tree
Showing 29 changed files with 286 additions and 97 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set(AGENT_VERSION_MAJOR 2)
set(AGENT_VERSION_MINOR 3)
set(AGENT_VERSION_PATCH 0)
set(AGENT_VERSION_BUILD 14)
set(AGENT_VERSION_BUILD 15)
set(AGENT_VERSION_RC "")

# This minimum version is to support Visual Studio 2019 and C++ feature checking and FetchContent
Expand Down
129 changes: 69 additions & 60 deletions src/mtconnect/agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
#include <sys/stat.h>
#include <thread>

#include <libxml/xmlwriter.h>

#include "mtconnect/asset/asset.hpp"
#include "mtconnect/asset/component_configuration_parameters.hpp"
#include "mtconnect/asset/cutting_tool.hpp"
Expand Down Expand Up @@ -436,28 +438,56 @@ namespace mtconnect {
}
}

void Agent::loadDevices(list<DevicePtr> devices, const optional<string> source)
void Agent::loadDevices(list<DevicePtr> devices, const optional<string> source,
bool force)
{
if (!IsOptionSet(m_options, config::EnableSourceDeviceModels))
if (!force && !IsOptionSet(m_options, config::EnableSourceDeviceModels))
{
LOG(warning) << "Device updates are disabled, skipping update";
return;
}

m_context.pause([=](config::AsyncContext &context) {
auto callback = [=](config::AsyncContext &context) {
try
{
bool changed = false;
for (auto device : devices)
{
auto oldName = *device->getComponentName();
auto oldDev = getDeviceByName(oldName);
optional<string> oldUuid;
if (oldDev)
{
oldUuid = *oldDev->getUuid();
}

auto uuid = *device->getUuid();
auto name = *device->getComponentName();

changed = receiveDevice(device, true) || changed;
if (source)
if (changed)
{
auto s = findSource(*source);
if (s)
if (source)
{
auto s = findSource(*source);
if (s)
{
s->setOptions({{config::Device, uuid}});
}
}

for (auto src : m_sources)
{
const auto &name = device->getComponentName();
s->setOptions({{config::Device, *name}});
auto adapter = std::dynamic_pointer_cast<source::adapter::Adapter>(src);
if (adapter)
{
auto &options = adapter->getOptions();
auto dev = GetOption<std::string>(options, config::Device);
if (dev == oldName || dev == oldUuid)
{
adapter->setOptions({{config::Device, uuid}});
}
}
}
}
}
Expand All @@ -483,7 +513,14 @@ namespace mtconnect {
LOG(error) << "Error detail: " << f.what();
cerr << f.what() << endl;
}
});
};

// Gets around a race condition in the loading of adapaters and setting of
// UUID.
if (m_context.isRunning() && !m_context.isPauased())
m_context.pause(callback);
else
callback(m_context);
}

bool Agent::receiveDevice(device_model::DevicePtr device, bool version)
Expand Down Expand Up @@ -518,14 +555,18 @@ namespace mtconnect {
addDevice(device);
if (version)
versionDeviceXml();

for (auto &sink : m_sinks)
sink->publish(device);

return true;
}
else
{
auto name = device->getComponentName();
if (!name)
{
LOG(error) << "Device does not have a name" << *device->getUuid();
LOG(error) << "Device does not have a name: " << *device->getUuid();
return false;
}

Expand Down Expand Up @@ -603,6 +644,9 @@ namespace mtconnect {
m_loopback->receive(d, props);
}

for (auto &sink : m_sinks)
sink->publish(device);

return true;
}
else
Expand Down Expand Up @@ -974,13 +1018,12 @@ namespace mtconnect {
printer.second->setModelChangeTime(getCurrentTime(GMT_UV_SEC));
}

void Agent::deviceChanged(DevicePtr device, const std::string &oldUuid,
const std::string &oldName)
void Agent::deviceChanged(DevicePtr device, const std::string &uuid)
{
NAMED_SCOPE("Agent::deviceChanged");

bool changed = false;
string uuid = *device->getUuid();
string oldUuid = *device->getUuid();
if (uuid != oldUuid)
{
changed = true;
Expand All @@ -992,46 +1035,15 @@ namespace mtconnect {
}
}

if (*device->getComponentName() != oldName)
{
changed = true;
}

if (changed)
{
createUniqueIds(device);
if (m_intSchemaVersion >= SCHEMA_VERSION(2, 2))
device->addHash();

versionDeviceXml();
loadCachedProbe();

if (m_agentDevice)
{
for (auto &printer : m_printers)
printer.second->setModelChangeTime(getCurrentTime(GMT_UV_SEC));
// Create a new device
auto xmlPrinter = dynamic_cast<printer::XmlPrinter *>(m_printers["xml"].get());
auto newDevice = m_xmlParser->parseDevice(xmlPrinter->printDevice(device), xmlPrinter);

entity::Properties props {{"VALUE", uuid}};
if (m_intSchemaVersion >= SCHEMA_VERSION(2, 2))
{
const auto &hash = device->getProperty("hash");
if (ValueType(hash.index()) != ValueType::EMPTY)
props.insert_or_assign("hash", hash);
}
newDevice->setUuid(uuid);

if (device->getUuid() != oldUuid)
{
auto d = m_agentDevice->getDeviceDataItem("device_added");
if (d)
m_loopback->receive(d, props);
}
else
{
auto d = m_agentDevice->getDeviceDataItem("device_changed");
if (d)
m_loopback->receive(d, props);
}
}
m_loopback->receive(newDevice);
}
}

Expand Down Expand Up @@ -1436,13 +1448,7 @@ namespace mtconnect {
DevicePtr device {nullptr};
device = findDeviceByUUIDorName(deviceName);

std::string oldName, oldUuid;
if (device)
{
oldName = *device->getComponentName();
oldUuid = *device->getUuid();
}
else
if (!device)
{
LOG(warning) << source << ": Cannot find device for name " << deviceName;
}
Expand Down Expand Up @@ -1502,12 +1508,15 @@ namespace mtconnect {
if (!device->preserveUuid())
{
auto &idx = m_deviceIndex.get<ByUuid>();
auto it = idx.find(oldUuid);
auto it = idx.find(*device->getUuid());
if (it != idx.end())
{
idx.modify(it, [&value](DevicePtr &ptr) { ptr->setUuid(value); });
deviceChanged(device, value);
}
else
{
LOG(warning) << "Could not find device by uuid: " << *device->getUuid();
}
deviceChanged(device, oldUuid, oldName);
}
}
else
Expand Down Expand Up @@ -1536,7 +1545,7 @@ namespace mtconnect {
else
{
action->second(device, value);
deviceChanged(device, oldUuid, oldName);
m_loopback->receive(device);
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/mtconnect/agent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ namespace mtconnect {
/// @param[in] device The modified device
/// @param[in] oldUuid The old uuid
/// @param[in] oldName The old name
void deviceChanged(DevicePtr device, const std::string &oldUuid, const std::string &oldName);
void deviceChanged(DevicePtr device, const std::string &uuid);
/// @brief Reload the devices from a device file after updates
/// @param[in] deviceFile The device file to load
/// @return true if successful
Expand All @@ -279,7 +279,8 @@ namespace mtconnect {
/// @param[in] deviceXml the device xml as a string
/// @param[in] source the source loading the device
void loadDevices(std::list<DevicePtr> device,
const std::optional<std::string> source = std::nullopt);
const std::optional<std::string> source = std::nullopt,
bool force = false);

/// @brief receive and parse a single device from a source
/// @param[in] deviceXml the device xml as a string
Expand Down Expand Up @@ -590,6 +591,8 @@ namespace mtconnect {
void deliverConnectStatus(entity::EntityPtr, const StringList &devices,
bool autoAvailable) override;
void deliverCommand(entity::EntityPtr) override;
void deliverDevice(DevicePtr device) override { m_agent->loadDevices({device}, std::nullopt,
true); }
void deliverDevices(std::list<DevicePtr> devices) override { m_agent->loadDevices(devices); }

void sourceFailed(const std::string &identity) override { m_agent->sourceFailed(identity); }
Expand Down
4 changes: 1 addition & 3 deletions src/mtconnect/configuration/agent_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -996,9 +996,7 @@ namespace mtconnect::configuration {
else if (auto uuid = GetOption<string>(adapterOptions, configuration::UUID))
{
// Set the UUID of the device
auto oldUuid = *device->getUuid();
device->setUuid(*uuid);
m_agent->deviceChanged(device, oldUuid, *device->getComponentName());
m_agent->deviceChanged(device, *uuid);
}

auto preserve = GetOption<bool>(adapterOptions, configuration::PreserveUUID);
Expand Down
8 changes: 8 additions & 0 deletions src/mtconnect/configuration/async_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ namespace mtconnect::configuration {
/// @brief removes the copy constructor
AsyncContext(const AsyncContext &) = delete;
~AsyncContext() {}

/// @brief is the context running
/// @returns running status
auto isRunning() { return m_running; }

/// @brief return the paused state
/// @returns the paused state
auto isPauased() { return m_paused; }

/// @brief Testing only: method to remove the run guard from the context
void removeGuard() { m_guard.reset(); }
Expand Down
2 changes: 2 additions & 0 deletions src/mtconnect/configuration/config_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ namespace mtconnect {
DECLARE_CONFIGURATION(MqttTls);
DECLARE_CONFIGURATION(MqttPort);
DECLARE_CONFIGURATION(MqttHost);
DECLARE_CONFIGURATION(MqttRetain);
DECLARE_CONFIGURATION(MqttQOS);
DECLARE_CONFIGURATION(MqttWs);
DECLARE_CONFIGURATION(MqttConnectInterval);
DECLARE_CONFIGURATION(MqttUserName);
Expand Down
13 changes: 11 additions & 2 deletions src/mtconnect/mqtt/mqtt_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ namespace mtconnect {
class MqttClient : public std::enable_shared_from_this<MqttClient>
{
public:
enum class QOS
{
at_most_once,
at_least_once,
exactly_once
};

/// @brief Create an Mqtt Client with an asio context and ClientHandler
/// @param context a boost asio context
/// @param ClientHandler configuration options
Expand Down Expand Up @@ -81,14 +88,16 @@ namespace mtconnect {
/// @param topic Publishing to the topic
/// @param payload Publishing to the payload
/// @return boolean either topic sucessfully connected and published
virtual bool publish(const std::string &topic, const std::string &payload) = 0;
virtual bool publish(const std::string &topic, const std::string &payload, bool retain = true,
QOS qos = QOS::at_least_once) = 0;

/// @brief Publish Topic to the Mqtt Client and call the async handler
/// @param topic Publishing to the topic
/// @param payload Publishing to the payload
/// @return boolean either topic sucessfully connected and published
virtual bool asyncPublish(const std::string &topic, const std::string &payload,
std::function<void(std::error_code)> callback) = 0;
std::function<void(std::error_code)> callback, bool retain = true,
QOS qos = QOS::at_least_once) = 0;

/// @brief Mqtt Client is connected
/// @return bool Either Client is sucessfully connected or not
Expand Down
Loading

0 comments on commit a5ba74c

Please sign in to comment.