Skip to content

Commit

Permalink
add ability to assign tags & improve group management
Browse files Browse the repository at this point in the history
  • Loading branch information
BigBoot committed Jan 6, 2024
1 parent 7fa99a8 commit d20c38f
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 34 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ edition = "2021"

[dependencies]
bollard = { version = "0.15.0" }
cute_custom_default = { version = "2.1.0" }
confique = { version = "0.2.5", default-features = false, features = ["toml"] }
cute_custom_default = { version = "2.1.0" }
derivative = { version = "2.2.0" }
futures-util = { version = "0.3.30" }
itertools = { version = "0.12.0" }
rust_socketio = { git = "https://github.com/1c3t3a/rust-socketio.git", rev = "9ccf67b", features = [
Expand Down
125 changes: 112 additions & 13 deletions src/kuma/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Event, Monitor, MonitorList, MonitorType, Tag, TagList};
use super::{Event, Monitor, MonitorList, MonitorType, Tag, TagDefinition};
use crate::config::Config;
use crate::util::ResultLogger;
use futures_util::FutureExt;
Expand Down Expand Up @@ -26,7 +26,7 @@ struct Worker {
config: Arc<Config>,
socket_io: Arc<Mutex<Option<SocketIO>>>,
event_sender: Arc<Sender>,
tags: Arc<Mutex<TagList>>,
tags: Arc<Mutex<Vec<TagDefinition>>>,
monitors: Arc<Mutex<MonitorList>>,
is_ready: Arc<Mutex<bool>>,
}
Expand Down Expand Up @@ -81,7 +81,7 @@ impl Worker {
.ok_or_else(|| ())
}

async fn get_tags(&self) -> TagList {
async fn get_tags(&self) -> Vec<TagDefinition> {
self.call("getTags", vec![], "/tags", Duration::from_secs(2))
.await
.unwrap_or_default()
Expand Down Expand Up @@ -174,7 +174,7 @@ impl Worker {
.unwrap_or_else(|_| false)
}

pub async fn add_tag(&self, tag: Tag) -> Tag {
pub async fn add_tag(&self, tag: TagDefinition) -> TagDefinition {
self.call(
"addTag",
vec![serde_json::to_value(tag.clone()).unwrap()],
Expand All @@ -201,6 +201,38 @@ impl Worker {
.unwrap_or_default();
}

pub async fn edit_monitor_tag(&self, monitor_id: i32, tag_id: i32, value: Option<String>) {
let _: bool = self
.call(
"editMonitorTag",
vec![
json!(tag_id),
json!(monitor_id),
json!(value.unwrap_or_default()),
],
"/ok",
Duration::from_secs(2),
)
.await
.unwrap_or_default();
}

pub async fn delete_monitor_tag(&self, monitor_id: i32, tag_id: i32, value: Option<String>) {
let _: bool = self
.call(
"deleteMonitorTag",
vec![
json!(tag_id),
json!(monitor_id),
json!(value.unwrap_or_default()),
],
"/ok",
Duration::from_secs(2),
)
.await
.unwrap_or_default();
}

pub async fn delete_monitor(&self, monitor_id: i32) {
let _: bool = self
.call(
Expand All @@ -214,7 +246,9 @@ impl Worker {
}

async fn resolve_group(&self, monitor: &mut Monitor) -> bool {
if let Some(group_name) = &monitor.common().group {
if let Some(group_name) = monitor.common().parent_name.clone() {
monitor.common_mut().parent_name = None;

if let Some(Some(group_id)) = self
.monitors
.lock()
Expand All @@ -227,7 +261,7 @@ impl Worker {
&& tag
.value
.as_ref()
.is_some_and(|tag_value| tag_value == group_name)
.is_some_and(|tag_value| tag_value == &group_name)
})
})
.map(|x| x.1.common().id)
Expand All @@ -236,16 +270,80 @@ impl Worker {
} else {
return false;
}
} else {
monitor.common_mut().parent = None;
}
return true;
}

async fn update_monitor_tags(&self, monitor_id: i32, tags: &Vec<Tag>) {
for tag in tags {
if let Some(tag_id) = tag.id {
self.add_monitor_tag(monitor_id, tag_id, tag.value.clone())
let new_tags = tags
.iter()
.filter_map(|tag| tag.tag_id.and_then(|id| Some((id, tag))))
.collect::<HashMap<_, _>>();

if let Some(monitor) = self.monitors.lock().await.get(&monitor_id.to_string()) {
let current_tags = monitor
.common()
.tags
.iter()
.filter_map(|tag| tag.tag_id.and_then(|id| Some((id, tag))))
.collect::<HashMap<_, _>>();

let duplicates = monitor
.common()
.tags
.iter()
.duplicates_by(|tag| tag.tag_id)
.filter_map(|tag| tag.tag_id.as_ref().map(|id| (id, tag)))
.collect::<HashMap<_, _>>();

let to_delete = current_tags
.iter()
.filter(|(id, _)| !new_tags.contains_key(*id) && !duplicates.contains_key(*id))
.collect_vec();

let to_create = new_tags
.iter()
.filter(|(id, _)| !current_tags.contains_key(*id))
.collect_vec();

let to_update = current_tags
.keys()
.filter_map(|id| match (current_tags.get(id), new_tags.get(id)) {
(Some(current), Some(new)) => Some((id, current, new)),
_ => None,
})
.collect_vec();

for (tag_id, tag) in duplicates {
self.delete_monitor_tag(monitor_id, *tag_id, tag.value.clone())
.await;
}

for (tag_id, tag) in to_delete {
self.delete_monitor_tag(monitor_id, *tag_id, tag.value.clone())
.await;
}

for (tag_id, tag) in to_create {
self.add_monitor_tag(monitor_id, *tag_id, tag.value.clone())
.await
}

for (tag_id, current, new) in to_update {
if current.value != new.value {
self.edit_monitor_tag(monitor_id, *tag_id, new.value.clone())
.await;
}
}
} else {
for tag in tags {
if let Some(tag_id) = tag.tag_id {
self.add_monitor_tag(monitor_id, tag_id, tag.value.clone())
.await;
}
}
}
}

Expand Down Expand Up @@ -288,11 +386,10 @@ impl Worker {
return monitor;
}

// TODO: Tags
let mut tags = vec![];
mem::swap(&mut tags, &mut monitor.common_mut().tags);

let _: i32 = self
let id: i32 = self
.call(
"editMonitor",
vec![serde_json::to_value(&monitor).unwrap()],
Expand All @@ -302,6 +399,8 @@ impl Worker {
.await
.unwrap_or_default();

self.update_monitor_tags(id, &tags).await;

monitor.common_mut().tags = tags;

monitor
Expand Down Expand Up @@ -408,11 +507,11 @@ impl Client {
self.worker.monitors.lock().await.clone()
}

pub async fn tags(&self) -> TagList {
pub async fn tags(&self) -> Vec<TagDefinition> {
self.worker.tags.lock().await.clone()
}

pub async fn add_tag(&self, tag: Tag) -> Tag {
pub async fn add_tag(&self, tag: TagDefinition) -> TagDefinition {
self.worker.add_tag(tag).await
}

Expand Down
66 changes: 56 additions & 10 deletions src/kuma/models.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::util::{DeserializeBoolLenient, DeserializeNumberLenient};
use crate::util::{DeserializeBoolLenient, DeserializeNumberLenient, DeserializeVecLenient};
use derivative::Derivative;
use serde::{Deserialize, Serialize};
use serde_alias::serde_alias;
use serde_inline_default::serde_inline_default;
use serde_with::{serde_as, skip_serializing_none};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use strum::EnumString;

#[derive(Debug, EnumString)]
Expand Down Expand Up @@ -126,26 +127,59 @@ pub enum HttpMethod {
#[skip_serializing_none]
#[serde_alias(SnakeCase)]
#[serde_as]
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
pub struct Tag {
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize, Hash, Eq)]
pub struct TagDefinition {
#[serde(rename = "id")]
#[serde_as(as = "Option<DeserializeNumberLenient>")]
pub id: Option<i32>,
pub tag_id: Option<i32>,

#[serde(rename = "name")]
pub name: Option<String>,

#[serde(rename = "color")]
pub color: Option<String>,
}

#[skip_serializing_none]
#[serde_alias(SnakeCase)]
#[serde_as]
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize, Hash, Eq)]
pub struct Tag {
#[serde(rename = "tag_id")]
#[serde_as(as = "Option<DeserializeNumberLenient>")]
pub tag_id: Option<i32>,

#[serde(rename = "name")]
pub name: Option<String>,

#[serde(rename = "color")]
pub color: Option<String>,

#[serde(rename = "value")]
pub value: Option<String>,
}

impl From<TagDefinition> for Tag {
fn from(value: TagDefinition) -> Self {
Tag {
name: value.name,
color: value.color,
tag_id: value.tag_id,
value: None,
}
}
}

impl From<Tag> for TagDefinition {
fn from(value: Tag) -> Self {
TagDefinition {
tag_id: value.tag_id,
name: value.name,
color: value.color,
}
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(tag = "mechanism")]
pub enum KafkaProducerSaslOptions {
Expand Down Expand Up @@ -203,7 +237,8 @@ pub enum KafkaProducerSaslOptions {
#[skip_serializing_none]
#[serde_alias(SnakeCase)]
#[serde_as]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, Derivative, Serialize, Deserialize)]
#[derivative(PartialEq)]
pub struct MonitorCommon {
#[serde(rename = "id")]
#[serde_as(as = "Option<DeserializeNumberLenient>")]
Expand Down Expand Up @@ -237,15 +272,19 @@ pub struct MonitorCommon {

#[serde(rename = "parent")]
#[serde_as(as = "Option<DeserializeNumberLenient>")]
#[serialize_always]
pub parent: Option<i32>,

#[serde(rename = "group")]
#[serde(skip_serializing)]
pub group: Option<String>,
#[serde(rename = "parent_name")]
#[derivative(PartialEq = "ignore")]
#[derivative(Hash = "ignore")]
pub parent_name: Option<String>,

#[serde(rename = "tags")]
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(default)]
#[serde_as(as = "DeserializeVecLenient<Tag>")]
#[derivative(PartialEq(compare_with = "compare_tags"))]
pub tags: Vec<Tag>,

#[serde(rename = "notificationIDList")]
Expand All @@ -256,6 +295,14 @@ pub struct MonitorCommon {
pub accepted_statuscodes: Vec<String>,
}

fn compare_tags(a: &Vec<Tag>, b: &Vec<Tag>) -> bool {
if a.len() != b.len() {
return false;
}

a.iter().collect::<HashSet<_>>() == b.iter().collect::<HashSet<_>>()
}

#[skip_serializing_none]
#[serde_alias(SnakeCase)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -899,4 +946,3 @@ impl Monitor {
}

pub type MonitorList = HashMap<String, Monitor>;
pub type TagList = Vec<Tag>;
Loading

0 comments on commit d20c38f

Please sign in to comment.