Skip to content

Commit

Permalink
Implemented usage (#86)
Browse files Browse the repository at this point in the history
* feat: implemented usage domain

* feat: implemented stateless usage cursor

* feat: improved usage domain validation and driven query

* chore: updated examples config

* chore: updated fabric test manifest

* feat: implemented usage cache

* feat: added tier field

* chore: removed dbg macro

* chore: updated sqlx files

* feat: implemented command to fetch usage report

* chore: moved assert project permission to utils

* test: fixed project cache driven mock in usage command

* chore: updated usage report query to group by tier too
  • Loading branch information
paulobressan authored Aug 26, 2024
1 parent c4e2048 commit e0444ba
Show file tree
Hide file tree
Showing 28 changed files with 1,022 additions and 60 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions examples/config/daemon.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
topic="events"

cluster_id = "625e6681-8a74-4454-b5ad-861b45c6a42e"
prometheus_url = "http://prometheus:9090/api/v1"
delay_sec = 60

[kafka]
"bootstrap.servers" = "localhost:19092"
"group.id"= "demeter-daemon"
Expand Down
56 changes: 51 additions & 5 deletions src/bin/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{collections::HashMap, env};
use std::{collections::HashMap, env, time::Duration};

use anyhow::Result;
use dotenv::dotenv;
use fabric::drivers::monitor::MonitorConfig;
use serde::Deserialize;
use fabric::drivers::{monitor::MonitorConfig, usage::UsageConfig};
use serde::{de::Visitor, Deserialize, Deserializer};
use tokio::try_join;
use tracing::Level;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

Expand All @@ -23,11 +24,21 @@ async fn main() -> Result<()> {

let config = Config::new()?;

fabric::drivers::monitor::subscribe(config.into()).await
let schedule = fabric::drivers::usage::schedule(config.clone().into());
let subscribe = fabric::drivers::monitor::subscribe(config.clone().into());

try_join!(schedule, subscribe)?;

Ok(())
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
struct Config {
cluster_id: String,
prometheus_url: String,
#[serde(deserialize_with = "deserialize_duration")]
#[serde(rename(deserialize = "delay_sec"))]
delay: Duration,
topic: String,
kafka: HashMap<String, String>,
}
Expand All @@ -54,3 +65,38 @@ impl From<Config> for MonitorConfig {
}
}
}

impl From<Config> for UsageConfig {
fn from(value: Config) -> Self {
Self {
cluster_id: value.cluster_id,
prometheus_url: value.prometheus_url,
delay: value.delay,
kafka: value.kafka,
topic: value.topic,
}
}
}

fn deserialize_duration<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_map(DurationVisitor)
}

struct DurationVisitor;
impl<'de> Visitor<'de> for DurationVisitor {
type Value = Duration;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("This Visitor expects to receive i64 seconds")
}

fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Duration::from_secs(v as u64))
}
}
10 changes: 5 additions & 5 deletions src/bin/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::Result;
use dotenv::dotenv;
use fabric::drivers::{cache::CacheConfig, grpc::GrpcConfig};
use serde::Deserialize;
use tokio::try_join;
use tracing::Level;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

Expand All @@ -23,11 +24,10 @@ async fn main() -> Result<()> {

let config = Config::new()?;

futures::future::try_join(
fabric::drivers::grpc::server(config.clone().into()),
fabric::drivers::cache::subscribe(config.clone().into()),
)
.await?;
let grpc = fabric::drivers::grpc::server(config.clone().into());
let subscribe = fabric::drivers::cache::subscribe(config.clone().into());

try_join!(grpc, subscribe)?;

Ok(())
}
Expand Down
33 changes: 33 additions & 0 deletions src/domain/auth/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
use std::sync::Arc;

use super::{error::Error, project::cache::ProjectDrivenCache};

use crate::domain::Result;

pub type UserId = String;
pub type SecretId = String;

Expand All @@ -6,3 +12,30 @@ pub enum Credential {
Auth0(UserId),
ApiKey(SecretId),
}

