diff --git a/Cargo.lock b/Cargo.lock index 40eb790..749b4c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1230,6 +1230,7 @@ dependencies = [ "protoc-wkt", "rand 0.8.5", "rdkafka", + "regex", "reqwest 0.12.8", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 9b7b81c..953aaf1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ csv = "1.3.0" handlebars = "6.1.0" slack-hook = "0.8.0" include_dir = "0.7.4" +regex = "1.11.0" [dev-dependencies] mockall = "0.12.1" diff --git a/src/driven/kafka/mod.rs b/src/driven/kafka/mod.rs index 3a8855e..560f21c 100644 --- a/src/driven/kafka/mod.rs +++ b/src/driven/kafka/mod.rs @@ -11,6 +11,9 @@ use crate::domain::{ Result, }; +const NAME: &str = env!("CARGO_PKG_NAME"); +const VERSION: &str = env!("CARGO_PKG_VERSION"); + pub struct KafkaProducer { producer: FutureProducer, topic: String, @@ -34,7 +37,14 @@ impl KafkaProducer { #[async_trait::async_trait] impl EventDrivenBridge for KafkaProducer { async fn dispatch(&self, event: Event) -> Result<()> { - let data = serde_json::to_vec(&event)?; + let annotation = HashMap::from([("source", format!("{NAME}-{VERSION}"))]); + + let Some(mut payload) = serde_json::to_value(&event)?.as_object().cloned() else { + return Err(Error::Unexpected("invalid event structure".into())); + }; + payload.insert("annotation".into(), serde_json::to_value(annotation)?); + + let data = serde_json::to_vec(&payload)?; let key = event.key(); self.producer diff --git a/src/drivers/monitor/mod.rs b/src/drivers/monitor/mod.rs index 665fb60..709c464 100644 --- a/src/drivers/monitor/mod.rs +++ b/src/drivers/monitor/mod.rs @@ -1,8 +1,9 @@ use anyhow::Result; use rdkafka::{ consumer::{CommitMode, Consumer, StreamConsumer}, - ClientConfig, + ClientConfig, Message, }; +use regex::Regex; use std::{borrow::Borrow, collections::HashMap, sync::Arc}; use tracing::{error, info, warn}; @@ -21,58 +22,87 @@ pub async fn subscribe(config: MonitorConfig) -> Result<()> { let consumer: StreamConsumer = client_config.create()?; consumer.subscribe(&[&config.topic])?; + let source_regex = Regex::new(r"fabric-.+")?; + info!("Monitor subscribe running"); loop { match consumer.recv().await { Err(error) => error!(?error, "kafka subscribe error"), - Ok(message) => match message.borrow().try_into() { - Ok(event) => { - let event_appliclation = { - match &event { - Event::ProjectCreated(evt) => { - project::cluster::apply_manifest(cluster.clone(), evt.clone()).await - } - Event::ProjectDeleted(evt) => { - project::cluster::delete_manifest(cluster.clone(), evt.clone()) - .await - } - Event::ResourceCreated(evt) => { - resource::cluster::apply_manifest(cluster.clone(), evt.clone()) - .await - } - Event::ResourceUpdated(evt) => { - resource::cluster::patch_manifest(cluster.clone(), evt.clone()) - .await + Ok(message) => { + let message = message.borrow(); + + let payload: serde_json::Value = + serde_json::from_slice(message.payload().unwrap_or_default())?; + match payload.get("annotation") { + Some(annotation) => match annotation.get("source") { + Some(v) => { + let source = v.to_string(); + if !source_regex.is_match(&source) { + info!(?source, "bypass event. Event source not allowed"); + continue; } - Event::ResourceDeleted(evt) => { - resource::cluster::delete_manifest(cluster.clone(), evt.clone()) - .await + } + None => { + info!("bypass event. Event doesnt have a source"); + continue; + } + }, + None => { + info!("bypass event. Event doesnt have a source"); + continue; + } + }; + + match message.try_into() { + Ok(event) => { + let event_appliclation = { + match &event { + Event::ProjectCreated(evt) => { + project::cluster::apply_manifest(cluster.clone(), evt.clone()) + .await + } + Event::ProjectDeleted(evt) => { + project::cluster::delete_manifest(cluster.clone(), evt.clone()) + .await + } + Event::ResourceCreated(evt) => { + resource::cluster::apply_manifest(cluster.clone(), evt.clone()) + .await + } + Event::ResourceUpdated(evt) => { + resource::cluster::patch_manifest(cluster.clone(), evt.clone()) + .await + } + Event::ResourceDeleted(evt) => { + resource::cluster::delete_manifest(cluster.clone(), evt.clone()) + .await + } + _ => { + info!(event = event.key(), "bypass event"); + Ok(()) + } } - _ => { - info!(event = event.key(), "bypass event"); - Ok(()) + }; + + match event_appliclation { + Ok(()) => { + info!(event = event.key(), "Successfully handled event") } + Err(err) => warn!( + event = event.key(), + error = err.to_string(), + "Error running event." + ), } - }; - match event_appliclation { - Ok(()) => { - info!(event = event.key(), "Successfully handled event") - } - Err(err) => warn!( - event = event.key(), - error = err.to_string(), - "Error running event." - ), + consumer.commit_message(message, CommitMode::Async)?; + } + Err(error) => { + error!(?error, "fail to convert message to event"); + consumer.commit_message(message, CommitMode::Async)?; } - - consumer.commit_message(&message, CommitMode::Async)?; - } - Err(error) => { - error!(?error, "fail to convert message to event"); - consumer.commit_message(&message, CommitMode::Async)?; } - }, + } }; } }