From d20c38fec905ba90611764d7cd39aead2435291e Mon Sep 17 00:00:00 2001 From: Marco Kirchner Date: Sat, 6 Jan 2024 17:49:40 +0100 Subject: [PATCH] add ability to assign tags & improve group management --- Cargo.lock | 12 +++++ Cargo.toml | 3 +- src/kuma/client.rs | 125 ++++++++++++++++++++++++++++++++++++++++----- src/kuma/models.rs | 66 ++++++++++++++++++++---- src/sync.rs | 69 ++++++++++++++++++++++--- src/util.rs | 53 +++++++++++++++++-- 6 files changed, 294 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b7a6421..a86ef0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -84,6 +84,7 @@ dependencies = [ "bollard", "confique", "cute_custom_default", + "derivative", "futures-util", "itertools", "rust_socketio", @@ -362,6 +363,17 @@ dependencies = [ "serde", ] +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "digest" version = "0.10.7" diff --git a/Cargo.toml b/Cargo.toml index 17072b2..b6f4672 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,9 @@ edition = "2021" [dependencies] bollard = { version = "0.15.0" } -cute_custom_default = { version = "2.1.0" } confique = { version = "0.2.5", default-features = false, features = ["toml"] } +cute_custom_default = { version = "2.1.0" } +derivative = { version = "2.2.0" } futures-util = { version = "0.3.30" } itertools = { version = "0.12.0" } rust_socketio = { git = "https://github.com/1c3t3a/rust-socketio.git", rev = "9ccf67b", features = [ diff --git a/src/kuma/client.rs b/src/kuma/client.rs index 57cf3b7..4a96aab 100644 --- a/src/kuma/client.rs +++ b/src/kuma/client.rs @@ -1,4 +1,4 @@ -use super::{Event, Monitor, MonitorList, MonitorType, Tag, TagList}; +use super::{Event, Monitor, MonitorList, MonitorType, Tag, TagDefinition}; use crate::config::Config; use crate::util::ResultLogger; use futures_util::FutureExt; @@ -26,7 +26,7 @@ struct Worker { config: Arc, socket_io: Arc>>, event_sender: Arc, - tags: Arc>, + tags: Arc>>, monitors: Arc>, is_ready: Arc>, } @@ -81,7 +81,7 @@ impl Worker { .ok_or_else(|| ()) } - async fn get_tags(&self) -> TagList { + async fn get_tags(&self) -> Vec { self.call("getTags", vec![], "/tags", Duration::from_secs(2)) .await .unwrap_or_default() @@ -174,7 +174,7 @@ impl Worker { .unwrap_or_else(|_| false) } - pub async fn add_tag(&self, tag: Tag) -> Tag { + pub async fn add_tag(&self, tag: TagDefinition) -> TagDefinition { self.call( "addTag", vec![serde_json::to_value(tag.clone()).unwrap()], @@ -201,6 +201,38 @@ impl Worker { .unwrap_or_default(); } + pub async fn edit_monitor_tag(&self, monitor_id: i32, tag_id: i32, value: Option) { + let _: bool = self + .call( + "editMonitorTag", + vec![ + json!(tag_id), + json!(monitor_id), + json!(value.unwrap_or_default()), + ], + "/ok", + Duration::from_secs(2), + ) + .await + .unwrap_or_default(); + } + + pub async fn delete_monitor_tag(&self, monitor_id: i32, tag_id: i32, value: Option) { + let _: bool = self + .call( + "deleteMonitorTag", + vec![ + json!(tag_id), + json!(monitor_id), + json!(value.unwrap_or_default()), + ], + "/ok", + Duration::from_secs(2), + ) + .await + .unwrap_or_default(); + } + pub async fn delete_monitor(&self, monitor_id: i32) { let _: bool = self .call( @@ -214,7 +246,9 @@ impl Worker { } async fn resolve_group(&self, monitor: &mut Monitor) -> bool { - if let Some(group_name) = &monitor.common().group { + if let Some(group_name) = monitor.common().parent_name.clone() { + monitor.common_mut().parent_name = None; + if let Some(Some(group_id)) = self .monitors .lock() @@ -227,7 +261,7 @@ impl Worker { && tag .value .as_ref() - .is_some_and(|tag_value| tag_value == group_name) + .is_some_and(|tag_value| tag_value == &group_name) }) }) .map(|x| x.1.common().id) @@ -236,16 +270,80 @@ impl Worker { } else { return false; } + } else { + monitor.common_mut().parent = None; } return true; } async fn update_monitor_tags(&self, monitor_id: i32, tags: &Vec) { - for tag in tags { - if let Some(tag_id) = tag.id { - self.add_monitor_tag(monitor_id, tag_id, tag.value.clone()) + let new_tags = tags + .iter() + .filter_map(|tag| tag.tag_id.and_then(|id| Some((id, tag)))) + .collect::>(); + + if let Some(monitor) = self.monitors.lock().await.get(&monitor_id.to_string()) { + let current_tags = monitor + .common() + .tags + .iter() + .filter_map(|tag| tag.tag_id.and_then(|id| Some((id, tag)))) + .collect::>(); + + let duplicates = monitor + .common() + .tags + .iter() + .duplicates_by(|tag| tag.tag_id) + .filter_map(|tag| tag.tag_id.as_ref().map(|id| (id, tag))) + .collect::>(); + + let to_delete = current_tags + .iter() + .filter(|(id, _)| !new_tags.contains_key(*id) && !duplicates.contains_key(*id)) + .collect_vec(); + + let to_create = new_tags + .iter() + .filter(|(id, _)| !current_tags.contains_key(*id)) + .collect_vec(); + + let to_update = current_tags + .keys() + .filter_map(|id| match (current_tags.get(id), new_tags.get(id)) { + (Some(current), Some(new)) => Some((id, current, new)), + _ => None, + }) + .collect_vec(); + + for (tag_id, tag) in duplicates { + self.delete_monitor_tag(monitor_id, *tag_id, tag.value.clone()) .await; } + + for (tag_id, tag) in to_delete { + self.delete_monitor_tag(monitor_id, *tag_id, tag.value.clone()) + .await; + } + + for (tag_id, tag) in to_create { + self.add_monitor_tag(monitor_id, *tag_id, tag.value.clone()) + .await + } + + for (tag_id, current, new) in to_update { + if current.value != new.value { + self.edit_monitor_tag(monitor_id, *tag_id, new.value.clone()) + .await; + } + } + } else { + for tag in tags { + if let Some(tag_id) = tag.tag_id { + self.add_monitor_tag(monitor_id, tag_id, tag.value.clone()) + .await; + } + } } } @@ -288,11 +386,10 @@ impl Worker { return monitor; } - // TODO: Tags let mut tags = vec![]; mem::swap(&mut tags, &mut monitor.common_mut().tags); - let _: i32 = self + let id: i32 = self .call( "editMonitor", vec![serde_json::to_value(&monitor).unwrap()], @@ -302,6 +399,8 @@ impl Worker { .await .unwrap_or_default(); + self.update_monitor_tags(id, &tags).await; + monitor.common_mut().tags = tags; monitor @@ -408,11 +507,11 @@ impl Client { self.worker.monitors.lock().await.clone() } - pub async fn tags(&self) -> TagList { + pub async fn tags(&self) -> Vec { self.worker.tags.lock().await.clone() } - pub async fn add_tag(&self, tag: Tag) -> Tag { + pub async fn add_tag(&self, tag: TagDefinition) -> TagDefinition { self.worker.add_tag(tag).await } diff --git a/src/kuma/models.rs b/src/kuma/models.rs index 6896db1..36bed6f 100644 --- a/src/kuma/models.rs +++ b/src/kuma/models.rs @@ -1,9 +1,10 @@ -use crate::util::{DeserializeBoolLenient, DeserializeNumberLenient}; +use crate::util::{DeserializeBoolLenient, DeserializeNumberLenient, DeserializeVecLenient}; +use derivative::Derivative; use serde::{Deserialize, Serialize}; use serde_alias::serde_alias; use serde_inline_default::serde_inline_default; use serde_with::{serde_as, skip_serializing_none}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use strum::EnumString; #[derive(Debug, EnumString)] @@ -126,26 +127,59 @@ pub enum HttpMethod { #[skip_serializing_none] #[serde_alias(SnakeCase)] #[serde_as] -#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] -pub struct Tag { +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize, Hash, Eq)] +pub struct TagDefinition { #[serde(rename = "id")] #[serde_as(as = "Option")] - pub id: Option, + pub tag_id: Option, #[serde(rename = "name")] pub name: Option, #[serde(rename = "color")] pub color: Option, +} +#[skip_serializing_none] +#[serde_alias(SnakeCase)] +#[serde_as] +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize, Hash, Eq)] +pub struct Tag { #[serde(rename = "tag_id")] #[serde_as(as = "Option")] pub tag_id: Option, + #[serde(rename = "name")] + pub name: Option, + + #[serde(rename = "color")] + pub color: Option, + #[serde(rename = "value")] pub value: Option, } +impl From for Tag { + fn from(value: TagDefinition) -> Self { + Tag { + name: value.name, + color: value.color, + tag_id: value.tag_id, + value: None, + } + } +} + +impl From for TagDefinition { + fn from(value: Tag) -> Self { + TagDefinition { + tag_id: value.tag_id, + name: value.name, + color: value.color, + } + } +} + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(tag = "mechanism")] pub enum KafkaProducerSaslOptions { @@ -203,7 +237,8 @@ pub enum KafkaProducerSaslOptions { #[skip_serializing_none] #[serde_alias(SnakeCase)] #[serde_as] -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, Derivative, Serialize, Deserialize)] +#[derivative(PartialEq)] pub struct MonitorCommon { #[serde(rename = "id")] #[serde_as(as = "Option")] @@ -237,15 +272,19 @@ pub struct MonitorCommon { #[serde(rename = "parent")] #[serde_as(as = "Option")] + #[serialize_always] pub parent: Option, - #[serde(rename = "group")] - #[serde(skip_serializing)] - pub group: Option, + #[serde(rename = "parent_name")] + #[derivative(PartialEq = "ignore")] + #[derivative(Hash = "ignore")] + pub parent_name: Option, #[serde(rename = "tags")] #[serde(skip_serializing_if = "Vec::is_empty")] #[serde(default)] + #[serde_as(as = "DeserializeVecLenient")] + #[derivative(PartialEq(compare_with = "compare_tags"))] pub tags: Vec, #[serde(rename = "notificationIDList")] @@ -256,6 +295,14 @@ pub struct MonitorCommon { pub accepted_statuscodes: Vec, } +fn compare_tags(a: &Vec, b: &Vec) -> bool { + if a.len() != b.len() { + return false; + } + + a.iter().collect::>() == b.iter().collect::>() +} + #[skip_serializing_none] #[serde_alias(SnakeCase)] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -899,4 +946,3 @@ impl Monitor { } pub type MonitorList = HashMap; -pub type TagList = Vec; diff --git a/src/sync.rs b/src/sync.rs index d90dad8..78776ea 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,6 +1,6 @@ use crate::{ config::Config, - kuma::{Client, Monitor, Tag}, + kuma::{Client, Monitor, MonitorType, Tag, TagDefinition}, util::{group_by_prefix, ResultLogger}, }; use bollard::{ @@ -115,7 +115,7 @@ impl Sync { .collect::>() } - async fn get_autokuma_tag(&self, kuma: &Client) -> Tag { + async fn get_autokuma_tag(&self, kuma: &Client) -> TagDefinition { match kuma .tags() .await @@ -124,7 +124,7 @@ impl Sync { { Some(tag) => tag, None => { - kuma.add_tag(Tag { + kuma.add_tag(TagDefinition { name: Some(self.config.kuma.tag_name.clone()), color: Some(self.config.kuma.tag_color.clone()), ..Default::default() @@ -153,6 +153,44 @@ impl Sync { .collect::>() } + fn merge_monitors( + &self, + current: &Monitor, + new: &Monitor, + addition_tags: Option>, + ) -> Monitor { + let mut new = new.clone(); + + let current_tags = current + .common() + .tags + .iter() + .filter_map(|tag| tag.tag_id.as_ref().map(|id| (*id, tag))) + .collect::>(); + + let merged_tags: Vec = new + .common_mut() + .tags + .drain(..) + .chain(addition_tags.unwrap_or_default()) + .map(|new_tag| { + new_tag + .tag_id + .as_ref() + .and_then(|id| { + current_tags.get(id).and_then(|current_tag| { + serde_merge::omerge(current_tag, &new_tag).unwrap() + }) + }) + .unwrap_or_else(|| new_tag) + }) + .collect_vec(); + + new.common_mut().tags = merged_tags; + + serde_merge::omerge(current, new).unwrap() + } + pub async fn run(&self) { let docker = Docker::connect_with_socket(&self.config.docker.socket_path, 120, API_DEFAULT_VERSION) @@ -164,6 +202,17 @@ impl Sync { let containers = self.get_kuma_containers(&docker).await; let new_monitors = self.get_monitors_from_containers(&containers); let current_monitors = self.get_managed_monitors(&kuma).await; + let groups = current_monitors + .iter() + .filter(|(_, monitor)| monitor.monitor_type() == MonitorType::Group) + .filter_map(|(id, monitor)| { + monitor + .common() + .id + .as_ref() + .map(|parent_id| (parent_id, id)) + }) + .collect::>(); let to_delete = current_monitors .iter() @@ -189,7 +238,7 @@ impl Sync { println!("Creating new monitor: {}", id); let mut monitor = monitor.clone(); - let mut tag = autokuma_tag.clone(); + let mut tag = Tag::from(autokuma_tag.clone()); tag.value = Some(id.clone()); monitor.common_mut().tags.push(tag); @@ -198,9 +247,17 @@ impl Sync { } for (id, current, new) in to_update { - let merge: Monitor = serde_merge::omerge(current, new).unwrap(); + let mut tag = Tag::from(autokuma_tag.clone()); + tag.value = Some(id.clone()); + + let merge: Monitor = self.merge_monitors(current, new, Some(vec![tag])); - if current != &merge { + if current != &merge + || merge.common().parent_name.is_some() != current.common().parent.is_some() + || merge.common().parent_name.as_ref().is_some_and(|name| { + Some(name) != current.common().parent.map(|id| groups[&id]) + }) + { println!("Updating monitor: {}", id); kuma.edit_monitor(merge).await; } diff --git a/src/util.rs b/src/util.rs index c44d4c7..431c407 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,6 +1,9 @@ -use std::{collections::BTreeMap, str::FromStr}; +use std::{collections::BTreeMap, marker::PhantomData, str::FromStr}; -use serde::{Deserialize, Serialize}; +use serde::{ + de::{DeserializeOwned, IntoDeserializer}, + Deserialize, Deserializer, Serialize, +}; use serde_json::Value; use serde_with::{DeserializeAs, SerializeAs}; @@ -30,7 +33,9 @@ where std::any::type_name::() )) }), - _ => Err(serde::de::Error::custom("Unexpected type for x")), + _ => Err(serde::de::Error::custom( + "Unexpected type for deserialization", + )), }; result @@ -66,7 +71,9 @@ impl<'de> DeserializeAs<'de, bool> for DeserializeBoolLenient { std::any::type_name::() )) }), - _ => Err(serde::de::Error::custom("Unexpected type for x")), + _ => Err(serde::de::Error::custom( + "Unexpected type for deserialization", + )), }; result @@ -85,6 +92,44 @@ where } } +pub struct DeserializeVecLenient(PhantomData); + +impl<'de, T> DeserializeAs<'de, Vec> for DeserializeVecLenient +where + T: DeserializeOwned, +{ + fn deserialize_as(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let value = Value::deserialize(deserializer) + .map_err(serde::de::Error::custom)? + .clone(); + + return match value { + Value::Array(_) => { + Vec::::deserialize(value.into_deserializer()).map_err(serde::de::Error::custom) + } + Value::String(s) => serde_json::from_str(&s).map_err(serde::de::Error::custom), + _ => Err(serde::de::Error::custom( + "Unexpected type for deserialization", + )), + }; + } +} + +impl SerializeAs> for DeserializeVecLenient +where + T: Serialize, +{ + fn serialize_as(source: &Vec, serializer: S) -> Result + where + S: serde::Serializer, + { + source.serialize(serializer) + } +} + pub fn group_by_prefix(v: I, delimiter: &str) -> BTreeMap> where A: AsRef,