Skip to content

Commit

Permalink
added QOS and retain to MQTT Service 2
Browse files Browse the repository at this point in the history
  • Loading branch information
wsobel committed Jul 2, 2024
1 parent 2a79f47 commit 4fad359
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 8 deletions.
2 changes: 2 additions & 0 deletions src/mtconnect/configuration/config_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ namespace mtconnect {
DECLARE_CONFIGURATION(MqttTls);
DECLARE_CONFIGURATION(MqttPort);
DECLARE_CONFIGURATION(MqttHost);
DECLARE_CONFIGURATION(MqttRetain);
DECLARE_CONFIGURATION(MqttQOS);
DECLARE_CONFIGURATION(MqttWs);
DECLARE_CONFIGURATION(MqttConnectInterval);
DECLARE_CONFIGURATION(MqttUserName);
Expand Down
30 changes: 26 additions & 4 deletions src/mtconnect/sink/mqtt_sink/mqtt2_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ namespace mtconnect {
{configuration::MqttUserName, string()},
{configuration::MqttPassword, string()},
{configuration::MqttPort, int()},
{configuration::MqttRetain, bool()},
{configuration::MqttQOS, string()},
{configuration::MqttHost, string()}});
AddDefaultedOptions(
config, m_options,
Expand Down Expand Up @@ -114,6 +116,26 @@ namespace mtconnect {
{
m_options[configuration::MqttHost] = m_options[configuration::Host];
}

auto retain = GetOption<bool>(m_options, configuration::MqttRetain);
if (retain)
m_retain = *retain;

auto qoso = GetOption<string>(m_options, configuration::MqttQOS);

if (qoso)
{
auto qos = *qoso;
if (qos == "at_most_once")
m_qos = MqttClient::QOS::at_most_once;
else if (qos == "at_least_once")
m_qos = MqttClient::QOS::at_least_once;
else if (qos == "exactly_once")
m_qos = MqttClient::QOS::exactly_once;
else
LOG(warning) << "Invalid QOS for MQTT Client: " << qos
<< ", must be at_most_once, at_least_once, or exactly_once";
}
}

void Mqtt2Service::start()
Expand Down Expand Up @@ -256,7 +278,7 @@ namespace mtconnect {
{
LOG(warning) << "Async publish failed for " << topic << ": " << ec.message();
}
});
}, m_retain, m_qos);

return end;
}
Expand Down Expand Up @@ -299,7 +321,7 @@ namespace mtconnect {
m_sinkContract->getCircularBuffer().getBufferSize(),
seq, firstSeq, seq - 1, observations);

m_client->publish(topic, doc);
m_client->publish(topic, doc, m_retain, m_qos);
}

using std::placeholders::_1;
Expand Down Expand Up @@ -327,7 +349,7 @@ namespace mtconnect {
buffer << doc;

if (m_client)
m_client->publish(topic, buffer.str());
m_client->publish(topic, buffer.str(), m_retain, m_qos);

return true;
}
Expand All @@ -352,7 +374,7 @@ namespace mtconnect {
buffer << doc;

if (m_client)
m_client->publish(topic, buffer.str());
m_client->publish(topic, buffer.str(), m_retain, m_qos);

return true;
}
Expand Down
9 changes: 9 additions & 0 deletions src/mtconnect/sink/mqtt_sink/mqtt2_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ namespace mtconnect {
/// @brief Mqtt Client is Connected or not
/// @return `true` when the client was connected
bool isConnected() { return m_client && m_client->isConnected(); }

/// @name Retain and QOS flags
///@{
auto getRetain() { return m_retain; }
auto getQOS() { return m_qos; }
///@}

protected:
const FilterSet &filterForDevice(const DevicePtr &device)
Expand Down Expand Up @@ -199,6 +205,9 @@ namespace mtconnect {

std::map<std::string, FilterSet> m_filters;
std::map<std::string, std::shared_ptr<AsyncSample>> m_samplers;

bool m_retain {true};
MqttClient::QOS m_qos {MqttClient::QOS::at_least_once};
};
} // namespace mqtt_sink
} // namespace sink
Expand Down
5 changes: 1 addition & 4 deletions src/mtconnect/source/adapter/mqtt/mqtt_adapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ namespace mtconnect::source::adapter::mqtt_adapter {

unsigned int getPort() const override;
///@}

/// @name Source interface
///@{
bool start() override;
Expand Down Expand Up @@ -101,8 +101,5 @@ namespace mtconnect::source::adapter::mqtt_adapter {
MqttPipeline m_pipeline;

std::shared_ptr<MqttClient> m_client;

bool retain {true};
MqttClient::QOS qos {MqttClient::QOS::at_least_once};
};
} // namespace mtconnect::source::adapter::mqtt_adapter

0 comments on commit 4fad359

Please sign in to comment.