From 44144e38f57b99e0a12ebaafe4b9a126c25ec446 Mon Sep 17 00:00:00 2001 From: Antoine Gouby Date: Tue, 19 Sep 2023 01:12:31 +0200 Subject: [PATCH] WIP: working on BPS --- examples/bps.cxx | 2 +- source/CMakeLists.txt | 6 +- source/pza/core/client.cxx | 98 ++++++++++++++++++++++++++++-- source/pza/core/client.hxx | 16 ++++- source/pza/core/device.cxx | 2 +- source/pza/core/device.hxx | 4 +- source/pza/core/device_factory.cxx | 18 ++++++ source/pza/core/device_factory.hxx | 30 +++++++++ source/pza/utils/topic.cxx | 24 ++++++++ source/pza/utils/topic.hxx | 25 ++++++++ 10 files changed, 212 insertions(+), 13 deletions(-) create mode 100644 source/pza/core/device_factory.cxx create mode 100644 source/pza/core/device_factory.hxx create mode 100644 source/pza/utils/topic.cxx create mode 100644 source/pza/utils/topic.hxx diff --git a/examples/bps.cxx b/examples/bps.cxx index 355c9c8..0899fa4 100644 --- a/examples/bps.cxx +++ b/examples/bps.cxx @@ -4,7 +4,7 @@ int main(void) { - pza::core::set_log_level(pza::core::log_level::info); + pza::core::set_log_level(pza::core::log_level::trace); pza::client::ptr cli = std::make_shared("localhost", 1883); diff --git a/source/CMakeLists.txt b/source/CMakeLists.txt index af323ea..070c1d6 100644 --- a/source/CMakeLists.txt +++ b/source/CMakeLists.txt @@ -7,16 +7,18 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/pza/version.hxx.in target_sources(${LIBRARY_NAME} PRIVATE - + pza/core/core.cxx pza/core/client.cxx pza/core/device.cxx + pza/core/device_factory.cxx pza/core/interface.cxx pza/core/attribute.cxx pza/utils/json.cxx pza/utils/string.cxx - + pza/utils/topic.cxx + pza/devices/bps.cxx pza/interfaces/ampermeter.cxx diff --git a/source/pza/core/client.cxx b/source/pza/core/client.cxx index e379916..2e7da91 100644 --- a/source/pza/core/client.cxx +++ b/source/pza/core/client.cxx @@ -196,17 +196,17 @@ int client::scan_devices(void) _subscribe("pza/+/+/device/atts/info", [&](const mqtt::const_message_ptr &msg) { std::string base_topic = msg->get_topic().substr(0, msg->get_topic().find("/device/atts/info")); - spdlog::trace("received device info: {} {}", msg->get_topic(), msg->get_payload_str()); + spdlog::debug("received device info: {} {}", msg->get_topic(), msg->get_payload_str()); _scan_device_results.emplace(base_topic, msg->get_payload_str()); cv.notify_all(); }); - _subscribe("pza/+/platforms/+/atts/info", [&](const mqtt::const_message_ptr &msg) { + _subscribe("pza/server/+/+/atts/info", [&](const mqtt::const_message_ptr &msg) { std::string payload = msg->get_payload_str(); std::string topic = msg->get_topic(); unsigned int val; - spdlog::trace("received platform info: {}", payload); + spdlog::debug("received platform info: {}", payload); if (pza::json::get_unsigned_int(payload, "info", "number_of_devices", val) == -1) { spdlog::error("failed to parse platform info: {}", payload); return; @@ -222,11 +222,11 @@ int client::scan_devices(void) }); _unsubscribe("pza/+/+/device/atts/info"); - _unsubscribe("pza/+/platforms/+/atts/info"); + _unsubscribe("pza/server/+/+/atts/info"); if (ret == false) { spdlog::error("timed out waiting for scan results"); - spdlog::trace("_scan_device_count_expected = {}, got = {}", _scan_device_count_expected, _scan_device_results.size()); + spdlog::debug("_scan_device_count_expected = {}, got = {}", _scan_device_count_expected, _scan_device_results.size()); return -1; } @@ -300,6 +300,11 @@ int client::register_device(const device::ptr &device) return -1; } + if (_devices.find(device->_get_base_topic()) != _devices.end()) { + spdlog::warn("Device {} is already registered", device->_get_base_topic()); + return 0; + } + if (_scan_device_results.find(device->_get_base_topic()) == _scan_device_results.end()) { spdlog::error("Device {} was not scanned", device->_get_base_topic()); return -1; @@ -322,5 +327,86 @@ int client::register_device(const device::ptr &device) device->_cli = this; - return device->_register_interfaces(_scan_itf_results); + if (device->_register_interfaces(_scan_itf_results) == -1) + return -1; + + _devices.emplace(device->_get_base_topic(), device); + return 0; } + +device::ptr client::create_device(const std::string &topic_str) +{ + bool recv = false; + bool ret; + std::condition_variable cv; + std::unique_lock lock(_mtx); + mqtt::const_message_ptr identify_msg; + std::string family; + topic t(topic_str); + + if (t.is_valid() == false) { + spdlog::error("Invalid topic {}", topic_str); + return nullptr; + } + + _subscribe(topic_str + "/device/atts/identity", [&](const mqtt::const_message_ptr &msg) { + identify_msg = msg; + recv = true; + cv.notify_all(); + }); + + ret = cv.wait_for(lock, std::chrono::seconds(_scan_timeout), [&](void) { return (recv); }); + if (ret == false) { + spdlog::error("Device is not sane, that's very troubling"); + return nullptr; + } + + if (json::get_string(identify_msg->get_payload_str(), "identity", "family", family) == -1) { + spdlog::error("Failed to get family from device"); + return nullptr; + } + + device::ptr dev = device_factory::create_device(family, t.get_group(), t.get_device()); + + if (dev == nullptr) { + spdlog::error("Failed to create device"); + return nullptr; + } + + if (dev->_set_identity(identify_msg->get_payload_str()) == -1) { + spdlog::error("Failed to set identity"); + return nullptr; + } + + if (_scan_interfaces(lock, dev) == -1) + return nullptr; + + dev->_cli = this; + + if (dev->_register_interfaces(_scan_itf_results) == -1) + return nullptr; + + _devices.emplace(dev->_get_base_topic(), dev); + return dev; +} + + +int client::register_all_devices() +{ + int ret = 0; + + for (auto &it : _scan_device_results) { + if (create_device(it.first) == nullptr) + ret = -1; + } + return ret; +} + +device::ptr client::find_device(const std::string &group, const std::string &name) +{ + std::string base_topic = "pza/" + group + "/" + name; + + if (_devices.find(base_topic) == _devices.end()) + return nullptr; + return _devices[base_topic]; +} \ No newline at end of file diff --git a/source/pza/core/client.hxx b/source/pza/core/client.hxx index a41089a..2b3842f 100644 --- a/source/pza/core/client.hxx +++ b/source/pza/core/client.hxx @@ -10,9 +10,12 @@ #include #include +#include + #include -#include +#include #include +#include namespace pza { @@ -39,6 +42,13 @@ namespace pza int get_port(void) const { return _port; } int register_device(const device::ptr &device); + int register_all_devices(); + + device::ptr find_device(const std::string &group, const std::string &name); + + using device_map = std::map; + + const device_map &get_devices(void) const { return _devices; } private: using listener_map = std::map>; @@ -58,7 +68,8 @@ namespace pza std::map _scan_itf_results; unsigned int _scan_itf_count_expected = 0; - + + std::map _devices; void connection_lost(const std::string &cause) override; void message_arrived(mqtt::const_message_ptr msg) override; @@ -72,5 +83,6 @@ namespace pza bool _topic_matches(const std::string &str, const std::string &fnmatchPattern); void _count_devices_to_scan(const std::string &payload); int _scan_interfaces(std::unique_lock &lock, const device::ptr &device); + device::ptr create_device(const std::string &topic); }; }; diff --git a/source/pza/core/device.cxx b/source/pza/core/device.cxx index 8e2f0c0..cd8bc1c 100644 --- a/source/pza/core/device.cxx +++ b/source/pza/core/device.cxx @@ -3,7 +3,7 @@ using namespace pza; -device::device(const std::string group, const std::string name) +device::device(const std::string &group, const std::string &name) : _name(name), _group(group), _base_topic("pza/" + group + "/" + name), diff --git a/source/pza/core/device.hxx b/source/pza/core/device.hxx index a0056d3..24b4b16 100644 --- a/source/pza/core/device.hxx +++ b/source/pza/core/device.hxx @@ -28,6 +28,8 @@ namespace pza running }; + const std::string &get_name() { return _name; } + const std::string &get_group() { return _group; } const std::string &get_model() { return _model; } const std::string &get_manufacturer() { return _manufacturer; } client *get_client() { return _cli; } @@ -38,7 +40,7 @@ namespace pza void register_interface(interface &interface); protected: - device(const std::string group, const std::string name); + device(const std::string &group, const std::string &name); virtual int _register_interfaces(const std::map &map) = 0; int _set_identity(const std::string &payload); diff --git a/source/pza/core/device_factory.cxx b/source/pza/core/device_factory.cxx new file mode 100644 index 0000000..82b6776 --- /dev/null +++ b/source/pza/core/device_factory.cxx @@ -0,0 +1,18 @@ +#include "device_factory.hxx" + +using namespace pza; + +std::map device_factory::_factory_map = { + { "bps", device_factory::allocate_device } +}; + +device::ptr device_factory::create_device(const std::string &family, const std::string &group, const std::string &name) +{ + auto it = _factory_map.find(family); + if (it == _factory_map.end()) { + spdlog::error("Unknown device type {}", family); + return nullptr; + } + + return it->second(group, name); +} \ No newline at end of file diff --git a/source/pza/core/device_factory.hxx b/source/pza/core/device_factory.hxx new file mode 100644 index 0000000..2a036c9 --- /dev/null +++ b/source/pza/core/device_factory.hxx @@ -0,0 +1,30 @@ +#pragma once + +#include + +#include +#include + +namespace pza +{ + class device_factory + { + public: + device_factory() = delete; + ~device_factory() = delete; + device_factory(const device_factory &) = delete; + device_factory &operator=(const device_factory &) = delete; + + static device::ptr create_device(const std::string &family, const std::string &group, const std::string &name); + + template + static device::ptr allocate_device(const std::string &group, const std::string &name) + { + return std::make_shared(group, name); + } + + private: + using factory_function = std::function; + static std::map _factory_map; + }; +}; \ No newline at end of file diff --git a/source/pza/utils/topic.cxx b/source/pza/utils/topic.cxx new file mode 100644 index 0000000..a772753 --- /dev/null +++ b/source/pza/utils/topic.cxx @@ -0,0 +1,24 @@ +#include "topic.hxx" + +namespace pza +{ + topic::topic(const std::string &topic) + : _topic(topic), + _is_valid(false) + { + std::stringstream strs(topic); + std::string buf; + + _list.resize(3); + for (unsigned int i = 0; std::getline(strs, buf, '/') && i < 3; i++) { + _list[i] = buf; + } + if (_list[0].empty() || _list[1].empty() || _list[2].empty()) { + return ; + } + if (_list[0] != "pza") { + return ; + } + _is_valid = true; + } +}; \ No newline at end of file diff --git a/source/pza/utils/topic.hxx b/source/pza/utils/topic.hxx new file mode 100644 index 0000000..38429ff --- /dev/null +++ b/source/pza/utils/topic.hxx @@ -0,0 +1,25 @@ +#pragma once + +#include +#include +#include + +namespace pza +{ + class topic + { + public: + topic(const std::string &topic); + + bool is_valid() const { return _is_valid; } + + std::string get_topic() const { return _topic;} + std::string get_group() const { return _list[1]; } + std::string get_device() const { return _list[2]; } + + private: + std::string _topic; + bool _is_valid = false; + std::vector _list; + }; +}; \ No newline at end of file