Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve usage to send one event per project #143

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/domain/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ into_event!(ResourceDeleted);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UsageUnitCreated {
pub project_namespace: String,
pub resource_name: String,
pub tier: String,
pub units: i64,
Expand All @@ -152,6 +151,7 @@ pub struct UsageUnitCreated {
pub struct UsageCreated {
pub id: String,
pub cluster_id: String,
pub project_namespace: String,
pub usages: Vec<UsageUnitCreated>,
pub created_at: DateTime<Utc>,
}
Expand Down Expand Up @@ -336,8 +336,8 @@ mod tests {
Self {
id: Uuid::new_v4().to_string(),
cluster_id: Uuid::new_v4().to_string(),
project_namespace: "test".into(),
usages: vec![UsageUnitCreated {
project_namespace: "test".into(),
resource_name: format!("cardanonode-{}", get_random_salt()),
units: 120,
tier: "0".into(),
Expand Down
2 changes: 1 addition & 1 deletion src/domain/usage/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub async fn create(
.iter()
.map(|usage| async {
let Some(resource) = resouce_cache
.find_by_name_for_usage(&usage.project_namespace, &usage.resource_name)
.find_by_name_for_usage(&evt.project_namespace, &usage.resource_name)
.await?
else {
return Err(Error::Unexpected("Resource name has not been found".into()));
Expand Down
49 changes: 29 additions & 20 deletions src/domain/usage/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::sync::Arc;

use chrono::{DateTime, Utc};
use tracing::info;
use futures::future::join_all;
use tracing::{error, info};
use uuid::Uuid;

use crate::domain::{
event::{EventDrivenBridge, UsageCreated, UsageUnitCreated},
Result,
};

use super::UsageUnit;
use super::UsageMetric;

#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
Expand All @@ -19,7 +20,7 @@ pub trait UsageDrivenCluster: Send + Sync {
step: &str,
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> Result<Vec<UsageUnit>>;
) -> Result<Vec<UsageMetric>>;
}

pub async fn sync_usage(
Expand All @@ -36,23 +37,31 @@ pub async fn sync_usage(
return Ok(());
}

let evt = UsageCreated {
id: Uuid::new_v4().to_string(),
cluster_id: cluster_id.into(),
usages: usages
.into_iter()
.map(|u| UsageUnitCreated {
project_namespace: u.project_namespace,
resource_name: u.resource_name,
units: u.units,
tier: u.tier,
interval: u.interval,
})
.collect(),
created_at: Utc::now(),
};

event.dispatch(evt.into()).await?;
let tasks = usages.iter().map(|u| async {
let evt = UsageCreated {
id: Uuid::new_v4().to_string(),
cluster_id: cluster_id.into(),
project_namespace: u.project_namespace.clone(),
created_at: Utc::now(),
usages: u
.resources
.iter()
.map(|r| UsageUnitCreated {
resource_name: r.resource_name.clone(),
units: r.units,
tier: r.tier.clone(),
interval: r.interval,
})
.collect(),
};

if let Err(error) = event.dispatch(evt.into()).await {
error!(?error, ?u.project_namespace, "fail to dispatch usage event");
}
});

join_all(tasks).await;

info!(
cursor = cursor.to_string(),
end = end.to_string(),
Expand Down
6 changes: 5 additions & 1 deletion src/domain/usage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ impl Usage {
}

#[derive(Debug)]
pub struct UsageUnit {
pub struct UsageMetric {
pub project_namespace: String,
pub resources: Vec<UsageUnitMetric>,
}
#[derive(Debug, Clone)]
pub struct UsageUnitMetric {
pub resource_name: String,
pub units: i64,
pub tier: String,
Expand Down
77 changes: 41 additions & 36 deletions src/driven/prometheus/usage.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::collections::HashMap;

use chrono::{DateTime, Utc};
use serde::Deserialize;
use tracing::error;

use crate::{
domain::{
error::Error,
usage::{cluster::UsageDrivenCluster, UsageUnit},
usage::{cluster::UsageDrivenCluster, UsageMetric, UsageUnitMetric},
Result,
},
driven::prometheus::deserialize_value,
Expand All @@ -20,7 +22,7 @@ impl UsageDrivenCluster for PrometheusUsageDriven {
step: &str,
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> Result<Vec<UsageUnit>> {
) -> Result<Vec<UsageMetric>> {
let response = self
.client
.get(format!(
Expand All @@ -46,46 +48,49 @@ impl UsageDrivenCluster for PrometheusUsageDriven {

let response: PrometheusResponse = response.json().await?;

let usage_units: Vec<UsageUnit> = response
.data
.result
.iter()
.map(|r| {
let min = r.values.iter().min_by_key(|v| v.timestamp);
let max = r.values.iter().max_by_key(|v| v.timestamp);
let mut metrics: HashMap<String, UsageMetric> = HashMap::new();
for r in response.data.result.iter() {
let min = r.values.iter().min_by_key(|v| v.timestamp);
let max = r.values.iter().max_by_key(|v| v.timestamp);

let first_timestamp = match min {
Some(v) => v.timestamp,
None => 0,
};
let last_timestamp = match max {
Some(v) => v.timestamp,
None => 0,
};

let first_timestamp = match min {
Some(v) => v.timestamp,
None => 0,
};
let last_timestamp = match max {
Some(v) => v.timestamp,
None => 0,
};
let first_value = match min {
Some(v) => v.value,
None => 0,
};
let last_value = match max {
Some(v) => v.value,
None => 0,
};

let first_value = match min {
Some(v) => v.value,
None => 0,
};
let last_value = match max {
Some(v) => v.value,
None => 0,
};
let interval = last_timestamp - first_timestamp;
let units = last_value - first_value;

let interval = last_timestamp - first_timestamp;
let units = last_value - first_value;
let usage_unit = UsageUnitMetric {
resource_name: r.metric.resource_name.clone(),
units,
interval,
tier: r.metric.tier.clone(),
};

UsageUnit {
metrics
.entry(r.metric.project.clone())
.and_modify(|u| u.resources.push(usage_unit.clone()))
.or_insert(UsageMetric {
project_namespace: r.metric.project.clone(),
resource_name: r.metric.resource_name.clone(),
units,
interval,
tier: r.metric.tier.clone(),
}
})
.collect();
resources: vec![usage_unit],
});
}

Ok(usage_units)
Ok(metrics.into_values().collect())
}
}

Expand Down
Loading