diff --git a/.sqlx/query-40eab1d64c11bb37930c3f21b7536f70fcc59a325e97395d00c3d360131b1181.json b/.sqlx/query-40eab1d64c11bb37930c3f21b7536f70fcc59a325e97395d00c3d360131b1181.json new file mode 100644 index 0000000..5063e97 --- /dev/null +++ b/.sqlx/query-40eab1d64c11bb37930c3f21b7536f70fcc59a325e97395d00c3d360131b1181.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO usage (\n id,\n resource_id,\n event_id,\n units,\n tier,\n created_at\n )\n VALUES ($1, $2, $3, $4, $5, $6)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 6 + }, + "nullable": [] + }, + "hash": "40eab1d64c11bb37930c3f21b7536f70fcc59a325e97395d00c3d360131b1181" +} diff --git a/Cargo.lock b/Cargo.lock index 529517a..fc2fa20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -479,7 +479,7 @@ dependencies = [ [[package]] name = "dmtri" version = "0.1.0" -source = "git+https://github.com/demeter-run/specs.git#9c20f3e4d1165388eadce082bf5dfd44105db0f1" +source = "git+https://github.com/demeter-run/specs.git#8bc909cfc739f7d770ce3de6aee637b9b468d0b5" dependencies = [ "bytes", "pbjson", diff --git a/examples/config/daemon.toml b/examples/config/daemon.toml index cb36b06..d50da12 100644 --- a/examples/config/daemon.toml +++ b/examples/config/daemon.toml @@ -1,5 +1,9 @@ topic="events" +cluster_id = "625e6681-8a74-4454-b5ad-861b45c6a42e" +prometheus_url = "http://prometheus:9090/api/v1" +delay_sec = 60 + [kafka] "bootstrap.servers" = "localhost:19092" "group.id"= "demeter-daemon" diff --git a/src/bin/daemon.rs b/src/bin/daemon.rs index 487bd53..fbe60c8 100644 --- a/src/bin/daemon.rs +++ b/src/bin/daemon.rs @@ -1,9 +1,10 @@ -use std::{collections::HashMap, env}; +use std::{collections::HashMap, env, time::Duration}; use anyhow::Result; use dotenv::dotenv; -use fabric::drivers::monitor::MonitorConfig; -use serde::Deserialize; +use fabric::drivers::{monitor::MonitorConfig, usage::UsageConfig}; +use serde::{de::Visitor, Deserialize, Deserializer}; +use tokio::try_join; use tracing::Level; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; @@ -23,11 +24,21 @@ async fn main() -> Result<()> { let config = Config::new()?; - fabric::drivers::monitor::subscribe(config.into()).await + let schedule = fabric::drivers::usage::schedule(config.clone().into()); + let subscribe = fabric::drivers::monitor::subscribe(config.clone().into()); + + try_join!(schedule, subscribe)?; + + Ok(()) } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] struct Config { + cluster_id: String, + prometheus_url: String, + #[serde(deserialize_with = "deserialize_duration")] + #[serde(rename(deserialize = "delay_sec"))] + delay: Duration, topic: String, kafka: HashMap, } @@ -54,3 +65,38 @@ impl From for MonitorConfig { } } } + +impl From for UsageConfig { + fn from(value: Config) -> Self { + Self { + cluster_id: value.cluster_id, + prometheus_url: value.prometheus_url, + delay: value.delay, + kafka: value.kafka, + topic: value.topic, + } + } +} + +fn deserialize_duration<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + deserializer.deserialize_map(DurationVisitor) +} + +struct DurationVisitor; +impl<'de> Visitor<'de> for DurationVisitor { + type Value = Duration; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("This Visitor expects to receive i64 seconds") + } + + fn visit_i64(self, v: i64) -> Result + where + E: serde::de::Error, + { + Ok(Duration::from_secs(v as u64)) + } +} diff --git a/src/bin/rpc.rs b/src/bin/rpc.rs index 60b3633..5b9f83d 100644 --- a/src/bin/rpc.rs +++ b/src/bin/rpc.rs @@ -4,6 +4,7 @@ use anyhow::Result; use dotenv::dotenv; use fabric::drivers::{cache::CacheConfig, grpc::GrpcConfig}; use serde::Deserialize; +use tokio::try_join; use tracing::Level; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; @@ -23,11 +24,10 @@ async fn main() -> Result<()> { let config = Config::new()?; - futures::future::try_join( - fabric::drivers::grpc::server(config.clone().into()), - fabric::drivers::cache::subscribe(config.clone().into()), - ) - .await?; + let grpc = fabric::drivers::grpc::server(config.clone().into()); + let subscribe = fabric::drivers::cache::subscribe(config.clone().into()); + + try_join!(grpc, subscribe)?; Ok(()) } diff --git a/src/domain/auth/mod.rs b/src/domain/auth/mod.rs index fb63f59..180be5a 100644 --- a/src/domain/auth/mod.rs +++ b/src/domain/auth/mod.rs @@ -1,3 +1,9 @@ +use std::sync::Arc; + +use super::{error::Error, project::cache::ProjectDrivenCache}; + +use crate::domain::Result; + pub type UserId = String; pub type SecretId = String; @@ -6,3 +12,30 @@ pub enum Credential { Auth0(UserId), ApiKey(SecretId), } + +pub async fn assert_project_permission( + project_cache: Arc, + credential: &Credential, + project_id: &str, +) -> Result<()> { + match credential { + Credential::Auth0(user_id) => { + let result = project_cache + .find_user_permission(user_id, project_id) + .await?; + + if result.is_none() { + return Err(Error::Unauthorized("user doesnt have permission".into())); + } + + Ok(()) + } + Credential::ApiKey(secret_project_id) => { + if project_id != secret_project_id { + return Err(Error::Unauthorized("secret doesnt have permission".into())); + } + + Ok(()) + } + } +} diff --git a/src/domain/error.rs b/src/domain/error.rs index dc9047f..507f5e4 100644 --- a/src/domain/error.rs +++ b/src/domain/error.rs @@ -47,3 +47,8 @@ impl From for Error { Self::Unexpected(value.to_string()) } } +impl From for Error { + fn from(value: reqwest::Error) -> Self { + Self::Unexpected(value.to_string()) + } +} diff --git a/src/domain/event/mod.rs b/src/domain/event/mod.rs index 128c2c5..d672870 100644 --- a/src/domain/event/mod.rs +++ b/src/domain/event/mod.rs @@ -90,6 +90,21 @@ pub struct ResourceDeleted { } into_event!(ResourceDeleted); +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UsageUnitCreated { + pub resource_id: String, + pub tier: String, + pub units: i64, +} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UsageCreated { + pub id: String, + pub cluster_id: String, + pub usages: Vec, + pub created_at: DateTime, +} +into_event!(UsageCreated); + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] #[allow(clippy::enum_variant_names)] @@ -101,6 +116,7 @@ pub enum Event { ResourceCreated(ResourceCreated), ResourceUpdated(ResourceUpdated), ResourceDeleted(ResourceDeleted), + UsageCreated(UsageCreated), } impl Event { pub fn key(&self) -> String { @@ -112,6 +128,7 @@ impl Event { Event::ResourceCreated(_) => "ResourceCreated".into(), Event::ResourceUpdated(_) => "ResourceUpdated".into(), Event::ResourceDeleted(_) => "ResourceDeleted".into(), + Event::UsageCreated(_) => "UsageCreated".into(), } } pub fn from_key(key: &str, payload: &[u8]) -> Result { @@ -125,6 +142,7 @@ impl Event { "ResourceCreated" => Ok(Self::ResourceCreated(serde_json::from_slice(payload)?)), "ResourceUpdated" => Ok(Self::ResourceUpdated(serde_json::from_slice(payload)?)), "ResourceDeleted" => Ok(Self::ResourceDeleted(serde_json::from_slice(payload)?)), + "UsageCreated" => Ok(Self::UsageCreated(serde_json::from_slice(payload)?)), _ => Err(Error::Unexpected(format!( "Event key '{}' not implemented", key @@ -202,4 +220,18 @@ mod tests { } } } + impl Default for UsageCreated { + fn default() -> Self { + Self { + id: Uuid::new_v4().to_string(), + cluster_id: Uuid::new_v4().to_string(), + usages: vec![UsageUnitCreated { + resource_id: Uuid::new_v4().to_string(), + units: 120, + tier: "0".into(), + }], + created_at: Utc::now(), + } + } + } } diff --git a/src/domain/mod.rs b/src/domain/mod.rs index 9a66bc4..a0dfdb1 100644 --- a/src/domain/mod.rs +++ b/src/domain/mod.rs @@ -6,6 +6,7 @@ pub mod event; pub mod metadata; pub mod project; pub mod resource; +pub mod usage; pub mod utils; pub const PAGE_SIZE_DEFAULT: u32 = 12; diff --git a/src/domain/resource/cluster.rs b/src/domain/resource/cluster.rs index 2c30f56..aaf189d 100644 --- a/src/domain/resource/cluster.rs +++ b/src/domain/resource/cluster.rs @@ -17,7 +17,6 @@ pub trait ResourceDrivenCluster: Send + Sync { async fn update(&self, obj: &DynamicObject) -> Result<()>; async fn delete(&self, obj: &DynamicObject) -> Result<()>; } - pub async fn apply_manifest( cluster: Arc, evt: ResourceCreated, diff --git a/src/domain/resource/command.rs b/src/domain/resource/command.rs index e5eb2e6..e58cb50 100644 --- a/src/domain/resource/command.rs +++ b/src/domain/resource/command.rs @@ -9,7 +9,7 @@ use tracing::info; use uuid::Uuid; use crate::domain::{ - auth::Credential, + auth::{assert_project_permission, Credential}, error::Error, event::{EventDrivenBridge, ResourceCreated, ResourceDeleted}, metadata::{KnownField, MetadataDriven}, @@ -26,7 +26,7 @@ pub async fn fetch( resource_cache: Arc, cmd: FetchCmd, ) -> Result> { - assert_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?; + assert_project_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?; resource_cache .find(&cmd.project_id, &cmd.page, &cmd.page_size) @@ -38,7 +38,7 @@ pub async fn fetch_by_id( resource_cache: Arc, cmd: FetchByIdCmd, ) -> Result { - assert_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?; + assert_project_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?; let Some(project) = project_cache.find_by_id(&cmd.project_id).await? else { return Err(Error::CommandMalformed("invalid project id".into())); @@ -59,7 +59,7 @@ pub async fn create( event: Arc, cmd: CreateCmd, ) -> Result<()> { - assert_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?; + assert_project_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?; let Some(crd) = metadata.find_by_kind(&cmd.kind).await? else { return Err(Error::CommandMalformed("kind not supported".into())); @@ -120,7 +120,7 @@ pub async fn update( return Err(Error::CommandMalformed("invalid resource id".into())); }; - assert_permission(project_cache.clone(), &cmd.credential, &resource.project_id).await?; + assert_project_permission(project_cache.clone(), &cmd.credential, &resource.project_id).await?; let Some(project) = project_cache.find_by_id(&resource.project_id).await? else { return Err(Error::CommandMalformed("invalid project id".into())); }; @@ -150,7 +150,7 @@ pub async fn delete( event: Arc, cmd: DeleteCmd, ) -> Result<()> { - assert_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?; + assert_project_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?; let Some(project) = project_cache.find_by_id(&cmd.project_id).await? else { return Err(Error::CommandMalformed("invalid project id".into())); @@ -177,32 +177,6 @@ pub async fn delete( Ok(()) } -async fn assert_permission( - project_cache: Arc, - credential: &Credential, - project_id: &str, -) -> Result<()> { - match credential { - Credential::Auth0(user_id) => { - let result = project_cache - .find_user_permission(user_id, project_id) - .await?; - - if result.is_none() { - return Err(Error::Unauthorized("user doesnt have permission".into())); - } - - Ok(()) - } - Credential::ApiKey(secret_project_id) => { - if project_id != secret_project_id { - return Err(Error::Unauthorized("secret doesnt have permission".into())); - } - - Ok(()) - } - } -} fn assert_project_resource(project: &Project, resource: &Resource) -> Result<()> { if project.id != resource.project_id { return Err(Error::CommandMalformed("invalid resource id".into())); diff --git a/src/domain/usage/cache.rs b/src/domain/usage/cache.rs new file mode 100644 index 0000000..9c03890 --- /dev/null +++ b/src/domain/usage/cache.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; + +use crate::domain::{event::UsageCreated, Result}; + +use super::{Usage, UsageReport}; + +#[async_trait::async_trait] +pub trait UsageDrivenCache: Send + Sync { + async fn find_report( + &self, + project_id: &str, + page: &u32, + page_size: &u32, + ) -> Result>; + async fn create(&self, usage: Vec) -> Result<()>; +} + +pub async fn create(cache: Arc, evt: UsageCreated) -> Result<()> { + cache.create(evt.into()).await +} + +#[cfg(test)] +mod tests { + use mockall::mock; + + use super::*; + + mock! { + pub FakeUsageDrivenCache { } + + #[async_trait::async_trait] + impl UsageDrivenCache for FakeUsageDrivenCache { + async fn find_report(&self, project_id: &str, page: &u32, page_size: &u32,) -> Result>; + async fn create(&self, usage: Vec) -> Result<()>; + } + } + + #[tokio::test] + async fn it_should_create_usage_cache() { + let mut cache = MockFakeUsageDrivenCache::new(); + cache.expect_create().return_once(|_| Ok(())); + + let evt = UsageCreated::default(); + + let result = create(Arc::new(cache), evt).await; + assert!(result.is_ok()); + } +} diff --git a/src/domain/usage/cluster.rs b/src/domain/usage/cluster.rs new file mode 100644 index 0000000..10158f6 --- /dev/null +++ b/src/domain/usage/cluster.rs @@ -0,0 +1,103 @@ +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use tracing::info; +use uuid::Uuid; + +use crate::domain::{ + event::{EventDrivenBridge, UsageCreated, UsageUnitCreated}, + Result, +}; + +use super::UsageUnit; + +#[async_trait::async_trait] +pub trait UsageDrivenCluster: Send + Sync { + async fn find_metrics( + &self, + start: DateTime, + end: DateTime, + ) -> Result>; +} + +pub async fn sync_usage( + usage: Arc, + event: Arc, + cluster_id: &str, + cursor: DateTime, +) -> Result<()> { + let end = Utc::now(); + + let usages = usage.find_metrics(cursor, end).await?; + if usages.is_empty() { + return Ok(()); + } + + let evt = UsageCreated { + id: Uuid::new_v4().to_string(), + cluster_id: cluster_id.into(), + usages: usages + .into_iter() + .map(|u| UsageUnitCreated { + resource_id: u.resource_id, + units: u.units, + tier: u.tier, + }) + .collect(), + created_at: Utc::now(), + }; + + event.dispatch(evt.into()).await?; + info!( + cursor = cursor.to_string(), + end = end.to_string(), + "usage collected" + ); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use mockall::mock; + + use super::*; + use crate::domain::event::Event; + + mock! { + pub FakeUsageDriven { } + + #[async_trait::async_trait] + impl UsageDrivenCluster for FakeUsageDriven { + async fn find_metrics(&self, start: DateTime, end: DateTime) -> Result>; + } + } + mock! { + pub FakeEventDrivenBridge { } + + #[async_trait::async_trait] + impl EventDrivenBridge for FakeEventDrivenBridge { + async fn dispatch(&self, event: Event) -> Result<()>; + } + } + + #[tokio::test] + async fn it_should_sync_usage() { + let mut usage = MockFakeUsageDriven::new(); + usage + .expect_find_metrics() + .return_once(|_, _| Ok(Default::default())); + + let mut event = MockFakeEventDrivenBridge::new(); + event.expect_dispatch().return_once(|_| Ok(())); + + let result = sync_usage( + Arc::new(usage), + Arc::new(event), + Default::default(), + Default::default(), + ) + .await; + assert!(result.is_ok()); + } +} diff --git a/src/domain/usage/command.rs b/src/domain/usage/command.rs new file mode 100644 index 0000000..f24b524 --- /dev/null +++ b/src/domain/usage/command.rs @@ -0,0 +1,150 @@ +use std::sync::Arc; + +use crate::domain::{ + auth::{assert_project_permission, Credential}, + error::Error, + project::cache::ProjectDrivenCache, + Result, PAGE_SIZE_DEFAULT, PAGE_SIZE_MAX, +}; + +use super::{cache::UsageDrivenCache, UsageReport}; + +pub async fn fetch_report( + project_cache: Arc, + usage_cache: Arc, + cmd: FetchCmd, +) -> Result> { + assert_project_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?; + + usage_cache + .find_report(&cmd.project_id, &cmd.page, &cmd.page_size) + .await +} + +#[derive(Debug, Clone)] +pub struct FetchCmd { + pub credential: Credential, + pub project_id: String, + pub page: u32, + pub page_size: u32, +} +impl FetchCmd { + pub fn new( + credential: Credential, + project_id: String, + page: Option, + page_size: Option, + ) -> Result { + let page = page.unwrap_or(1); + let page_size = page_size.unwrap_or(PAGE_SIZE_DEFAULT); + + if page_size >= PAGE_SIZE_MAX { + return Err(Error::CommandMalformed(format!( + "page_size exceeded the limit of {PAGE_SIZE_MAX}" + ))); + } + + Ok(Self { + credential, + project_id, + page, + page_size, + }) + } +} + +#[cfg(test)] +mod tests { + use chrono::{DateTime, Utc}; + use mockall::mock; + use uuid::Uuid; + + use super::*; + use crate::domain::{ + project::{Project, ProjectSecret, ProjectUpdate, ProjectUser}, + usage::Usage, + }; + + mock! { + pub FakeProjectDrivenCache { } + + #[async_trait::async_trait] + impl ProjectDrivenCache for FakeProjectDrivenCache { + async fn find(&self, user_id: &str, page: &u32, page_size: &u32) -> Result>; + async fn find_by_namespace(&self, namespace: &str) -> Result>; + async fn find_by_id(&self, id: &str) -> Result>; + async fn create(&self, project: &Project) -> Result<()>; + async fn update(&self, project: &ProjectUpdate) -> Result<()>; + async fn delete(&self, id: &str, deleted_at: &DateTime) -> Result<()>; + async fn create_secret(&self, secret: &ProjectSecret) -> Result<()>; + async fn find_secret_by_project_id(&self, project_id: &str) -> Result>; + async fn find_user_permission(&self,user_id: &str, project_id: &str) -> Result>; + } + } + + mock! { + pub FakeUsageDrivenCache { } + + #[async_trait::async_trait] + impl UsageDrivenCache for FakeUsageDrivenCache { + async fn find_report(&self, project_id: &str, page: &u32, page_size: &u32,) -> Result>; + async fn create(&self, usage: Vec) -> Result<()>; + } + } + + impl Default for FetchCmd { + fn default() -> Self { + Self { + credential: Credential::Auth0("user id".into()), + project_id: Uuid::new_v4().to_string(), + page: 1, + page_size: 12, + } + } + } + + #[tokio::test] + async fn it_should_fetch_project_usage_report() { + let mut project_cache = MockFakeProjectDrivenCache::new(); + project_cache + .expect_find_user_permission() + .return_once(|_, _| Ok(Some(ProjectUser::default()))); + + let mut usage_cache = MockFakeUsageDrivenCache::new(); + usage_cache + .expect_find_report() + .return_once(|_, _, _| Ok(vec![UsageReport::default()])); + + let cmd = FetchCmd::default(); + + let result = fetch_report(Arc::new(project_cache), Arc::new(usage_cache), cmd).await; + assert!(result.is_ok()); + } + #[tokio::test] + async fn it_should_fail_fetch_project_usage_report_when_user_doesnt_have_permission() { + let mut project_cache = MockFakeProjectDrivenCache::new(); + project_cache + .expect_find_user_permission() + .return_once(|_, _| Ok(None)); + + let usage_cache = MockFakeUsageDrivenCache::new(); + + let cmd = FetchCmd::default(); + + let result = fetch_report(Arc::new(project_cache), Arc::new(usage_cache), cmd).await; + assert!(result.is_err()); + } + #[tokio::test] + async fn it_should_fail_fetch_project_usage_report_when_secret_doesnt_have_permission() { + let project_cache = MockFakeProjectDrivenCache::new(); + let usage_cache = MockFakeUsageDrivenCache::new(); + + let cmd = FetchCmd { + credential: Credential::ApiKey(Uuid::new_v4().to_string()), + ..Default::default() + }; + + let result = fetch_report(Arc::new(project_cache), Arc::new(usage_cache), cmd).await; + assert!(result.is_err()); + } +} diff --git a/src/domain/usage/mod.rs b/src/domain/usage/mod.rs new file mode 100644 index 0000000..670ad44 --- /dev/null +++ b/src/domain/usage/mod.rs @@ -0,0 +1,82 @@ +use chrono::{DateTime, Utc}; +use uuid::Uuid; + +use super::event::UsageCreated; + +pub mod cache; +pub mod cluster; +pub mod command; + +pub struct Usage { + pub id: String, + pub event_id: String, + pub resource_id: String, + pub units: i64, + pub tier: String, + pub created_at: DateTime, +} +impl From for Vec { + fn from(evt: UsageCreated) -> Self { + evt.usages + .iter() + .map(|usage| Usage { + id: Uuid::new_v4().to_string(), + event_id: evt.id.clone(), + resource_id: usage.resource_id.clone(), + units: usage.units, + tier: usage.tier.clone(), + created_at: evt.created_at, + }) + .collect() + } +} + +pub struct UsageUnit { + pub resource_id: String, + pub units: i64, + pub tier: String, +} + +pub struct UsageReport { + pub resource_id: String, + pub resource_kind: String, + pub resource_spec: String, + pub tier: String, + pub units: i64, + pub period: String, +} + +#[cfg(test)] +mod tests { + use uuid::Uuid; + + use super::*; + + impl Default for Usage { + fn default() -> Self { + Self { + id: Uuid::new_v4().to_string(), + event_id: Uuid::new_v4().to_string(), + resource_id: Uuid::new_v4().to_string(), + units: 120, + tier: "0".into(), + created_at: Utc::now(), + } + } + } + + impl Default for UsageReport { + fn default() -> Self { + Self { + resource_id: Uuid::new_v4().to_string(), + resource_kind: "CardanoNodePort".into(), + resource_spec: + "{\"version\":\"stable\",\"network\":\"mainnet\",\"throughputTier\":\"1\"}" + .into(), + units: 120, + tier: "0".into(), + period: "08-2024".into(), + } + } + } +} diff --git a/src/driven/cache/migrations/20240821_table_usage.sql b/src/driven/cache/migrations/20240821_table_usage.sql new file mode 100644 index 0000000..61be093 --- /dev/null +++ b/src/driven/cache/migrations/20240821_table_usage.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS usage ( + id TEXT PRIMARY KEY NOT NULL, + event_id TEXT NOT NULL, + resource_id TEXT NOT NULL, + units INT NOT NULL, + tier TEXT NOT NULL, + created_at DATETIME NOT NULL, + FOREIGN KEY(resource_id) REFERENCES resource(id) +); diff --git a/src/driven/cache/mod.rs b/src/driven/cache/mod.rs index 589a402..60ccecd 100644 --- a/src/driven/cache/mod.rs +++ b/src/driven/cache/mod.rs @@ -3,6 +3,7 @@ use std::path::Path; pub mod project; pub mod resource; +pub mod usage; pub struct SqliteCache { db: sqlx::sqlite::SqlitePool, @@ -36,3 +37,43 @@ impl SqliteCache { Ok(out) } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use resource::SqliteResourceDrivenCache; + + use crate::{ + domain::{ + project::{cache::ProjectDrivenCache, Project}, + resource::{cache::ResourceDrivenCache, Resource}, + }, + driven::cache::project::SqliteProjectDrivenCache, + }; + + use super::*; + + pub async fn mock_project(sqlite_cache: Arc) -> Project { + let cache: Box = + Box::new(SqliteProjectDrivenCache::new(sqlite_cache)); + + let project = Project::default(); + cache.create(&project).await.unwrap(); + + project + } + pub async fn mock_resource(sqlite_cache: Arc, project_id: &str) -> Resource { + let cache: Box = + Box::new(SqliteResourceDrivenCache::new(sqlite_cache)); + + let resource = Resource { + project_id: project_id.to_string(), + ..Default::default() + }; + + cache.create(&resource).await.unwrap(); + + resource + } +} diff --git a/src/driven/cache/resource.rs b/src/driven/cache/resource.rs index cdaa389..18e7aa1 100644 --- a/src/driven/cache/resource.rs +++ b/src/driven/cache/resource.rs @@ -186,23 +186,10 @@ impl FromRow<'_, SqliteRow> for Resource { #[cfg(test)] mod tests { - use crate::{ - domain::project::{cache::ProjectDrivenCache, Project}, - driven::cache::project::SqliteProjectDrivenCache, - }; + use crate::driven::cache::tests::mock_project; use super::*; - async fn mock_project(sqlite_cache: Arc) -> Project { - let cache: Box = - Box::new(SqliteProjectDrivenCache::new(sqlite_cache)); - - let project = Project::default(); - cache.create(&project).await.unwrap(); - - project - } - #[tokio::test] async fn it_should_find_project_resources() { let sqlite_cache = Arc::new(SqliteCache::ephemeral().await.unwrap()); diff --git a/src/driven/cache/usage.rs b/src/driven/cache/usage.rs new file mode 100644 index 0000000..c2eed8d --- /dev/null +++ b/src/driven/cache/usage.rs @@ -0,0 +1,194 @@ +use sqlx::{sqlite::SqliteRow, FromRow, Row}; +use std::sync::Arc; + +use crate::domain::{ + usage::{cache::UsageDrivenCache, Usage, UsageReport}, + Result, +}; + +use super::SqliteCache; + +pub struct SqliteUsageDrivenCache { + sqlite: Arc, +} +impl SqliteUsageDrivenCache { + pub fn new(sqlite: Arc) -> Self { + Self { sqlite } + } +} +#[async_trait::async_trait] +impl UsageDrivenCache for SqliteUsageDrivenCache { + async fn find_report( + &self, + project_id: &str, + page: &u32, + page_size: &u32, + ) -> Result> { + let offset = page_size * (page - 1); + + let report = sqlx::query_as::<_, UsageReport>( + r#" + SELECT + r.id as resource_id, + r.kind as resource_kind, + r.spec as resource_spec, + u.tier, + SUM(u.units) as units, + STRFTIME('%m-%Y', 'now') as period + FROM "usage" u + INNER JOIN resource r ON r.id == u.resource_id + WHERE STRFTIME('%m-%Y', u.created_at) = STRFTIME('%m-%Y', 'now') AND r.project_id = $1 + GROUP BY resource_id, tier + ORDER BY units DESC + LIMIT $2 + OFFSET $3; + "#, + ) + .bind(project_id) + .bind(page_size) + .bind(offset) + .fetch_all(&self.sqlite.db) + .await?; + + Ok(report) + } + + async fn create(&self, usages: Vec) -> Result<()> { + let mut tx = self.sqlite.db.begin().await?; + + for usage in usages { + sqlx::query!( + r#" + INSERT INTO usage ( + id, + resource_id, + event_id, + units, + tier, + created_at + ) + VALUES ($1, $2, $3, $4, $5, $6) + "#, + usage.id, + usage.resource_id, + usage.event_id, + usage.units, + usage.tier, + usage.created_at, + ) + .execute(&mut *tx) + .await?; + } + + tx.commit().await?; + + Ok(()) + } +} + +impl FromRow<'_, SqliteRow> for Usage { + fn from_row(row: &SqliteRow) -> sqlx::Result { + Ok(Self { + id: row.try_get("id")?, + event_id: row.try_get("event_id")?, + resource_id: row.try_get("resource_id")?, + units: row.try_get("units")?, + tier: row.try_get("tier")?, + created_at: row.try_get("created_at")?, + }) + } +} + +impl FromRow<'_, SqliteRow> for UsageReport { + fn from_row(row: &SqliteRow) -> sqlx::Result { + Ok(Self { + resource_id: row.try_get("resource_id")?, + resource_kind: row.try_get("resource_kind")?, + resource_spec: row.try_get("resource_spec")?, + units: row.try_get("units")?, + tier: row.try_get("tier")?, + period: row.try_get("period")?, + }) + } +} + +#[cfg(test)] +mod tests { + use crate::driven::cache::tests::{mock_project, mock_resource}; + + use super::*; + + #[tokio::test] + async fn it_should_create_usage() { + let sqlite_cache = Arc::new(SqliteCache::ephemeral().await.unwrap()); + let cache = SqliteUsageDrivenCache::new(sqlite_cache.clone()); + + let project = mock_project(sqlite_cache.clone()).await; + let resource = mock_resource(sqlite_cache.clone(), &project.id).await; + + let usage = Usage { + resource_id: resource.id, + ..Default::default() + }; + + let result = cache.create(vec![usage]).await; + + assert!(result.is_ok()); + } + + #[tokio::test] + async fn it_should_find_usage_report() { + let sqlite_cache = Arc::new(SqliteCache::ephemeral().await.unwrap()); + let cache = SqliteUsageDrivenCache::new(sqlite_cache.clone()); + + let project = mock_project(sqlite_cache.clone()).await; + let resource = mock_resource(sqlite_cache.clone(), &project.id).await; + + let usages = vec![ + Usage { + resource_id: resource.id.clone(), + ..Default::default() + }, + Usage { + resource_id: resource.id.clone(), + ..Default::default() + }, + ]; + + cache.create(usages).await.unwrap(); + + let result = cache.find_report(&project.id, &1, &12).await; + + assert!(result.is_ok()); + assert!(result.unwrap().len() == 1); + } + + #[tokio::test] + async fn it_should_find_usage_report_after_tier_updated() { + let sqlite_cache = Arc::new(SqliteCache::ephemeral().await.unwrap()); + let cache = SqliteUsageDrivenCache::new(sqlite_cache.clone()); + + let project = mock_project(sqlite_cache.clone()).await; + let resource = mock_resource(sqlite_cache.clone(), &project.id).await; + + let usages = vec![ + Usage { + resource_id: resource.id.clone(), + tier: "0".into(), + ..Default::default() + }, + Usage { + resource_id: resource.id.clone(), + tier: "1".into(), + ..Default::default() + }, + ]; + + cache.create(usages).await.unwrap(); + + let result = cache.find_report(&project.id, &1, &12).await; + + assert!(result.is_ok()); + assert!(result.unwrap().len() == 2); + } +} diff --git a/src/driven/mod.rs b/src/driven/mod.rs index a6a05f8..a164a49 100644 --- a/src/driven/mod.rs +++ b/src/driven/mod.rs @@ -3,3 +3,4 @@ pub mod cache; pub mod k8s; pub mod kafka; pub mod metadata; +pub mod prometheus; diff --git a/src/driven/prometheus/mod.rs b/src/driven/prometheus/mod.rs new file mode 100644 index 0000000..6595b08 --- /dev/null +++ b/src/driven/prometheus/mod.rs @@ -0,0 +1,32 @@ +use anyhow::Result as AnyhowResult; +use reqwest::Client; +use serde::{Deserialize, Deserializer}; + +use crate::domain::Result; + +pub mod usage; + +pub struct PrometheusUsageDriven { + client: Client, + url: String, +} +impl PrometheusUsageDriven { + pub async fn new(url: &str) -> AnyhowResult { + let client = Client::new(); + let url = url.to_string(); + + Ok(Self { client, url }) + } +} + +fn deserialize_value<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let value: Vec = Deserialize::deserialize(deserializer)?; + Ok(value.into_iter().as_slice()[1] + .as_str() + .unwrap() + .parse::() + .unwrap()) +} diff --git a/src/driven/prometheus/usage.rs b/src/driven/prometheus/usage.rs new file mode 100644 index 0000000..9d2fbaa --- /dev/null +++ b/src/driven/prometheus/usage.rs @@ -0,0 +1,81 @@ +use chrono::{DateTime, Utc}; +use serde::Deserialize; +use tracing::error; + +use crate::{ + domain::{ + error::Error, + usage::{cluster::UsageDrivenCluster, UsageUnit}, + Result, + }, + driven::prometheus::deserialize_value, +}; + +use super::PrometheusUsageDriven; + +#[async_trait::async_trait] +impl UsageDrivenCluster for PrometheusUsageDriven { + async fn find_metrics( + &self, + start: DateTime, + end: DateTime, + ) -> Result> { + let since = (end - start).num_seconds(); + + let query = format!( + "round(sum by (resource_name, tier) (increase(usage{{tier!~\"0\"}}[{since}s] @ {})) > 0)", + end.timestamp_millis() / 1000 + ); + + let response = self + .client + .get(format!("{}/query?query={query}", &self.url)) + .send() + .await?; + + let status = response.status(); + if status.is_client_error() || status.is_server_error() { + error!(status = status.to_string(), "request status code fail"); + return Err(Error::Unexpected(format!( + "Prometheus request error. Status: {} Query: {}", + status, query + ))); + } + + let response: PrometheusResponse = response.json().await?; + + let usage_units: Vec = response + .data + .result + .iter() + .map(|r| UsageUnit { + resource_id: r.metric.resource_name.clone(), + units: r.value, + tier: r.metric.tier.clone(), + }) + .collect(); + + Ok(usage_units) + } +} + +#[derive(Debug, Deserialize)] +struct PrometheusResponse { + data: PrometheusData, +} +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct PrometheusData { + result: Vec, +} +#[derive(Debug, Deserialize)] +struct PrometheusUsageResult { + metric: PrometheusUsageMetric, + #[serde(deserialize_with = "deserialize_value")] + value: i64, +} +#[derive(Debug, Deserialize)] +pub struct PrometheusUsageMetric { + resource_name: String, + tier: String, +} diff --git a/src/drivers/cache/mod.rs b/src/drivers/cache/mod.rs index be9c8e1..15fcb5f 100644 --- a/src/drivers/cache/mod.rs +++ b/src/drivers/cache/mod.rs @@ -7,9 +7,10 @@ use std::{borrow::Borrow, collections::HashMap, path::Path, sync::Arc}; use tracing::{error, info}; use crate::{ - domain::{event::Event, project, resource}, + domain::{event::Event, project, resource, usage}, driven::cache::{ - project::SqliteProjectDrivenCache, resource::SqliteResourceDrivenCache, SqliteCache, + project::SqliteProjectDrivenCache, resource::SqliteResourceDrivenCache, + usage::SqliteUsageDrivenCache, SqliteCache, }, }; @@ -19,6 +20,7 @@ pub async fn subscribe(config: CacheConfig) -> Result<()> { let project_cache = Arc::new(SqliteProjectDrivenCache::new(sqlite_cache.clone())); let resource_cache = Arc::new(SqliteResourceDrivenCache::new(sqlite_cache.clone())); + let usage_cache = Arc::new(SqliteUsageDrivenCache::new(sqlite_cache.clone())); let mut client_config = ClientConfig::new(); for (k, v) in config.kafka.iter() { @@ -57,6 +59,9 @@ pub async fn subscribe(config: CacheConfig) -> Result<()> { Event::ResourceDeleted(evt) => { resource::cache::delete(resource_cache.clone(), evt.clone()).await } + Event::UsageCreated(evt) => { + usage::cache::create(usage_cache.clone(), evt.clone()).await + } Event::ResourceUpdated(evt) => { resource::cache::update(resource_cache.clone(), evt.clone()).await } diff --git a/src/drivers/grpc/mod.rs b/src/drivers/grpc/mod.rs index dd583d8..d590871 100644 --- a/src/drivers/grpc/mod.rs +++ b/src/drivers/grpc/mod.rs @@ -1,6 +1,7 @@ use anyhow::Result; use dmtri::demeter::ops::v1alpha::metadata_service_server::MetadataServiceServer; use dmtri::demeter::ops::v1alpha::resource_service_server::ResourceServiceServer; +use dmtri::demeter::ops::v1alpha::usage_service_server::UsageServiceServer; use middlewares::auth::AuthenticatorImpl; use std::collections::HashMap; use std::net::SocketAddr; @@ -17,6 +18,7 @@ use crate::domain::error::Error; use crate::driven::auth::Auth0Provider; use crate::driven::cache::project::SqliteProjectDrivenCache; use crate::driven::cache::resource::SqliteResourceDrivenCache; +use crate::driven::cache::usage::SqliteUsageDrivenCache; use crate::driven::cache::SqliteCache; use crate::driven::kafka::KafkaProducer; use crate::driven::metadata::MetadataCrd; @@ -25,11 +27,13 @@ mod metadata; mod middlewares; mod project; mod resource; +mod usage; pub async fn server(config: GrpcConfig) -> Result<()> { let sqlite_cache = Arc::new(SqliteCache::new(Path::new(&config.db_path)).await?); let project_cache = Arc::new(SqliteProjectDrivenCache::new(sqlite_cache.clone())); let resource_cache = Arc::new(SqliteResourceDrivenCache::new(sqlite_cache.clone())); + let usage_cache = Arc::new(SqliteUsageDrivenCache::new(sqlite_cache.clone())); let event_bridge = Arc::new(KafkaProducer::new(&config.topic, &config.kafka)?); @@ -64,6 +68,9 @@ pub async fn server(config: GrpcConfig) -> Result<()> { let metadata_inner = metadata::MetadataServiceImpl::new(metadata.clone()); let metadata_service = MetadataServiceServer::new(metadata_inner); + let usage_inner = usage::UsageServiceImpl::new(project_cache.clone(), usage_cache.clone()); + let usage_service = UsageServiceServer::with_interceptor(usage_inner, auth.clone()); + let address = SocketAddr::from_str(&config.addr)?; info!(address = config.addr, "Server running"); @@ -73,6 +80,7 @@ pub async fn server(config: GrpcConfig) -> Result<()> { .add_service(project_service) .add_service(resource_service) .add_service(metadata_service) + .add_service(usage_service) .serve(address) .await?; diff --git a/src/drivers/grpc/usage.rs b/src/drivers/grpc/usage.rs new file mode 100644 index 0000000..3436fe9 --- /dev/null +++ b/src/drivers/grpc/usage.rs @@ -0,0 +1,64 @@ +use dmtri::demeter::ops::v1alpha::{self as proto}; +use std::sync::Arc; +use tonic::{async_trait, Status}; + +use crate::domain::{ + auth::Credential, + project::cache::ProjectDrivenCache, + usage::{cache::UsageDrivenCache, command, UsageReport}, +}; + +pub struct UsageServiceImpl { + pub project_cache: Arc, + pub usage_cache: Arc, +} +impl UsageServiceImpl { + pub fn new( + project_cache: Arc, + usage_cache: Arc, + ) -> Self { + Self { + project_cache, + usage_cache, + } + } +} + +#[async_trait] +impl proto::usage_service_server::UsageService for UsageServiceImpl { + async fn fetch_usage_report( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let credential = match request.extensions().get::() { + Some(credential) => credential.clone(), + None => return Err(Status::unauthenticated("invalid credential")), + }; + + let req = request.into_inner(); + + let cmd = command::FetchCmd::new(credential, req.project_id, req.page, req.page_size)?; + + let usage_report = + command::fetch_report(self.project_cache.clone(), self.usage_cache.clone(), cmd) + .await?; + + let records = usage_report.into_iter().map(|v| v.into()).collect(); + let message = proto::FetchUsageReportResponse { records }; + + Ok(tonic::Response::new(message)) + } +} + +impl From for proto::UsageReport { + fn from(value: UsageReport) -> Self { + Self { + resource_id: value.resource_id, + resource_kind: value.resource_kind, + resource_spec: value.resource_spec, + units: value.units, + tier: value.tier, + period: value.period, + } + } +} diff --git a/src/drivers/mod.rs b/src/drivers/mod.rs index 9cd3734..79c68d5 100644 --- a/src/drivers/mod.rs +++ b/src/drivers/mod.rs @@ -1,3 +1,4 @@ pub mod cache; +pub mod usage; pub mod grpc; pub mod monitor; diff --git a/src/drivers/usage/mod.rs b/src/drivers/usage/mod.rs new file mode 100644 index 0000000..a4a64f8 --- /dev/null +++ b/src/drivers/usage/mod.rs @@ -0,0 +1,46 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use anyhow::Result; +use chrono::Utc; +use tokio::time::sleep; +use tracing::{info, warn}; + +use crate::{ + domain::usage, + driven::{kafka::KafkaProducer, prometheus::PrometheusUsageDriven}, +}; + +pub async fn schedule(config: UsageConfig) -> Result<()> { + let prometheus_driven = Arc::new(PrometheusUsageDriven::new(&config.prometheus_url).await?); + let event_bridge = Arc::new(KafkaProducer::new(&config.topic, &config.kafka)?); + + let mut cursor = Utc::now(); + + loop { + sleep(config.delay).await; + + let result = usage::cluster::sync_usage( + prometheus_driven.clone(), + event_bridge.clone(), + &config.cluster_id, + cursor, + ) + .await; + + match result { + Ok(()) => { + info!("Successfully sync usage"); + cursor = Utc::now(); + } + Err(err) => warn!(error = err.to_string(), "Error running sync usage"), + } + } +} + +pub struct UsageConfig { + pub cluster_id: String, + pub prometheus_url: String, + pub delay: Duration, + pub topic: String, + pub kafka: HashMap, +} diff --git a/test/fabric.manifest.yaml b/test/fabric.manifest.yaml index 9f92db6..4aa4317 100644 --- a/test/fabric.manifest.yaml +++ b/test/fabric.manifest.yaml @@ -41,6 +41,10 @@ apiVersion: v1 data: daemon.toml: | topic = "events" + cluster_id = "625e6681-8a74-4454-b5ad-861b45c6a42e" + prometheus_url = "http://prometheus:9090/api/v1" + delay_sec = 60 + [kafka] "bootstrap.servers" = "redpanda.demeter-kafka.svc.cluster.local:19092" "group.id"= "demeter-daemon"