From 12f3b2c91905112ea20c8788fb19ef3640928190 Mon Sep 17 00:00:00 2001 From: Paulo Bressan Date: Fri, 20 Sep 2024 14:30:03 -0300 Subject: [PATCH] Implemented billing driver (#123) * feat: implemented queue EOF and basic billing cli * feat: implemented billing driver and load it from cli * chore: added cli config file example * chore: removed stripe config as required * feat: added config path in cli * feat: improved metrics collection and billing cli * chore: adjusted lint * chore: adjusted lint --- .gitignore | 3 + ...82126fc3e3b328d882989a1475b6b7c62c71.json} | 6 +- Cargo.lock | 202 +++++++++++++++++- Cargo.toml | 7 + examples/README.md | 4 + examples/config/cli.toml | 8 + src/bin/cli.rs | 120 +++++++++++ src/domain/event/mod.rs | 2 + src/domain/usage/cache.rs | 21 +- src/domain/usage/cluster.rs | 1 + src/domain/usage/mod.rs | 22 ++ .../cache/migrations/20240606_tables.sql | 1 + src/driven/cache/usage.rs | 88 +++++++- src/driven/prometheus/mod.rs | 8 +- src/driven/prometheus/usage.rs | 64 ++++-- src/drivers/billing/mod.rs | 152 +++++++++++++ src/drivers/cache/mod.rs | 18 +- src/drivers/mod.rs | 3 +- src/drivers/monitor/mod.rs | 1 + 19 files changed, 695 insertions(+), 36 deletions(-) rename .sqlx/{query-40eab1d64c11bb37930c3f21b7536f70fcc59a325e97395d00c3d360131b1181.json => query-6df3bbbabc374168c007ec1c8dea82126fc3e3b328d882989a1475b6b7c62c71.json} (55%) create mode 100644 examples/config/cli.toml create mode 100644 src/bin/cli.rs create mode 100644 src/drivers/billing/mod.rs diff --git a/.gitignore b/.gitignore index 6cc28cf..5a413da 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,11 @@ /target .env dev.db* +dev.billing.db* grpcurl* config.toml +config.billing.toml +config.daemon.toml rpc.toml test/.terraform* test/local.tfstate* diff --git a/.sqlx/query-40eab1d64c11bb37930c3f21b7536f70fcc59a325e97395d00c3d360131b1181.json b/.sqlx/query-6df3bbbabc374168c007ec1c8dea82126fc3e3b328d882989a1475b6b7c62c71.json similarity index 55% rename from .sqlx/query-40eab1d64c11bb37930c3f21b7536f70fcc59a325e97395d00c3d360131b1181.json rename to .sqlx/query-6df3bbbabc374168c007ec1c8dea82126fc3e3b328d882989a1475b6b7c62c71.json index 5063e97..6b49884 100644 --- a/.sqlx/query-40eab1d64c11bb37930c3f21b7536f70fcc59a325e97395d00c3d360131b1181.json +++ b/.sqlx/query-6df3bbbabc374168c007ec1c8dea82126fc3e3b328d882989a1475b6b7c62c71.json @@ -1,12 +1,12 @@ { "db_name": "SQLite", - "query": "\n INSERT INTO usage (\n id,\n resource_id,\n event_id,\n units,\n tier,\n created_at\n )\n VALUES ($1, $2, $3, $4, $5, $6)\n ", + "query": "\n INSERT INTO usage (\n id,\n resource_id,\n event_id,\n units,\n tier,\n interval,\n created_at\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7)\n ", "describe": { "columns": [], "parameters": { - "Right": 6 + "Right": 7 }, "nullable": [] }, - "hash": "40eab1d64c11bb37930c3f21b7536f70fcc59a325e97395d00c3d360131b1181" + "hash": "6df3bbbabc374168c007ec1c8dea82126fc3e3b328d882989a1475b6b7c62c71" } diff --git a/Cargo.lock b/Cargo.lock index fe11a96..4a7c98f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,12 +60,55 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + [[package]] name = "anstyle" version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" +[[package]] +name = "anstyle-parse" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb47de1e80c2b463c735db5b217a0ddc39d612e7ac9e2e96a5aed1f57616c1cb" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "anyhow" version = "1.0.87" @@ -651,6 +694,64 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "clap" +version = "4.5.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e5a21b8495e732f1b3c364c9949b201ca7bae518c502c80256c96ad79eaf6ac" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cf2dd12af7a047ad9d6da2b6b249759a22a7abc0f474c1dae1777afa4b21a73" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.77", +] + +[[package]] +name = "clap_lex" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" + +[[package]] +name = "colorchoice" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" + +[[package]] +name = "comfy-table" +version = "7.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7" +dependencies = [ + "crossterm", + "strum", + "strum_macros", + "unicode-width", +] + [[package]] name = "config" version = "0.14.0" @@ -761,6 +862,28 @@ version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +[[package]] +name = "crossterm" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" +dependencies = [ + "bitflags 2.6.0", + "crossterm_winapi", + "libc", + "parking_lot", + "winapi", +] + +[[package]] +name = "crossterm_winapi" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b" +dependencies = [ + "winapi", +] + [[package]] name = "crunchy" version = "0.2.2" @@ -777,6 +900,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "der" version = "0.7.9" @@ -924,7 +1068,10 @@ dependencies = [ "base64 0.22.1", "bech32", "chrono", + "clap", + "comfy-table", "config", + "csv", "dmtri", "dotenv", "futures", @@ -1249,6 +1396,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.3.9" @@ -1576,6 +1729,12 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itertools" version = "0.10.5" @@ -2202,7 +2361,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bdbb7b706f2afc610f3853550cdbbf6372fd324824a087806bd4480ea4996e24" dependencies = [ - "heck", + "heck 0.4.1", "itertools 0.10.5", "prost 0.11.9", "prost-types 0.11.9", @@ -2459,7 +2618,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" dependencies = [ "bytes", - "heck", + "heck 0.4.1", "itertools 0.10.5", "lazy_static", "log", @@ -3339,7 +3498,7 @@ checksum = "5833ef53aaa16d860e92123292f1f6a3d53c34ba8b1969f152ef1a7bb803f3c8" dependencies = [ "dotenvy", "either", - "heck", + "heck 0.4.1", "hex", "once_cell", "proc-macro2", @@ -3474,6 +3633,31 @@ dependencies = [ "unicode-properties", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" + +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.77", +] + [[package]] name = "subtle" version = "2.6.1" @@ -3998,6 +4182,12 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" +[[package]] +name = "unicode-width" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" + [[package]] name = "unicode_categories" version = "0.1.1" @@ -4033,6 +4223,12 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.10.0" diff --git a/Cargo.toml b/Cargo.toml index 2026d50..43ab373 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,9 @@ base64 = "0.22.1" json-patch = "2.0.0" aws-config = { version = "1.5.5", features = ["behavior-version-latest"] } aws-sdk-sesv2 = { version = "1.43.0", features = ["behavior-version-latest"] } +clap = { version = "4.5.17", features = ["derive", "env"] } +comfy-table = "7.1.1" +csv = "1.3.0" [dev-dependencies] mockall = "0.12.1" @@ -50,5 +53,9 @@ path = "src/bin/daemon.rs" name = "rpc" path = "src/bin/rpc.rs" +[[bin]] +name = "cli" +path = "src/bin/cli.rs" + [lib] path = "src/lib.rs" diff --git a/examples/README.md b/examples/README.md index 10586aa..48172f7 100644 --- a/examples/README.md +++ b/examples/README.md @@ -13,3 +13,7 @@ Use the [rpc config](config/rpc.toml) It's possible to create a toml file as config and set the file path in the env `DAEMON_CONFIG`, but it's possible to set the config using the prefix `DAEMON_` Use the [daemon config](config/daemon.toml) + +## cli + +Use the [cli config](config/cli.toml) diff --git a/examples/config/cli.toml b/examples/config/cli.toml new file mode 100644 index 0000000..24c2031 --- /dev/null +++ b/examples/config/cli.toml @@ -0,0 +1,8 @@ +db_path="dev.billing.db" +topic="events" + +[kafka_consumer] +"bootstrap.servers" = "localhost:19092" +"group.id"= "billing-stg-local-1" +"auto.offset.reset" = "earliest" +"enable.partition.eof" = true diff --git a/src/bin/cli.rs b/src/bin/cli.rs new file mode 100644 index 0000000..6720d3d --- /dev/null +++ b/src/bin/cli.rs @@ -0,0 +1,120 @@ +use std::{collections::HashMap, path::PathBuf}; + +use anyhow::{bail, Result}; +use clap::{Parser, Subcommand}; +use dotenv::dotenv; +use fabric::drivers::{ + billing::{BillingConfig, BillingTlsConfig, OutputFormat}, + cache::CacheConfig, +}; +use serde::Deserialize; +use tracing::{info, Level}; +use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +#[command(propagate_version = true)] +struct Cli { + #[arg(short, long, help = "Cli config path file", env = "CLI_CONFIG")] + config: String, + + #[command(subcommand)] + command: Commands, +} + +#[derive(Parser, Clone)] +pub struct BillingArgs { + /// period to collect the data (month-year) e.g 09-2024 + pub period: String, + + /// format that will be returned table(log in terminal), json(log in terminal), csv(save a file e.g 09-2024.csv) + pub output: String, +} +#[derive(Subcommand)] +enum Commands { + /// Send the billing invoices + Billing(BillingArgs), +} + +#[tokio::main] +async fn main() -> Result<()> { + dotenv().ok(); + + let env_filter = EnvFilter::builder() + .with_default_directive(Level::INFO.into()) + .with_env_var("RUST_LOG") + .from_env_lossy(); + + tracing_subscriber::registry() + .with(fmt::layer()) + .with(env_filter) + .init(); + + let cli = Cli::parse(); + let config = Config::new(&cli.config)?; + + match cli.command { + Commands::Billing(args) => { + info!("sincronizing cache"); + + let output = match args.output.as_str() { + "table" => OutputFormat::Table, + "json" => OutputFormat::Json, + "csv" => OutputFormat::Csv, + _ => bail!("invalid output format"), + }; + + fabric::drivers::cache::subscribe(config.clone().into()).await?; + fabric::drivers::billing::run(config.clone().into(), &args.period, output).await?; + } + } + + Ok(()) +} + +#[derive(Debug, Clone, Deserialize)] +struct TlsConfig { + ssl_crt_path: PathBuf, + ssl_key_path: PathBuf, +} +#[derive(Debug, Clone, Deserialize)] +struct Config { + db_path: String, + topic: String, + tls: Option, + kafka_consumer: HashMap, +} +impl Config { + pub fn new(path: &str) -> Result { + let config = config::Config::builder() + .add_source(config::File::with_name(path).required(true)) + .build()? + .try_deserialize()?; + + Ok(config) + } +} + +impl From for BillingConfig { + fn from(value: Config) -> Self { + Self { + db_path: value.db_path, + kafka: value.kafka_consumer, + topic: value.topic, + tls_config: value.tls.map(|value| BillingTlsConfig { + ssl_key_path: value.ssl_key_path, + ssl_crt_path: value.ssl_crt_path, + }), + } + } +} + +impl From for CacheConfig { + fn from(value: Config) -> Self { + Self { + kafka: value.kafka_consumer, + db_path: value.db_path, + topic: value.topic, + } + } +} diff --git a/src/domain/event/mod.rs b/src/domain/event/mod.rs index f10aa6b..d3f2a30 100644 --- a/src/domain/event/mod.rs +++ b/src/domain/event/mod.rs @@ -123,6 +123,7 @@ pub struct UsageUnitCreated { pub resource_id: String, pub tier: String, pub units: i64, + pub interval: u64, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct UsageCreated { @@ -296,6 +297,7 @@ mod tests { resource_id: Uuid::new_v4().to_string(), units: 120, tier: "0".into(), + interval: 10, }], created_at: Utc::now(), } diff --git a/src/domain/usage/cache.rs b/src/domain/usage/cache.rs index bf4ccf5..e2f0d8e 100644 --- a/src/domain/usage/cache.rs +++ b/src/domain/usage/cache.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use crate::domain::{event::UsageCreated, Result}; -use super::{Usage, UsageReport}; +use super::{Usage, UsageReport, UsageReportAggregated}; #[cfg_attr(test, mockall::automock)] #[async_trait::async_trait] @@ -13,6 +13,7 @@ pub trait UsageDrivenCache: Send + Sync { page: &u32, page_size: &u32, ) -> Result>; + async fn find_report_aggregated(&self, period: &str) -> Result>; async fn create(&self, usage: Vec) -> Result<()>; } @@ -20,6 +21,13 @@ pub async fn create(cache: Arc, evt: UsageCreated) -> Resu cache.create(evt.into()).await } +pub async fn find_report_aggregated( + cache: Arc, + period: &str, +) -> Result> { + cache.find_report_aggregated(period).await +} + #[cfg(test)] mod tests { use super::*; @@ -34,4 +42,15 @@ mod tests { let result = create(Arc::new(cache), evt).await; assert!(result.is_ok()); } + + #[tokio::test] + async fn it_should_find_report_aggregated() { + let mut cache = MockUsageDrivenCache::new(); + cache + .expect_find_report_aggregated() + .return_once(|_| Ok(Default::default())); + + let result = find_report_aggregated(Arc::new(cache), "09-2024").await; + assert!(result.is_ok()); + } } diff --git a/src/domain/usage/cluster.rs b/src/domain/usage/cluster.rs index 4dfc51e..7f2887b 100644 --- a/src/domain/usage/cluster.rs +++ b/src/domain/usage/cluster.rs @@ -43,6 +43,7 @@ pub async fn sync_usage( resource_id: u.resource_id, units: u.units, tier: u.tier, + interval: u.interval, }) .collect(), created_at: Utc::now(), diff --git a/src/domain/usage/mod.rs b/src/domain/usage/mod.rs index 670ad44..53177d1 100644 --- a/src/domain/usage/mod.rs +++ b/src/domain/usage/mod.rs @@ -13,6 +13,7 @@ pub struct Usage { pub resource_id: String, pub units: i64, pub tier: String, + pub interval: u64, pub created_at: DateTime, } impl From for Vec { @@ -25,18 +26,22 @@ impl From for Vec { resource_id: usage.resource_id.clone(), units: usage.units, tier: usage.tier.clone(), + interval: usage.interval, created_at: evt.created_at, }) .collect() } } +#[derive(Debug)] pub struct UsageUnit { pub resource_id: String, pub units: i64, pub tier: String, + pub interval: u64, } +#[derive(Debug)] pub struct UsageReport { pub resource_id: String, pub resource_kind: String, @@ -46,6 +51,22 @@ pub struct UsageReport { pub period: String, } +#[derive(Debug)] +pub struct UsageReportAggregated { + pub project_id: String, + pub project_namespace: String, + #[allow(dead_code)] + pub project_billing_provider: String, + pub project_billing_provider_id: String, + pub resource_id: String, + pub resource_kind: String, + pub tier: String, + pub interval: u64, + pub units: i64, + #[allow(dead_code)] + pub period: String, +} + #[cfg(test)] mod tests { use uuid::Uuid; @@ -60,6 +81,7 @@ mod tests { resource_id: Uuid::new_v4().to_string(), units: 120, tier: "0".into(), + interval: 10, created_at: Utc::now(), } } diff --git a/src/driven/cache/migrations/20240606_tables.sql b/src/driven/cache/migrations/20240606_tables.sql index 65d8cf9..3b1fd37 100644 --- a/src/driven/cache/migrations/20240606_tables.sql +++ b/src/driven/cache/migrations/20240606_tables.sql @@ -60,6 +60,7 @@ CREATE TABLE IF NOT EXISTS usage ( resource_id TEXT NOT NULL, units INT NOT NULL, tier TEXT NOT NULL, + interval INT NOT NULL, created_at DATETIME NOT NULL, FOREIGN KEY(resource_id) REFERENCES resource(id) ); diff --git a/src/driven/cache/usage.rs b/src/driven/cache/usage.rs index c2eed8d..e6ff71f 100644 --- a/src/driven/cache/usage.rs +++ b/src/driven/cache/usage.rs @@ -2,7 +2,7 @@ use sqlx::{sqlite::SqliteRow, FromRow, Row}; use std::sync::Arc; use crate::domain::{ - usage::{cache::UsageDrivenCache, Usage, UsageReport}, + usage::{cache::UsageDrivenCache, Usage, UsageReport, UsageReportAggregated}, Result, }; @@ -33,6 +33,7 @@ impl UsageDrivenCache for SqliteUsageDrivenCache { r.kind as resource_kind, r.spec as resource_spec, u.tier, + SUM(u.interval) as interval, SUM(u.units) as units, STRFTIME('%m-%Y', 'now') as period FROM "usage" u @@ -53,10 +54,40 @@ impl UsageDrivenCache for SqliteUsageDrivenCache { Ok(report) } + async fn find_report_aggregated(&self, period: &str) -> Result> { + let report_aggregated = sqlx::query_as::<_, UsageReportAggregated>( + r#" + SELECT + p.id as project_id, + p.namespace as project_namespace, + p.billing_provider as project_billing_provider, + p.billing_provider_id as project_billing_provider_id, + r.id as resource_id, + r.kind as resource_kind, + u.tier as tier, + SUM(u.interval) as interval, + SUM(u.units) as units, + STRFTIME('%m-%Y', 'now') as period + FROM "usage" u + INNER JOIN resource r ON r.id == u.resource_id + INNER JOIN project p ON p.id == r.project_id + WHERE STRFTIME('%m-%Y', u.created_at) = $1 + GROUP BY resource_id, tier + ORDER BY project_namespace, resource_id ASC; + "#, + ) + .bind(period) + .fetch_all(&self.sqlite.db) + .await?; + + Ok(report_aggregated) + } + async fn create(&self, usages: Vec) -> Result<()> { let mut tx = self.sqlite.db.begin().await?; for usage in usages { + let interval = usage.interval as i64; sqlx::query!( r#" INSERT INTO usage ( @@ -65,15 +96,17 @@ impl UsageDrivenCache for SqliteUsageDrivenCache { event_id, units, tier, + interval, created_at ) - VALUES ($1, $2, $3, $4, $5, $6) + VALUES ($1, $2, $3, $4, $5, $6, $7) "#, usage.id, usage.resource_id, usage.event_id, usage.units, usage.tier, + interval, usage.created_at, ) .execute(&mut *tx) @@ -88,12 +121,14 @@ impl UsageDrivenCache for SqliteUsageDrivenCache { impl FromRow<'_, SqliteRow> for Usage { fn from_row(row: &SqliteRow) -> sqlx::Result { + let interval: i64 = row.try_get("interval")?; Ok(Self { id: row.try_get("id")?, event_id: row.try_get("event_id")?, resource_id: row.try_get("resource_id")?, units: row.try_get("units")?, tier: row.try_get("tier")?, + interval: interval as u64, created_at: row.try_get("created_at")?, }) } @@ -112,8 +147,28 @@ impl FromRow<'_, SqliteRow> for UsageReport { } } +impl FromRow<'_, SqliteRow> for UsageReportAggregated { + fn from_row(row: &SqliteRow) -> sqlx::Result { + let interval: i64 = row.try_get("interval")?; + Ok(Self { + project_id: row.try_get("project_id")?, + project_namespace: row.try_get("project_namespace")?, + project_billing_provider: row.try_get("project_billing_provider")?, + project_billing_provider_id: row.try_get("project_billing_provider_id")?, + resource_id: row.try_get("resource_id")?, + resource_kind: row.try_get("resource_kind")?, + tier: row.try_get("tier")?, + interval: interval as u64, + units: row.try_get("units")?, + period: row.try_get("period")?, + }) + } +} + #[cfg(test)] mod tests { + use chrono::Utc; + use crate::driven::cache::tests::{mock_project, mock_resource}; use super::*; @@ -191,4 +246,33 @@ mod tests { assert!(result.is_ok()); assert!(result.unwrap().len() == 2); } + + #[tokio::test] + async fn it_should_find_usage_report_aggregated() { + let sqlite_cache = Arc::new(SqliteCache::ephemeral().await.unwrap()); + let cache = SqliteUsageDrivenCache::new(sqlite_cache.clone()); + + let project = mock_project(sqlite_cache.clone()).await; + let resource = mock_resource(sqlite_cache.clone(), &project.id).await; + + let usages = vec![ + Usage { + resource_id: resource.id.clone(), + ..Default::default() + }, + Usage { + resource_id: resource.id.clone(), + ..Default::default() + }, + ]; + + cache.create(usages).await.unwrap(); + + let result = cache + .find_report_aggregated(&Utc::now().format("%m-%Y").to_string()) + .await; + + assert!(result.is_ok()); + assert!(result.unwrap().len() == 1); + } } diff --git a/src/driven/prometheus/mod.rs b/src/driven/prometheus/mod.rs index eeb77f8..1a66dd8 100644 --- a/src/driven/prometheus/mod.rs +++ b/src/driven/prometheus/mod.rs @@ -22,10 +22,6 @@ fn deserialize_value<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, { - let value: Vec = Deserialize::deserialize(deserializer)?; - Ok(value.into_iter().as_slice()[1] - .as_str() - .unwrap() - .parse::() - .unwrap()) + let value = String::deserialize(deserializer)?; + Ok(value.parse().unwrap()) } diff --git a/src/driven/prometheus/usage.rs b/src/driven/prometheus/usage.rs index 9d2fbaa..e41e2f2 100644 --- a/src/driven/prometheus/usage.rs +++ b/src/driven/prometheus/usage.rs @@ -20,16 +20,17 @@ impl UsageDrivenCluster for PrometheusUsageDriven { start: DateTime, end: DateTime, ) -> Result> { - let since = (end - start).num_seconds(); - - let query = format!( - "round(sum by (resource_name, tier) (increase(usage{{tier!~\"0\"}}[{since}s] @ {})) > 0)", - end.timestamp_millis() / 1000 - ); - let response = self .client - .get(format!("{}/query?query={query}", &self.url)) + .get(format!( + "{}/query_range?query=sum by (resource_name, tier) (usage)", + &self.url + )) + .query(&[ + ("start", start.timestamp().to_string()), + ("end", end.timestamp().to_string()), + ("step", "10m".into()), + ]) .send() .await?; @@ -37,8 +38,8 @@ impl UsageDrivenCluster for PrometheusUsageDriven { if status.is_client_error() || status.is_server_error() { error!(status = status.to_string(), "request status code fail"); return Err(Error::Unexpected(format!( - "Prometheus request error. Status: {} Query: {}", - status, query + "Prometheus request error. Status: {}", + status ))); } @@ -48,10 +49,37 @@ impl UsageDrivenCluster for PrometheusUsageDriven { .data .result .iter() - .map(|r| UsageUnit { - resource_id: r.metric.resource_name.clone(), - units: r.value, - tier: r.metric.tier.clone(), + .map(|r| { + let min = r.values.iter().min_by_key(|v| v.timestamp); + let max = r.values.iter().max_by_key(|v| v.timestamp); + + let first_timestamp = match min { + Some(v) => v.timestamp, + None => 0, + }; + let last_timestamp = match max { + Some(v) => v.timestamp, + None => 0, + }; + + let first_value = match min { + Some(v) => v.value, + None => 0, + }; + let last_value = match max { + Some(v) => v.value, + None => 0, + }; + + let interval = last_timestamp - first_timestamp; + let units = last_value - first_value; + + UsageUnit { + resource_id: r.metric.resource_name.clone(), + units, + interval, + tier: r.metric.tier.clone(), + } }) .collect(); @@ -71,6 +99,14 @@ struct PrometheusData { #[derive(Debug, Deserialize)] struct PrometheusUsageResult { metric: PrometheusUsageMetric, + values: Vec, +} +#[derive(Debug, Deserialize)] +struct PrometheusValue { + #[serde(rename = "0")] + timestamp: u64, + + #[serde(rename = "1")] #[serde(deserialize_with = "deserialize_value")] value: i64, } diff --git a/src/drivers/billing/mod.rs b/src/drivers/billing/mod.rs new file mode 100644 index 0000000..262186d --- /dev/null +++ b/src/drivers/billing/mod.rs @@ -0,0 +1,152 @@ +use anyhow::Result; +use comfy_table::Table; +use serde_json::json; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::Arc, +}; +use tracing::{error, info}; + +use crate::{ + domain::{self, usage::UsageReportAggregated}, + driven::cache::{usage::SqliteUsageDrivenCache, SqliteCache}, +}; + +pub enum OutputFormat { + Table, + Json, + Csv, +} + +pub async fn run(config: BillingConfig, period: &str, output: OutputFormat) -> Result<()> { + let sqlite_cache = Arc::new(SqliteCache::new(Path::new(&config.db_path)).await?); + sqlite_cache.migrate().await?; + + let usage_cache = Arc::new(SqliteUsageDrivenCache::new(sqlite_cache.clone())); + + info!("Collecting data"); + + let report = domain::usage::cache::find_report_aggregated(usage_cache.clone(), period).await?; + + match output { + OutputFormat::Table => table(report), + OutputFormat::Json => json(report), + OutputFormat::Csv => csv(report, period), + }; + + Ok(()) +} + +fn csv(report: Vec, period: &str) { + let path = format!("{period}.csv"); + let result = csv::Writer::from_path(&path); + if let Err(error) = result { + error!(?error); + return; + } + + let mut wtr = result.unwrap(); + + let result = wtr.write_record([ + "", + "project", + "stripe_id", + "", + "port", + "tier", + "time", + "units", + ]); + if let Err(error) = result { + error!(?error); + return; + } + + for (i, r) in report.iter().enumerate() { + let result = wtr.write_record([ + &(i + 1).to_string(), + &r.project_namespace, + &r.project_billing_provider_id, + &r.resource_id, + &r.resource_kind, + &r.tier, + &format!("{:.1}h", ((r.interval as f64) / 60.) / 60.), + &r.units.to_string(), + ]); + if let Err(error) = result { + error!(?error); + return; + } + } + + let result = wtr.flush(); + if let Err(error) = result { + error!(?error); + return; + } + + println!("File {} created", path) +} + +fn json(report: Vec) { + let mut json = vec![]; + + for r in report { + json.push(json!({ + "project_id": r.project_id, + "project_namespace": r.project_namespace, + "stripe_id": r.project_billing_provider_id, + "resource_id": r.resource_id, + "resource_kind": r.resource_kind, + "tier": r.tier, + "interval": r.interval, + "units": r.units, + })) + } + + println!("{}", serde_json::to_string_pretty(&json).unwrap()); +} + +fn table(report: Vec) { + let mut table = Table::new(); + table.set_header(vec![ + "", + "project", + "stripe_id", + "", + "port", + "tier", + "time", + "units", + ]); + + for (i, r) in report.iter().enumerate() { + table.add_row(vec![ + &(i + 1).to_string(), + &r.project_namespace, + &r.project_billing_provider_id, + &r.resource_id, + &r.resource_kind, + &r.tier, + &format!("{:.1}h", ((r.interval as f64) / 60.) / 60.), + &r.units.to_string(), + ]); + } + + println!("{table}"); +} + +#[derive(Debug)] +pub struct BillingTlsConfig { + pub ssl_crt_path: PathBuf, + pub ssl_key_path: PathBuf, +} + +#[derive(Debug)] +pub struct BillingConfig { + pub db_path: String, + pub topic: String, + pub kafka: HashMap, + pub tls_config: Option, +} diff --git a/src/drivers/cache/mod.rs b/src/drivers/cache/mod.rs index 884db0e..baff708 100644 --- a/src/drivers/cache/mod.rs +++ b/src/drivers/cache/mod.rs @@ -1,6 +1,7 @@ -use anyhow::Result; +use anyhow::{bail, Result}; use rdkafka::{ consumer::{CommitMode, Consumer, StreamConsumer}, + error::KafkaError, ClientConfig, Message, }; use std::{borrow::Borrow, collections::HashMap, path::Path, sync::Arc}; @@ -26,16 +27,21 @@ pub async fn subscribe(config: CacheConfig) -> Result<()> { for (k, v) in config.kafka.iter() { client_config.set(k, v); } + let consumer: StreamConsumer = client_config.create()?; consumer.subscribe(&[&config.topic])?; info!("Subscriber running"); loop { - // If we fail to consume from Kafka, we need a restart. - let message = consumer - .recv() - .await - .expect("Failed to consume from Kafka, restarting"); + let result = consumer.recv().await; + if let Err(error) = result { + return match error { + KafkaError::PartitionEOF(_) => Ok(()), + _ => bail!(error), + }; + } + + let message = result.unwrap(); info!("Consuming from kafka, current offset: {}", message.offset()); match message.borrow().try_into() { diff --git a/src/drivers/mod.rs b/src/drivers/mod.rs index 79c68d5..0a35cbe 100644 --- a/src/drivers/mod.rs +++ b/src/drivers/mod.rs @@ -1,4 +1,5 @@ +pub mod billing; pub mod cache; -pub mod usage; pub mod grpc; pub mod monitor; +pub mod usage; diff --git a/src/drivers/monitor/mod.rs b/src/drivers/monitor/mod.rs index 428d5a3..dcfd724 100644 --- a/src/drivers/monitor/mod.rs +++ b/src/drivers/monitor/mod.rs @@ -77,6 +77,7 @@ pub async fn subscribe(config: MonitorConfig) -> Result<()> { } } +#[derive(Debug)] pub struct MonitorConfig { pub topic: String, pub kafka: HashMap,