Skip to content

Commit

Permalink
feat: implemented event source validation in daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan committed Oct 21, 2024
1 parent 8fba278 commit 2ef16ee
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 42 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ csv = "1.3.0"
handlebars = "6.1.0"
slack-hook = "0.8.0"
include_dir = "0.7.4"
regex = "1.11.0"

[dev-dependencies]
mockall = "0.12.1"
Expand Down
114 changes: 72 additions & 42 deletions src/drivers/monitor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use anyhow::Result;
use rdkafka::{
consumer::{CommitMode, Consumer, StreamConsumer},
ClientConfig,
ClientConfig, Message,
};
use regex::Regex;
use std::{borrow::Borrow, collections::HashMap, sync::Arc};
use tracing::{error, info, warn};

Expand All @@ -21,58 +22,87 @@ pub async fn subscribe(config: MonitorConfig) -> Result<()> {
let consumer: StreamConsumer = client_config.create()?;
consumer.subscribe(&[&config.topic])?;

let source_regex = Regex::new(r"fabric-.+")?;

info!("Monitor subscribe running");
loop {
match consumer.recv().await {
Err(error) => error!(?error, "kafka subscribe error"),
Ok(message) => match message.borrow().try_into() {
Ok(event) => {
let event_appliclation = {
match &event {
Event::ProjectCreated(evt) => {
project::cluster::apply_manifest(cluster.clone(), evt.clone()).await
}
Event::ProjectDeleted(evt) => {
project::cluster::delete_manifest(cluster.clone(), evt.clone())
.await
}
Event::ResourceCreated(evt) => {
resource::cluster::apply_manifest(cluster.clone(), evt.clone())
.await
}
Event::ResourceUpdated(evt) => {
resource::cluster::patch_manifest(cluster.clone(), evt.clone())
.await
Ok(message) => {
let message = message.borrow();

let payload: serde_json::Value =
serde_json::from_slice(message.payload().unwrap_or_default())?;
match payload.get("annotation") {
Some(annotation) => match annotation.get("source") {
Some(v) => {
let source = v.to_string();
if !source_regex.is_match(&source) {
info!(?source, "bypass event. Event source not allowed");
continue;
}
Event::ResourceDeleted(evt) => {
resource::cluster::delete_manifest(cluster.clone(), evt.clone())
.await
}
None => {
info!("bypass event. Event doesnt have a source");
continue;
}
},
None => {
info!("bypass event. Event doesnt have a source");
continue;
}
};

match message.try_into() {
Ok(event) => {
let event_appliclation = {
match &event {
Event::ProjectCreated(evt) => {
project::cluster::apply_manifest(cluster.clone(), evt.clone())
.await
}
Event::ProjectDeleted(evt) => {
project::cluster::delete_manifest(cluster.clone(), evt.clone())
.await
}
Event::ResourceCreated(evt) => {
resource::cluster::apply_manifest(cluster.clone(), evt.clone())
.await
}
Event::ResourceUpdated(evt) => {
resource::cluster::patch_manifest(cluster.clone(), evt.clone())
.await
}
Event::ResourceDeleted(evt) => {
resource::cluster::delete_manifest(cluster.clone(), evt.clone())
.await
}
_ => {
info!(event = event.key(), "bypass event");
Ok(())
}
}
_ => {
info!(event = event.key(), "bypass event");
Ok(())
};

match event_appliclation {
Ok(()) => {
info!(event = event.key(), "Successfully handled event")
}
Err(err) => warn!(
event = event.key(),
error = err.to_string(),
"Error running event."
),
}
};

match event_appliclation {
Ok(()) => {
info!(event = event.key(), "Successfully handled event")
}
Err(err) => warn!(
event = event.key(),
error = err.to_string(),
"Error running event."
),
consumer.commit_message(&message, CommitMode::Async)?;
}
Err(error) => {
error!(?error, "fail to convert message to event");
consumer.commit_message(&message, CommitMode::Async)?;
}

consumer.commit_message(&message, CommitMode::Async)?;
}
Err(error) => {
error!(?error, "fail to convert message to event");
consumer.commit_message(&message, CommitMode::Async)?;
}
},
}
};
}
}
Expand Down

0 comments on commit 2ef16ee

Please sign in to comment.