Skip to content

Commit

Permalink
feat: support wildcard mqtt subscribe
Browse files Browse the repository at this point in the history
Subscribe supports wildcard topics
There is a second callback option that provides the full topic of the received
message.

Signed-off-by: James Chapman <[email protected]>
  • Loading branch information
james-ctc committed Nov 4, 2024
1 parent 313709f commit 9b8dd4a
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 35 deletions.
6 changes: 6 additions & 0 deletions include/framework/ModuleAdapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ struct ModuleAdapter {
using GetErrorStateMonitorReqFunc = std::function<std::shared_ptr<error::ErrorStateMonitor>(const Requirement&)>;
using ExtMqttPublishFunc = std::function<void(const std::string&, const std::string&)>;
using ExtMqttSubscribeFunc = std::function<UnsubscribeToken(const std::string&, StringHandler)>;
using ExtMqttSubscribePairFunc = std::function<UnsubscribeToken(const std::string&, StringPairHandler)>;
using TelemetryPublishFunc =
std::function<void(const std::string&, const std::string&, const std::string&, const TelemetryMap&)>;

Expand All @@ -105,6 +106,7 @@ struct ModuleAdapter {
GetGlobalErrorStateMonitorFunc get_global_error_state_monitor;
ExtMqttPublishFunc ext_mqtt_publish;
ExtMqttSubscribeFunc ext_mqtt_subscribe;
ExtMqttSubscribePairFunc ext_mqtt_subscribe_pair;
std::vector<cmd> registered_commands;
TelemetryPublishFunc telemetry_publish;

Expand Down Expand Up @@ -156,6 +158,10 @@ class MqttProvider {
return ev.ext_mqtt_subscribe(topic, std::move(handler));
}

UnsubscribeToken subscribe(const std::string& topic, StringPairHandler handler) const {
return ev.ext_mqtt_subscribe_pair(topic, std::move(handler));
}

private:
ModuleAdapter& ev;
};
Expand Down
5 changes: 5 additions & 0 deletions include/framework/everest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ class Everest {
///
UnsubscribeToken provide_external_mqtt_handler(const std::string& topic, const StringHandler& handler);

///
/// \brief Allows a module to indicate that it provides a external mqtt \p handler at the given \p topic
///
UnsubscribeToken provide_external_mqtt_handler(const std::string& topic, const StringPairHandler& handler);

///
/// \brief publishes the given telemetry \p data on the given \p topic
///
Expand Down
1 change: 1 addition & 0 deletions include/utils/error/error_database_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace error {
class ErrorDatabaseMap : public ErrorDatabase {
public:
ErrorDatabaseMap() = default;
virtual ~ErrorDatabaseMap() = default;

void add_error(ErrorPtr error) override;
std::list<ErrorPtr> get_errors(const std::list<ErrorFilter>& filters) const override;
Expand Down
10 changes: 8 additions & 2 deletions include/utils/message_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,16 @@ class MessageQueue {

/// \brief Contains a message queue driven list of handler callbacks
class MessageHandler {
public:
struct MessageDetails {
std::string topic;

Check notice on line 56 in include/utils/message_queue.hpp

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

include/utils/message_queue.hpp#L56

struct member 'MessageDetails::topic' is never used.
std::shared_ptr<json> data;
};

private:
std::unordered_set<std::shared_ptr<TypedHandler>> handlers;
std::thread handler_thread;
std::queue<std::shared_ptr<json>> message_queue;
std::queue<MessageDetails> message_queue;

Check notice on line 63 in include/utils/message_queue.hpp

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

include/utils/message_queue.hpp#L63

class member 'MessageHandler::message_queue' is never used.
std::mutex handler_ctrl_mutex;
std::mutex handler_list_mutex;
std::condition_variable cv;
Expand All @@ -68,7 +74,7 @@ class MessageHandler {
~MessageHandler();

/// \brief Adds a \p message to the message queue which will be delivered to the registered handlers
void add(std::shared_ptr<json> message);
void add(MessageDetails message);

/// \brief Stops the message handler
void stop();
Expand Down
3 changes: 2 additions & 1 deletion include/utils/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ using ModuleConfigs = std::map<std::string, ConfigMap>;
using Array = json::array_t;
using Object = json::object_t;
// TODO (aw): can we pass the handler arguments by const ref?
using Handler = std::function<void(json)>;
using Handler = std::function<void(const std::string&, json)>;
using StringHandler = std::function<void(std::string)>;
using StringPairHandler = std::function<void(const std::string& topic, const std::string& data)>;

enum class HandlerType {
Call,
Expand Down
46 changes: 37 additions & 9 deletions lib/everest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ Everest::Everest(std::string module_id_, const Config& config_, bool validate_da
}

// register handler for global ready signal
Handler handle_ready_wrapper = [this](json data) { this->handle_ready(data); };
Handler handle_ready_wrapper = [this](const std::string&, json data) { this->handle_ready(data); };
std::shared_ptr<TypedHandler> everest_ready =
std::make_shared<TypedHandler>(HandlerType::ExternalMQTT, std::make_shared<Handler>(handle_ready_wrapper));
this->mqtt_abstraction.register_handler(fmt::format("{}ready", mqtt_everest_prefix), everest_ready, QOS::QOS2);
Expand Down Expand Up @@ -357,7 +357,8 @@ json Everest::call_cmd(const Requirement& req, const std::string& cmd_name, json
std::promise<json> res_promise;
std::future<json> res_future = res_promise.get_future();

Handler res_handler = [this, &res_promise, call_id, connection, cmd_name, return_type](json data) {
Handler res_handler = [this, &res_promise, call_id, connection, cmd_name, return_type](const std::string&,
json data) {
auto& data_id = data.at("id");
if (data_id != call_id) {
EVLOG_debug << fmt::format("RES: data_id != call_id ({} != {})", data_id, call_id);
Expand Down Expand Up @@ -474,7 +475,7 @@ void Everest::subscribe_var(const Requirement& req, const std::string& var_name,
auto requirement_manifest_vardef = requirement_impl_manifest["vars"][var_name];

Handler handler = [this, requirement_module_id, requirement_impl_id, requirement_manifest_vardef, var_name,
callback](json const& data) {
callback](const std::string&, json const& data) {
EVLOG_verbose << fmt::format(
"Incoming {}->{}", this->config.printable_identifier(requirement_module_id, requirement_impl_id), var_name);

Expand Down Expand Up @@ -539,7 +540,8 @@ void Everest::subscribe_error(const Requirement& req, const error::ErrorType& er
return;
}

Handler raise_handler = [this, requirement_module_id, requirement_impl_id, error_type, callback](json const& data) {
Handler raise_handler = [this, requirement_module_id, requirement_impl_id, error_type, callback](const std::string&,
json const& data) {
EVLOG_debug << fmt::format("Incoming error {}->{}",
this->config.printable_identifier(requirement_module_id, requirement_impl_id),
error_type);
Expand All @@ -548,7 +550,7 @@ void Everest::subscribe_error(const Requirement& req, const error::ErrorType& er
};

Handler clear_handler = [this, requirement_module_id, requirement_impl_id, error_type,
clear_callback](json const& data) {
clear_callback](const std::string&, json const& data) {
EVLOG_debug << fmt::format("Error cleared {}->{}",
this->config.printable_identifier(requirement_module_id, requirement_impl_id),
error_type);
Expand Down Expand Up @@ -636,15 +638,15 @@ void Everest::subscribe_global_all_errors(const error::ErrorCallback& callback,
return;
}

Handler raise_handler = [this, callback](json const& data) {
Handler raise_handler = [this, callback](const std::string&, json const& data) {
error::Error error = data.get<error::Error>();
EVLOG_debug << fmt::format(
"Incoming error {}->{}",
this->config.printable_identifier(error.origin.module_id, error.origin.implementation_id), error.type);
callback(error);
};

Handler clear_handler = [this, clear_callback](json const& data) {
Handler clear_handler = [this, clear_callback](const std::string&, json const& data) {
error::Error error = data.get<error::Error>();
EVLOG_debug << fmt::format(
"Incoming error cleared {}->{}",
Expand Down Expand Up @@ -726,7 +728,7 @@ UnsubscribeToken Everest::provide_external_mqtt_handler(const std::string& topic

std::string external_topic = fmt::format("{}{}", this->mqtt_external_prefix, topic);

Handler external_handler = [this, handler, external_topic](json const& data) {
Handler external_handler = [handler, external_topic](const std::string&, json const& data) {
EVLOG_verbose << fmt::format("Incoming external mqtt data for topic '{}'...", external_topic);
if (!data.is_string()) {
EVLOG_AND_THROW(EverestInternalError("External mqtt result is not a string (that should never happen)"));
Expand All @@ -740,6 +742,32 @@ UnsubscribeToken Everest::provide_external_mqtt_handler(const std::string& topic
return [this, topic, token]() { this->mqtt_abstraction.unregister_handler(topic, token); };
}

UnsubscribeToken Everest::provide_external_mqtt_handler(const std::string& topic, const StringPairHandler& handler) {
BOOST_LOG_FUNCTION();

// check if external mqtt is enabled
if (!this->module_manifest.contains("enable_external_mqtt") &&
this->module_manifest["enable_external_mqtt"] == false) {
EVLOG_AND_THROW(EverestApiError(fmt::format("Module {} tries to provide an external MQTT handler, but didn't "
"set 'enable_external_mqtt' to 'true' in its manifest",
this->config.printable_identifier(this->module_id))));
}

std::string external_topic = fmt::format("{}{}", this->mqtt_external_prefix, topic);

// must be json and not std::string
Handler external_handler = [handler](const std::string& topic, const json& data) {
EVLOG_verbose << fmt::format("Incoming external mqtt data for topic '{}'...", topic);
std::string data_s = (data.is_string()) ? std::string(data) : data.dump();
handler(topic, data_s);
};

std::shared_ptr<TypedHandler> token =
std::make_shared<TypedHandler>(HandlerType::ExternalMQTT, std::make_shared<Handler>(external_handler));
this->mqtt_abstraction.register_handler(external_topic, token, QOS::QOS0);
return [this, topic, token]() { this->mqtt_abstraction.unregister_handler(topic, token); };
}

void Everest::telemetry_publish(const std::string& topic, const std::string& data) {
BOOST_LOG_FUNCTION();

Expand Down Expand Up @@ -834,7 +862,7 @@ void Everest::provide_cmd(const std::string impl_id, const std::string cmd_name,
const auto cmd_topic = fmt::format("{}/cmd", this->config.mqtt_prefix(this->module_id, impl_id));

// define command wrapper
Handler wrapper = [this, cmd_topic, impl_id, cmd_name, handler, cmd_definition](json data) {
Handler wrapper = [this, cmd_topic, impl_id, cmd_name, handler, cmd_definition](const std::string&, json data) {
BOOST_LOG_FUNCTION();

std::set<std::string> arg_names;
Expand Down
14 changes: 7 additions & 7 deletions lib/message_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ MessageHandler::MessageHandler() : running(true) {
this->message_queue.pop();
lock.unlock();

auto data = *message.get();
auto data = *message.data;

// get the registered handlers
std::vector<std::shared_ptr<TypedHandler>> local_handlers;
Expand All @@ -89,7 +89,7 @@ MessageHandler::MessageHandler() : running(true) {
continue;
}
if (data.at("type") == "call") {
handler(data.at("data"));
handler(message.topic, data.at("data"));
}
} else if (handler_->type == HandlerType::Result) {
// unpack result
Expand All @@ -99,28 +99,28 @@ MessageHandler::MessageHandler() : running(true) {
if (data.at("type") == "result") {
// only deliver result to handler with matching id
if (handler_->id == data.at("data").at("id")) {
handler(data.at("data"));
handler(message.topic, data.at("data"));
}
}
} else if (handler_->type == HandlerType::SubscribeVar) {
// unpack var
if (handler_->name != data.at("name")) {
continue;
}
handler(data.at("data"));
handler(message.topic, data.at("data"));
} else {
// external or unknown, no preprocessing
handler(data);
handler(message.topic, data);
}
}
}
});
}

void MessageHandler::add(std::shared_ptr<json> message) {
void MessageHandler::add(MessageDetails message) {
{
std::lock_guard<std::mutex> lock(this->handler_ctrl_mutex);
this->message_queue.push(message);
this->message_queue.emplace(message);
}
this->cv.notify_all();
}
Expand Down
15 changes: 3 additions & 12 deletions lib/mqtt_abstraction_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2020 - 2023 Pionix GmbH and Contributors to EVerest
#include "utils/message_queue.hpp"
#include <algorithm>
#include <chrono>
#include <cstdio>
Expand Down Expand Up @@ -275,10 +276,8 @@ void MQTTAbstractionImpl::on_mqtt_message(std::shared_ptr<Message> message) {

try {
std::shared_ptr<json> data;
bool is_everest_topic = false;
if (topic.find(mqtt_everest_prefix) == 0) {
EVLOG_verbose << fmt::format("topic {} starts with {}", topic, mqtt_everest_prefix);
is_everest_topic = true;
try {
data = std::make_shared<json>(json::parse(payload));
} catch (nlohmann::detail::parse_error& e) {
Expand All @@ -296,18 +295,10 @@ void MQTTAbstractionImpl::on_mqtt_message(std::shared_ptr<Message> message) {
std::unique_lock<std::mutex> lock(handlers_mutex);
std::vector<Handler> local_handlers;
for (auto& [handler_topic, handler] : this->message_handlers) {
bool topic_matches = false;
if (is_everest_topic) {
// everest topics never contain wildcards, so a direct comparison is enough
if (topic == handler_topic) {
topic_matches = true;
}
} else {
topic_matches = MQTTAbstractionImpl::check_topic_matches(topic, handler_topic);
}
bool topic_matches = MQTTAbstractionImpl::check_topic_matches(topic, handler_topic);
if (topic_matches) {
found = true;
handler.add(data);
handler.add({topic, data});
}
}
lock.unlock();
Expand Down
11 changes: 8 additions & 3 deletions lib/runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,14 @@ int ModuleLoader::initialize() {
module_adapter.ext_mqtt_publish =
std::bind(&Everest::Everest::external_mqtt_publish, &everest, std::placeholders::_1, std::placeholders::_2);

// NOLINTNEXTLINE(modernize-avoid-bind): prefer bind here for readability
module_adapter.ext_mqtt_subscribe = std::bind(&Everest::Everest::provide_external_mqtt_handler, &everest,
std::placeholders::_1, std::placeholders::_2);
module_adapter.ext_mqtt_subscribe = [&everest](const std::string& topic, const StringHandler& handler) {
return everest.provide_external_mqtt_handler(topic, handler);
};

module_adapter.ext_mqtt_subscribe_pair = [&everest](const std::string& topic,
const StringPairHandler& handler) {
return everest.provide_external_mqtt_handler(topic, handler);
};

module_adapter.telemetry_publish = [&everest](const std::string& category, const std::string& subcategory,
const std::string& type, const TelemetryMap& telemetry) {
Expand Down
2 changes: 1 addition & 1 deletion src/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ static std::map<pid_t, std::string> start_modules(Config& config, MQTTAbstractio

Handler module_ready_handler = [module_name, &mqtt_abstraction, standalone_modules,
mqtt_everest_prefix = rs->mqtt_everest_prefix,
&status_fifo](nlohmann::json json) {
&status_fifo](const std::string&, nlohmann::json json) {
EVLOG_debug << fmt::format("received module ready signal for module: {}({})", module_name, json.dump());
std::unique_lock<std::mutex> lock(modules_ready_mutex);
// FIXME (aw): here are race conditions, if the ready handler gets called while modules are shut down!
Expand Down

0 comments on commit 9b8dd4a

Please sign in to comment.