pub async fn assert_project_permission(
project_cache: Arc<dyn ProjectDrivenCache>,
credential: &Credential,
project_id: &str,
) -> Result<()> {
match credential {
Credential::Auth0(user_id) => {
let result = project_cache
.find_user_permission(user_id, project_id)
.await?;

if result.is_none() {
return Err(Error::Unauthorized("user doesnt have permission".into()));
}

Ok(())
}
Credential::ApiKey(secret_project_id) => {
if project_id != secret_project_id {
return Err(Error::Unauthorized("secret doesnt have permission".into()));
}

Ok(())
}
}
}
5 changes: 5 additions & 0 deletions src/domain/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,8 @@ impl From<kube::Error> for Error {
Self::Unexpected(value.to_string())
}
}
impl From<reqwest::Error> for Error {
fn from(value: reqwest::Error) -> Self {
Self::Unexpected(value.to_string())
}
}
32 changes: 32 additions & 0 deletions src/domain/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@ pub struct ResourceDeleted {
}
into_event!(ResourceDeleted);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UsageUnitCreated {
pub resource_id: String,
pub tier: String,
pub units: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UsageCreated {
pub id: String,
pub cluster_id: String,
pub usages: Vec<UsageUnitCreated>,
pub created_at: DateTime<Utc>,
}
into_event!(UsageCreated);

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
#[allow(clippy::enum_variant_names)]
Expand All @@ -101,6 +116,7 @@ pub enum Event {
ResourceCreated(ResourceCreated),
ResourceUpdated(ResourceUpdated),
ResourceDeleted(ResourceDeleted),
UsageCreated(UsageCreated),
}
impl Event {
pub fn key(&self) -> String {
Expand All @@ -112,6 +128,7 @@ impl Event {
Event::ResourceCreated(_) => "ResourceCreated".into(),
Event::ResourceUpdated(_) => "ResourceUpdated".into(),
Event::ResourceDeleted(_) => "ResourceDeleted".into(),
Event::UsageCreated(_) => "UsageCreated".into(),
}
}
pub fn from_key(key: &str, payload: &[u8]) -> Result<Self> {
Expand All @@ -125,6 +142,7 @@ impl Event {
"ResourceCreated" => Ok(Self::ResourceCreated(serde_json::from_slice(payload)?)),
"ResourceUpdated" => Ok(Self::ResourceUpdated(serde_json::from_slice(payload)?)),
"ResourceDeleted" => Ok(Self::ResourceDeleted(serde_json::from_slice(payload)?)),
"UsageCreated" => Ok(Self::UsageCreated(serde_json::from_slice(payload)?)),
_ => Err(Error::Unexpected(format!(
"Event key '{}' not implemented",
key
Expand Down Expand Up @@ -202,4 +220,18 @@ mod tests {
}
}
}
impl Default for UsageCreated {
fn default() -> Self {
Self {
id: Uuid::new_v4().to_string(),
cluster_id: Uuid::new_v4().to_string(),
usages: vec![UsageUnitCreated {
resource_id: Uuid::new_v4().to_string(),
units: 120,
tier: "0".into(),
}],
created_at: Utc::now(),
}
}
}
}
1 change: 1 addition & 0 deletions src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod event;
pub mod metadata;
pub mod project;
pub mod resource;
pub mod usage;
pub mod utils;

pub const PAGE_SIZE_DEFAULT: u32 = 12;
Expand Down
1 change: 0 additions & 1 deletion src/domain/resource/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ pub trait ResourceDrivenCluster: Send + Sync {
async fn update(&self, obj: &DynamicObject) -> Result<()>;
async fn delete(&self, obj: &DynamicObject) -> Result<()>;
}

pub async fn apply_manifest(
cluster: Arc<dyn ResourceDrivenCluster>,
evt: ResourceCreated,
Expand Down
38 changes: 6 additions & 32 deletions src/domain/resource/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::info;
use uuid::Uuid;

use crate::domain::{
auth::Credential,
auth::{assert_project_permission, Credential},
error::Error,
event::{EventDrivenBridge, ResourceCreated, ResourceDeleted},
metadata::{KnownField, MetadataDriven},
Expand All @@ -26,7 +26,7 @@ pub async fn fetch(
resource_cache: Arc<dyn ResourceDrivenCache>,
cmd: FetchCmd,
) -> Result<Vec<Resource>> {
assert_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?;
assert_project_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?;

resource_cache
.find(&cmd.project_id, &cmd.page, &cmd.page_size)
Expand All @@ -38,7 +38,7 @@ pub async fn fetch_by_id(
resource_cache: Arc<dyn ResourceDrivenCache>,
cmd: FetchByIdCmd,
) -> Result<Resource> {
assert_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?;
assert_project_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?;

let Some(project) = project_cache.find_by_id(&cmd.project_id).await? else {
return Err(Error::CommandMalformed("invalid project id".into()));
Expand All @@ -59,7 +59,7 @@ pub async fn create(
event: Arc<dyn EventDrivenBridge>,
cmd: CreateCmd,
) -> Result<()> {
assert_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?;
assert_project_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?;

let Some(crd) = metadata.find_by_kind(&cmd.kind).await? else {
return Err(Error::CommandMalformed("kind not supported".into()));
Expand Down Expand Up @@ -120,7 +120,7 @@ pub async fn update(
return Err(Error::CommandMalformed("invalid resource id".into()));
};

assert_permission(project_cache.clone(), &cmd.credential, &resource.project_id).await?;
assert_project_permission(project_cache.clone(), &cmd.credential, &resource.project_id).await?;
let Some(project) = project_cache.find_by_id(&resource.project_id).await? else {
return Err(Error::CommandMalformed("invalid project id".into()));
};
Expand Down Expand Up @@ -150,7 +150,7 @@ pub async fn delete(
event: Arc<dyn EventDrivenBridge>,
cmd: DeleteCmd,
) -> Result<()> {
assert_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?;
assert_project_permission(project_cache.clone(), &cmd.credential, &cmd.project_id).await?;

let Some(project) = project_cache.find_by_id(&cmd.project_id).await? else {
return Err(Error::CommandMalformed("invalid project id".into()));
Expand All @@ -177,32 +177,6 @@ pub async fn delete(
Ok(())
}

async fn assert_permission(
project_cache: Arc<dyn ProjectDrivenCache>,
credential: &Credential,
project_id: &str,
) -> Result<()> {
match credential {
Credential::Auth0(user_id) => {
let result = project_cache
.find_user_permission(user_id, project_id)
.await?;

if result.is_none() {
return Err(Error::Unauthorized("user doesnt have permission".into()));
}

Ok(())
}
Credential::ApiKey(secret_project_id) => {
if project_id != secret_project_id {
return Err(Error::Unauthorized("secret doesnt have permission".into()));
}

Ok(())
}
}
}
fn assert_project_resource(project: &Project, resource: &Resource) -> Result<()> {
if project.id != resource.project_id {
return Err(Error::CommandMalformed("invalid resource id".into()));
Expand Down
48 changes: 48 additions & 0 deletions src/domain/usage/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::sync::Arc;

use crate::domain::{event::UsageCreated, Result};

use super::{Usage, UsageReport};

#[async_trait::async_trait]
pub trait UsageDrivenCache: Send + Sync {
async fn find_report(
&self,
project_id: &str,
page: &u32,
page_size: &u32,
) -> Result<Vec<UsageReport>>;
async fn create(&self, usage: Vec<Usage>) -> Result<()>;
}

pub async fn create(cache: Arc<dyn UsageDrivenCache>, evt: UsageCreated) -> Result<()> {
cache.create(evt.into()).await
}

#[cfg(test)]
mod tests {
use mockall::mock;

use super::*;

mock! {
pub FakeUsageDrivenCache { }

#[async_trait::async_trait]
impl UsageDrivenCache for FakeUsageDrivenCache {
async fn find_report(&self, project_id: &str, page: &u32, page_size: &u32,) -> Result<Vec<UsageReport>>;
async fn create(&self, usage: Vec<Usage>) -> Result<()>;
}
}

#[tokio::test]
async fn it_should_create_usage_cache() {
let mut cache = MockFakeUsageDrivenCache::new();
cache.expect_create().return_once(|_| Ok(()));

let evt = UsageCreated::default();

let result = create(Arc::new(cache), evt).await;
assert!(result.is_ok());
}
}
Loading

0 comments on commit e0444ba

Please sign in to comment.