Skip to content

Commit

Permalink
feat(plugin): netdata collector of thin-edge measurements
Browse files Browse the repository at this point in the history
Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Dec 31, 2024
1 parent a48c009 commit d30331b
Show file tree
Hide file tree
Showing 9 changed files with 472 additions and 1 deletion.
132 changes: 131 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"crates/tests/*",
"plugins/c8y_firmware_plugin",
"plugins/c8y_remote_access_plugin",
"plugins/netdata-collector",
"plugins/tedge_apt_plugin",
]
resolver = "2"
Expand Down
21 changes: 21 additions & 0 deletions crates/extensions/netdata_collector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "netdata_collector"
version.workspace = true
authors.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = { workspace = true }
netdata-plugin = "0.2.0"
tedge_actors = { workspace = true }
tedge_api = { workspace = true }
time = { workspace = true }

[lints]
workspace = true
88 changes: 88 additions & 0 deletions crates/extensions/netdata_collector/src/actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use crate::message::MetricPoints;
use crate::TEdgeNetDataCollectorBuilder;
use netdata_plugin::collector::Collector;
use netdata_plugin::Chart;
use netdata_plugin::Dimension;
use std::collections::HashMap;
use std::collections::HashSet;
use tedge_actors::Actor;
use tedge_actors::LoggingReceiver;
use tedge_actors::MessageReceiver;
use tedge_actors::RuntimeError;

pub struct TEdgeNetDataCollector {
pub(crate) input: LoggingReceiver<MetricPoints>,
}

impl TEdgeNetDataCollector {
pub fn builder() -> TEdgeNetDataCollectorBuilder {
TEdgeNetDataCollectorBuilder::default()
}
}

#[async_trait::async_trait]
impl Actor for TEdgeNetDataCollector {
fn name(&self) -> &str {
"NetData"
}

async fn run(mut self) -> Result<(), RuntimeError> {
let mut writer = std::io::stdout();
let mut c = Collector::new(&mut writer);
let mut charts = HashMap::new();

while let Some(points) = self.input.recv().await {
// Declare any new chart
let updated_charts: HashSet<String> =
points.iter().map(|p| p.chart_id.clone()).collect();
for chart_id in updated_charts.iter() {
if !charts.contains_key(chart_id) {
let chart = new_chart(chart_id);
c.add_chart(&chart).unwrap();
charts.insert(chart_id.to_string(), HashSet::new());
}
}

// Declare any new dimension
for p in points.iter() {
if let Some(dims) = charts.get_mut(&p.chart_id) {
let dim_id = p.dimension_id.clone();
if !dims.contains(&dim_id) {
let dim = new_dim(&dim_id);
c.add_dimension(&p.chart_id, &dim).unwrap();
dims.insert(dim_id);
}
}
}

// Publish the metrics
for p in points {
c.prepare_value(&p.chart_id, &p.dimension_id, p.value)
.unwrap();
}
for chart_id in updated_charts {
c.commit_chart(&chart_id).unwrap();
}
}

Ok(())
}
}

fn new_chart(chart_id: &str) -> Chart {
Chart {
type_id: chart_id,
name: chart_id,
title: chart_id,
units: "units",
..Default::default()
}
}

fn new_dim(dim_id: &str) -> Dimension {
Dimension {
id: dim_id,
name: dim_id,
..Default::default()
}
}
51 changes: 51 additions & 0 deletions crates/extensions/netdata_collector/src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use crate::MetricPoints;
use crate::TEdgeNetDataCollector;
use std::convert::Infallible;
use tedge_actors::futures::channel::mpsc;
use tedge_actors::Builder;
use tedge_actors::CloneSender;
use tedge_actors::DynSender;
use tedge_actors::LoggingReceiver;
use tedge_actors::MessageSink;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;

pub struct TEdgeNetDataCollectorBuilder {
input: LoggingReceiver<MetricPoints>,
input_sender: DynSender<MetricPoints>,
signal_sender: DynSender<RuntimeRequest>,
}

impl Default for TEdgeNetDataCollectorBuilder {
fn default() -> Self {
let (input_sender, input_receiver) = mpsc::channel(10);
let (signal_sender, signal_receiver) = mpsc::channel(10);
let input = LoggingReceiver::new("NetData".into(), input_receiver, signal_receiver);

TEdgeNetDataCollectorBuilder {
input,
input_sender: input_sender.into(),
signal_sender: signal_sender.into(),
}
}
}

impl MessageSink<MetricPoints> for TEdgeNetDataCollectorBuilder {
fn get_sender(&self) -> DynSender<MetricPoints> {
self.input_sender.sender_clone()
}
}

impl RuntimeRequestSink for TEdgeNetDataCollectorBuilder {
fn get_signal_sender(&self) -> DynSender<RuntimeRequest> {
self.signal_sender.sender_clone()
}
}

impl Builder<TEdgeNetDataCollector> for TEdgeNetDataCollectorBuilder {
type Error = Infallible;

fn try_build(self) -> Result<TEdgeNetDataCollector, Self::Error> {
Ok(TEdgeNetDataCollector { input: self.input })
}
}
7 changes: 7 additions & 0 deletions crates/extensions/netdata_collector/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod actor;
mod builder;
mod message;

pub use actor::*;
pub use builder::*;
pub use message::*;
Loading

0 comments on commit d30331b

Please sign in to comment.