Skip to content

Commit

Permalink
Add Fargate Princing table (#112)
Browse files Browse the repository at this point in the history
Adds a table for storing the price per second associated to running Fargate
with our configuration.

Drive-by:
- Removes unused fields in the BrokenspokePipeline table.



save

Signed-off-by: Rémy Greinhofer <[email protected]>
  • Loading branch information
rgreinho authored Jul 8, 2024
1 parent 250bdf4 commit 58c2869
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 66 deletions.
1 change: 1 addition & 0 deletions .github/workflows/deployment-staging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ jobs:
get-cities-bnas
get-cities-census
get-cities-submissions
get-price-fargate
patch-bnas-analysis
patch-cities-submissions
post-bnas
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ entity = { path = "entity" }
futures = "0.3.21"
http-serde = "2.0.0"
itertools = "0.13.0"
lambda_http = "0.11.0"
lambda_http = "0.12.0"
lambda_runtime = "0.12.0"
migration = { path = "migration" }
nom = "7.1.3"
Expand Down
8 changes: 3 additions & 5 deletions entity/src/entities/brokenspoke_pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.14
use super::sea_orm_active_enums::BrokenspokeState;
use super::sea_orm_active_enums::BrokenspokeStep;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

Expand All @@ -9,17 +9,15 @@ use serde::{Deserialize, Serialize};
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub state_machine_id: Uuid,
#[sea_orm(unique)]
pub scheduled_trigger_id: Option<Uuid>,
pub state: Option<BrokenspokeState>,
pub step: Option<BrokenspokeStep>,
pub sqs_message: Option<Json>,
pub neon_branch_id: Option<String>,
pub fargate_task_arn: Option<String>,
pub s3_bucket: Option<String>,
pub start_time: TimeDateTimeWithTimeZone,
pub end_time: Option<TimeDateTimeWithTimeZone>,
pub torn_down: Option<bool>,
pub results_posted: Option<bool>,
pub cost: Option<Decimal>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
18 changes: 18 additions & 0 deletions entity/src/entities/fargate_price.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.14
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "fargate_price")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub per_second: Decimal,
pub created_at: TimeDateTimeWithTimeZone,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}
1 change: 1 addition & 0 deletions entity/src/entities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod brokenspoke_pipeline;
pub mod census;
pub mod city;
pub mod core_services;
pub mod fargate_price;
pub mod features;
pub mod infrastructure;
pub mod opportunity;
Expand Down
1 change: 1 addition & 0 deletions entity/src/entities/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub use super::brokenspoke_pipeline::Entity as BrokenspokePipeline;
pub use super::census::Entity as Census;
pub use super::city::Entity as City;
pub use super::core_services::Entity as CoreServices;
pub use super::fargate_price::Entity as FargatePrice;
pub use super::features::Entity as Features;
pub use super::infrastructure::Entity as Infrastructure;
pub use super::opportunity::Entity as Opportunity;
Expand Down
4 changes: 2 additions & 2 deletions entity/src/entities/sea_orm_active_enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub enum BnaRegion {
South,
}
#[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "brokenspoke_state")]
pub enum BrokenspokeState {
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "brokenspoke_step")]
pub enum BrokenspokeStep {
#[sea_orm(string_value = "analysis")]
Analysis,
#[sea_orm(string_value = "cleanup")]
Expand Down
51 changes: 23 additions & 28 deletions entity/src/wrappers/brokenspoke_pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,75 +1,70 @@
use crate::entities::{brokenspoke_pipeline, sea_orm_active_enums};
use sea_orm::{
prelude::{Json, TimeDateTimeWithTimeZone, Uuid},
prelude::{Decimal, Json, TimeDateTimeWithTimeZone, Uuid},
ActiveValue, IntoActiveModel,
};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct BrokenspokePipelinePost {
pub state: Option<sea_orm_active_enums::BrokenspokeState>,
pub state_machine_id: Uuid,
pub scheduled_trigger_id: Option<Uuid>,
pub sqs_message: Option<Json>,
pub neon_branch_id: Option<String>,
pub cost: Option<Decimal>,
pub end_time: Option<TimeDateTimeWithTimeZone>,
pub fargate_task_arn: Option<String>,
pub result_posted: Option<bool>,
pub s3_bucket: Option<String>,
pub sqs_message: Option<Json>,
pub start_time: TimeDateTimeWithTimeZone,
pub end_time: Option<TimeDateTimeWithTimeZone>,
pub state_machine_id: Uuid,
pub step: Option<sea_orm_active_enums::BrokenspokeStep>,
pub torn_down: Option<bool>,
pub result_posted: Option<bool>,
}

impl IntoActiveModel<brokenspoke_pipeline::ActiveModel> for BrokenspokePipelinePost {
fn into_active_model(self) -> brokenspoke_pipeline::ActiveModel {
brokenspoke_pipeline::ActiveModel {
state: ActiveValue::Set(self.state),
state_machine_id: ActiveValue::Set(self.state_machine_id),
sqs_message: ActiveValue::Set(self.sqs_message),
neon_branch_id: ActiveValue::Set(self.neon_branch_id),
cost: ActiveValue::Set(self.cost),
end_time: ActiveValue::Set(self.end_time),
fargate_task_arn: ActiveValue::Set(self.fargate_task_arn),
results_posted: ActiveValue::Set(self.result_posted),
s3_bucket: ActiveValue::Set(self.s3_bucket),
scheduled_trigger_id: ActiveValue::Set(self.scheduled_trigger_id),
sqs_message: ActiveValue::Set(self.sqs_message),
start_time: ActiveValue::Set(self.start_time),
end_time: ActiveValue::Set(self.end_time),
state_machine_id: ActiveValue::Set(self.state_machine_id),
step: ActiveValue::Set(self.step),
torn_down: ActiveValue::Set(self.torn_down),
results_posted: ActiveValue::Set(self.result_posted),
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct BrokenspokePipelinePatch {
pub state: Option<Option<sea_orm_active_enums::BrokenspokeState>>,
pub scheduled_trigger_id: Option<Option<Uuid>>,
pub sqs_message: Option<Option<Json>>,
pub neon_branch_id: Option<Option<String>>,
pub cost: Option<Option<Decimal>>,
pub end_time: Option<Option<TimeDateTimeWithTimeZone>>,
pub fargate_task_arn: Option<Option<String>>,
pub neon_branch_id: Option<Option<String>>,
pub result_posted: Option<Option<bool>>,
pub s3_bucket: Option<Option<String>>,
pub scheduled_trigger_id: Option<Option<Uuid>>,
pub sqs_message: Option<Option<Json>>,
pub start_time: Option<Option<TimeDateTimeWithTimeZone>>,
pub end_time: Option<Option<TimeDateTimeWithTimeZone>>,
pub state: Option<Option<sea_orm_active_enums::BrokenspokeStep>>,
pub torn_down: Option<Option<bool>>,
pub result_posted: Option<Option<bool>>,
}

impl IntoActiveModel<brokenspoke_pipeline::ActiveModel> for BrokenspokePipelinePatch {
fn into_active_model(self) -> brokenspoke_pipeline::ActiveModel {
brokenspoke_pipeline::ActiveModel {
state_machine_id: ActiveValue::NotSet,
state: self.state.map_or(ActiveValue::NotSet, ActiveValue::Set),
cost: self.cost.map_or(ActiveValue::NotSet, ActiveValue::Set),
step: self.state.map_or(ActiveValue::NotSet, ActiveValue::Set),
sqs_message: self
.sqs_message
.map_or(ActiveValue::NotSet, ActiveValue::Set),
neon_branch_id: self
.neon_branch_id
.map_or(ActiveValue::NotSet, ActiveValue::Set),

fargate_task_arn: self
.fargate_task_arn
.map_or(ActiveValue::NotSet, ActiveValue::Set),
s3_bucket: self.s3_bucket.map_or(ActiveValue::NotSet, ActiveValue::Set),
scheduled_trigger_id: self
.scheduled_trigger_id
.map_or(ActiveValue::NotSet, ActiveValue::Set),
start_time: ActiveValue::NotSet,
end_time: self.end_time.map_or(ActiveValue::NotSet, ActiveValue::Set),
torn_down: self.torn_down.map_or(ActiveValue::NotSet, ActiveValue::Set),
Expand Down
28 changes: 14 additions & 14 deletions entity/src/wrappers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,36 +44,36 @@ impl FromStr for ApprovalStatus {
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum BrokenspokeState {
pub enum BrokenspokeStep {
SqsMessage,
Setup,
Analysis,
Cleanup,
}

impl From<sea_orm_active_enums::BrokenspokeState> for BrokenspokeState {
fn from(value: sea_orm_active_enums::BrokenspokeState) -> Self {
impl From<sea_orm_active_enums::BrokenspokeStep> for BrokenspokeStep {
fn from(value: sea_orm_active_enums::BrokenspokeStep) -> Self {
match value {
sea_orm_active_enums::BrokenspokeState::Analysis => Self::Analysis,
sea_orm_active_enums::BrokenspokeState::Cleanup => Self::Cleanup,
sea_orm_active_enums::BrokenspokeState::Setup => Self::Setup,
sea_orm_active_enums::BrokenspokeState::SqsMessage => Self::SqsMessage,
sea_orm_active_enums::BrokenspokeStep::Analysis => Self::Analysis,
sea_orm_active_enums::BrokenspokeStep::Cleanup => Self::Cleanup,
sea_orm_active_enums::BrokenspokeStep::Setup => Self::Setup,
sea_orm_active_enums::BrokenspokeStep::SqsMessage => Self::SqsMessage,
}
}
}

impl From<BrokenspokeState> for sea_orm_active_enums::BrokenspokeState {
fn from(val: BrokenspokeState) -> Self {
impl From<BrokenspokeStep> for sea_orm_active_enums::BrokenspokeStep {
fn from(val: BrokenspokeStep) -> Self {
match val {
BrokenspokeState::Analysis => sea_orm_active_enums::BrokenspokeState::Analysis,
BrokenspokeState::Cleanup => sea_orm_active_enums::BrokenspokeState::Cleanup,
BrokenspokeState::Setup => sea_orm_active_enums::BrokenspokeState::Setup,
BrokenspokeState::SqsMessage => sea_orm_active_enums::BrokenspokeState::SqsMessage,
BrokenspokeStep::Analysis => sea_orm_active_enums::BrokenspokeStep::Analysis,
BrokenspokeStep::Cleanup => sea_orm_active_enums::BrokenspokeStep::Cleanup,
BrokenspokeStep::Setup => sea_orm_active_enums::BrokenspokeStep::Setup,
BrokenspokeStep::SqsMessage => sea_orm_active_enums::BrokenspokeStep::SqsMessage,
}
}
}

impl FromStr for BrokenspokeState {
impl FromStr for BrokenspokeStep {
type Err = serde_plain::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Expand Down
4 changes: 4 additions & 0 deletions lambdas/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ path = "src/cities/get-cities-census.rs"
name = "get-cities-submissions"
path = "src/cities-submissions/get-cities-submissions.rs"

[[bin]]
name = "get-price-fargate"
path = "src/price-fargate/get-price-fargate.rs"

[[bin]]
name = "patch-bnas-analysis"
path = "src/bnas-analysis/patch-bnas-analysis.rs"
Expand Down
108 changes: 108 additions & 0 deletions lambdas/src/price-fargate/get-price-fargate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use dotenv::dotenv;
use effortless::{
api::{entry_not_found, invalid_path_parameter, parse_query_string_parameter},
error::APIError,
fragment::{get_apigw_request_id, BnaRequestExt},
};
use entity::prelude::*;
use lambda_http::{run, service_fn, Body, Error, IntoResponse, Request, Response};
use lambdas::{api_database_connect, build_paginated_response, pagination_parameters};
use sea_orm::{EntityTrait, PaginatorTrait, QueryOrder, QuerySelect};
use serde_json::json;
use tracing::{debug, info};

async fn function_handler(event: Request) -> Result<Response<Body>, Error> {
dotenv().ok();

// Retrieve pagination parameters if any.
let (page_size, page) = match pagination_parameters(&event) {
Ok((page_size, page)) => (page_size, page),
Err(e) => return Ok(e),
};

// Retrieve the ID of the entry to update.
let parameter = "id";
let id = event.path_parameter::<i32>(parameter);

// Set the database connection.
let db = match api_database_connect(&event).await {
Ok(db) => db,
Err(e) => return Ok(e),
};

// Retrieve all entries or a specific one.
debug!("Processing the requests...");
let res = match id {
Some(id) => match id {
Ok(id) => {
let model = FargatePrice::find_by_id(id).one(&db).await?;
let res: Response<Body> = match model {
Some(model) => json!(model).into_response().await,
None => entry_not_found(&event).into(),
};
res
}
Err(e) => {
return Ok(
invalid_path_parameter(&event, parameter, e.to_string().as_str()).into(),
);
}
},
None => {
// Retrieve the filter if any.
let latest: Option<bool> = match parse_query_string_parameter::<bool>(&event, "latest")
{
Ok(latest) => latest,
Err(e) => return Ok(e.into()),
};

// Select the entity.
let mut select = FargatePrice::find();

// Select latest only.
if latest.is_some() {
select = select
.order_by_asc(entity::fargate_price::Column::CreatedAt)
.limit(1);
}

// Select the results.
let query = select
.clone()
.paginate(&db, page_size)
.fetch_page(page - 1)
.await;
let res: Response<Body> = match query {
Ok(models) => {
let total_items = select.count(&db).await?;
build_paginated_response(json!(models), total_items, page, page_size, &event)?
}
Err(e) => APIError::with_pointer(
get_apigw_request_id(&event),
event.uri().path().to_string().as_str(),
e.to_string().as_str(),
)
.into(),
};
res
}
};

Ok(res)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();

run(service_fn(function_handler)).await.map_err(|e| {
info!("{e}");
e
})
}
Loading

0 comments on commit 58c2869

Please sign in to comment.