Skip to content
This repository was archived by the owner on Jul 18, 2023. It is now read-only.

#2878 Change MQTT Implementation #262

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
892 changes: 447 additions & 445 deletions agent_lib/CMakeLists.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ branches:
- /.*master.*/

image:
- Ubuntu2004
- Visual Studio 2019
- Ubuntu2004
- macos-monterey

environment:
Expand Down
2 changes: 2 additions & 0 deletions src/mtconnect/configuration/agent_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "mtconnect/device_model/device.hpp"
#include "mtconnect/printer/xml_printer.hpp"
#include "mtconnect/sink/mqtt_sink/mqtt_service.hpp"
#include "mtconnect/sink/mqtt_sink/mqtt2_service.hpp"
#include "mtconnect/sink/rest_sink/rest_service.hpp"
#include "mtconnect/source/adapter/agent_adapter/agent_adapter.hpp"
#include "mtconnect/source/adapter/mqtt/mqtt_adapter.hpp"
Expand Down Expand Up @@ -110,6 +111,7 @@ namespace mtconnect::configuration {
bool success = false;

sink::mqtt_sink::MqttService::registerFactory(m_sinkFactory);
sink::mqtt_sink::Mqtt2Service::registerFactory(m_sinkFactory);
sink::rest_sink::RestService::registerFactory(m_sinkFactory);
adapter::shdr::ShdrAdapter::registerFactory(m_sourceFactory);
adapter::mqtt_adapter::MqttAdapter::registerFactory(m_sourceFactory);
Expand Down
88 changes: 75 additions & 13 deletions src/mtconnect/mqtt/mqtt_authorization.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ namespace mtconnect {
};

public:

MqttTopicPermission(const std::string& topic)
{
m_topic = topic;
Expand Down Expand Up @@ -74,49 +75,111 @@ namespace mtconnect {
return false;
}

const std::string& getTopic() const
{
return m_topic;
}

protected:
TopicMode m_mode;
AuthorizationType m_type;
std::string m_topic;

}; // namespace MqttTopicPermission

using MqttTopicPermissionPtr = std::shared_ptr<MqttTopicPermission>;

class MqttAuthorization
{
public:
MqttAuthorization(const ConfigOptions& options) : m_options(options)
{
m_clientId = *GetOption<std::string>(options, configuration::MqttClientId);
m_username = GetOption<std::string>(options, configuration::MqttUserName);
m_password = GetOption<std::string>(options, configuration::MqttPassword);
}

virtual ~MqttAuthorization() = default;

MqttTopicPermission getPermissionsForClient(const std::string& topic)
void addTopicPermissionForClient(const std::string& packetId, const std::string& topic)
{
MqttTopicPermission mqttTopicPerm = *new MqttTopicPermission(topic);
return mqttTopicPerm;
}

list<MqttTopicPermission> getPermissionsForClient(const std::list<std::string>& topics)
if (m_mapMqttTopicPermissions.empty())
{
list<MqttTopicPermissionPtr> mqttTopicPermissions;
MqttTopicPermissionPtr mqttTopicPerm = make_shared<MqttTopicPermission>(topic);
mqttTopicPermissions.emplace_back(mqttTopicPerm);
m_mapMqttTopicPermissions.emplace(packetId, mqttTopicPermissions);
}
else
{
list<MqttTopicPermissionPtr> mqttTopicPermissions = getTopicPermissionsForClient(packetId);

if (!mqttTopicPermissions.empty())
{
MqttTopicPermissionPtr mqttTopicPerm = make_shared<MqttTopicPermission>(topic);
mqttTopicPermissions.emplace_back(mqttTopicPerm);
m_mapMqttTopicPermissions[packetId] = mqttTopicPermissions;
}
}
}

void addTopicPermissionsForClient(const std::string& packetId,
const std::list<std::string>& topics)
{
list<MqttTopicPermission> mqttTopicPermissions;
list<MqttTopicPermissionPtr> mqttTopicPermissions;

for (auto& topic : topics)
{
mqttTopicPermissions.push_back(*new MqttTopicPermission(topic));
MqttTopicPermissionPtr mqttTopicPerm = make_shared<MqttTopicPermission>(topic);
mqttTopicPermissions.emplace_back(mqttTopicPerm);
}
m_mapMqttTopicPermissions.emplace(packetId, mqttTopicPermissions);
}

MqttTopicPermissionPtr getTopicPermissionForClient(const std::string& packetId,
const std::string& topic) const
{
MqttTopicPermissionPtr mqttTopicPerm;
for (const auto& mqttPerms : m_mapMqttTopicPermissions)
{
if (!mqttPerms.second.empty())
{
for (MqttTopicPermissionPtr mqttperm : mqttPerms.second)
{
if (mqttperm->getTopic() == topic)
return mqttperm;
}
}
}
return mqttTopicPerm;
}

return mqttTopicPermissions;
list<MqttTopicPermissionPtr> getTopicPermissionsForClient(const std::string& packetId)
{
return m_mapMqttTopicPermissions[packetId];
}

bool hasAuthorization(const std::string& packetId, const std::string& topic)
{
for (const auto& mqttPerms : m_mapMqttTopicPermissions)
{
if (!mqttPerms.second.empty())
{
for (MqttTopicPermissionPtr mqttperm : mqttPerms.second)
{
if (mqttperm->getTopic() == topic)
return mqttperm->hasAuthorization();
}
}
}
return false;
}

protected:
std::optional<std::string> m_username;
std::optional<std::string> m_password;
std::string m_clientId;
std::uint16_t m_packetId;
ConfigOptions m_options;

std::map<std::string, list<MqttTopicPermissionPtr> > m_mapMqttTopicPermissions;
}; // namespace MqttAuthorization

