Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grafana Integration #425

Open
wants to merge 27 commits into
base: ros2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6c6eef9
First working version of remote_logging
Dec 4, 2024
8662aee
Merge branch 'ros:ros2' into ros2
dwffls Dec 11, 2024
390bb8c
Added more error handling, and skipping values when new line is prese…
dwffls Dec 18, 2024
53b5297
Changed default telegraf url to reflect the change to influxdb_v2_lis…
dwffls Dec 18, 2024
a43c0c0
Changed default telegraf url to reflect the change to influxdb_v2_lis…
dwffls Dec 18, 2024
003e8a5
Made node composable and changed name to influx to better reflect use…
Dec 18, 2024
0d526ee
Added README
Dec 18, 2024
0a7ebbc
Merge branch 'ros:ros2' into influx_db
dwffls Dec 18, 2024
2491bad
Escaping string values fixed
Jan 6, 2025
b97c4de
Build fix on farm
dwffls Jan 8, 2025
3231f91
Adde curl dependency
dwffls Jan 8, 2025
6eb4c49
Removed Curl Dependency as build does not work on farm
dwffls Jan 8, 2025
adbc480
Merge branch 'ros:ros2' into influx_db
dwffls Jan 8, 2025
5e15745
Curl Dependency
dwffls Jan 8, 2025
61c69eb
Update diagnostic_remote_logging/package.xml
dwffls Jan 8, 2025
3c26580
Update diagnostic_remote_logging/package.xml
dwffls Jan 8, 2025
0ece0a2
Added unit tests for influx_line_protocol
Jan 9, 2025
5ca34cd
Only depend on gtest when testing
Jan 9, 2025
733b6fb
Update diagnostic_remote_logging/README.md
dwffls Jan 15, 2025
91880fb
Update diagnostic_remote_logging/README.md
dwffls Jan 15, 2025
87fa500
Update diagnostic_remote_logging/README.md
dwffls Jan 15, 2025
2f16302
Update diagnostic_remote_logging/README.md
dwffls Jan 15, 2025
a73894f
Update diagnostic_remote_logging/README.md
dwffls Jan 15, 2025
6fcf498
Apply suggestions from code review
dwffls Jan 15, 2025
598ed5d
Added diagnostic_remote_logging
Jan 15, 2025
3635eb4
Added BSD clause and added untested function
Jan 15, 2025
09986e3
Apply suggestions from code review
Jan 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ jobs:
package-name: |
diagnostic_aggregator
diagnostic_common_diagnostics
diagnostic_remote_logging
diagnostic_updater
self_test
1 change: 1 addition & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jobs:
package: [
diagnostic_aggregator,
diagnostic_common_diagnostics,
diagnostic_remote_logging,
diagnostic_updater,
self_test,
]
Expand Down
52 changes: 52 additions & 0 deletions diagnostic_remote_logging/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
cmake_minimum_required(VERSION 3.8)

project(diagnostic_remote_logging)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

set(dependencies
ament_cmake
rclcpp
rclcpp_components
diagnostic_msgs
CURL
)

foreach(dep ${dependencies})
find_package(${dep} REQUIRED)
endforeach(dep)

include_directories(
src/
include/
)
add_library(influx_component SHARED src/influxdb.cpp)

ament_target_dependencies(influx_component ${dependencies})

ament_export_dependencies(influx_component ${dependencies})

target_compile_features(influx_component PUBLIC c_std_99 cxx_std_17)

rclcpp_components_register_node(
influx_component
PLUGIN "InfluxDB"
EXECUTABLE influx
)
ament_export_targets(export_influx_component)
install(TARGETS influx_component
EXPORT export_influx_component
ARCHIVE DESTINATION lib
LIBRARY DESTINATION lib
RUNTIME DESTINATION bin
)

if(BUILD_TESTING)
find_package(ament_cmake_gtest REQUIRED)
ament_add_gtest(unit_tests test/influx_line_protocol.cpp)
target_include_directories(unit_tests PRIVATE include)
target_link_libraries(unit_tests influx_component)
ament_target_dependencies(unit_tests ${dependencies} ament_cmake_gtest)
endif()
ament_package()
81 changes: 81 additions & 0 deletions diagnostic_remote_logging/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
General information about this repository, including legal information and known issues/limitations, are given in [README.md](../README.md) in the repository root.

# The diagnostic_remote_logging package

