From ec80dfe4b22140866ae57cab72c1e301b666f2a0 Mon Sep 17 00:00:00 2001 From: Wilfried OLLIVIER Date: Sat, 26 Nov 2022 17:46:39 +0100 Subject: [PATCH] Start sending scaphandre data to ES --- src/exporters/elastic.rs | 59 ++++++++++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 18 deletions(-) diff --git a/src/exporters/elastic.rs b/src/exporters/elastic.rs index dacd3016..58ddcd8c 100644 --- a/src/exporters/elastic.rs +++ b/src/exporters/elastic.rs @@ -3,8 +3,9 @@ //! `ElasticExporter` implementation, exposes metrics to //! an [ElasticSearch](https://www.elastic.co/fr/elasticsearch/) server. -use crate::exporters::Exporter; +use super::get_scaphandre_version; use crate::sensors::Sensor; +use crate::{exporters::Exporter, sensors::Topology}; use clap::{Arg, ArgMatches}; use elasticsearch::{ auth::Credentials, @@ -13,7 +14,10 @@ use elasticsearch::{ }; use hyper::StatusCode; use serde::{Deserialize, Serialize}; +use std::thread; +use std::time::Duration; use url::Url; +use uuid::Uuid; /// Default url for Elastic endpoint const DEFAULT_HOST: &str = "localhost"; @@ -24,9 +28,7 @@ const DEFAULT_SCHEME: &str = "http"; /// Exporter that pushes metrics to an ElasticSearch endpoint pub struct ElasticExporter { - /// Sensor instance that is used to generate the Topology and - /// thus get power consumption metrics. - _sensor: Box, + topology: Topology, } impl Exporter for ElasticExporter { @@ -118,30 +120,51 @@ const ES_INDEX_NAME: &str = "scaphandre"; #[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct ScaphandreData { - pub wip: i32, + pub scaphandre_version: String, } impl ElasticExporter { /// Instantiates and returns a new ElasticExporter // TODO: make sensor mutable - pub fn new(sensor: Box) -> ElasticExporter { - ElasticExporter { _sensor: sensor } + pub fn new(mut sensor: Box) -> ElasticExporter { + ElasticExporter { + topology: sensor + .get_topology() + .expect("Could'nt generate the Topology."), + } } #[tokio::main] - pub async fn runner(&self, client: Elasticsearch) -> Result<(), Error> { + pub async fn runner(&mut self, client: Elasticsearch) -> Result<(), Error> { self.ensure_index(&client).await?; - // WIP - let create_test_resp = client - .create(CreateParts::IndexId(ES_INDEX_NAME, "42")) - .body(ScaphandreData { wip: 42 }) - .send() - .await?; - - println!("create test resp {}", create_test_resp.status_code()); - - Ok(()) + loop { + self.topology.refresh(); + + match client + .create(CreateParts::IndexId( + ES_INDEX_NAME, + // Looks like rust ES Library do not support autogenerated ids + // for both insert and bulk insert + // https://github.com/elastic/elasticsearch-rs/issues/174 + // forced to create an ID on our side + &Uuid::new_v4().to_string(), + )) + .body(ScaphandreData { + scaphandre_version: get_scaphandre_version(), + }) + .send() + .await + { + Ok(resp) => println!("create test resp {}", resp.status_code()), + Err(e) => println!("Error: {}", e), + } + + println!("loop tick"); + + // TODO @papey: add custom duration + thread::sleep(Duration::new(2, 0)); + } } async fn ensure_index(&self, client: &Elasticsearch) -> Result<(), Error> {