class MqttAuthentication
Expand All @@ -138,7 +201,6 @@ namespace mtconnect {
LOG(error) << "MQTT USERNAME_OR_PASSWORD are Not Available";
return false;
}

return true;
}

Expand Down
228 changes: 228 additions & 0 deletions src/mtconnect/sink/mqtt_sink/mqtt2_service.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
//
// Copyright Copyright 2009-2023, AMT – The Association For Manufacturing Technology (“AMT”)
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#include "mqtt2_service.hpp"

#include "mtconnect/configuration/config_options.hpp"
#include "mtconnect/entity/entity.hpp"
#include "mtconnect/entity/factory.hpp"
#include "mtconnect/entity/json_parser.hpp"
#include "mtconnect/mqtt/mqtt_client_impl.hpp"
#include "mtconnect/printer/json_printer.hpp"

using ptree = boost::property_tree::ptree;

using namespace std;
using namespace mtconnect::asset;

namespace asio = boost::asio;
namespace config = ::mtconnect::configuration;

namespace mtconnect {
namespace sink {
namespace mqtt_sink {
// get obeservation in
// create a json printer
// call print

Mqtt2Service::Mqtt2Service(boost::asio::io_context &context, sink::SinkContractPtr &&contract,
const ConfigOptions &options, const ptree &config)
: Sink("Mqtt2Service", std::move(contract)), m_context(context), m_options(options)
{
auto jsonPrinter = dynamic_cast<printer::JsonPrinter *>(m_sinkContract->getPrinter("json"));
m_jsonPrinter = make_unique<entity::JsonEntityPrinter>(jsonPrinter->getJsonVersion());

GetOptions(config, m_options, options);
AddOptions(config, m_options,
{{configuration::MqttCaCert, string()},
{configuration::MqttPrivateKey, string()},
{configuration::MqttCert, string()},
{configuration::MqttClientId, string()}});
AddDefaultedOptions(config, m_options,
{{configuration::MqttHost, "127.0.0.1"s},
{configuration::DeviceTopic, "MTConnect/Device/"s},
{configuration::AssetTopic, "MTConnect/Asset/"s},
{configuration::ObservationTopic, "MTConnect/Observation/"s},
{configuration::MqttPort, 1883},
{configuration::MqttTls, false}});

auto clientHandler = make_unique<ClientHandler>();
clientHandler->m_connected = [this](shared_ptr<MqttClient> client) {
// Publish latest devices, assets, and observations
auto &circ = m_sinkContract->getCircularBuffer();
std::lock_guard<buffer::CircularBuffer> lock(circ);
client->connectComplete();

for (auto &dev : m_sinkContract->getDevices())
{
publish(dev);
}

auto obsList {circ.getLatest().getObservations()};
for (auto &obs : obsList)
{
observation::ObservationPtr p {obs.second};
publish(p);
}

AssetList list;
m_sinkContract->getAssetStorage()->getAssets(list, 100000);
for (auto &asset : list)
{
publish(asset);
}
};

m_devicePrefix = get<string>(m_options[configuration::DeviceTopic]);
m_assetPrefix = get<string>(m_options[configuration::AssetTopic]);
m_observationPrefix = get<string>(m_options[configuration::ObservationTopic]);

if (IsOptionSet(m_options, configuration::MqttTls))
{
m_client = make_shared<MqttTlsClient>(m_context, m_options, std::move(clientHandler));
}
else
{
m_client = make_shared<MqttTcpClient>(m_context, m_options, std::move(clientHandler));
}
}

void Mqtt2Service::start()
{
// mqtt client side not a server side...
if (!m_client)
return;

m_client->start();
}

void Mqtt2Service::stop()
{
// stop client side
if (m_client)
m_client->stop();
}

std::shared_ptr<MqttClient> Mqtt2Service::getClient() { return m_client; }

bool Mqtt2Service::publish(observation::ObservationPtr &observation)
{
// get the data item from observation
if (observation->isOrphan())
return false;

DataItemPtr dataItem = observation->getDataItem();

auto topic = m_observationPrefix + dataItem->getTopic(); // client asyn topic
auto content = dataItem->getTopicName(); // client asyn content

// We may want to use the observation from the checkpoint.
auto doc = m_jsonPrinter->printEntity(observation);
if (m_client)
m_client->publish(topic, doc);

return true;
}

bool Mqtt2Service::publish(device_model::DevicePtr device)
{
auto topic = m_devicePrefix + *device->getUuid();
auto doc = m_jsonPrinter->print(device);

stringstream buffer;
buffer << doc;

if (m_client)

m_client->publish(topic, buffer.str());

return true;
}

bool Mqtt2Service::publish_Probe(device_model::DevicePtr device)
{
auto topic = m_devicePrefix + *device->getUuid();
auto doc = m_jsonPrinter->print(device);

stringstream buffer;
buffer << doc;

if (m_client)

m_client->publish(topic, buffer.str());

return true;
}

bool Mqtt2Service::publish_Current(device_model::DevicePtr device)
{
auto topic = m_devicePrefix + *device->getUuid();
auto doc = m_jsonPrinter->print(device);

stringstream buffer;
buffer << doc;

if (m_client)

m_client->publish(topic, buffer.str());

return true;
}

bool Mqtt2Service::publish_Samples(device_model::DevicePtr device)
{
auto topic = m_devicePrefix + *device->getUuid();
auto doc = m_jsonPrinter->print(device);

stringstream buffer;
buffer << doc;

if (m_client)

m_client->publish(topic, buffer.str());

return true;
}

bool Mqtt2Service::publish(asset::AssetPtr asset)
{
auto topic = m_assetPrefix + get<string>(asset->getIdentity());
auto doc = m_jsonPrinter->print(asset);

stringstream buffer;
buffer << doc;

if (m_client)
m_client->publish(topic, buffer.str());

return true;
}

// Register the service with the sink factory
void Mqtt2Service::registerFactory(SinkFactory &factory)
{
factory.registerFactory(
"Mqtt2Service",
[](const std::string &name, boost::asio::io_context &io, SinkContractPtr &&contract,
const ConfigOptions &options, const boost::property_tree::ptree &block) -> SinkPtr {
auto sink = std::make_shared<Mqtt2Service>(io, std::move(contract), options, block);
return sink;
});
}
} // namespace mqtt_sink
} // namespace sink
} // namespace mtconnect
Loading