This package provides the `influx` node, which listens to diagnostic messages and integrates with InfluxDB v2 for monitoring and visualization. Specifically, it subscribes to the [`diagnostic_msgs/DiagnosticArray`](https://index.ros.org/p/diagnostic_msgs) messages on the `/diagnostics_agg` topic and the [`diagnostic_msgs/DiagnosticStatus`](https://index.ros.org/p/diagnostic_msgs) messages on the `/diagnostics_toplevel_state` topic. The node processes these messages, sending their statistics and levels to an [`InfluxDB`](http://influxdb.com) database, enabling use with tools like [`Grafana`](https://grafana.com).

As of now we only support InfluxDB v2, for support with older versions please use a proxy like [`Telegraf`](https://www.influxdata.com/time-series-platform/telegraf/). See section [Telegraf](## Using a Telegraf Proxy) for an example on how to setup.

## Node Configuration

You can send data to [`InfluxDB`](http://influxdb.com) in two ways: directly to the database or via a proxy like [`Telegraf`](https://www.influxdata.com/time-series-platform/telegraf/). While both methods are valid, using a proxy is generally recommended due to the following benefits:

- **Efficient Data Transmission**: Telegraf aggregates multiple measurements and sends them in a single request, reducing bandwidth usage and minimizing database load.
- **Enhanced Reliability**: Provides buffering in case of connection issues, ensuring no data is lost.
- **Comprehensive Metric Collection**: Telegraf can send additional system metrics (e.g., RAM, CPU, network usage) with minimal configuration.
- **Data Filtering and Transformation**: Supports preprocessing, such as filtering or transforming data, before sending it to InfluxDB.

To use either method, ensure you have a running instance of InfluxDB. The simplest way to set this up is through [`InfluxDB Cloud`](https://cloud2.influxdata.com/signup).

### Parameters

The `influx` node supports several parameters. Below is an example configuration:

```yaml
/influx:
ros__parameters:
connection:
url: http://localhost:8086/api/v2/write
token:
bucket:
organization:
send:
agg: true
top_level_state: true
```

- `send.agg`: Enables or disables subscription to the `/diagnostics_agg` topic.
- `send.top_level_state`: Enables or disables subscription to the `/diagnostics_toplevel_state` topic.

#### InfluxDB Configuration

## Starting the Node

Set the following parameters in your configuration to match your InfluxDB instance:
dwffls marked this conversation as resolved.
Show resolved Hide resolved

- `connection.url`: The URL of your InfluxDB write API endpoint.
- `connection.token`: Your InfluxDB authentication token.
- `connection.bucket`: The target bucket in InfluxDB.
- `connection.organization`: The name of your InfluxDB organization.

Afterwards run the node with the following command:

```bash
ros2 run diagnostic_remote_logging influx --ros-args --params-file <path_to_yaml_file>
```

## Using a Telegraf Proxy

To configure Telegraf as a proxy for InfluxDB:

1. Ensure Telegraf is set up to send data to your InfluxDB instance via its configuration file (`/etc/telegraf/telegraf.conf`). Check [this link](https://docs.influxdata.com/influxdb/cloud/write-data/no-code/use-telegraf/manual-config/) for an example.
2. Add the following to the telegraf configuration file to enable the InfluxDB v2 listener:

```toml
[[inputs.influxdb_v2_listener]]
service_address = ":8086"
```

3. Update the `influx` node configuration to point to the appropriate URL. For example, if Telegraf is running on the same host as the `influx` node, the default `http://localhost:8086/api/v2/write` should work.

4. Leave the following parameters empty in the `influx` node configuration when using Telegraf as a proxy:

- `connection.token`
- `connection.bucket`
- `connection.organization`

5. Afterwards run the node with the following command:

```bash
ros2 run diagnostic_remote_logging influx --ros-args --params-file <path_to_yaml_file>
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*********************************************************************
* Software License Agreement (BSD License)
*
* Copyright (c) 2025, Willow Garage, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
* * Neither the name of the Willow Garage nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*********************************************************************/

/**
* \author Daan Wijffels
*/

#pragma once

#include "diagnostic_msgs/msg/diagnostic_array.hpp"
#include "rclcpp/rclcpp.hpp"

std::string toInfluxTimestamp(const rclcpp::Time& time)
{
uint64_t seconds = static_cast<uint64_t>(time.seconds());
uint64_t nanoseconds = static_cast<uint64_t>(time.nanoseconds()) % 1000000000;

// Convert to strings
std::string secStr = std::to_string(seconds);
std::string nanosecStr = std::to_string(nanoseconds);

// Zero-pad nanoseconds to 9 digits
nanosecStr = std::string(9 - nanosecStr.length(), '0') + nanosecStr;

return secStr + nanosecStr;
}

std::string escapeSpace(const std::string& input)
{
std::string result;
for (char c : input) {
if (c == ' ') {
result += '\\'; // Add a backslash before the space
}
result += c; // Add the original character
}
return result;
}

bool is_number(const std::string& s)
{
std::istringstream iss(s);
double d;
return iss >> std::noskipws >> d && iss.eof();
}

std::string formatValues(const std::vector<diagnostic_msgs::msg::KeyValue>& values)
{
std::string formatted;
for (const auto& kv : values) {
if (kv.value.find("\n") != std::string::npos) {
// If the value contains a newline, skip it
// InfluxDB uses this to separate measurements
continue;
}

formatted += escapeSpace(kv.key) + "=";

if (is_number(kv.value)) {
formatted += kv.value;
} else {
formatted += "\"" + kv.value + "\"";
}
formatted += ",";
}
if (!formatted.empty()) {
formatted.pop_back(); // Remove the last comma
}
return formatted;
}

std::pair<std::string, std::string> splitHardwareID(const std::string& input)
{
size_t first_slash_pos = input.find('/');

// If no slash is found, treat the entire input as the node_name
if (first_slash_pos == std::string::npos) {
return {"none", input};
}

size_t second_slash_pos = input.find('/', first_slash_pos + 1);

// If the second slash is found, extract the "ns" and "node" parts
if (second_slash_pos != std::string::npos) {
std::string ns = input.substr(first_slash_pos + 1, second_slash_pos - first_slash_pos - 1);
std::string node = input.substr(second_slash_pos + 1);
return {ns, node};
}

// If no second slash is found, everything after the first slash is the node
std::string node = input.substr(first_slash_pos + 1);
return {"none", node}; // ns is empty, node is the remaining string
}

void statusToInfluxLineProtocol(std::string& output, const diagnostic_msgs::msg::DiagnosticStatus& status, const std::string& timestamp_str)
{
// hardware_id is empty for analyzer groups, so skip them
if (status.hardware_id.empty()) {
return;
}

auto [ns, identifier] = splitHardwareID(status.hardware_id);
output += escapeSpace(identifier) + ",ns=" + escapeSpace(ns) + " level=" + std::to_string(status.level) + ",message=\"" + status.message + "\"";
auto formatted_key_values = formatValues(status.values);
if (!formatted_key_values.empty()) {
output += "," + formatted_key_values;
}
output += " " + timestamp_str + "\n";
}

std::string diagnosticStatusToInfluxLineProtocol(const diagnostic_msgs::msg::DiagnosticStatus::SharedPtr& msg, const rclcpp::Time& time)
{
std::string output = msg->name + " level=" + std::to_string(msg->level) + " " + toInfluxTimestamp(time) + "\n";
return output;
};

std::string diagnosticArrayToInfluxLineProtocol(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr& diag_msg)
{
std::string output;
std::string timestamp = toInfluxTimestamp(diag_msg->header.stamp);

for (auto& status : diag_msg->status) {
statusToInfluxLineProtocol(output, status, timestamp);
}

return output;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*********************************************************************
* Software License Agreement (BSD License)
*
* Copyright (c) 2025, Willow Garage, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
* * Neither the name of the Willow Garage nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*********************************************************************/

/**
* \author Daan Wijffels
*/

#include "diagnostic_msgs/msg/diagnostic_array.hpp"
#include "diagnostic_remote_logging/influx_line_protocol.hpp"
#include "rclcpp/rclcpp.hpp"
#include <curl/curl.h>

class InfluxDB : public rclcpp::Node
{
public:
explicit InfluxDB(const rclcpp::NodeOptions& opt);
~InfluxDB();

private:
rclcpp::Subscription<diagnostic_msgs::msg::DiagnosticArray>::SharedPtr diag_sub_;
rclcpp::Subscription<diagnostic_msgs::msg::DiagnosticStatus>::SharedPtr top_level_sub_;

std::string post_url_, influx_token_;
CURL* curl_;

void setupConnection(const std::string& telegraf_url);

void diagnosticsCallback(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg);
void topLevelCallback(const diagnostic_msgs::msg::DiagnosticStatus::SharedPtr msg);

bool sendToInfluxDB(const std::string& data);
};
22 changes: 22 additions & 0 deletions diagnostic_remote_logging/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>diagnostic_remote_logging</name>
<version>4.3.1</version>
<description>diagnostic_remote_logging</description>
<maintainer email="[email protected]">Daan Wijffels</maintainer>
<license>>BSD-3-Clause</license>

<buildtool_depend>ament_cmake</buildtool_depend>

<build_depend>diagnostic_msgs</build_depend>
<build_depend>rclcpp_components</build_depend>
<build_depend>curl</build_depend>

<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<test_depend>ament_cmake_gtest</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>
Loading