Skip to content

Commit

Permalink
update scan mecanism (partial/need some precision on the spec + platf…
Browse files Browse the repository at this point in the history
…orm fix)
  • Loading branch information
Rodriguez committed Oct 10, 2023
1 parent 0864374 commit f0fc5f0
Showing 1 changed file with 51 additions and 17 deletions.
68 changes: 51 additions & 17 deletions source/pza/core/client.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -176,69 +176,99 @@ void client::message_arrived(mqtt::const_message_ptr msg)
}
}

// ============================================================================
//
int client::scan_devices(void)
{
bool ret;
std::condition_variable cv;
std::unique_lock<std::mutex> lock(_mtx);

// Check client connection
if (is_connected() == false) {
spdlog::error("scan failed. Not connected to {}", _addr);
return -1;
}

// Reset result variables
_scan_device_count_expected = 0;
_scan_device_results.clear();

// Log
spdlog::debug("scanning for devices on {}...", _addr);

_subscribe("pza/+/+/device/atts/info", [&](const mqtt::const_message_ptr &msg) {
std::string base_topic = msg->get_topic().substr(0, msg->get_topic().find("/device/atts/info"));
spdlog::debug("received device info: {} {}", msg->get_topic(), msg->get_payload_str());
_scan_device_results.emplace(base_topic, msg->get_payload_str());
cv.notify_all();
});
// STEP 1 : Scan platforms

_subscribe("pza/server/+/+/atts/info", [&](const mqtt::const_message_ptr &msg) {
// Prepare platform scan message processing
_subscribe("pza/+/+/+/atts/info", [&](const mqtt::const_message_ptr &msg) {
// Prepare data
std::string payload = msg->get_payload_str();
std::string topic = msg->get_topic();
std::string type;
unsigned int val;

spdlog::debug("received platform info: {}", payload);

// Exclude other messages than platform
if (pza::json::get_string(payload, "info", "type", type) == -1) {
spdlog::error("failed to parse type info: {}", payload);
return;
}
if (type != "platform") return;

// @TODO HERE we should also check that we did not get the same platform twice (in the case 2 scan is performed the same time)

// Get the number of devices
if (pza::json::get_unsigned_int(payload, "info", "number_of_devices", val) == -1) {
spdlog::error("failed to parse platform info: {}", payload);
return;
}
_scan_device_count_expected += val;
cv.notify_all();

// @TODO Dirty hack because platform does not implement device interface for the "platform" YET
_scan_device_count_expected -= 1;
});

_publish("pza", "*");
// Request scan for platforms and just wait for answers
_publish("pza", "p");
std::this_thread::sleep_for(std::chrono::seconds(1));
_unsubscribe("pza/+/+/+/atts/info");

// STEP 2 : Scan devices

// Prepare device scan message processing
_subscribe("pza/+/+/device/atts/info", [&](const mqtt::const_message_ptr &msg) {
std::string base_topic = msg->get_topic().substr(0, msg->get_topic().find("/device/atts/info"));
spdlog::debug("received device info: {} {}", msg->get_topic(), msg->get_payload_str());
_scan_device_results.emplace(base_topic, msg->get_payload_str());
cv.notify_all();
});

// Request for device scan
_publish("pza", "d");
ret = cv.wait_for(lock, std::chrono::seconds(_scan_timeout), [&](void) {
return (_scan_device_count_expected && (_scan_device_count_expected == _scan_device_results.size()));
});

_unsubscribe("pza/+/+/device/atts/info");
_unsubscribe("pza/server/+/+/atts/info");

// Process timeout error
if (ret == false) {
spdlog::error("timed out waiting for scan results");
spdlog::debug("Expected {} devices, got {}", _scan_device_count_expected, _scan_device_results.size());
return -1;
}

// Process success logs
spdlog::debug("scan successful, found {} devices", _scan_device_results.size());

if (core::get_log_level() == core::log_level::trace) {
for (auto &it : _scan_device_results) {
spdlog::trace("device: {}", it.first);
}
}

return 0;
}

// ============================================================================
//
int client::_scan_interfaces(std::unique_lock<std::mutex> &lock, const device::ptr &device)
{
bool ret;
Expand All @@ -251,6 +281,9 @@ int client::_scan_interfaces(std::unique_lock<std::mutex> &lock, const device::p
return -1;
}

// @TODO Dirty hack because the platform count the "device" as an interface but the scan mecanism exclude it
_scan_itf_count_expected -= 1;

_scan_itf_results.clear();

_subscribe(itf_topic, [&](const mqtt::const_message_ptr &msg) {
Expand All @@ -261,8 +294,9 @@ int client::_scan_interfaces(std::unique_lock<std::mutex> &lock, const device::p
cv.notify_all();
});

_publish(device->_get_device_topic(), "*");

// Trigger the scan for the given device and wait for all interface info
auto device_short_topic = device->get_group() + "/" + device->get_name();
_publish("pza", device_short_topic);
ret = cv.wait_for(lock, std::chrono::seconds(_scan_timeout), [&](void) {
return (_scan_itf_count_expected && (_scan_itf_count_expected == _scan_itf_results.size()));
});
Expand All @@ -271,7 +305,7 @@ int client::_scan_interfaces(std::unique_lock<std::mutex> &lock, const device::p

if (ret == false) {
spdlog::error("timed out waiting for scan results");
spdlog::trace("_scan_itf_count_expected = {}, got = {}", _scan_itf_count_expected, _scan_itf_results.size());
spdlog::error("_scan_itf_count_expected = {}, got = {}", _scan_itf_count_expected, _scan_itf_results.size());
return -1;
}

Expand Down

0 comments on commit f0fc5f0

Please sign in to comment.