diff --git a/.github/workflows/deployment-staging.yml b/.github/workflows/deployment-staging.yml index 052ff1f..d6f390f 100644 --- a/.github/workflows/deployment-staging.yml +++ b/.github/workflows/deployment-staging.yml @@ -43,6 +43,7 @@ jobs: get-cities-bnas get-cities-census get-cities-submissions + get-price-fargate patch-bnas-analysis patch-cities-submissions post-bnas diff --git a/Cargo.toml b/Cargo.toml index 989bc48..ca273a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ entity = { path = "entity" } futures = "0.3.21" http-serde = "2.0.0" itertools = "0.13.0" -lambda_http = "0.11.0" +lambda_http = "0.12.0" lambda_runtime = "0.12.0" migration = { path = "migration" } nom = "7.1.3" diff --git a/entity/src/entities/brokenspoke_pipeline.rs b/entity/src/entities/brokenspoke_pipeline.rs index 4e9a92e..53b603e 100644 --- a/entity/src/entities/brokenspoke_pipeline.rs +++ b/entity/src/entities/brokenspoke_pipeline.rs @@ -1,6 +1,6 @@ //! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.14 -use super::sea_orm_active_enums::BrokenspokeState; +use super::sea_orm_active_enums::BrokenspokeStep; use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; @@ -9,17 +9,15 @@ use serde::{Deserialize, Serialize}; pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub state_machine_id: Uuid, - #[sea_orm(unique)] - pub scheduled_trigger_id: Option, - pub state: Option, + pub step: Option, pub sqs_message: Option, - pub neon_branch_id: Option, pub fargate_task_arn: Option, pub s3_bucket: Option, pub start_time: TimeDateTimeWithTimeZone, pub end_time: Option, pub torn_down: Option, pub results_posted: Option, + pub cost: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/entity/src/entities/fargate_price.rs b/entity/src/entities/fargate_price.rs new file mode 100644 index 0000000..649365b --- /dev/null +++ b/entity/src/entities/fargate_price.rs @@ -0,0 +1,18 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.14 + +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] +#[sea_orm(table_name = "fargate_price")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + pub per_second: Decimal, + pub created_at: TimeDateTimeWithTimeZone, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/entity/src/entities/mod.rs b/entity/src/entities/mod.rs index 372e14a..d8d8709 100644 --- a/entity/src/entities/mod.rs +++ b/entity/src/entities/mod.rs @@ -6,6 +6,7 @@ pub mod brokenspoke_pipeline; pub mod census; pub mod city; pub mod core_services; +pub mod fargate_price; pub mod features; pub mod infrastructure; pub mod opportunity; diff --git a/entity/src/entities/prelude.rs b/entity/src/entities/prelude.rs index 93ac6ea..c1dfd7c 100644 --- a/entity/src/entities/prelude.rs +++ b/entity/src/entities/prelude.rs @@ -4,6 +4,7 @@ pub use super::brokenspoke_pipeline::Entity as BrokenspokePipeline; pub use super::census::Entity as Census; pub use super::city::Entity as City; pub use super::core_services::Entity as CoreServices; +pub use super::fargate_price::Entity as FargatePrice; pub use super::features::Entity as Features; pub use super::infrastructure::Entity as Infrastructure; pub use super::opportunity::Entity as Opportunity; diff --git a/entity/src/entities/sea_orm_active_enums.rs b/entity/src/entities/sea_orm_active_enums.rs index 2513022..34d8574 100644 --- a/entity/src/entities/sea_orm_active_enums.rs +++ b/entity/src/entities/sea_orm_active_enums.rs @@ -30,8 +30,8 @@ pub enum BnaRegion { South, } #[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "brokenspoke_state")] -pub enum BrokenspokeState { +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "brokenspoke_step")] +pub enum BrokenspokeStep { #[sea_orm(string_value = "analysis")] Analysis, #[sea_orm(string_value = "cleanup")] diff --git a/entity/src/wrappers/brokenspoke_pipeline.rs b/entity/src/wrappers/brokenspoke_pipeline.rs index 845721b..b6454fb 100644 --- a/entity/src/wrappers/brokenspoke_pipeline.rs +++ b/entity/src/wrappers/brokenspoke_pipeline.rs @@ -1,75 +1,70 @@ use crate::entities::{brokenspoke_pipeline, sea_orm_active_enums}; use sea_orm::{ - prelude::{Json, TimeDateTimeWithTimeZone, Uuid}, + prelude::{Decimal, Json, TimeDateTimeWithTimeZone, Uuid}, ActiveValue, IntoActiveModel, }; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct BrokenspokePipelinePost { - pub state: Option, - pub state_machine_id: Uuid, - pub scheduled_trigger_id: Option, - pub sqs_message: Option, - pub neon_branch_id: Option, + pub cost: Option, + pub end_time: Option, pub fargate_task_arn: Option, + pub result_posted: Option, pub s3_bucket: Option, + pub sqs_message: Option, pub start_time: TimeDateTimeWithTimeZone, - pub end_time: Option, + pub state_machine_id: Uuid, + pub step: Option, pub torn_down: Option, - pub result_posted: Option, } impl IntoActiveModel for BrokenspokePipelinePost { fn into_active_model(self) -> brokenspoke_pipeline::ActiveModel { brokenspoke_pipeline::ActiveModel { - state: ActiveValue::Set(self.state), - state_machine_id: ActiveValue::Set(self.state_machine_id), - sqs_message: ActiveValue::Set(self.sqs_message), - neon_branch_id: ActiveValue::Set(self.neon_branch_id), + cost: ActiveValue::Set(self.cost), + end_time: ActiveValue::Set(self.end_time), fargate_task_arn: ActiveValue::Set(self.fargate_task_arn), + results_posted: ActiveValue::Set(self.result_posted), s3_bucket: ActiveValue::Set(self.s3_bucket), - scheduled_trigger_id: ActiveValue::Set(self.scheduled_trigger_id), + sqs_message: ActiveValue::Set(self.sqs_message), start_time: ActiveValue::Set(self.start_time), - end_time: ActiveValue::Set(self.end_time), + state_machine_id: ActiveValue::Set(self.state_machine_id), + step: ActiveValue::Set(self.step), torn_down: ActiveValue::Set(self.torn_down), - results_posted: ActiveValue::Set(self.result_posted), } } } #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct BrokenspokePipelinePatch { - pub state: Option>, - pub scheduled_trigger_id: Option>, - pub sqs_message: Option>, - pub neon_branch_id: Option>, + pub cost: Option>, + pub end_time: Option>, pub fargate_task_arn: Option>, + pub neon_branch_id: Option>, + pub result_posted: Option>, pub s3_bucket: Option>, + pub scheduled_trigger_id: Option>, + pub sqs_message: Option>, pub start_time: Option>, - pub end_time: Option>, + pub state: Option>, pub torn_down: Option>, - pub result_posted: Option>, } impl IntoActiveModel for BrokenspokePipelinePatch { fn into_active_model(self) -> brokenspoke_pipeline::ActiveModel { brokenspoke_pipeline::ActiveModel { state_machine_id: ActiveValue::NotSet, - state: self.state.map_or(ActiveValue::NotSet, ActiveValue::Set), + cost: self.cost.map_or(ActiveValue::NotSet, ActiveValue::Set), + step: self.state.map_or(ActiveValue::NotSet, ActiveValue::Set), sqs_message: self .sqs_message .map_or(ActiveValue::NotSet, ActiveValue::Set), - neon_branch_id: self - .neon_branch_id - .map_or(ActiveValue::NotSet, ActiveValue::Set), + fargate_task_arn: self .fargate_task_arn .map_or(ActiveValue::NotSet, ActiveValue::Set), s3_bucket: self.s3_bucket.map_or(ActiveValue::NotSet, ActiveValue::Set), - scheduled_trigger_id: self - .scheduled_trigger_id - .map_or(ActiveValue::NotSet, ActiveValue::Set), start_time: ActiveValue::NotSet, end_time: self.end_time.map_or(ActiveValue::NotSet, ActiveValue::Set), torn_down: self.torn_down.map_or(ActiveValue::NotSet, ActiveValue::Set), diff --git a/entity/src/wrappers/mod.rs b/entity/src/wrappers/mod.rs index a110f9b..0897e4a 100644 --- a/entity/src/wrappers/mod.rs +++ b/entity/src/wrappers/mod.rs @@ -44,36 +44,36 @@ impl FromStr for ApprovalStatus { } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub enum BrokenspokeState { +pub enum BrokenspokeStep { SqsMessage, Setup, Analysis, Cleanup, } -impl From for BrokenspokeState { - fn from(value: sea_orm_active_enums::BrokenspokeState) -> Self { +impl From for BrokenspokeStep { + fn from(value: sea_orm_active_enums::BrokenspokeStep) -> Self { match value { - sea_orm_active_enums::BrokenspokeState::Analysis => Self::Analysis, - sea_orm_active_enums::BrokenspokeState::Cleanup => Self::Cleanup, - sea_orm_active_enums::BrokenspokeState::Setup => Self::Setup, - sea_orm_active_enums::BrokenspokeState::SqsMessage => Self::SqsMessage, + sea_orm_active_enums::BrokenspokeStep::Analysis => Self::Analysis, + sea_orm_active_enums::BrokenspokeStep::Cleanup => Self::Cleanup, + sea_orm_active_enums::BrokenspokeStep::Setup => Self::Setup, + sea_orm_active_enums::BrokenspokeStep::SqsMessage => Self::SqsMessage, } } } -impl From for sea_orm_active_enums::BrokenspokeState { - fn from(val: BrokenspokeState) -> Self { +impl From for sea_orm_active_enums::BrokenspokeStep { + fn from(val: BrokenspokeStep) -> Self { match val { - BrokenspokeState::Analysis => sea_orm_active_enums::BrokenspokeState::Analysis, - BrokenspokeState::Cleanup => sea_orm_active_enums::BrokenspokeState::Cleanup, - BrokenspokeState::Setup => sea_orm_active_enums::BrokenspokeState::Setup, - BrokenspokeState::SqsMessage => sea_orm_active_enums::BrokenspokeState::SqsMessage, + BrokenspokeStep::Analysis => sea_orm_active_enums::BrokenspokeStep::Analysis, + BrokenspokeStep::Cleanup => sea_orm_active_enums::BrokenspokeStep::Cleanup, + BrokenspokeStep::Setup => sea_orm_active_enums::BrokenspokeStep::Setup, + BrokenspokeStep::SqsMessage => sea_orm_active_enums::BrokenspokeStep::SqsMessage, } } } -impl FromStr for BrokenspokeState { +impl FromStr for BrokenspokeStep { type Err = serde_plain::Error; fn from_str(s: &str) -> Result { diff --git a/lambdas/Cargo.toml b/lambdas/Cargo.toml index 9c09d8a..e1a904a 100644 --- a/lambdas/Cargo.toml +++ b/lambdas/Cargo.toml @@ -68,6 +68,10 @@ path = "src/cities/get-cities-census.rs" name = "get-cities-submissions" path = "src/cities-submissions/get-cities-submissions.rs" +[[bin]] +name = "get-price-fargate" +path = "src/price-fargate/get-price-fargate.rs" + [[bin]] name = "patch-bnas-analysis" path = "src/bnas-analysis/patch-bnas-analysis.rs" diff --git a/lambdas/src/price-fargate/get-price-fargate.rs b/lambdas/src/price-fargate/get-price-fargate.rs new file mode 100644 index 0000000..65823ff --- /dev/null +++ b/lambdas/src/price-fargate/get-price-fargate.rs @@ -0,0 +1,108 @@ +use dotenv::dotenv; +use effortless::{ + api::{entry_not_found, invalid_path_parameter, parse_query_string_parameter}, + error::APIError, + fragment::{get_apigw_request_id, BnaRequestExt}, +}; +use entity::prelude::*; +use lambda_http::{run, service_fn, Body, Error, IntoResponse, Request, Response}; +use lambdas::{api_database_connect, build_paginated_response, pagination_parameters}; +use sea_orm::{EntityTrait, PaginatorTrait, QueryOrder, QuerySelect}; +use serde_json::json; +use tracing::{debug, info}; + +async fn function_handler(event: Request) -> Result, Error> { + dotenv().ok(); + + // Retrieve pagination parameters if any. + let (page_size, page) = match pagination_parameters(&event) { + Ok((page_size, page)) => (page_size, page), + Err(e) => return Ok(e), + }; + + // Retrieve the ID of the entry to update. + let parameter = "id"; + let id = event.path_parameter::(parameter); + + // Set the database connection. + let db = match api_database_connect(&event).await { + Ok(db) => db, + Err(e) => return Ok(e), + }; + + // Retrieve all entries or a specific one. + debug!("Processing the requests..."); + let res = match id { + Some(id) => match id { + Ok(id) => { + let model = FargatePrice::find_by_id(id).one(&db).await?; + let res: Response = match model { + Some(model) => json!(model).into_response().await, + None => entry_not_found(&event).into(), + }; + res + } + Err(e) => { + return Ok( + invalid_path_parameter(&event, parameter, e.to_string().as_str()).into(), + ); + } + }, + None => { + // Retrieve the filter if any. + let latest: Option = match parse_query_string_parameter::(&event, "latest") + { + Ok(latest) => latest, + Err(e) => return Ok(e.into()), + }; + + // Select the entity. + let mut select = FargatePrice::find(); + + // Select latest only. + if latest.is_some() { + select = select + .order_by_asc(entity::fargate_price::Column::CreatedAt) + .limit(1); + } + + // Select the results. + let query = select + .clone() + .paginate(&db, page_size) + .fetch_page(page - 1) + .await; + let res: Response = match query { + Ok(models) => { + let total_items = select.count(&db).await?; + build_paginated_response(json!(models), total_items, page, page_size, &event)? + } + Err(e) => APIError::with_pointer( + get_apigw_request_id(&event), + event.uri().path().to_string().as_str(), + e.to_string().as_str(), + ) + .into(), + }; + res + } + }; + + Ok(res) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // disable printing the name of the module in every log line. + .with_target(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + run(service_fn(function_handler)).await.map_err(|e| { + info!("{e}"); + e + }) +} diff --git a/migration/src/m20240202_004130_brokenspoke_analyzer_pipeline.rs b/migration/src/m20240202_004130_brokenspoke_analyzer_pipeline.rs index 8aadc55..6341b9a 100644 --- a/migration/src/m20240202_004130_brokenspoke_analyzer_pipeline.rs +++ b/migration/src/m20240202_004130_brokenspoke_analyzer_pipeline.rs @@ -21,12 +21,13 @@ impl MigrationTrait for Migration { manager .create_type( Type::create() - .as_enum(BrokenspokeState::Table) - .values(BrokenspokeState::iter().skip(1)) + .as_enum(BrokenspokeStep::Table) + .values(BrokenspokeStep::iter().skip(1)) .to_owned(), ) .await?; + // Create the Brokenspoke Pipeline table. manager .create_table( Table::create() @@ -39,16 +40,10 @@ impl MigrationTrait for Migration { .primary_key(), ) .col( - ColumnDef::new(BrokenspokePipeline::ScheduledTriggerId) - .uuid() - .unique_key(), - ) - .col( - ColumnDef::new(BrokenspokePipeline::State) - .enumeration(BrokenspokeState::Table, BrokenspokeState::iter().skip(1)), + ColumnDef::new(BrokenspokePipeline::Step) + .enumeration(BrokenspokeStep::Table, BrokenspokeStep::iter().skip(1)), ) .col(ColumnDef::new(BrokenspokePipeline::SqsMessage).json()) - .col(ColumnDef::new(BrokenspokePipeline::NeonBranchId).string()) .col(ColumnDef::new(BrokenspokePipeline::FargateTaskARN).string()) .col(ColumnDef::new(BrokenspokePipeline::S3Bucket).string()) .col( @@ -59,31 +54,61 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(BrokenspokePipeline::EndTime).timestamp_with_time_zone()) .col(ColumnDef::new(BrokenspokePipeline::TornDown).boolean()) .col(ColumnDef::new(BrokenspokePipeline::ResultsPosted).boolean()) + .col(ColumnDef::new(BrokenspokePipeline::Cost).decimal()) + .to_owned(), + ) + .await?; + + // Create the Fargate Price table. + manager + .create_table( + Table::create() + .table(FargatePrice::Table) + .if_not_exists() + .col( + ColumnDef::new(FargatePrice::Id) + .integer() + .auto_increment() + .not_null() + .primary_key(), + ) + .col(ColumnDef::new(FargatePrice::PerSecond).not_null().decimal()) + .col( + ColumnDef::new(FargatePrice::CreatedAt) + .timestamp_with_time_zone() + .default(Expr::current_timestamp()) + .not_null(), + ) .to_owned(), ) - .await + .await?; + + Ok(()) } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { manager .drop_table(Table::drop().table(BrokenspokePipeline::Table).to_owned()) - .await + .await?; + manager + .drop_table(Table::drop().table(FargatePrice::Table).to_owned()) + .await?; + Ok(()) } } #[derive(DeriveIden)] enum BrokenspokePipeline { Table, + Cost, EndTime, FargateTaskARN, - NeonBranchId, ResultsPosted, S3Bucket, - ScheduledTriggerId, SqsMessage, StartTime, - State, StateMachineId, + Step, TornDown, } @@ -96,10 +121,35 @@ pub enum BrokenspokeStatus { } #[derive(Iden, EnumIter)] -pub enum BrokenspokeState { +pub enum BrokenspokeStep { Table, SqsMessage, Setup, Analysis, Cleanup, } +// Pricing is coming from the CloudTempo calculator with the following paramaters: +// - Architecture: x86 +// - Region: US West (Oregon) +// - Duration: 1h +// - vCPU: 2 +// - Memory: 8GB +// - Storage: 200GB +// +// https://cloudtempo.dev/fargate-pricing-calculator +// +// The final value is obtained by converting the price per hour to price per second. +// This is done by dividing the price per hour per 60 and rounding it up to the +// ten thousandths (4th decimal). +// +// For instance: +// Price per hour is $0.137, which gives a price per second of 0.137/60 = 0.00228333333333. +// Rounding it up to the nearest $0.0001 brings a price of $0.0023 per second. +// https://www.calculator.net/rounding-calculator.html?cnum=0.00228333333333&cpre=4&cpren=2&cmode=nearest&sp=0&x=Calculate +#[derive(Iden)] +pub enum FargatePrice { + Table, + Id, + PerSecond, + CreatedAt, +}