diff --git a/.github/iac/main.tf b/.github/iac/main.tf index c57918f..cfbe10c 100644 --- a/.github/iac/main.tf +++ b/.github/iac/main.tf @@ -58,7 +58,7 @@ module "fabric_rpc" { namespace = local.namespace image = var.rpc_image broker_urls = local.broker_urls - consumer_name = "rpc-ahid02" + consumer_name = "rpc-ahid03" kafka_username = local.kafka_rpc_username kafka_password = local.kafka_rpc_password kafka_topic = local.kafka_topic diff --git a/.sqlx/query-a8589c22d0d0d06df19bbe4629639652645b0b631069bc6e44301e2d8d24cddf.json b/.sqlx/query-a8589c22d0d0d06df19bbe4629639652645b0b631069bc6e44301e2d8d24cddf.json deleted file mode 100644 index c286edc..0000000 --- a/.sqlx/query-a8589c22d0d0d06df19bbe4629639652645b0b631069bc6e44301e2d8d24cddf.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n INSERT INTO resource (\n id,\n project_id,\n kind,\n spec,\n status,\n created_at,\n updated_at\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7)\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 7 - }, - "nullable": [] - }, - "hash": "a8589c22d0d0d06df19bbe4629639652645b0b631069bc6e44301e2d8d24cddf" -} diff --git a/.sqlx/query-fb9ea27b47dbeafed5f943106b26e9cfe737404cd1169c15f5983092cf98aaa5.json b/.sqlx/query-fb9ea27b47dbeafed5f943106b26e9cfe737404cd1169c15f5983092cf98aaa5.json new file mode 100644 index 0000000..f4b6359 --- /dev/null +++ b/.sqlx/query-fb9ea27b47dbeafed5f943106b26e9cfe737404cd1169c15f5983092cf98aaa5.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO resource (\n id,\n project_id,\n name,\n kind,\n spec,\n status,\n created_at,\n updated_at\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 8 + }, + "nullable": [] + }, + "hash": "fb9ea27b47dbeafed5f943106b26e9cfe737404cd1169c15f5983092cf98aaa5" +} diff --git a/Cargo.lock b/Cargo.lock index a92b97c..b750af8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -965,7 +965,7 @@ dependencies = [ [[package]] name = "dmtri" version = "0.1.0" -source = "git+https://github.com/demeter-run/specs.git#12727b71fe3af229ae79d584471b0b4083fbf108" +source = "git+https://github.com/demeter-run/specs.git#70f4c78447d28f771456cdfeca968c65165a6dd8" dependencies = [ "bytes", "pbjson", @@ -2818,9 +2818,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62871f2d65009c0256aed1b9cfeeb8ac272833c404e13d53d400cd0dad7a2ac0" +checksum = "355ae415ccd3a04315d3f8246e86d67689ea74d88d915576e1589a351062a13b" dependencies = [ "bitflags 2.6.0", ] @@ -3986,7 +3986,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow 0.6.19", + "winnow 0.6.20", ] [[package]] @@ -4631,9 +4631,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.6.19" +version = "0.6.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c52ac009d615e79296318c1bcce2d422aaca15ad08515e344feeda07df67a587" +checksum = "36c1fec1a2bb5866f07c25f68c26e565c4c200aebb96d7e55710c19d3e8ac49b" dependencies = [ "memchr", ] diff --git a/bootstrap/daemon/daemon.toml.tftpl b/bootstrap/daemon/daemon.toml.tftpl index c8968af..5795cc2 100644 --- a/bootstrap/daemon/daemon.toml.tftpl +++ b/bootstrap/daemon/daemon.toml.tftpl @@ -1,9 +1,12 @@ topic = "${topic}" cluster_id = "${cluster_id}" -prometheus_url = "${prometheus_url}" delay_sec = ${prometheus_delay_sec} +[prometheus] +url = "${prometheus_url}" +query_step = "${prometheus_query_step}" + [kafka] "bootstrap.servers" = "${broker_urls}" "group.id"= "${consumer_name}" diff --git a/bootstrap/daemon/main.tf b/bootstrap/daemon/main.tf index f22eea6..2eaf6f8 100644 --- a/bootstrap/daemon/main.tf +++ b/bootstrap/daemon/main.tf @@ -54,6 +54,12 @@ variable "prometheus_delay_sec" { default = 60 } +variable "prometheus_query_step" { + type = string + description = "Usage Query Step" + default = "10m" +} + variable "tolerations" { type = list(object({ effect = string diff --git a/src/bin/daemon.rs b/src/bin/daemon.rs index fbe60c8..ce2b12d 100644 --- a/src/bin/daemon.rs +++ b/src/bin/daemon.rs @@ -32,10 +32,16 @@ async fn main() -> Result<()> { Ok(()) } +#[derive(Debug, Deserialize, Clone)] +struct Prometheus { + url: String, + query_step: String, +} + #[derive(Debug, Deserialize, Clone)] struct Config { cluster_id: String, - prometheus_url: String, + prometheus: Prometheus, #[serde(deserialize_with = "deserialize_duration")] #[serde(rename(deserialize = "delay_sec"))] delay: Duration, @@ -70,7 +76,8 @@ impl From for UsageConfig { fn from(value: Config) -> Self { Self { cluster_id: value.cluster_id, - prometheus_url: value.prometheus_url, + prometheus_url: value.prometheus.url, + prometheus_query_step: value.prometheus.query_step, delay: value.delay, kafka: value.kafka, topic: value.topic, diff --git a/src/domain/event/mod.rs b/src/domain/event/mod.rs index 79c91e2..c21c7e7 100644 --- a/src/domain/event/mod.rs +++ b/src/domain/event/mod.rs @@ -99,6 +99,7 @@ pub struct ResourceCreated { pub id: String, pub project_id: String, pub project_namespace: String, + pub name: String, pub kind: String, pub spec: String, pub status: String, @@ -112,6 +113,7 @@ pub struct ResourceUpdated { pub id: String, pub project_id: String, pub project_namespace: String, + pub name: String, pub kind: String, pub spec_patch: String, pub updated_at: DateTime, @@ -121,17 +123,19 @@ into_event!(ResourceUpdated); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ResourceDeleted { pub id: String, - pub kind: String, - pub status: String, pub project_id: String, pub project_namespace: String, + pub name: String, + pub kind: String, + pub status: String, pub deleted_at: DateTime, } into_event!(ResourceDeleted); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct UsageUnitCreated { - pub resource_id: String, + pub project_namespace: String, + pub resource_name: String, pub tier: String, pub units: i64, pub interval: u64, @@ -219,6 +223,7 @@ mod tests { project::{ProjectStatus, ProjectUserRole}, resource::ResourceStatus, tests::{PHC, SECRET}, + utils::get_random_salt, }; use super::*; @@ -280,7 +285,8 @@ mod tests { Self { id: Uuid::new_v4().to_string(), project_id: Uuid::new_v4().to_string(), - project_namespace: "prj-test".into(), + project_namespace: "test".into(), + name: format!("cardanonode-{}", get_random_salt()), kind: "CardanoNodePort".into(), spec: "{\"version\":\"stable\",\"network\":\"mainnet\",\"throughputTier\":\"1\"}" .into(), @@ -294,10 +300,11 @@ mod tests { fn default() -> Self { Self { id: Uuid::new_v4().to_string(), + project_id: Uuid::new_v4().to_string(), + project_namespace: "test".into(), + name: format!("cardanonode-{}", get_random_salt()), kind: "CardanoNodePort".into(), status: ResourceStatus::Deleted.to_string(), - project_id: Uuid::new_v4().to_string(), - project_namespace: "prj-test".into(), deleted_at: Utc::now(), } } @@ -308,7 +315,8 @@ mod tests { id: Uuid::new_v4().to_string(), cluster_id: Uuid::new_v4().to_string(), usages: vec![UsageUnitCreated { - resource_id: Uuid::new_v4().to_string(), + project_namespace: "test".into(), + resource_name: format!("cardanonode-{}", get_random_salt()), units: 120, tier: "0".into(), interval: 10, diff --git a/src/domain/project/cluster.rs b/src/domain/project/cluster.rs index 9db951d..edcd0ef 100644 --- a/src/domain/project/cluster.rs +++ b/src/domain/project/cluster.rs @@ -6,6 +6,7 @@ use tracing::info; use crate::domain::{ event::{ProjectCreated, ProjectDeleted}, + utils::cluster_namespace, Result, }; @@ -22,7 +23,7 @@ pub async fn apply_manifest( ) -> Result<()> { let namespace = Namespace { metadata: ObjectMeta { - name: Some(evt.namespace), + name: Some(cluster_namespace(&evt.namespace)), ..Default::default() }, ..Default::default() @@ -41,7 +42,7 @@ pub async fn delete_manifest( ) -> Result<()> { let namespace = Namespace { metadata: ObjectMeta { - name: Some(evt.namespace), + name: Some(cluster_namespace(&evt.namespace)), ..Default::default() }, ..Default::default() diff --git a/src/domain/project/command.rs b/src/domain/project/command.rs index c451116..fe4ea12 100644 --- a/src/domain/project/command.rs +++ b/src/domain/project/command.rs @@ -498,7 +498,7 @@ pub struct CreateCmd { impl CreateCmd { pub fn new(credential: Credential, name: String) -> Self { let id = Uuid::new_v4().to_string(); - let namespace = format!("prj-{}", utils::get_random_name()); + let namespace = utils::get_random_name(); Self { credential, diff --git a/src/domain/resource/cache.rs b/src/domain/resource/cache.rs index a3d1a08..12d9bb2 100644 --- a/src/domain/resource/cache.rs +++ b/src/domain/resource/cache.rs @@ -14,6 +14,9 @@ use chrono::{DateTime, Utc}; pub trait ResourceDrivenCache: Send + Sync { async fn find(&self, project_id: &str, page: &u32, page_size: &u32) -> Result>; async fn find_by_id(&self, id: &str) -> Result>; + async fn find_by_name(&self, project_id: &str, name: &str) -> Result>; + async fn find_by_name_for_usage(&self, namespace: &str, name: &str) + -> Result>; async fn create(&self, resource: &Resource) -> Result<()>; async fn update(&self, resource: &ResourceUpdate) -> Result<()>; async fn delete(&self, id: &str, deleted_at: &DateTime) -> Result<()>; diff --git a/src/domain/resource/cluster.rs b/src/domain/resource/cluster.rs index 14c6ba2..735f662 100644 --- a/src/domain/resource/cluster.rs +++ b/src/domain/resource/cluster.rs @@ -8,6 +8,7 @@ use tracing::info; use crate::domain::{ event::{ResourceCreated, ResourceDeleted, ResourceUpdated}, + utils::cluster_namespace, Result, }; @@ -24,10 +25,10 @@ pub async fn apply_manifest( ) -> Result<()> { let api = build_api_resource(&evt.kind); - let mut obj = DynamicObject::new(&evt.id, &api); + let mut obj = DynamicObject::new(&evt.name, &api); obj.metadata = ObjectMeta { - name: Some(evt.id), - namespace: Some(evt.project_namespace), + name: Some(evt.name), + namespace: Some(cluster_namespace(&evt.project_namespace)), ..Default::default() }; @@ -47,10 +48,10 @@ pub async fn patch_manifest( evt: ResourceUpdated, ) -> Result<()> { let api = build_api_resource(&evt.kind); - let mut obj = DynamicObject::new(&evt.id, &api); + let mut obj = DynamicObject::new(&evt.name, &api); obj.metadata = ObjectMeta { - name: Some(evt.id), - namespace: Some(evt.project_namespace), + name: Some(evt.name), + namespace: Some(cluster_namespace(&evt.project_namespace)), ..Default::default() }; @@ -71,10 +72,10 @@ pub async fn delete_manifest( ) -> Result<()> { let api = build_api_resource(&evt.kind); - let mut obj = DynamicObject::new(&evt.id, &api); + let mut obj = DynamicObject::new(&evt.name, &api); obj.metadata = ObjectMeta { - name: Some(evt.id), - namespace: Some(evt.project_namespace), + name: Some(evt.name), + namespace: Some(cluster_namespace(&evt.project_namespace)), ..Default::default() }; diff --git a/src/domain/resource/command.rs b/src/domain/resource/command.rs index eb38c96..088dac9 100644 --- a/src/domain/resource/command.rs +++ b/src/domain/resource/command.rs @@ -15,7 +15,7 @@ use crate::domain::{ metadata::{KnownField, MetadataDriven}, project::cache::ProjectDrivenCache, resource::{ResourceStatus, ResourceUpdated}, - utils::get_schema_from_crd, + utils::{self, get_schema_from_crd}, Result, PAGE_SIZE_DEFAULT, PAGE_SIZE_MAX, }; @@ -67,6 +67,7 @@ pub async fn fetch_by_id( } pub async fn create( + resource_cache: Arc, project_cache: Arc, metadata: Arc, event: Arc, @@ -74,6 +75,14 @@ pub async fn create( ) -> Result<()> { assert_project_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?; + if resource_cache + .find_by_name(&cmd.project_id, &cmd.name) + .await? + .is_some() + { + return Err(Error::Unexpected("invalid random name, try again".into())); + } + let Some(crd) = metadata.find_by_kind(&cmd.kind).await? else { return Err(Error::CommandMalformed("kind not supported".into())); }; @@ -110,6 +119,7 @@ pub async fn create( id: cmd.id, project_id: project.id, project_namespace: project.namespace, + name: cmd.name, kind: cmd.kind.clone(), spec: serde_json::to_string(&spec)?, status: ResourceStatus::Active.to_string(), @@ -143,6 +153,7 @@ pub async fn update( id: cmd.id.clone(), project_id: project.id, project_namespace: project.namespace, + name: resource.name, kind: resource.kind, spec_patch: serde_json::to_string(&cmd.spec)?, updated_at: Utc::now(), @@ -176,10 +187,11 @@ pub async fn delete( let evt = ResourceDeleted { id: cmd.id, - kind: resource.kind.clone(), - status: ResourceStatus::Deleted.to_string(), project_id: project.id, project_namespace: project.namespace, + name: resource.name, + kind: resource.kind.clone(), + status: ResourceStatus::Deleted.to_string(), deleted_at: Utc::now(), }; @@ -202,8 +214,7 @@ pub fn build_key(project_id: &str, resource_id: &str) -> Result> { } pub fn encode_key(key: Vec, prefix: &str) -> Result { - let prefix = format!("dmtr_{}", prefix.to_lowercase().replace("port", "")); - let hrp = Hrp::parse(&prefix)?; + let hrp = Hrp::parse(&prefix.to_lowercase().replace("port", ""))?; let bech = bech32::encode::(hrp, &key)?; Ok(bech) @@ -252,6 +263,7 @@ pub type Spec = serde_json::value::Map; pub struct CreateCmd { pub credential: Credential, pub id: String, + pub name: String, pub project_id: String, pub kind: String, pub spec: Spec, @@ -259,10 +271,16 @@ pub struct CreateCmd { impl CreateCmd { pub fn new(credential: Credential, project_id: String, kind: String, spec: Spec) -> Self { let id = Uuid::new_v4().to_string(); + let name = format!( + "{}-{}", + kind.to_lowercase().replace("port", ""), + utils::get_random_salt() + ); Self { credential, id, + name, project_id, kind, spec, @@ -328,6 +346,7 @@ mod tests { Self { credential: Credential::Auth0("user id".into()), id: Uuid::new_v4().to_string(), + name: format!("cardanonode-{}", utils::get_random_salt()), project_id: Uuid::new_v4().to_string(), kind: "CardanoNodePort".into(), spec: serde_json::Map::default(), @@ -447,6 +466,11 @@ mod tests { #[tokio::test] async fn it_should_create_resource() { + let mut resource_cache = MockResourceDrivenCache::new(); + resource_cache + .expect_find_by_name() + .return_once(|_, _| Ok(None)); + let mut project_cache = MockProjectDrivenCache::new(); project_cache .expect_find_user_permission() @@ -466,6 +490,7 @@ mod tests { let cmd = CreateCmd::default(); let result = create( + Arc::new(resource_cache), Arc::new(project_cache), Arc::new(metadata), Arc::new(event), @@ -477,6 +502,11 @@ mod tests { } #[tokio::test] async fn it_should_fail_create_resource_when_crd_doesnt_exist() { + let mut resource_cache = MockResourceDrivenCache::new(); + resource_cache + .expect_find_by_name() + .return_once(|_, _| Ok(None)); + let mut project_cache = MockProjectDrivenCache::new(); project_cache .expect_find_user_permission() @@ -490,6 +520,7 @@ mod tests { let cmd = CreateCmd::default(); let result = create( + Arc::new(resource_cache), Arc::new(project_cache), Arc::new(metadata), Arc::new(event), @@ -501,6 +532,11 @@ mod tests { } #[tokio::test] async fn it_should_fail_create_resource_when_project_doesnt_exist() { + let mut resource_cache = MockResourceDrivenCache::new(); + resource_cache + .expect_find_by_name() + .return_once(|_, _| Ok(None)); + let mut project_cache = MockProjectDrivenCache::new(); project_cache .expect_find_user_permission() @@ -517,6 +553,7 @@ mod tests { let cmd = CreateCmd::default(); let result = create( + Arc::new(resource_cache), Arc::new(project_cache), Arc::new(metadata), Arc::new(event), @@ -533,12 +570,14 @@ mod tests { .expect_find_user_permission() .return_once(|_, _| Ok(None)); + let resource_cache = MockResourceDrivenCache::new(); let metadata = MockMetadataDriven::new(); let event = MockEventDrivenBridge::new(); let cmd = CreateCmd::default(); let result = create( + Arc::new(resource_cache), Arc::new(project_cache), Arc::new(metadata), Arc::new(event), @@ -549,6 +588,7 @@ mod tests { } #[tokio::test] async fn it_should_fail_create_resource_when_secret_doesnt_have_permission() { + let resource_cache = MockResourceDrivenCache::new(); let project_cache = MockProjectDrivenCache::new(); let metadata = MockMetadataDriven::new(); let event = MockEventDrivenBridge::new(); @@ -559,6 +599,7 @@ mod tests { }; let result = create( + Arc::new(resource_cache), Arc::new(project_cache), Arc::new(metadata), Arc::new(event), diff --git a/src/domain/resource/mod.rs b/src/domain/resource/mod.rs index a474ebe..a634bd7 100644 --- a/src/domain/resource/mod.rs +++ b/src/domain/resource/mod.rs @@ -14,6 +14,7 @@ pub mod command; pub struct Resource { pub id: String, pub project_id: String, + pub name: String, pub kind: String, pub spec: String, pub annotations: Option, @@ -28,6 +29,7 @@ impl TryFrom for Resource { Ok(Self { id: value.id, project_id: value.project_id, + name: value.name, kind: value.kind, spec: value.spec, annotations: None, @@ -84,6 +86,8 @@ impl Display for ResourceStatus { mod tests { use uuid::Uuid; + use crate::domain::utils; + use super::*; impl Default for Resource { @@ -91,6 +95,7 @@ mod tests { Self { id: Uuid::new_v4().to_string(), project_id: Uuid::new_v4().to_string(), + name: format!("cardanonode-{}", utils::get_random_salt()), kind: "CardanoNodePort".into(), spec: "{\"version\":\"stable\",\"network\":\"mainnet\",\"throughputTier\":\"1\"}" .into(), diff --git a/src/domain/usage/cache.rs b/src/domain/usage/cache.rs index e2f0d8e..4627361 100644 --- a/src/domain/usage/cache.rs +++ b/src/domain/usage/cache.rs @@ -1,6 +1,10 @@ use std::sync::Arc; -use crate::domain::{event::UsageCreated, Result}; +use futures::future::try_join_all; + +use crate::domain::{ + error::Error, event::UsageCreated, resource::cache::ResourceDrivenCache, Result, +}; use super::{Usage, UsageReport, UsageReportAggregated}; @@ -17,8 +21,30 @@ pub trait UsageDrivenCache: Send + Sync { async fn create(&self, usage: Vec) -> Result<()>; } -pub async fn create(cache: Arc, evt: UsageCreated) -> Result<()> { - cache.create(evt.into()).await +pub async fn create( + usage_cache: Arc, + resouce_cache: Arc, + evt: UsageCreated, +) -> Result<()> { + let tasks = evt + .usages + .iter() + .map(|usage| async { + let Some(resource) = resouce_cache + .find_by_name_for_usage(&usage.project_namespace, &usage.resource_name) + .await? + else { + return Err(Error::Unexpected("Resource name has not been found".into())); + }; + + let usage = Usage::from_usage_evt(usage, &resource.id, &evt.id, evt.created_at); + Ok(usage) + }) + .collect::>(); + + let usages = try_join_all(tasks).await?; + + usage_cache.create(usages).await } pub async fn find_report_aggregated( @@ -30,16 +56,23 @@ pub async fn find_report_aggregated( #[cfg(test)] mod tests { + use crate::domain::resource::{cache::MockResourceDrivenCache, Resource}; + use super::*; #[tokio::test] async fn it_should_create_usage_cache() { - let mut cache = MockUsageDrivenCache::new(); - cache.expect_create().return_once(|_| Ok(())); + let mut usage_cache = MockUsageDrivenCache::new(); + usage_cache.expect_create().return_once(|_| Ok(())); + + let mut resource_cache = MockResourceDrivenCache::new(); + resource_cache + .expect_find_by_name_for_usage() + .return_once(|_, _| Ok(Some(Resource::default()))); let evt = UsageCreated::default(); - let result = create(Arc::new(cache), evt).await; + let result = create(Arc::new(usage_cache), Arc::new(resource_cache), evt).await; assert!(result.is_ok()); } diff --git a/src/domain/usage/cluster.rs b/src/domain/usage/cluster.rs index 7f2887b..b322061 100644 --- a/src/domain/usage/cluster.rs +++ b/src/domain/usage/cluster.rs @@ -16,6 +16,7 @@ use super::UsageUnit; pub trait UsageDrivenCluster: Send + Sync { async fn find_metrics( &self, + step: &str, start: DateTime, end: DateTime, ) -> Result>; @@ -25,11 +26,12 @@ pub async fn sync_usage( usage: Arc, event: Arc, cluster_id: &str, + step: &str, cursor: DateTime, ) -> Result<()> { let end = Utc::now(); - let usages = usage.find_metrics(cursor, end).await?; + let usages = usage.find_metrics(step, cursor, end).await?; if usages.is_empty() { return Ok(()); } @@ -40,7 +42,8 @@ pub async fn sync_usage( usages: usages .into_iter() .map(|u| UsageUnitCreated { - resource_id: u.resource_id, + project_namespace: u.project_namespace, + resource_name: u.resource_name, units: u.units, tier: u.tier, interval: u.interval, @@ -70,7 +73,7 @@ mod tests { let mut usage = MockUsageDrivenCluster::new(); usage .expect_find_metrics() - .return_once(|_, _| Ok(Default::default())); + .return_once(|_, _, _| Ok(Default::default())); let mut event = MockEventDrivenBridge::new(); event.expect_dispatch().return_once(|_| Ok(())); @@ -80,6 +83,7 @@ mod tests { Arc::new(event), Default::default(), Default::default(), + Default::default(), ) .await; assert!(result.is_ok()); diff --git a/src/domain/usage/mod.rs b/src/domain/usage/mod.rs index 53177d1..0c4184a 100644 --- a/src/domain/usage/mod.rs +++ b/src/domain/usage/mod.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; use uuid::Uuid; -use super::event::UsageCreated; +use super::event::UsageUnitCreated; pub mod cache; pub mod cluster; @@ -16,26 +16,29 @@ pub struct Usage { pub interval: u64, pub created_at: DateTime, } -impl From for Vec { - fn from(evt: UsageCreated) -> Self { - evt.usages - .iter() - .map(|usage| Usage { - id: Uuid::new_v4().to_string(), - event_id: evt.id.clone(), - resource_id: usage.resource_id.clone(), - units: usage.units, - tier: usage.tier.clone(), - interval: usage.interval, - created_at: evt.created_at, - }) - .collect() +impl Usage { + pub fn from_usage_evt( + usage: &UsageUnitCreated, + resource_id: &str, + evt_id: &str, + evt_created_at: DateTime, + ) -> Self { + Self { + id: Uuid::new_v4().to_string(), + event_id: evt_id.into(), + resource_id: resource_id.into(), + units: usage.units, + tier: usage.tier.clone(), + interval: usage.interval, + created_at: evt_created_at, + } } } #[derive(Debug)] pub struct UsageUnit { - pub resource_id: String, + pub project_namespace: String, + pub resource_name: String, pub units: i64, pub tier: String, pub interval: u64, diff --git a/src/domain/utils/mod.rs b/src/domain/utils/mod.rs index 675d0cd..a99da7e 100644 --- a/src/domain/utils/mod.rs +++ b/src/domain/utils/mod.rs @@ -16,19 +16,22 @@ fn get_random_word(list: Vec<&str>) -> String { } pub fn get_random_name() -> String { - let salt: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit()) - .take(6) - .map(char::from) - .collect(); - + let salt = get_random_salt(); let adjective = get_random_word(ADJECTIVES.lines().collect()); let noun = get_random_word(NOUNS.lines().collect()); format!("{adjective}-{noun}-{salt}") } +pub fn get_random_salt() -> String { + rand::thread_rng() + .sample_iter(&Alphanumeric) + .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit()) + .take(6) + .map(char::from) + .collect() +} + pub fn get_schema_from_crd( crd: &CustomResourceDefinition, field: &str, @@ -37,3 +40,7 @@ pub fn get_schema_from_crd( let schema = version.schema.clone()?.open_api_v3_schema?.properties?; schema.get(field)?.properties.clone() } + +pub fn cluster_namespace(namespace: &str) -> String { + format!("prj-{}", namespace) +} diff --git a/src/driven/cache/migrations/20240606_tables.sql b/src/driven/cache/migrations/20240606_tables.sql index 3b1fd37..470ab71 100644 --- a/src/driven/cache/migrations/20240606_tables.sql +++ b/src/driven/cache/migrations/20240606_tables.sql @@ -14,6 +14,7 @@ CREATE TABLE IF NOT EXISTS project ( CREATE TABLE IF NOT EXISTS resource ( id TEXT PRIMARY KEY NOT NULL, project_id TEXT NOT NULL, + name TEXT NOT NULL, kind TEXT NOT NULL, spec TEXT NOT NULL, status TEXT NOT NULL, diff --git a/src/driven/cache/resource.rs b/src/driven/cache/resource.rs index 143ee93..e250707 100644 --- a/src/driven/cache/resource.rs +++ b/src/driven/cache/resource.rs @@ -28,6 +28,7 @@ impl ResourceDrivenCache for SqliteResourceDrivenCache { SELECT r.id, r.project_id, + r.name, r.kind, r.spec, r.status, @@ -55,6 +56,7 @@ impl ResourceDrivenCache for SqliteResourceDrivenCache { SELECT r.id, r.project_id, + r.name, r.kind, r.spec, r.status, @@ -71,6 +73,64 @@ impl ResourceDrivenCache for SqliteResourceDrivenCache { Ok(resource) } + async fn find_by_name(&self, project_id: &str, name: &str) -> Result> { + let resource = sqlx::query_as::<_, Resource>( + r#" + SELECT + r.id, + r.project_id, + r.name, + r.kind, + r.spec, + r.status, + r.created_at, + r.updated_at + FROM resource r + WHERE r.project_id = $1 AND r.name = $2 AND r.status != $3; + "#, + ) + .bind(project_id) + .bind(name) + .bind(ResourceStatus::Deleted.to_string()) + .fetch_optional(&self.sqlite.db) + .await?; + + Ok(resource) + } + async fn find_by_name_for_usage( + &self, + namespace: &str, + name: &str, + ) -> Result> { + let resource = sqlx::query_as::<_, Resource>( + r#" + SELECT + r.id, + r.project_id, + r.name, + r.kind, + r.spec, + r.status, + r.created_at, + r.updated_at + FROM + resource r + INNER JOIN project p ON + p.id == r.project_id + WHERE + p.namespace = $1 + AND + r.name = $2; + "#, + ) + .bind(namespace) + .bind(name) + .fetch_optional(&self.sqlite.db) + .await?; + + Ok(resource) + } + async fn create(&self, resource: &Resource) -> Result<()> { let status = resource.status.to_string(); @@ -79,16 +139,18 @@ impl ResourceDrivenCache for SqliteResourceDrivenCache { INSERT INTO resource ( id, project_id, + name, kind, spec, status, created_at, updated_at ) - VALUES ($1, $2, $3, $4, $5, $6, $7) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) "#, resource.id, resource.project_id, + resource.name, resource.kind, resource.spec, status, @@ -173,6 +235,7 @@ impl FromRow<'_, SqliteRow> for Resource { Ok(Self { id: row.try_get("id")?, project_id: row.try_get("project_id")?, + name: row.try_get("name")?, kind: row.try_get("kind")?, spec: row.try_get("spec")?, annotations: None, diff --git a/src/driven/prometheus/usage.rs b/src/driven/prometheus/usage.rs index e41e2f2..8b92807 100644 --- a/src/driven/prometheus/usage.rs +++ b/src/driven/prometheus/usage.rs @@ -17,19 +17,20 @@ use super::PrometheusUsageDriven; impl UsageDrivenCluster for PrometheusUsageDriven { async fn find_metrics( &self, + step: &str, start: DateTime, end: DateTime, ) -> Result> { let response = self .client .get(format!( - "{}/query_range?query=sum by (resource_name, tier) (usage)", + "{}/query_range?query=sum by (project, resource_name, tier) (usage)", &self.url )) .query(&[ ("start", start.timestamp().to_string()), ("end", end.timestamp().to_string()), - ("step", "10m".into()), + ("step", step.into()), ]) .send() .await?; @@ -75,7 +76,8 @@ impl UsageDrivenCluster for PrometheusUsageDriven { let units = last_value - first_value; UsageUnit { - resource_id: r.metric.resource_name.clone(), + project_namespace: r.metric.project.clone(), + resource_name: r.metric.resource_name.clone(), units, interval, tier: r.metric.tier.clone(), @@ -112,6 +114,7 @@ struct PrometheusValue { } #[derive(Debug, Deserialize)] pub struct PrometheusUsageMetric { + project: String, resource_name: String, tier: String, } diff --git a/src/drivers/cache/mod.rs b/src/drivers/cache/mod.rs index 306115a..a29db46 100644 --- a/src/drivers/cache/mod.rs +++ b/src/drivers/cache/mod.rs @@ -79,7 +79,12 @@ pub async fn subscribe(config: CacheConfig) -> Result<()> { resource::cache::delete(resource_cache.clone(), evt.clone()).await } Event::UsageCreated(evt) => { - usage::cache::create(usage_cache.clone(), evt.clone()).await + usage::cache::create( + usage_cache.clone(), + resource_cache.clone(), + evt.clone(), + ) + .await } Event::ResourceUpdated(evt) => { resource::cache::update(resource_cache.clone(), evt.clone()).await diff --git a/src/drivers/grpc/resource.rs b/src/drivers/grpc/resource.rs index b2f3f81..66a197d 100644 --- a/src/drivers/grpc/resource.rs +++ b/src/drivers/grpc/resource.rs @@ -111,6 +111,7 @@ impl proto::resource_service_server::ResourceService for ResourceServiceImpl { let cmd = command::CreateCmd::new(credential, req.project_id, req.kind, spec); command::create( + self.resource_cache.clone(), self.project_cache.clone(), self.metadata.clone(), self.event.clone(), @@ -120,6 +121,7 @@ impl proto::resource_service_server::ResourceService for ResourceServiceImpl { let message = proto::CreateResourceResponse { id: cmd.id, + name: cmd.name, kind: cmd.kind, }; @@ -193,6 +195,7 @@ impl From for proto::Resource { fn from(value: Resource) -> Self { Self { id: value.id, + name: value.name, kind: value.kind, spec: value.spec, annotations: value.annotations, diff --git a/src/drivers/usage/mod.rs b/src/drivers/usage/mod.rs index 5a251b3..50fd4d0 100644 --- a/src/drivers/usage/mod.rs +++ b/src/drivers/usage/mod.rs @@ -23,6 +23,7 @@ pub async fn schedule(config: UsageConfig) -> Result<()> { prometheus_driven.clone(), event_bridge.clone(), &config.cluster_id, + &config.prometheus_query_step, cursor, ) .await; @@ -40,6 +41,7 @@ pub async fn schedule(config: UsageConfig) -> Result<()> { pub struct UsageConfig { pub cluster_id: String, pub prometheus_url: String, + pub prometheus_query_step: String, pub delay: Duration, pub topic: String, pub kafka: HashMap, diff --git a/test/expect b/test/expect index 71ced3c..5caa971 100755 --- a/test/expect +++ b/test/expect @@ -98,7 +98,7 @@ NAMESPACE=$(echo $OUTPUT | jq -r '.namespace') # Check if namespace is created echo "Checking if namespace $NAMESPACE exists" for attempt in $(seq 1 120); do - if kubectl get namespace "$NAMESPACE" &> /dev/null; then + if kubectl get namespace "prj-$NAMESPACE" &> /dev/null; then echo "Namespace $NAMESPACE exists." break else @@ -107,7 +107,7 @@ for attempt in $(seq 1 120); do fi done -if ! kubectl get namespace "$NAMESPACE" &> /dev/null; then +if ! kubectl get namespace "prj-$NAMESPACE" &> /dev/null; then echo "Error: Namespace $NAMESPACE not found after 120 attempts." exit 1 fi diff --git a/test/fabric.manifest.yaml b/test/fabric.manifest.yaml index fcee70d..5802bc4 100644 --- a/test/fabric.manifest.yaml +++ b/test/fabric.manifest.yaml @@ -42,12 +42,15 @@ data: daemon.toml: | topic = "events" cluster_id = "625e6681-8a74-4454-b5ad-861b45c6a42e" - prometheus_url = "http://prometheus:9090/api/v1" delay_sec = 60 [kafka] "bootstrap.servers" = "redpanda.demeter-kafka.svc.cluster.local:19092" - "group.id"= "demeter-daemon" + "group.id" = "demeter-daemon" + + [prometheus] + url = "http://prometheus:9090/api/v1" + query_step = "1m" kind: ConfigMap metadata: name: daemon-config