From 6c6eef9db0cba8bc1497ff9a82df1d962616153c Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 4 Dec 2024 16:20:29 +0100 Subject: [PATCH 01/24] First working version of remote_logging --- diagnostic_remote_logging/CMakeLists.txt | 38 +++++++ .../influx_line_protocol.hpp | 103 ++++++++++++++++++ .../diagnostic_remote_logging/telegraf.hpp | 63 +++++++++++ diagnostic_remote_logging/package.xml | 20 ++++ diagnostic_remote_logging/src/telegraf.cpp | 89 +++++++++++++++ 5 files changed, 313 insertions(+) create mode 100644 diagnostic_remote_logging/CMakeLists.txt create mode 100644 diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp create mode 100644 diagnostic_remote_logging/include/diagnostic_remote_logging/telegraf.hpp create mode 100644 diagnostic_remote_logging/package.xml create mode 100644 diagnostic_remote_logging/src/telegraf.cpp diff --git a/diagnostic_remote_logging/CMakeLists.txt b/diagnostic_remote_logging/CMakeLists.txt new file mode 100644 index 00000000..6d03c4c8 --- /dev/null +++ b/diagnostic_remote_logging/CMakeLists.txt @@ -0,0 +1,38 @@ +cmake_minimum_required(VERSION 3.8) + +project(diagnostic_remote_logging) +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +set(dependencies + ament_cmake + rclcpp + diagnostic_msgs + CURL + ) + +foreach(dep ${dependencies}) + find_package(${dep} REQUIRED) +endforeach(dep) + +include_directories( + src/ + include/ +) + +add_executable(telegraf + src/telegraf.cpp + ) + +ament_target_dependencies(telegraf ${dependencies}) + +ament_export_dependencies(telegraf ${dependencies}) + +target_compile_features(telegraf PUBLIC c_std_99 cxx_std_17) + +install(TARGETS telegraf + DESTINATION lib/${PROJECT_NAME} + ) + +ament_package() diff --git a/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp new file mode 100644 index 00000000..0de98eef --- /dev/null +++ b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp @@ -0,0 +1,103 @@ +#pragma once + +#include "diagnostic_msgs/msg/diagnostic_array.hpp" +#include "rclcpp/rclcpp.hpp" + +std::string toInfluxTimestamp(const rclcpp::Time& time) +{ + double timestamp = time.seconds(); + // Extract the integer part (seconds) + uint64_t seconds = static_cast(timestamp); + // Extract the fractional part (nanoseconds) by subtracting the integer part and scaling + uint64_t nanoseconds = static_cast((timestamp - seconds) * 1e9); + + // Convert both parts to strings + std::string secStr = std::to_string(seconds); + std::string nanosecStr = std::to_string(nanoseconds); + + // Ensure the nanoseconds part is zero-padded to 9 digits + nanosecStr = std::string(9 - nanosecStr.length(), '0') + nanosecStr; + + // Concatenate and return the result + return secStr + nanosecStr; +} + +std::string escapeSpace(const std::string& input) +{ + std::string result; + for (char c : input) { + if (c == ' ') { + result += '\\'; // Add a backslash before the space + } + result += c; // Add the original character + } + return result; +} + +std::string formatValues(const std::vector& values) +{ + std::string formatted; + for (const auto& kv : values) { + formatted += escapeSpace(kv.key) + "=" + escapeSpace(kv.value) + ","; + } + if (!formatted.empty()) { + formatted.pop_back(); // Remove the last comma + } + return formatted; +} + +std::pair splitHardwareID(const std::string& input) +{ + size_t first_slash_pos = input.find('/'); + + // If no slash is found, treat the entire input as the node_name + if (first_slash_pos == std::string::npos) { + return {"\"\"", input}; + } + + size_t second_slash_pos = input.find('/', first_slash_pos + 1); + + // If the second slash is found, extract the "ns" and "node" parts + if (second_slash_pos != std::string::npos) { + std::string ns = input.substr(first_slash_pos + 1, second_slash_pos - first_slash_pos - 1); + std::string node = input.substr(second_slash_pos + 1); + return {ns, node}; + } + + // If no second slash is found, everything after the first slash is the node + std::string node = input.substr(first_slash_pos + 1); + return {"\"\"", node}; // ns is empty, node is the remaining string +} + +void statusToInfluxLineProtocol(std::string& output, const diagnostic_msgs::msg::DiagnosticStatus& status, const std::string& timestamp_str) +{ + // hardware_id is empty for analyzer groups, so skip them + if (status.hardware_id.empty()) { + return; + } + + auto [ns, identifier] = splitHardwareID(status.hardware_id); + output += identifier + ",ns=" + ns + " level=" + std::to_string(status.level) + ",message=\"" + status.message + "\""; + if (status.values.size()) { + output += "," + formatValues(status.values); + } + output += " " + timestamp_str + "\n"; +} + +std::string topLevelToInfluxLineProtocol(const diagnostic_msgs::msg::DiagnosticStatus::SharedPtr& msg, const rclcpp::Time& time) +{ + std::string output = msg->name + " level=" + std::to_string(msg->level) + " " + toInfluxTimestamp(time) + "\n"; + return output; +}; + +std::string arrayToInfluxLineProtocol(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr& diag_msg) +{ + std::string output; + std::string timestamp = toInfluxTimestamp(diag_msg->header.stamp); + + for (auto& status : diag_msg->status) { + statusToInfluxLineProtocol(output, status, timestamp); + } + + return output; +}; \ No newline at end of file diff --git a/diagnostic_remote_logging/include/diagnostic_remote_logging/telegraf.hpp b/diagnostic_remote_logging/include/diagnostic_remote_logging/telegraf.hpp new file mode 100644 index 00000000..8f8024a1 --- /dev/null +++ b/diagnostic_remote_logging/include/diagnostic_remote_logging/telegraf.hpp @@ -0,0 +1,63 @@ +/********************************************************************* + * Software License Agreement (BSD License) + * + * Copyright (c) 2009, Willow Garage, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * * Neither the name of the Willow Garage nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + *********************************************************************/ + +/** + * \author Daan Wijfels + */ + +#include "diagnostic_msgs/msg/diagnostic_array.hpp" +#include "diagnostic_remote_logging/influx_line_protocol.hpp" +#include "rclcpp/rclcpp.hpp" +#include + +class Telegraf : public rclcpp::Node +{ +public: + Telegraf(); + ~Telegraf(); + +private: + rclcpp::Subscription::SharedPtr diag_sub_; + rclcpp::Subscription::SharedPtr top_level_sub_; + + std::string telegraf_url_; + CURL* curl_; + + void setupConnection(const std::string& telegraf_url); + + void diagnosticsCallback(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg); + void topLevelCallback(const diagnostic_msgs::msg::DiagnosticStatus::SharedPtr msg); + + bool sendToTelegraf(const std::string& output); +}; diff --git a/diagnostic_remote_logging/package.xml b/diagnostic_remote_logging/package.xml new file mode 100644 index 00000000..605a77bb --- /dev/null +++ b/diagnostic_remote_logging/package.xml @@ -0,0 +1,20 @@ + + + + diagnostic_remote_logging + 0.0.1 + diagnostic_remote_logging + Daan Wijffels + >BSD-3-Clause + + ament_cmake + + diagnostic_msgs + + ament_lint_auto + ament_lint_common + + + ament_cmake + + diff --git a/diagnostic_remote_logging/src/telegraf.cpp b/diagnostic_remote_logging/src/telegraf.cpp new file mode 100644 index 00000000..984a4e41 --- /dev/null +++ b/diagnostic_remote_logging/src/telegraf.cpp @@ -0,0 +1,89 @@ +#include "diagnostic_remote_logging/telegraf.hpp" + +Telegraf::Telegraf() + : Node("telegraf") +{ + telegraf_url_ = this->declare_parameter("telegraf_url", "http://localhost:8186/telegraf"); + + if (declare_parameter("send_agg", true)) { + diag_sub_ = this->create_subscription("/diagnostics_agg", rclcpp::SensorDataQoS(), std::bind(&Telegraf::diagnosticsCallback, this, std::placeholders::_1)); + } + + if (declare_parameter("send_top_level_state", true)) { + top_level_sub_ = this->create_subscription("/diagnostics_toplevel_state", rclcpp::SensorDataQoS(), std::bind(&Telegraf::topLevelCallback, this, std::placeholders::_1)); + } + + setupConnection(telegraf_url_); +} + +void Telegraf::diagnosticsCallback(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg) +{ + std::string output = arrayToInfluxLineProtocol(msg); + + if (!sendToTelegraf(output)) { + RCLCPP_ERROR(this->get_logger(), "Failed to send /diagnostics_agg to telegraf"); + } + + // RCLCPP_INFO(this->get_logger(), "%s", output.c_str()); +} + +void Telegraf::topLevelCallback(const diagnostic_msgs::msg::DiagnosticStatus::SharedPtr msg) +{ + std::string output = topLevelToInfluxLineProtocol(msg, this->get_clock()->now()); + + if (!sendToTelegraf(output)) { + RCLCPP_ERROR(this->get_logger(), "Failed to send /diagnostics_toplevel_state to telegraf"); + } + + // RCLCPP_INFO(this->get_logger(), "%s", output.c_str()); +} + +void Telegraf::setupConnection(const std::string& url) +{ + curl_global_init(CURL_GLOBAL_ALL); + curl_ = curl_easy_init(); + if (!curl_) { + throw std::runtime_error("Failed to initialize curl"); + } + + curl_easy_setopt(curl_, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl_, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); // Use HTTP/1.1 for keep-alive + curl_easy_setopt(curl_, CURLOPT_CONNECTTIMEOUT, 10L); // Set timeout as needed + curl_easy_setopt(curl_, CURLOPT_TCP_KEEPALIVE, 1L); // Enable TCP keep-alive +} + +bool Telegraf::sendToTelegraf(const std::string& data) +{ + if (!curl_) { + RCLCPP_ERROR(this->get_logger(), "cURL not initialized."); + return false; + } + + curl_easy_setopt(curl_, CURLOPT_POSTFIELDS, data.c_str()); + + // Perform the request + CURLcode res = curl_easy_perform(curl_); + + // Check for errors + if (res != CURLE_OK) { + RCLCPP_ERROR(this->get_logger(), "cURL error: %s", curl_easy_strerror(res)); + return false; + } + return true; +} + +Telegraf::~Telegraf() +{ + if (curl_) { + curl_easy_cleanup(curl_); + } + curl_global_cleanup(); +} + +int main(int argc, char* argv[]) +{ + rclcpp::init(argc, argv); + rclcpp::spin(std::make_shared()); + rclcpp::shutdown(); + return 0; +} \ No newline at end of file From 390bb8c7b84b5ef70a6cd30c99eebc2d7bae95f9 Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 18 Dec 2024 09:24:36 +0100 Subject: [PATCH 02/24] Added more error handling, and skipping values when new line is present in stat --- .../influx_line_protocol.hpp | 13 ++++++++++--- diagnostic_remote_logging/src/telegraf.cpp | 8 ++++++++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp index 0de98eef..b256d1f8 100644 --- a/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp +++ b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp @@ -38,6 +38,12 @@ std::string formatValues(const std::vector& valu { std::string formatted; for (const auto& kv : values) { + if (kv.value.find("\n") != std::string::npos) { + // If the value contains a newline, skip it + // Telegraf uses this to separate measurements + continue; + } + formatted += escapeSpace(kv.key) + "=" + escapeSpace(kv.value) + ","; } if (!formatted.empty()) { @@ -52,7 +58,7 @@ std::pair splitHardwareID(const std::string& input) // If no slash is found, treat the entire input as the node_name if (first_slash_pos == std::string::npos) { - return {"\"\"", input}; + return {"none", input}; } size_t second_slash_pos = input.find('/', first_slash_pos + 1); @@ -78,8 +84,9 @@ void statusToInfluxLineProtocol(std::string& output, const diagnostic_msgs::msg: auto [ns, identifier] = splitHardwareID(status.hardware_id); output += identifier + ",ns=" + ns + " level=" + std::to_string(status.level) + ",message=\"" + status.message + "\""; - if (status.values.size()) { - output += "," + formatValues(status.values); + auto formatted_key_values = formatValues(status.values); + if (!formatted_key_values.empty()) { + output += "," + formatted_key_values; } output += " " + timestamp_str + "\n"; } diff --git a/diagnostic_remote_logging/src/telegraf.cpp b/diagnostic_remote_logging/src/telegraf.cpp index 984a4e41..40e436e3 100644 --- a/diagnostic_remote_logging/src/telegraf.cpp +++ b/diagnostic_remote_logging/src/telegraf.cpp @@ -69,6 +69,14 @@ bool Telegraf::sendToTelegraf(const std::string& data) RCLCPP_ERROR(this->get_logger(), "cURL error: %s", curl_easy_strerror(res)); return false; } + long response_code; + curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &response_code); + + if (response_code != 204) { + RCLCPP_ERROR(this->get_logger(), "Error (%ld) when sending to telegraf:\n%s", response_code, data.c_str()); + return false; + } + return true; } From 53b5297694e296e6d90362b43646ce5059a7b181 Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 18 Dec 2024 09:47:07 +0100 Subject: [PATCH 03/24] Changed default telegraf url to reflect the change to influxdb_v2_listener --- diagnostic_remote_logging/src/telegraf.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/diagnostic_remote_logging/src/telegraf.cpp b/diagnostic_remote_logging/src/telegraf.cpp index 40e436e3..695f90c6 100644 --- a/diagnostic_remote_logging/src/telegraf.cpp +++ b/diagnostic_remote_logging/src/telegraf.cpp @@ -3,7 +3,7 @@ Telegraf::Telegraf() : Node("telegraf") { - telegraf_url_ = this->declare_parameter("telegraf_url", "http://localhost:8186/telegraf"); + telegraf_url_ = this->declare_parameter("telegraf_url", "http://localhost:8086/api/v2/write"); if (declare_parameter("send_agg", true)) { diag_sub_ = this->create_subscription("/diagnostics_agg", rclcpp::SensorDataQoS(), std::bind(&Telegraf::diagnosticsCallback, this, std::placeholders::_1)); From a43c0c0be47efbff7431844dd11773642409cf80 Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 18 Dec 2024 09:53:18 +0100 Subject: [PATCH 04/24] Changed default telegraf url to reflect the change to influxdb_v2_listener --- .../include/diagnostic_remote_logging/influx_line_protocol.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp index b256d1f8..04f44361 100644 --- a/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp +++ b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp @@ -72,7 +72,7 @@ std::pair splitHardwareID(const std::string& input) // If no second slash is found, everything after the first slash is the node std::string node = input.substr(first_slash_pos + 1); - return {"\"\"", node}; // ns is empty, node is the remaining string + return {"none", node}; // ns is empty, node is the remaining string } void statusToInfluxLineProtocol(std::string& output, const diagnostic_msgs::msg::DiagnosticStatus& status, const std::string& timestamp_str) From 003e8a5d1cec9a2a979be272997ecf4a7d14e15d Mon Sep 17 00:00:00 2001 From: Thiever Base Date: Wed, 18 Dec 2024 15:15:11 +0100 Subject: [PATCH 05/24] Made node composable and changed name to influx to better reflect use cases --- diagnostic_remote_logging/CMakeLists.txt | 27 ++-- .../influx_line_protocol.hpp | 41 +++++- .../{telegraf.hpp => influxdb.hpp} | 11 +- diagnostic_remote_logging/src/influxdb.cpp | 139 ++++++++++++++++++ diagnostic_remote_logging/src/telegraf.cpp | 97 ------------ 5 files changed, 203 insertions(+), 112 deletions(-) rename diagnostic_remote_logging/include/diagnostic_remote_logging/{telegraf.hpp => influxdb.hpp} (92%) create mode 100644 diagnostic_remote_logging/src/influxdb.cpp delete mode 100644 diagnostic_remote_logging/src/telegraf.cpp diff --git a/diagnostic_remote_logging/CMakeLists.txt b/diagnostic_remote_logging/CMakeLists.txt index 6d03c4c8..a36c0a1d 100644 --- a/diagnostic_remote_logging/CMakeLists.txt +++ b/diagnostic_remote_logging/CMakeLists.txt @@ -8,6 +8,7 @@ endif() set(dependencies ament_cmake rclcpp + rclcpp_components diagnostic_msgs CURL ) @@ -20,19 +21,27 @@ include_directories( src/ include/ ) - -add_executable(telegraf - src/telegraf.cpp +add_library(influx_component SHARED + src/influxdb.cpp ) -ament_target_dependencies(telegraf ${dependencies}) +ament_target_dependencies(influx_component ${dependencies}) -ament_export_dependencies(telegraf ${dependencies}) +ament_export_dependencies(influx_component ${dependencies}) -target_compile_features(telegraf PUBLIC c_std_99 cxx_std_17) +target_compile_features(influx_component PUBLIC c_std_99 cxx_std_17) -install(TARGETS telegraf - DESTINATION lib/${PROJECT_NAME} - ) +rclcpp_components_register_node( + influx_component + PLUGIN "InfluxDB" + EXECUTABLE influx +) +ament_export_targets(export_influx_component) +install(TARGETS influx_component + EXPORT export_influx_component + ARCHIVE DESTINATION lib + LIBRARY DESTINATION lib + RUNTIME DESTINATION bin +) ament_package() diff --git a/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp index 04f44361..ff745feb 100644 --- a/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp +++ b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp @@ -1,3 +1,42 @@ +/********************************************************************* + * Software License Agreement (BSD License) + * + * Copyright (c) 2009, Willow Garage, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * * Neither the name of the Willow Garage nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + *********************************************************************/ + +/** + * \author Daan Wijfels + */ + + #pragma once #include "diagnostic_msgs/msg/diagnostic_array.hpp" @@ -40,7 +79,7 @@ std::string formatValues(const std::vector& valu for (const auto& kv : values) { if (kv.value.find("\n") != std::string::npos) { // If the value contains a newline, skip it - // Telegraf uses this to separate measurements + // InfluxDB uses this to separate measurements continue; } diff --git a/diagnostic_remote_logging/include/diagnostic_remote_logging/telegraf.hpp b/diagnostic_remote_logging/include/diagnostic_remote_logging/influxdb.hpp similarity index 92% rename from diagnostic_remote_logging/include/diagnostic_remote_logging/telegraf.hpp rename to diagnostic_remote_logging/include/diagnostic_remote_logging/influxdb.hpp index 8f8024a1..7acc9ff9 100644 --- a/diagnostic_remote_logging/include/diagnostic_remote_logging/telegraf.hpp +++ b/diagnostic_remote_logging/include/diagnostic_remote_logging/influxdb.hpp @@ -41,17 +41,18 @@ #include "rclcpp/rclcpp.hpp" #include -class Telegraf : public rclcpp::Node +class InfluxDB : public rclcpp::Node { public: - Telegraf(); - ~Telegraf(); + explicit InfluxDB(const rclcpp::NodeOptions& opt); + ~InfluxDB(); private: rclcpp::Subscription::SharedPtr diag_sub_; rclcpp::Subscription::SharedPtr top_level_sub_; - std::string telegraf_url_; + + std::string post_url_, influx_token_; CURL* curl_; void setupConnection(const std::string& telegraf_url); @@ -59,5 +60,5 @@ class Telegraf : public rclcpp::Node void diagnosticsCallback(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg); void topLevelCallback(const diagnostic_msgs::msg::DiagnosticStatus::SharedPtr msg); - bool sendToTelegraf(const std::string& output); + bool sendToInfluxDB(const std::string& data); }; diff --git a/diagnostic_remote_logging/src/influxdb.cpp b/diagnostic_remote_logging/src/influxdb.cpp new file mode 100644 index 00000000..9017171c --- /dev/null +++ b/diagnostic_remote_logging/src/influxdb.cpp @@ -0,0 +1,139 @@ +#include "diagnostic_remote_logging/influxdb.hpp" + +InfluxDB::InfluxDB(const rclcpp::NodeOptions &opt) + : Node("influxdb", opt) +{ + post_url_ = this->declare_parameter("connection.url", "http://localhost:8086/api/v2/write"); + + if (post_url_.empty()) + { + throw std::runtime_error("Parameter connection.url must be set"); + } + + std::string organization = declare_parameter("connection.organization", ""); + std::string bucket = declare_parameter("connection.bucket", ""); + influx_token_ = declare_parameter("connection.token", ""); + + // Check if any of the parameters is set + if (!organization.empty() || !bucket.empty() || !influx_token_.empty()) + { + // Ensure all parameters are set + if (organization.empty() || bucket.empty() || influx_token_.empty()) + { + throw std::runtime_error("All parameters (connection.organization, connection.bucket, connection.token) must be set, or when using a proxy like Telegraf none have to be set."); + } + + // Construct the Telegraf URL + post_url_ += "?org=" + organization; + post_url_ += "&bucket=" + bucket; + } + + setupConnection(post_url_); + + if (declare_parameter("send.agg", true)) + { + diag_sub_ = this->create_subscription("/diagnostics_agg", rclcpp::SensorDataQoS(), std::bind(&InfluxDB::diagnosticsCallback, this, std::placeholders::_1)); + } + + if (declare_parameter("send.top_level_state", true)) + { + top_level_sub_ = this->create_subscription("/diagnostics_toplevel_state", rclcpp::SensorDataQoS(), std::bind(&InfluxDB::topLevelCallback, this, std::placeholders::_1)); + } +} + +void InfluxDB::diagnosticsCallback(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg) +{ + std::string output = arrayToInfluxLineProtocol(msg); + + if (!sendToInfluxDB(output)) + { + RCLCPP_ERROR(this->get_logger(), "Failed to send /diagnostics_agg to telegraf"); + } + + // RCLCPP_INFO(this->get_logger(), "%s", output.c_str()); +} + +void InfluxDB::topLevelCallback(const diagnostic_msgs::msg::DiagnosticStatus::SharedPtr msg) +{ + std::string output = topLevelToInfluxLineProtocol(msg, this->get_clock()->now()); + + if (!sendToInfluxDB(output)) + { + RCLCPP_ERROR(this->get_logger(), "Failed to send /diagnostics_toplevel_state to telegraf"); + } + + // RCLCPP_INFO(this->get_logger(), "%s", output.c_str()); +} + +void InfluxDB::setupConnection(const std::string &url) +{ + curl_global_init(CURL_GLOBAL_ALL); + curl_ = curl_easy_init(); + if (!curl_) + { + throw std::runtime_error("Failed to initialize curl"); + } + + struct curl_slist *headers = nullptr; + + if (!influx_token_.empty()){ + headers = curl_slist_append(headers, ("Authorization: Token " + influx_token_).c_str()); + } + + headers = curl_slist_append(headers, "Content-Type: text/plain; charset=utf-8"); + headers = curl_slist_append(headers, "Accept: application/json"); + + curl_easy_setopt(curl_, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl_, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); // Use HTTP/1.1 for keep-alive + curl_easy_setopt(curl_, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(curl_, CURLOPT_CONNECTTIMEOUT, 10L); // Set timeout as needed + curl_easy_setopt(curl_, CURLOPT_TCP_KEEPALIVE, 1L); // Enable TCP keep-alive + curl_easy_setopt(curl_, CURLOPT_POST, 1L); +} + +bool InfluxDB::sendToInfluxDB(const std::string &data) +{ + if (!curl_) + { + RCLCPP_ERROR(this->get_logger(), "cURL not initialized."); + return false; + } + + curl_easy_setopt(curl_, CURLOPT_POSTFIELDS, data.c_str()); + + // Perform the request + CURLcode res = curl_easy_perform(curl_); + + // Check for errors + if (res != CURLE_OK) + { + RCLCPP_ERROR(this->get_logger(), "cURL error: %s", curl_easy_strerror(res)); + return false; + } + long response_code; + curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &response_code); + + if (response_code != 204) + { + RCLCPP_ERROR(this->get_logger(), "Error (%ld) when sending to telegraf:\n%s", response_code, data.c_str()); + return false; + } + + return true; +} + +InfluxDB::~InfluxDB() +{ + if (curl_) + { + curl_easy_cleanup(curl_); + } + curl_global_cleanup(); +} + +#include "rclcpp_components/register_node_macro.hpp" + +// Register the component with class_loader. +// This acts as a sort of entry point, allowing the component to be discoverable when its library +// is being loaded into a running process. +RCLCPP_COMPONENTS_REGISTER_NODE(InfluxDB) \ No newline at end of file diff --git a/diagnostic_remote_logging/src/telegraf.cpp b/diagnostic_remote_logging/src/telegraf.cpp deleted file mode 100644 index 695f90c6..00000000 --- a/diagnostic_remote_logging/src/telegraf.cpp +++ /dev/null @@ -1,97 +0,0 @@ -#include "diagnostic_remote_logging/telegraf.hpp" - -Telegraf::Telegraf() - : Node("telegraf") -{ - telegraf_url_ = this->declare_parameter("telegraf_url", "http://localhost:8086/api/v2/write"); - - if (declare_parameter("send_agg", true)) { - diag_sub_ = this->create_subscription("/diagnostics_agg", rclcpp::SensorDataQoS(), std::bind(&Telegraf::diagnosticsCallback, this, std::placeholders::_1)); - } - - if (declare_parameter("send_top_level_state", true)) { - top_level_sub_ = this->create_subscription("/diagnostics_toplevel_state", rclcpp::SensorDataQoS(), std::bind(&Telegraf::topLevelCallback, this, std::placeholders::_1)); - } - - setupConnection(telegraf_url_); -} - -void Telegraf::diagnosticsCallback(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg) -{ - std::string output = arrayToInfluxLineProtocol(msg); - - if (!sendToTelegraf(output)) { - RCLCPP_ERROR(this->get_logger(), "Failed to send /diagnostics_agg to telegraf"); - } - - // RCLCPP_INFO(this->get_logger(), "%s", output.c_str()); -} - -void Telegraf::topLevelCallback(const diagnostic_msgs::msg::DiagnosticStatus::SharedPtr msg) -{ - std::string output = topLevelToInfluxLineProtocol(msg, this->get_clock()->now()); - - if (!sendToTelegraf(output)) { - RCLCPP_ERROR(this->get_logger(), "Failed to send /diagnostics_toplevel_state to telegraf"); - } - - // RCLCPP_INFO(this->get_logger(), "%s", output.c_str()); -} - -void Telegraf::setupConnection(const std::string& url) -{ - curl_global_init(CURL_GLOBAL_ALL); - curl_ = curl_easy_init(); - if (!curl_) { - throw std::runtime_error("Failed to initialize curl"); - } - - curl_easy_setopt(curl_, CURLOPT_URL, url.c_str()); - curl_easy_setopt(curl_, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); // Use HTTP/1.1 for keep-alive - curl_easy_setopt(curl_, CURLOPT_CONNECTTIMEOUT, 10L); // Set timeout as needed - curl_easy_setopt(curl_, CURLOPT_TCP_KEEPALIVE, 1L); // Enable TCP keep-alive -} - -bool Telegraf::sendToTelegraf(const std::string& data) -{ - if (!curl_) { - RCLCPP_ERROR(this->get_logger(), "cURL not initialized."); - return false; - } - - curl_easy_setopt(curl_, CURLOPT_POSTFIELDS, data.c_str()); - - // Perform the request - CURLcode res = curl_easy_perform(curl_); - - // Check for errors - if (res != CURLE_OK) { - RCLCPP_ERROR(this->get_logger(), "cURL error: %s", curl_easy_strerror(res)); - return false; - } - long response_code; - curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &response_code); - - if (response_code != 204) { - RCLCPP_ERROR(this->get_logger(), "Error (%ld) when sending to telegraf:\n%s", response_code, data.c_str()); - return false; - } - - return true; -} - -Telegraf::~Telegraf() -{ - if (curl_) { - curl_easy_cleanup(curl_); - } - curl_global_cleanup(); -} - -int main(int argc, char* argv[]) -{ - rclcpp::init(argc, argv); - rclcpp::spin(std::make_shared()); - rclcpp::shutdown(); - return 0; -} \ No newline at end of file From 0d526eebe1428aaf8d95705bc754caefd956da91 Mon Sep 17 00:00:00 2001 From: Thiever Base Date: Wed, 18 Dec 2024 15:15:16 +0100 Subject: [PATCH 06/24] Added README --- diagnostic_remote_logging/README.md | 79 +++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 diagnostic_remote_logging/README.md diff --git a/diagnostic_remote_logging/README.md b/diagnostic_remote_logging/README.md new file mode 100644 index 00000000..7997a29a --- /dev/null +++ b/diagnostic_remote_logging/README.md @@ -0,0 +1,79 @@ +General information about this repository, including legal information and known issues/limitations, are given in [README.md](../README.md) in the repository root. + +# The diagnostic_remote_logging package + +This package provides the `influx` node, which listens to diagnostic messages and integrates with InfluxDB v2 for monitoring and visualization. Specifically, it subscribes to the [`diagnostic_msgs/DiagnosticArray`](https://index.ros.org/p/diagnostic_msgs) messages on the `/diagnostics_agg` topic and the [`diagnostic_msgs/DiagnosticStatus`](https://index.ros.org/p/diagnostic_msgs) messages on the `/diagnostics_toplevel_state` topic. The node processes these messages, publishing their statistics and levels to an [`InfluxDB`](http://influxdb.com) database, enabling use with tools like [`Grafana`](https://grafana.com). + +As of now we only support InfluxDB v2, for support with older versions please use a proxy like [`Telegraf`](https://www.influxdata.com/time-series-platform/telegraf/). See section [Telegraf](#telegraf-proxy) for an example on how to setup. + +## Node Configuration + +You can send data to [`InfluxDB`](http://influxdb.com) in two ways: directly to the database or via a proxy like [`Telegraf`](https://www.influxdata.com/time-series-platform/telegraf/). While both methods are valid, using a proxy is generally recommended due to the following benefits: + +- **Efficient Data Transmission**: Telegraf aggregates multiple measurements and sends them in a single request, reducing bandwidth usage and minimizing database load. +- **Enhanced Reliability**: Provides buffering in case of connection issues, ensuring no data is lost. +- **Comprehensive Metric Collection**: Telegraf can send additional system metrics (e.g., RAM, CPU, network usage) with minimal configuration. +- **Data Filtering and Transformation**: Supports preprocessing, such as filtering or transforming data, before sending it to InfluxDB. + +To use either method, ensure you have a running instance of InfluxDB. The simplest way to set this up is through [`InfluxDB Cloud`](https://cloud2.influxdata.com/signup). + +### Parameters + +The `influx` node supports several parameters. Below is an example configuration: + +```yaml +/influx: + ros__parameters: + connection: + url: http://localhost:8086/api/v2/write + token: + bucket: + organization: + send: + agg: true + top_level_state: true +``` + +- `send.agg`: Enables or disables subscription to the `/diagnostics_agg` topic. +- `send.top_level_state`: Enables or disables subscription to the `/diagnostics_toplevel_state` topic. + +## InfluxDB Configuration + +Set the following parameters in your configuration to match your InfluxDB instance: + +- `connection.url`: The URL of your InfluxDB write API endpoint. +- `connection.token`: Your InfluxDB authentication token. +- `connection.bucket`: The target bucket in InfluxDB. +- `connection.organization`: The name of your InfluxDB organization. + +Afterwards run the node with the following command: + +```bash +ros2 run diagnostic_remote_logging influx --ros-args --params-file +``` + +## Using a Telegraf Proxy + +To configure Telegraf as a proxy for InfluxDB: + +1. Ensure Telegraf is set up to send data to your InfluxDB instance via its configuration file (`/etc/telegraf/telegraf.conf`). Check [this link](https://docs.influxdata.com/influxdb/cloud/write-data/no-code/use-telegraf/manual-config/) for an example. +2. Add the following to the telegraf configuration file to enable the InfluxDB v2 listener: + +```toml +[[inputs.influxdb_v2_listener]] + service_address = ":8086" +``` + +3. Update the `influx` node configuration to point to the appropriate URL. For example, if Telegraf is running on the same host as the `influx` node, the default `http://localhost:8086/api/v2/write` should work. + +Leave the following parameters empty in the `influx` node configuration when using Telegraf as a proxy: + +- `connection.token` +- `connection.bucket` +- `connection.organization` + +Afterwards run the node with the following command: + +```bash +ros2 run diagnostic_remote_logging influx --ros-args --params-file +``` From 2491badf1cdd24d4d6098d1e64734dae9924dfb4 Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Mon, 6 Jan 2025 11:08:58 +0100 Subject: [PATCH 07/24] Escaping string values fixed --- .../influx_line_protocol.hpp | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp index ff745feb..a09c3e77 100644 --- a/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp +++ b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp @@ -73,6 +73,13 @@ std::string escapeSpace(const std::string& input) return result; } +bool is_number(const std::string& s) { + std::istringstream iss(s); + double d; + return iss >> std::noskipws >> d && iss.eof(); +} + + std::string formatValues(const std::vector& values) { std::string formatted; @@ -83,7 +90,16 @@ std::string formatValues(const std::vector& valu continue; } - formatted += escapeSpace(kv.key) + "=" + escapeSpace(kv.value) + ","; + formatted += escapeSpace(kv.key) + "="; + + if (is_number(kv.value)) { + formatted += kv.value; + } else { + formatted += "\"" + kv.value + "\""; + } + formatted += ","; + + } if (!formatted.empty()) { formatted.pop_back(); // Remove the last comma @@ -122,7 +138,7 @@ void statusToInfluxLineProtocol(std::string& output, const diagnostic_msgs::msg: } auto [ns, identifier] = splitHardwareID(status.hardware_id); - output += identifier + ",ns=" + ns + " level=" + std::to_string(status.level) + ",message=\"" + status.message + "\""; + output += escapeSpace(identifier) + ",ns=" + escapeSpace(ns) + " level=" + std::to_string(status.level) + ",message=\"" + status.message + "\""; auto formatted_key_values = formatValues(status.values); if (!formatted_key_values.empty()) { output += "," + formatted_key_values; @@ -146,4 +162,5 @@ std::string arrayToInfluxLineProtocol(const diagnostic_msgs::msg::DiagnosticArra } return output; -}; \ No newline at end of file +}; + From b97c4de010a93642800c678d0cf3c90feae2b3ee Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 8 Jan 2025 13:38:32 +0100 Subject: [PATCH 08/24] Build fix on farm --- diagnostic_remote_logging/package.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/diagnostic_remote_logging/package.xml b/diagnostic_remote_logging/package.xml index 605a77bb..fa63a567 100644 --- a/diagnostic_remote_logging/package.xml +++ b/diagnostic_remote_logging/package.xml @@ -10,6 +10,7 @@ ament_cmake diagnostic_msgs + rclcpp_components ament_lint_auto ament_lint_common From 3231f9165197c9d3f9e72b671de427cded7793e6 Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 8 Jan 2025 13:41:50 +0100 Subject: [PATCH 09/24] Adde curl dependency --- diagnostic_remote_logging/package.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/diagnostic_remote_logging/package.xml b/diagnostic_remote_logging/package.xml index fa63a567..97a446f8 100644 --- a/diagnostic_remote_logging/package.xml +++ b/diagnostic_remote_logging/package.xml @@ -11,6 +11,7 @@ diagnostic_msgs rclcpp_components + cURL ament_lint_auto ament_lint_common From 6eb4c49cf548e5a87cfe89d5bd72850c87008f9a Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 8 Jan 2025 13:44:29 +0100 Subject: [PATCH 10/24] Removed Curl Dependency as build does not work on farm --- diagnostic_remote_logging/package.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/diagnostic_remote_logging/package.xml b/diagnostic_remote_logging/package.xml index 97a446f8..fa63a567 100644 --- a/diagnostic_remote_logging/package.xml +++ b/diagnostic_remote_logging/package.xml @@ -11,7 +11,6 @@ diagnostic_msgs rclcpp_components - cURL ament_lint_auto ament_lint_common From 5e15745d75afcc17bd912d3cee8f8b4fb033e1f6 Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 8 Jan 2025 13:54:17 +0100 Subject: [PATCH 11/24] Curl Dependency --- diagnostic_remote_logging/package.xml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/diagnostic_remote_logging/package.xml b/diagnostic_remote_logging/package.xml index fa63a567..75743715 100644 --- a/diagnostic_remote_logging/package.xml +++ b/diagnostic_remote_logging/package.xml @@ -11,10 +11,13 @@ diagnostic_msgs rclcpp_components - + libcurl4-openssl-dev + ament_lint_auto ament_lint_common + libcurl4-openssl-dev + ament_cmake From 61c69ebc9e586627c8fb71881c61960b3c928709 Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 8 Jan 2025 14:02:01 +0100 Subject: [PATCH 12/24] Update diagnostic_remote_logging/package.xml Co-authored-by: Christian Henkel <6976069+ct2034@users.noreply.github.com> --- diagnostic_remote_logging/package.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/diagnostic_remote_logging/package.xml b/diagnostic_remote_logging/package.xml index 75743715..5f1c3dce 100644 --- a/diagnostic_remote_logging/package.xml +++ b/diagnostic_remote_logging/package.xml @@ -11,7 +11,7 @@ diagnostic_msgs rclcpp_components - libcurl4-openssl-dev + curl ament_lint_auto ament_lint_common From 3c26580daa7b62f2a96fbf5217be0609adc3345c Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 8 Jan 2025 14:02:07 +0100 Subject: [PATCH 13/24] Update diagnostic_remote_logging/package.xml Co-authored-by: Christian Henkel <6976069+ct2034@users.noreply.github.com> --- diagnostic_remote_logging/package.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/diagnostic_remote_logging/package.xml b/diagnostic_remote_logging/package.xml index 5f1c3dce..981c4dcc 100644 --- a/diagnostic_remote_logging/package.xml +++ b/diagnostic_remote_logging/package.xml @@ -16,7 +16,6 @@ ament_lint_auto ament_lint_common - libcurl4-openssl-dev ament_cmake From 0ece0a258fa84066e14eb722ebd28c506740e0ab Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Thu, 9 Jan 2025 14:00:45 +0000 Subject: [PATCH 14/24] Added unit tests for influx_line_protocol --- diagnostic_remote_logging/CMakeLists.txt | 9 ++ .../influx_line_protocol.hpp | 26 ++--- diagnostic_remote_logging/package.xml | 1 + .../test/influx_line_protocol.cpp | 107 ++++++++++++++++++ 4 files changed, 126 insertions(+), 17 deletions(-) create mode 100644 diagnostic_remote_logging/test/influx_line_protocol.cpp diff --git a/diagnostic_remote_logging/CMakeLists.txt b/diagnostic_remote_logging/CMakeLists.txt index a36c0a1d..f646b4e8 100644 --- a/diagnostic_remote_logging/CMakeLists.txt +++ b/diagnostic_remote_logging/CMakeLists.txt @@ -10,6 +10,7 @@ set(dependencies rclcpp rclcpp_components diagnostic_msgs + ament_cmake_gtest CURL ) @@ -44,4 +45,12 @@ install(TARGETS influx_component RUNTIME DESTINATION bin ) +if(BUILD_TESTING) + ament_add_gtest(unit_tests test/influx_line_protocol.cpp) + target_include_directories(unit_tests PRIVATE include) + target_link_libraries(unit_tests influx_component) + ament_target_dependencies(unit_tests ${dependencies}) +endif() + + ament_package() diff --git a/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp index a09c3e77..d7d5aa22 100644 --- a/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp +++ b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp @@ -36,7 +36,6 @@ * \author Daan Wijfels */ - #pragma once #include "diagnostic_msgs/msg/diagnostic_array.hpp" @@ -44,20 +43,16 @@ std::string toInfluxTimestamp(const rclcpp::Time& time) { - double timestamp = time.seconds(); - // Extract the integer part (seconds) - uint64_t seconds = static_cast(timestamp); - // Extract the fractional part (nanoseconds) by subtracting the integer part and scaling - uint64_t nanoseconds = static_cast((timestamp - seconds) * 1e9); + uint64_t seconds = static_cast(time.seconds()); + uint64_t nanoseconds = static_cast(time.nanoseconds()) % 1000000000; - // Convert both parts to strings + // Convert to strings std::string secStr = std::to_string(seconds); std::string nanosecStr = std::to_string(nanoseconds); - // Ensure the nanoseconds part is zero-padded to 9 digits + // Zero-pad nanoseconds to 9 digits nanosecStr = std::string(9 - nanosecStr.length(), '0') + nanosecStr; - // Concatenate and return the result return secStr + nanosecStr; } @@ -73,13 +68,13 @@ std::string escapeSpace(const std::string& input) return result; } -bool is_number(const std::string& s) { - std::istringstream iss(s); - double d; - return iss >> std::noskipws >> d && iss.eof(); +bool is_number(const std::string& s) +{ + std::istringstream iss(s); + double d; + return iss >> std::noskipws >> d && iss.eof(); } - std::string formatValues(const std::vector& values) { std::string formatted; @@ -98,8 +93,6 @@ std::string formatValues(const std::vector& valu formatted += "\"" + kv.value + "\""; } formatted += ","; - - } if (!formatted.empty()) { formatted.pop_back(); // Remove the last comma @@ -163,4 +156,3 @@ std::string arrayToInfluxLineProtocol(const diagnostic_msgs::msg::DiagnosticArra return output; }; - diff --git a/diagnostic_remote_logging/package.xml b/diagnostic_remote_logging/package.xml index 981c4dcc..02a1737b 100644 --- a/diagnostic_remote_logging/package.xml +++ b/diagnostic_remote_logging/package.xml @@ -15,6 +15,7 @@ ament_lint_auto ament_lint_common + ament_cmake_gtest diff --git a/diagnostic_remote_logging/test/influx_line_protocol.cpp b/diagnostic_remote_logging/test/influx_line_protocol.cpp new file mode 100644 index 00000000..fd398e65 --- /dev/null +++ b/diagnostic_remote_logging/test/influx_line_protocol.cpp @@ -0,0 +1,107 @@ +#include "diagnostic_msgs/msg/diagnostic_array.hpp" +#include "diagnostic_msgs/msg/diagnostic_status.hpp" +#include "diagnostic_msgs/msg/key_value.hpp" +#include +#include + +// Include the functions to test +#include "diagnostic_remote_logging/influx_line_protocol.hpp" // Replace with the actual header file name + +diagnostic_msgs::msg::KeyValue createKeyValue(const std::string& key, const std::string& value) +{ + diagnostic_msgs::msg::KeyValue output; + output.key = key; + output.value = value; + return output; +} + +// Test toInfluxTimestamp +TEST(InfluxTimestampTests, CorrectConversion) +{ + rclcpp::Time time(1672531200, 123456789); // Example time + std::string expected = "1672531200123456789"; + EXPECT_EQ(toInfluxTimestamp(time), expected); +} + +// Test escapeSpace +TEST(EscapeSpaceTests, HandlesSpaces) +{ + EXPECT_EQ(escapeSpace("test string"), "test\\ string"); + EXPECT_EQ(escapeSpace("no_space"), "no_space"); + EXPECT_EQ(escapeSpace("multiple spaces here"), "multiple\\ spaces\\ here"); +} + +// Test is_number +TEST(IsNumberTests, ValidatesNumbers) +{ + EXPECT_TRUE(is_number("123")); + EXPECT_TRUE(is_number("123.456")); + EXPECT_TRUE(is_number("-123.456")); + EXPECT_FALSE(is_number("123abc")); + EXPECT_FALSE(is_number("abc123")); + EXPECT_FALSE(is_number("")); +} + +// Test formatValues +TEST(FormatValuesTests, FormatsKeyValuePairs) +{ + std::vector values; + values.push_back(createKeyValue("key1", "value")); + values.push_back(createKeyValue("key2", "42")); + values.push_back(createKeyValue("key3", "-3.14")); + values.push_back(createKeyValue("key with spaces", "value with spaces")); + + std::string expected = "key1=\"value\",key2=42,key3=-3.14,key\\ with\\ spaces=\"value with spaces\""; + EXPECT_EQ(formatValues(values), expected); +} + +// Test splitHardwareID +TEST(SplitHardwareIDTests, SplitsCorrectly) +{ + EXPECT_EQ(splitHardwareID("node_name"), std::make_pair(std::string("none"), std::string("node_name"))); + EXPECT_EQ(splitHardwareID("/ns/node_name"), std::make_pair(std::string("ns"), std::string("node_name"))); + EXPECT_EQ(splitHardwareID("/ns/prefix/node_name"), std::make_pair(std::string("ns"), std::string("prefix/node_name"))); +} + +// Test statusToInfluxLineProtocol +TEST(StatusToInfluxLineProtocolTests, FormatsCorrectly) +{ + diagnostic_msgs::msg::DiagnosticStatus status; + status.hardware_id = "/ns/node_name"; + status.level = 2; + status.message = "Test message"; + status.values.push_back(createKeyValue("key1", "value1")); + status.values.push_back(createKeyValue("key2", "42")); + + std::string expected = "node_name,ns=ns level=2,message=\"Test message\",key1=\"value1\",key2=42 1672531200123456789\n"; + std::string output; + statusToInfluxLineProtocol(output, status, "1672531200123456789"); + + EXPECT_EQ(output, expected); +} + +// Test arrayToInfluxLineProtocol +TEST(ArrayToInfluxLineProtocolTests, HandlesMultipleStatuses) +{ + auto diag_msg = std::make_shared(); + diag_msg->header.stamp = rclcpp::Time(1672531200, 123456789); + + diagnostic_msgs::msg::DiagnosticStatus status1; + status1.hardware_id = "/ns1/node1"; + status1.level = 1; + status1.message = "First status"; + status1.values.push_back(createKeyValue("keyA", "valueA")); + + diagnostic_msgs::msg::DiagnosticStatus status2; + status2.hardware_id = "node2"; + status2.level = 2; + status2.message = "Second status"; + status2.values.push_back(createKeyValue("keyB", "42")); + + diag_msg->status = {status1, status2}; + + std::string expected = "node1,ns=ns1 level=1,message=\"First status\",keyA=\"valueA\" 1672531200123456789\n" + "node2,ns=none level=2,message=\"Second status\",keyB=42 1672531200123456789\n"; + + EXPECT_EQ(arrayToInfluxLineProtocol(diag_msg), expected); +} \ No newline at end of file From 5ca34cde11574f25857497f3fe40d6dafe2fbda4 Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Thu, 9 Jan 2025 14:24:38 +0000 Subject: [PATCH 15/24] Only depend on gtest when testing --- diagnostic_remote_logging/CMakeLists.txt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/diagnostic_remote_logging/CMakeLists.txt b/diagnostic_remote_logging/CMakeLists.txt index f646b4e8..67a9f876 100644 --- a/diagnostic_remote_logging/CMakeLists.txt +++ b/diagnostic_remote_logging/CMakeLists.txt @@ -10,7 +10,6 @@ set(dependencies rclcpp rclcpp_components diagnostic_msgs - ament_cmake_gtest CURL ) @@ -46,10 +45,11 @@ install(TARGETS influx_component ) if(BUILD_TESTING) - ament_add_gtest(unit_tests test/influx_line_protocol.cpp) - target_include_directories(unit_tests PRIVATE include) - target_link_libraries(unit_tests influx_component) - ament_target_dependencies(unit_tests ${dependencies}) + find_package(ament_cmake_gtest REQUIRED) + ament_add_gtest(unit_tests test/influx_line_protocol.cpp) + target_include_directories(unit_tests PRIVATE include) + target_link_libraries(unit_tests influx_component) + ament_target_dependencies(unit_tests ${dependencies} ament_cmake_gtest) endif() From 733b6fbf27f4633dcc609e0c8297566f088be7cd Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 15 Jan 2025 14:10:47 +0100 Subject: [PATCH 16/24] Update diagnostic_remote_logging/README.md Co-authored-by: Christian Henkel <6976069+ct2034@users.noreply.github.com> --- diagnostic_remote_logging/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/diagnostic_remote_logging/README.md b/diagnostic_remote_logging/README.md index 7997a29a..497ef6ec 100644 --- a/diagnostic_remote_logging/README.md +++ b/diagnostic_remote_logging/README.md @@ -4,7 +4,7 @@ General information about this repository, including legal information and known This package provides the `influx` node, which listens to diagnostic messages and integrates with InfluxDB v2 for monitoring and visualization. Specifically, it subscribes to the [`diagnostic_msgs/DiagnosticArray`](https://index.ros.org/p/diagnostic_msgs) messages on the `/diagnostics_agg` topic and the [`diagnostic_msgs/DiagnosticStatus`](https://index.ros.org/p/diagnostic_msgs) messages on the `/diagnostics_toplevel_state` topic. The node processes these messages, publishing their statistics and levels to an [`InfluxDB`](http://influxdb.com) database, enabling use with tools like [`Grafana`](https://grafana.com). -As of now we only support InfluxDB v2, for support with older versions please use a proxy like [`Telegraf`](https://www.influxdata.com/time-series-platform/telegraf/). See section [Telegraf](#telegraf-proxy) for an example on how to setup. +As of now we only support InfluxDB v2, for support with older versions please use a proxy like [`Telegraf`](https://www.influxdata.com/time-series-platform/telegraf/). See section [Telegraf](## Using a Telegraf Proxy) for an example on how to setup. ## Node Configuration From 91880fbacc8e257a00002126a0fd52683abd6bd9 Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 15 Jan 2025 14:23:10 +0100 Subject: [PATCH 17/24] Update diagnostic_remote_logging/README.md Co-authored-by: Christian Henkel <6976069+ct2034@users.noreply.github.com> --- diagnostic_remote_logging/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/diagnostic_remote_logging/README.md b/diagnostic_remote_logging/README.md index 497ef6ec..c6dc70b5 100644 --- a/diagnostic_remote_logging/README.md +++ b/diagnostic_remote_logging/README.md @@ -37,7 +37,7 @@ The `influx` node supports several parameters. Below is an example configuration - `send.agg`: Enables or disables subscription to the `/diagnostics_agg` topic. - `send.top_level_state`: Enables or disables subscription to the `/diagnostics_toplevel_state` topic. -## InfluxDB Configuration +#### InfluxDB Configuration Set the following parameters in your configuration to match your InfluxDB instance: From 87fa5004f68034318921984ee5566d077205c02c Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 15 Jan 2025 14:23:21 +0100 Subject: [PATCH 18/24] Update diagnostic_remote_logging/README.md Co-authored-by: Christian Henkel <6976069+ct2034@users.noreply.github.com> --- diagnostic_remote_logging/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/diagnostic_remote_logging/README.md b/diagnostic_remote_logging/README.md index c6dc70b5..cc94b5c0 100644 --- a/diagnostic_remote_logging/README.md +++ b/diagnostic_remote_logging/README.md @@ -39,6 +39,8 @@ The `influx` node supports several parameters. Below is an example configuration #### InfluxDB Configuration +## Starting the Node + Set the following parameters in your configuration to match your InfluxDB instance: - `connection.url`: The URL of your InfluxDB write API endpoint. From 2f16302c2c47f5ec4c6ad42c148f08b9bf79b1d3 Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 15 Jan 2025 14:23:29 +0100 Subject: [PATCH 19/24] Update diagnostic_remote_logging/README.md Co-authored-by: Christian Henkel <6976069+ct2034@users.noreply.github.com> --- diagnostic_remote_logging/README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/diagnostic_remote_logging/README.md b/diagnostic_remote_logging/README.md index cc94b5c0..78920fc6 100644 --- a/diagnostic_remote_logging/README.md +++ b/diagnostic_remote_logging/README.md @@ -61,10 +61,10 @@ To configure Telegraf as a proxy for InfluxDB: 1. Ensure Telegraf is set up to send data to your InfluxDB instance via its configuration file (`/etc/telegraf/telegraf.conf`). Check [this link](https://docs.influxdata.com/influxdb/cloud/write-data/no-code/use-telegraf/manual-config/) for an example. 2. Add the following to the telegraf configuration file to enable the InfluxDB v2 listener: -```toml -[[inputs.influxdb_v2_listener]] - service_address = ":8086" -``` + ```toml + [[inputs.influxdb_v2_listener]] + service_address = ":8086" + ``` 3. Update the `influx` node configuration to point to the appropriate URL. For example, if Telegraf is running on the same host as the `influx` node, the default `http://localhost:8086/api/v2/write` should work. From a73894fe8cc6eafde2c6c210b8a514051604d0b4 Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 15 Jan 2025 13:24:53 +0000 Subject: [PATCH 20/24] Update diagnostic_remote_logging/README.md --- diagnostic_remote_logging/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/diagnostic_remote_logging/README.md b/diagnostic_remote_logging/README.md index 78920fc6..f27fd753 100644 --- a/diagnostic_remote_logging/README.md +++ b/diagnostic_remote_logging/README.md @@ -2,7 +2,7 @@ General information about this repository, including legal information and known # The diagnostic_remote_logging package -This package provides the `influx` node, which listens to diagnostic messages and integrates with InfluxDB v2 for monitoring and visualization. Specifically, it subscribes to the [`diagnostic_msgs/DiagnosticArray`](https://index.ros.org/p/diagnostic_msgs) messages on the `/diagnostics_agg` topic and the [`diagnostic_msgs/DiagnosticStatus`](https://index.ros.org/p/diagnostic_msgs) messages on the `/diagnostics_toplevel_state` topic. The node processes these messages, publishing their statistics and levels to an [`InfluxDB`](http://influxdb.com) database, enabling use with tools like [`Grafana`](https://grafana.com). +This package provides the `influx` node, which listens to diagnostic messages and integrates with InfluxDB v2 for monitoring and visualization. Specifically, it subscribes to the [`diagnostic_msgs/DiagnosticArray`](https://index.ros.org/p/diagnostic_msgs) messages on the `/diagnostics_agg` topic and the [`diagnostic_msgs/DiagnosticStatus`](https://index.ros.org/p/diagnostic_msgs) messages on the `/diagnostics_toplevel_state` topic. The node processes these messages, sending their statistics and levels to an [`InfluxDB`](http://influxdb.com) database, enabling use with tools like [`Grafana`](https://grafana.com). As of now we only support InfluxDB v2, for support with older versions please use a proxy like [`Telegraf`](https://www.influxdata.com/time-series-platform/telegraf/). See section [Telegraf](## Using a Telegraf Proxy) for an example on how to setup. From 6fcf4983714cd175b7059cca83ab982c660e4a88 Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 15 Jan 2025 14:34:07 +0100 Subject: [PATCH 21/24] Apply suggestions from code review Co-authored-by: Christian Henkel <6976069+ct2034@users.noreply.github.com> --- diagnostic_remote_logging/CMakeLists.txt | 8 ++------ .../diagnostic_remote_logging/influx_line_protocol.hpp | 4 ++-- diagnostic_remote_logging/package.xml | 4 +--- diagnostic_remote_logging/src/influxdb.cpp | 4 ++-- 4 files changed, 7 insertions(+), 13 deletions(-) diff --git a/diagnostic_remote_logging/CMakeLists.txt b/diagnostic_remote_logging/CMakeLists.txt index 67a9f876..a33edea0 100644 --- a/diagnostic_remote_logging/CMakeLists.txt +++ b/diagnostic_remote_logging/CMakeLists.txt @@ -11,7 +11,7 @@ set(dependencies rclcpp_components diagnostic_msgs CURL - ) +) foreach(dep ${dependencies}) find_package(${dep} REQUIRED) @@ -21,9 +21,7 @@ include_directories( src/ include/ ) -add_library(influx_component SHARED - src/influxdb.cpp - ) +add_library(influx_component SHARED src/influxdb.cpp) ament_target_dependencies(influx_component ${dependencies}) @@ -51,6 +49,4 @@ if(BUILD_TESTING) target_link_libraries(unit_tests influx_component) ament_target_dependencies(unit_tests ${dependencies} ament_cmake_gtest) endif() - - ament_package() diff --git a/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp index d7d5aa22..fb6dcb59 100644 --- a/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp +++ b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp @@ -139,13 +139,13 @@ void statusToInfluxLineProtocol(std::string& output, const diagnostic_msgs::msg: output += " " + timestamp_str + "\n"; } -std::string topLevelToInfluxLineProtocol(const diagnostic_msgs::msg::DiagnosticStatus::SharedPtr& msg, const rclcpp::Time& time) +std::string diagnosticStatusToInfluxLineProtocol(const diagnostic_msgs::msg::DiagnosticStatus::SharedPtr& msg, const rclcpp::Time& time) { std::string output = msg->name + " level=" + std::to_string(msg->level) + " " + toInfluxTimestamp(time) + "\n"; return output; }; -std::string arrayToInfluxLineProtocol(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr& diag_msg) +std::string diagnosticArrayToInfluxLineProtocol(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr& diag_msg) { std::string output; std::string timestamp = toInfluxTimestamp(diag_msg->header.stamp); diff --git a/diagnostic_remote_logging/package.xml b/diagnostic_remote_logging/package.xml index 02a1737b..164ca442 100644 --- a/diagnostic_remote_logging/package.xml +++ b/diagnostic_remote_logging/package.xml @@ -2,7 +2,7 @@ diagnostic_remote_logging - 0.0.1 + 4.3.1 diagnostic_remote_logging Daan Wijffels >BSD-3-Clause @@ -16,8 +16,6 @@ ament_lint_auto ament_lint_common ament_cmake_gtest - - ament_cmake diff --git a/diagnostic_remote_logging/src/influxdb.cpp b/diagnostic_remote_logging/src/influxdb.cpp index 9017171c..5d975dad 100644 --- a/diagnostic_remote_logging/src/influxdb.cpp +++ b/diagnostic_remote_logging/src/influxdb.cpp @@ -50,7 +50,7 @@ void InfluxDB::diagnosticsCallback(const diagnostic_msgs::msg::DiagnosticArray:: RCLCPP_ERROR(this->get_logger(), "Failed to send /diagnostics_agg to telegraf"); } - // RCLCPP_INFO(this->get_logger(), "%s", output.c_str()); + RCLCPP_DEBUG(this->get_logger(), "%s", output.c_str()); } void InfluxDB::topLevelCallback(const diagnostic_msgs::msg::DiagnosticStatus::SharedPtr msg) @@ -62,7 +62,7 @@ void InfluxDB::topLevelCallback(const diagnostic_msgs::msg::DiagnosticStatus::Sh RCLCPP_ERROR(this->get_logger(), "Failed to send /diagnostics_toplevel_state to telegraf"); } - // RCLCPP_INFO(this->get_logger(), "%s", output.c_str()); + RCLCPP_DEBUGRCLCPP_INFO(this->get_logger(), "%s", output.c_str()); } void InfluxDB::setupConnection(const std::string &url) From 598ed5da0ebf8b60f7e679a1db221a0220976bda Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 15 Jan 2025 13:46:50 +0000 Subject: [PATCH 22/24] Added diagnostic_remote_logging --- .github/workflows/lint.yaml | 1 + .github/workflows/test.yaml | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 4f926eef..55223155 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -41,5 +41,6 @@ jobs: package-name: | diagnostic_aggregator diagnostic_common_diagnostics + diagnostic_remote_logging diagnostic_updater self_test diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 224a0cf3..d92ce5f4 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -17,6 +17,7 @@ jobs: package: [ diagnostic_aggregator, diagnostic_common_diagnostics, + diagnostic_remote_logging, diagnostic_updater, self_test, ] From 3635eb47da04d42658e24703afabe5e2e8d9fc1f Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 15 Jan 2025 14:02:57 +0000 Subject: [PATCH 23/24] Added BSD clause and added untested function --- .../test/influx_line_protocol.cpp | 59 +++++++++++++++++-- 1 file changed, 54 insertions(+), 5 deletions(-) diff --git a/diagnostic_remote_logging/test/influx_line_protocol.cpp b/diagnostic_remote_logging/test/influx_line_protocol.cpp index fd398e65..f6f7244d 100644 --- a/diagnostic_remote_logging/test/influx_line_protocol.cpp +++ b/diagnostic_remote_logging/test/influx_line_protocol.cpp @@ -1,3 +1,41 @@ +/********************************************************************* + * Software License Agreement (BSD License) + * + * Copyright (c) 2025, Willow Garage, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * * Neither the name of the Willow Garage nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + *********************************************************************/ + +/** + * \author Daan Wijffels + */ + #include "diagnostic_msgs/msg/diagnostic_array.hpp" #include "diagnostic_msgs/msg/diagnostic_status.hpp" #include "diagnostic_msgs/msg/key_value.hpp" @@ -5,7 +43,7 @@ #include // Include the functions to test -#include "diagnostic_remote_logging/influx_line_protocol.hpp" // Replace with the actual header file name +#include "diagnostic_remote_logging/influx_line_protocol.hpp" diagnostic_msgs::msg::KeyValue createKeyValue(const std::string& key, const std::string& value) { @@ -80,8 +118,8 @@ TEST(StatusToInfluxLineProtocolTests, FormatsCorrectly) EXPECT_EQ(output, expected); } -// Test arrayToInfluxLineProtocol -TEST(ArrayToInfluxLineProtocolTests, HandlesMultipleStatuses) +// Test diagnosticArrayToInfluxLineProtocol +TEST(DiagnosticArrayToInfluxLineProtocolTests, HandlesMultipleStatuses) { auto diag_msg = std::make_shared(); diag_msg->header.stamp = rclcpp::Time(1672531200, 123456789); @@ -103,5 +141,16 @@ TEST(ArrayToInfluxLineProtocolTests, HandlesMultipleStatuses) std::string expected = "node1,ns=ns1 level=1,message=\"First status\",keyA=\"valueA\" 1672531200123456789\n" "node2,ns=none level=2,message=\"Second status\",keyB=42 1672531200123456789\n"; - EXPECT_EQ(arrayToInfluxLineProtocol(diag_msg), expected); -} \ No newline at end of file + EXPECT_EQ(diagnosticArrayToInfluxLineProtocol(diag_msg), expected); +} + +// Test diagnosticStatusToInfluxLineProtocol +TEST(DiagnosticStatusToInfluxLineProtocol, HandlesSingleStatus){ + auto status = std::make_shared(); + status->level = 1; + status->name = "toplevel_state"; + auto time = rclcpp::Time(1672531200, 123456789); + + std::string expected = "toplevel_state level=1 1672531200123456789\n"; + EXPECT_EQ(diagnosticStatusToInfluxLineProtocol(status, time), expected); +} From 09986e36e7bd8ca945f91bf0b952debbdfd9b50c Mon Sep 17 00:00:00 2001 From: Daan Wijffels Date: Wed, 15 Jan 2025 14:03:23 +0000 Subject: [PATCH 24/24] Apply suggestions from code review --- diagnostic_remote_logging/README.md | 16 +-- .../influx_line_protocol.hpp | 4 +- .../diagnostic_remote_logging/influxdb.hpp | 5 +- diagnostic_remote_logging/src/influxdb.cpp | 98 ++++++++++++------- 4 files changed, 74 insertions(+), 49 deletions(-) diff --git a/diagnostic_remote_logging/README.md b/diagnostic_remote_logging/README.md index f27fd753..844b3c72 100644 --- a/diagnostic_remote_logging/README.md +++ b/diagnostic_remote_logging/README.md @@ -68,14 +68,14 @@ To configure Telegraf as a proxy for InfluxDB: 3. Update the `influx` node configuration to point to the appropriate URL. For example, if Telegraf is running on the same host as the `influx` node, the default `http://localhost:8086/api/v2/write` should work. -Leave the following parameters empty in the `influx` node configuration when using Telegraf as a proxy: +4. Leave the following parameters empty in the `influx` node configuration when using Telegraf as a proxy: -- `connection.token` -- `connection.bucket` -- `connection.organization` + - `connection.token` + - `connection.bucket` + - `connection.organization` -Afterwards run the node with the following command: +5. Afterwards run the node with the following command: -```bash -ros2 run diagnostic_remote_logging influx --ros-args --params-file -``` + ```bash + ros2 run diagnostic_remote_logging influx --ros-args --params-file + ``` diff --git a/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp index fb6dcb59..7eab9d19 100644 --- a/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp +++ b/diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp @@ -1,7 +1,7 @@ /********************************************************************* * Software License Agreement (BSD License) * - * Copyright (c) 2009, Willow Garage, Inc. + * Copyright (c) 2025, Willow Garage, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -33,7 +33,7 @@ *********************************************************************/ /** - * \author Daan Wijfels + * \author Daan Wijffels */ #pragma once diff --git a/diagnostic_remote_logging/include/diagnostic_remote_logging/influxdb.hpp b/diagnostic_remote_logging/include/diagnostic_remote_logging/influxdb.hpp index 7acc9ff9..ab0f7e63 100644 --- a/diagnostic_remote_logging/include/diagnostic_remote_logging/influxdb.hpp +++ b/diagnostic_remote_logging/include/diagnostic_remote_logging/influxdb.hpp @@ -1,7 +1,7 @@ /********************************************************************* * Software License Agreement (BSD License) * - * Copyright (c) 2009, Willow Garage, Inc. + * Copyright (c) 2025, Willow Garage, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -33,7 +33,7 @@ *********************************************************************/ /** - * \author Daan Wijfels + * \author Daan Wijffels */ #include "diagnostic_msgs/msg/diagnostic_array.hpp" @@ -51,7 +51,6 @@ class InfluxDB : public rclcpp::Node rclcpp::Subscription::SharedPtr diag_sub_; rclcpp::Subscription::SharedPtr top_level_sub_; - std::string post_url_, influx_token_; CURL* curl_; diff --git a/diagnostic_remote_logging/src/influxdb.cpp b/diagnostic_remote_logging/src/influxdb.cpp index 5d975dad..b7d915a4 100644 --- a/diagnostic_remote_logging/src/influxdb.cpp +++ b/diagnostic_remote_logging/src/influxdb.cpp @@ -1,25 +1,60 @@ +/********************************************************************* + * Software License Agreement (BSD License) + * + * Copyright (c) 2025, Willow Garage, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * * Neither the name of the Willow Garage nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + *********************************************************************/ + +/** + * \author Daan Wijffels + */ + #include "diagnostic_remote_logging/influxdb.hpp" -InfluxDB::InfluxDB(const rclcpp::NodeOptions &opt) +InfluxDB::InfluxDB(const rclcpp::NodeOptions& opt) : Node("influxdb", opt) { post_url_ = this->declare_parameter("connection.url", "http://localhost:8086/api/v2/write"); - if (post_url_.empty()) - { + if (post_url_.empty()) { throw std::runtime_error("Parameter connection.url must be set"); } std::string organization = declare_parameter("connection.organization", ""); - std::string bucket = declare_parameter("connection.bucket", ""); - influx_token_ = declare_parameter("connection.token", ""); + std::string bucket = declare_parameter("connection.bucket", ""); + influx_token_ = declare_parameter("connection.token", ""); // Check if any of the parameters is set - if (!organization.empty() || !bucket.empty() || !influx_token_.empty()) - { + if (!organization.empty() || !bucket.empty() || !influx_token_.empty()) { // Ensure all parameters are set - if (organization.empty() || bucket.empty() || influx_token_.empty()) - { + if (organization.empty() || bucket.empty() || influx_token_.empty()) { throw std::runtime_error("All parameters (connection.organization, connection.bucket, connection.token) must be set, or when using a proxy like Telegraf none have to be set."); } @@ -30,23 +65,20 @@ InfluxDB::InfluxDB(const rclcpp::NodeOptions &opt) setupConnection(post_url_); - if (declare_parameter("send.agg", true)) - { + if (declare_parameter("send.agg", true)) { diag_sub_ = this->create_subscription("/diagnostics_agg", rclcpp::SensorDataQoS(), std::bind(&InfluxDB::diagnosticsCallback, this, std::placeholders::_1)); } - if (declare_parameter("send.top_level_state", true)) - { + if (declare_parameter("send.top_level_state", true)) { top_level_sub_ = this->create_subscription("/diagnostics_toplevel_state", rclcpp::SensorDataQoS(), std::bind(&InfluxDB::topLevelCallback, this, std::placeholders::_1)); } } void InfluxDB::diagnosticsCallback(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg) { - std::string output = arrayToInfluxLineProtocol(msg); + std::string output = diagnosticArrayToInfluxLineProtocol(msg); - if (!sendToInfluxDB(output)) - { + if (!sendToInfluxDB(output)) { RCLCPP_ERROR(this->get_logger(), "Failed to send /diagnostics_agg to telegraf"); } @@ -55,28 +87,26 @@ void InfluxDB::diagnosticsCallback(const diagnostic_msgs::msg::DiagnosticArray:: void InfluxDB::topLevelCallback(const diagnostic_msgs::msg::DiagnosticStatus::SharedPtr msg) { - std::string output = topLevelToInfluxLineProtocol(msg, this->get_clock()->now()); + std::string output = diagnosticStatusToInfluxLineProtocol(msg, this->get_clock()->now()); - if (!sendToInfluxDB(output)) - { + if (!sendToInfluxDB(output)) { RCLCPP_ERROR(this->get_logger(), "Failed to send /diagnostics_toplevel_state to telegraf"); } - RCLCPP_DEBUGRCLCPP_INFO(this->get_logger(), "%s", output.c_str()); + RCLCPP_DEBUG(this->get_logger(), "%s", output.c_str()); } -void InfluxDB::setupConnection(const std::string &url) +void InfluxDB::setupConnection(const std::string& url) { curl_global_init(CURL_GLOBAL_ALL); curl_ = curl_easy_init(); - if (!curl_) - { + if (!curl_) { throw std::runtime_error("Failed to initialize curl"); } - struct curl_slist *headers = nullptr; + struct curl_slist* headers = nullptr; - if (!influx_token_.empty()){ + if (!influx_token_.empty()) { headers = curl_slist_append(headers, ("Authorization: Token " + influx_token_).c_str()); } @@ -86,15 +116,14 @@ void InfluxDB::setupConnection(const std::string &url) curl_easy_setopt(curl_, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl_, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); // Use HTTP/1.1 for keep-alive curl_easy_setopt(curl_, CURLOPT_HTTPHEADER, headers); - curl_easy_setopt(curl_, CURLOPT_CONNECTTIMEOUT, 10L); // Set timeout as needed - curl_easy_setopt(curl_, CURLOPT_TCP_KEEPALIVE, 1L); // Enable TCP keep-alive + curl_easy_setopt(curl_, CURLOPT_CONNECTTIMEOUT, 10L); // Set timeout as needed + curl_easy_setopt(curl_, CURLOPT_TCP_KEEPALIVE, 1L); // Enable TCP keep-alive curl_easy_setopt(curl_, CURLOPT_POST, 1L); } -bool InfluxDB::sendToInfluxDB(const std::string &data) +bool InfluxDB::sendToInfluxDB(const std::string& data) { - if (!curl_) - { + if (!curl_) { RCLCPP_ERROR(this->get_logger(), "cURL not initialized."); return false; } @@ -105,16 +134,14 @@ bool InfluxDB::sendToInfluxDB(const std::string &data) CURLcode res = curl_easy_perform(curl_); // Check for errors - if (res != CURLE_OK) - { + if (res != CURLE_OK) { RCLCPP_ERROR(this->get_logger(), "cURL error: %s", curl_easy_strerror(res)); return false; } long response_code; curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &response_code); - if (response_code != 204) - { + if (response_code != 204) { RCLCPP_ERROR(this->get_logger(), "Error (%ld) when sending to telegraf:\n%s", response_code, data.c_str()); return false; } @@ -124,8 +151,7 @@ bool InfluxDB::sendToInfluxDB(const std::string &data) InfluxDB::~InfluxDB() { - if (curl_) - { + if (curl_) { curl_easy_cleanup(curl_); } curl_global_cleanup();